MyBatis源码分析(5)——内置DataSource实现

@(MyBatis)[DataSource]

MyBatis 源码分析(5)——内置 DataSource 实现

MyBatis 内置了两个 DataSource 的实现:UnpooledDataSource,该数据源对于每次获取请求都简单的打开和关闭连接。PooledDataSource,该数据源在 Unpooled 的基础上构建了连接池。

UnpooledDataSource

配置

UNPOOLED 数据源只有 5 个属性需要配置:

driver:JDBC 具体数据库驱动
url:JDBC 连接
username:用户名
password:密码
defaultTransactionIsolationLevel:默认事务隔离级别

除了以上属性外,还可以配置驱动的连接信息,但是需要加前缀driver.,如:driver.encoding=UTF8,会将该配置作为参数传入 driverManager.getConnection。

如下:

<dataSource type="UNPOOLED">
	<property name="driver" value="${jdbc.driver}" />
	<property name="url" value="${jdbc.url}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
</dataSource>

实现

UnpooledDataSource,内部通过调用DriverManager来实现,DriverManager是用于管理低层驱动以及创建连接的。

public class UnpooledDataSource implements DataSource {

private ClassLoader driverClassLoader;
// 驱动连接属性
private Properties driverProperties;
// 所有已注册的驱动,仅仅用于识别驱动在 DriverManager 中是否已经被加载进来了
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

// 当前使用的驱动
private String driver;
private String url;
private String username;
private String password;

private Boolean autoCommit;
// 默认事务隔离级别
private Integer defaultTransactionIsolationLevel;

static {
// 静态代码块,当类加载的时候,就从 DriverManager 中获取所有的驱动信息,放到当前维护的 Map 中
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}

// 省略了部分代码...

public Connection getConnection() throws SQLException {
// 获取数据库连接
return doGetConnection(username, password);
}

public Connection getConnection(String username, String password) throws SQLException {
return doGetConnection(username, password);
}

private Connection doGetConnection(String username, String password) throws SQLException {
// 这里通过 url 加 Properties 来获取连接,是因为可以在配置文件中配置数据库连接的信息,比如编码之类的
Properties props = new Properties();
if (driverProperties != null) {
props.putAll(driverProperties);
}
if (username != null) {
props.setProperty("user", username);
}
if (password != null) {
props.setProperty("password", password);
}
return doGetConnection(props);
}

private Connection doGetConnection(Properties properties) throws SQLException {
// 初始化驱动信息
initializeDriver();
// 从 DriverManager 中获取数据库连接
Connection connection = DriverManager.getConnection(url, properties);
// 配置连接信息,自动提交以及事务隔离级别
configureConnection(connection);
return connection;
}

private synchronized void initializeDriver() throws SQLException {
// 如果没有包含在已注册 Map 中,则需要将该驱动加载进来
if (!registeredDrivers.containsKey(driver)) {
Class<?> driverType;
try {
// 加载数据库连接驱动
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
// Resources 为 MyBatis 内置的资源工具类,该方法依次尝试从多个 ClassLoader 中获取 Class 类,顺序为:配置的 classLoader,默认的 defaultClassLoader,当前线程的 getContextClassLoader,当前类的 getClass().getClassLoader(),系统的 systemClassLoader
driverType = Resources.classForName(driver);
}
// 创建驱动实例
Driver driverInstance = (Driver)driverType.newInstance();
// 注册到 DriverManager 中,用于创建数据库连接
DriverManager.registerDriver(new DriverProxy(driverInstance));
// 放到已注册 Map 中
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause:" + e);
}
}
}

private void configureConnection(Connection conn) throws SQLException {
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
// 如果已开启了事务,则可以将自动提交关闭
conn.setAutoCommit(autoCommit);
}
if (defaultTransactionIsolationLevel != null) {
// 设置事务隔离级别
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
}

PooledDataSource

配置

除了 UNPOOLED 的配置外,还可以配置其它的一些属性,如下:

poolMaximumActiveConnections:最大活动连接数(默认为10)
poolMaximumIdleConnections:最大空闲连接数(默认为5)
poolMaximumCheckoutTime:最大可回收时间,即当达到最大活动链接数时,此时如果有程序获取连接,则检查最先使用的连接,看其是否超出了该时间,如果超出了该时间,则可以回收该连接。(默认20s)
poolTimeToWait:没有连接时,重尝试获取连接以及打印日志的时间间隔(默认20s)
poolPingQuery:检查连接正确的语句,默认为"NO PING QUERY SET",即没有,使用会导致抛异常
poolPingEnabled:是否开启ping检测,(默认:false)
poolPingConnectionsNotUsedFor:设置ping检测时间间隔,通常用于检测超时连接(默认为0,即当开启检测后每次从连接词中获取连接以及放回连接池都需要检测)

示例配置如下:

<dataSource type="POOLED">
	<property name="driver" value="${jdbc.driver}" />
	<property name="url" value="${jdbc.url}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
	<property name="poolMaximumActiveConnections" value="20" />
	<property name="poolMaximumIdleConnections" value="10" />
	<property name="poolMaximumCheckoutTime" value="15" />
	<property name="poolTimeToWait" value="10" />
	<property name="poolPingQuery" value="select 1 from dual" />
	<property name="poolPingEnabled" value="true" />
	<property name="poolPingConnectionsNotUsedFor" value="0" />
</dataSource>

当日志开启 DEBUG 输出,使用 Mapper 操作时,可以看到:

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection()]: 
Created connection 395084181.

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.pingConnection()]:
Testing connection 395084181 ...

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.pingConnection()]:
Connection 395084181 is GOOD!

实现

连接池的实现,核心的思想在于,将连接缓存起来,即可以通过两个链表 (/ 队列) 来实现,一个用于维持活动连接,另一个维持空闲连接。还有另外一点就是关闭连接后返回给连接池或者释放掉,这里可以通过代理来实现,当客户端调用 close 时,并不是直接关闭连接,而是将其缓存起来,放到空闲链表中。这里使用代理还有个优点,每次释放的时候,重新创建新的代理连接来封装,并且将原有的代理设置为无效,可以使得程序即使持有原有的代理,也不会影响到回收的连接。
在 MyBatis 中,主要是通过PoolState来维护连接状态,以及通过PooledConnection代理来实现归还连接操作。


PooledConnection

这里主要是通过 Java Proxy 代理来实现的。

// 实现代理接口
class PooledConnection implements InvocationHandler {

private static final String CLOSE = "close";
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

// 省略了部分代码...

private PooledDataSource dataSource;
private Connection realConnection;
private Connection proxyConnection;

public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
// 创建代理
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}

// 代理实现 Connection 接口的所有方法,只对 CLOSE 方法特别处理
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
// 如果为 CLOSE 方法,那么就将其放回连接池中
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
checkConnection();
}
// 其他方法则直接调用实际的连接来处理
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}
}


PoolState

连接池的状态,用于维护活动连接,空闲连接,以及统计一些连接信息。

public class PoolState {

protected PooledDataSource dataSource;

// 空闲连接
protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
// 当前活动连接
protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
// 请求次数
protected long requestCount = 0;
// 请求获得连接所需时间
protected long accumulatedRequestTime = 0;
// 统计连接使用时间
protected long accumulatedCheckoutTime = 0;
// 统计过期回收连接数
protected long claimedOverdueConnectionCount = 0;
// 统计连接过期使用时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 统计获取连接需要等待的时间
protected long accumulatedWaitTime = 0;
// 统计获取连接需要等待的次数
protected long hadToWaitCount = 0;
// 统计无效连接个数
protected long badConnectionCount = 0;

public PoolState(PooledDataSource dataSource) {
this.dataSource = dataSource;
}

// 省略了部分 Getter 方法...
}


PooledDataSource获取连接

连接池主要是通过popConnection来实现连接的创建以及分配的,过程不复杂,当有空闲连接时则直接使用,否则再根据是否达到了设置的峰值,来决定是否需要创建新的连接等。

流程图如下:

popConnection 实现:
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
<span class="hljs-keyword">while</span> (conn == <span class="hljs-literal">null</span>) {
  <span class="hljs-comment">// 加锁访问</span>
  <span class="hljs-keyword">synchronized</span> (state) {
    <span class="hljs-keyword">if</span> (state.idleConnections.size() &gt; <span class="hljs-number">0</span>) {
      <span class="hljs-comment">// 有空闲连接,直接获取</span>
      conn = state.idleConnections.remove(<span class="hljs-number">0</span>);
      <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
        log.debug(<span class="hljs-string">"Checked out connection "</span> + conn.getRealHashCode() + <span class="hljs-string">" from pool."</span>);
      }
    } <span class="hljs-keyword">else</span> {
      <span class="hljs-comment">// 空闲连接不足</span>
      <span class="hljs-keyword">if</span> (state.activeConnections.size() &lt; poolMaximumActiveConnections) {
        <span class="hljs-comment">// 小于最大活动连接数,直接建立新的连接,并封装代理</span>
        conn = <span class="hljs-keyword">new</span> <span class="hljs-title class_">PooledConnection</span>(dataSource.getConnection(), <span class="hljs-built_in">this</span>);
        <span class="hljs-type">Connection</span> <span class="hljs-variable">realConn</span> <span class="hljs-operator">=</span> conn.getRealConnection();
        <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
          log.debug(<span class="hljs-string">"Created connection "</span> + conn.getRealHashCode() + <span class="hljs-string">"."</span>);
        }
      } <span class="hljs-keyword">else</span> {
        <span class="hljs-comment">// 超出最大活动连接数,不能创建连接</span>
        <span class="hljs-type">PooledConnection</span> <span class="hljs-variable">oldestActiveConnection</span> <span class="hljs-operator">=</span> state.activeConnections.get(<span class="hljs-number">0</span>);
        <span class="hljs-comment">// 获取使用时间最长的活动连接,并计算使用的时间</span>
        <span class="hljs-type">long</span> <span class="hljs-variable">longestCheckoutTime</span> <span class="hljs-operator">=</span> oldestActiveConnection.getCheckoutTime();
        <span class="hljs-keyword">if</span> (longestCheckoutTime &gt; poolMaximumCheckoutTime) {
          <span class="hljs-comment">// 超出了最大可回收时间,直接回收该连接,回收过期次数增加</span>
          state.claimedOverdueConnectionCount++;
          <span class="hljs-comment">// 统计过期回收时间增加</span>
          state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
          <span class="hljs-comment">// 统计使用时间增加</span>
          state.accumulatedCheckoutTime += longestCheckoutTime;
          <span class="hljs-comment">// 将连接从活动队列中移除</span>
          state.activeConnections.remove(oldestActiveConnection);
          <span class="hljs-keyword">if</span> (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
            <span class="hljs-comment">// 如果不是自动提交事务,则将其回滚,因为可能存在一些操作</span>
            oldestActiveConnection.getRealConnection().rollback();
          }
          <span class="hljs-comment">// 使用新的代理封装,可以使得不会被原有的影响</span>
          conn = <span class="hljs-keyword">new</span> <span class="hljs-title class_">PooledConnection</span>(oldestActiveConnection.getRealConnection(), <span class="hljs-built_in">this</span>);
          <span class="hljs-comment">// 将之前的代理设置为无效</span>
          oldestActiveConnection.invalidate();
          <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
            log.debug(<span class="hljs-string">"Claimed overdue connection "</span> + conn.getRealHashCode() + <span class="hljs-string">"."</span>);
          }
        } <span class="hljs-keyword">else</span> {
          <span class="hljs-comment">// Must wait</span>
          <span class="hljs-keyword">try</span> {
            <span class="hljs-keyword">if</span> (!countedWait) {
              <span class="hljs-comment">// 增加获取连接需要等待的次数</span>
              state.hadToWaitCount++;
              countedWait = <span class="hljs-literal">true</span>;
            }
            <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
              log.debug(<span class="hljs-string">"Waiting as long as "</span> + poolTimeToWait + <span class="hljs-string">" milliseconds for connection."</span>);
            }
            <span class="hljs-type">long</span> <span class="hljs-variable">wt</span> <span class="hljs-operator">=</span> System.currentTimeMillis();
            <span class="hljs-comment">// 等待</span>
            state.wait(poolTimeToWait);
            <span class="hljs-comment">// 增加获取连接的等待时间</span>
            state.accumulatedWaitTime += System.currentTimeMillis() - wt;
          } <span class="hljs-keyword">catch</span> (InterruptedException e) {
            <span class="hljs-comment">// 被中断,退出尝试以及等待</span>
            <span class="hljs-keyword">break</span>;
          }
        }
      }
    }
    <span class="hljs-keyword">if</span> (conn != <span class="hljs-literal">null</span>) {
      <span class="hljs-keyword">if</span> (conn.isValid()) {
        
        <span class="hljs-keyword">if</span> (!conn.getRealConnection().getAutoCommit()) {
          <span class="hljs-comment">// 连接为非自动提交事务,则将其回滚,可能存在一些未提交操作,并且防止影响下一次使用 </span>
          conn.getRealConnection().rollback();
        }
        <span class="hljs-comment">// 根据URL,用户名以及密码计算出一个Hash,用于标识此次连接</span>
        conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
        <span class="hljs-comment">// 设置当前连接开始使用时间</span>
        conn.setCheckoutTimestamp(System.currentTimeMillis());
        <span class="hljs-comment">// 设置最后一次使用时间</span>
        conn.setLastUsedTimestamp(System.currentTimeMillis());
        <span class="hljs-comment">// 加入活动队列中</span>
        state.activeConnections.add(conn);
        <span class="hljs-comment">// 统计请求次数</span>
        state.requestCount++;
        <span class="hljs-comment">// 统计获取连接所需时间</span>
        state.accumulatedRequestTime += System.currentTimeMillis() - t;
      } <span class="hljs-keyword">else</span> {
        <span class="hljs-comment">// 无效连接</span>
        <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
          log.debug(<span class="hljs-string">"A bad connection ("</span> + conn.getRealHashCode() + <span class="hljs-string">") was returned from the pool, getting another connection."</span>);
        }
        <span class="hljs-comment">// 统计无效连接个数</span>
        state.badConnectionCount++;
        localBadConnectionCount++;
        conn = <span class="hljs-literal">null</span>;
        <span class="hljs-keyword">if</span> (localBadConnectionCount &gt; (poolMaximumIdleConnections + <span class="hljs-number">3</span>)) {
          <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
            log.debug(<span class="hljs-string">"PooledDataSource: Could not get a good connection to the database."</span>);
          }
          <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">SQLException</span>(<span class="hljs-string">"PooledDataSource: Could not get a good connection to the database."</span>);
        }
      }
    }
  }

}

<span class="hljs-comment">// 从上面的循环退出,如果为null,则一定出现异常情况了</span>
<span class="hljs-keyword">if</span> (conn == <span class="hljs-literal">null</span>) {
  <span class="hljs-keyword">if</span> (log.isDebugEnabled()) {
    log.debug(<span class="hljs-string">"PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection."</span>);
  }
  <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">SQLException</span>(<span class="hljs-string">"PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection."</span>);
}

<span class="hljs-keyword">return</span> conn;

}


PooledDataSource回收连接

通过使用代理,就可以在用户调用 Close 关闭数据库连接的时候,根据当前状态来决定放入空闲链表中还是释放掉。

流程图如下:

pushConnection实现:
  // 省略了部分日志代码
  protected void pushConnection(PooledConnection conn) throws SQLException {
    synchronized (state) {
      // 从活动链表中移除当前连接
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 当前连接有效的话判断是否达到了最大空闲连接数,以及当前的连接是否变更过(即用户名,密码,Url 等变更)
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          // 统计使用连接时长
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            // 没有自动提交的话,先回滚,防止影响下一次使用
            conn.getRealConnection().rollback();
          }
          // 重新创建一个代理连接来封装,可以使得当前使用的连接不会被原有的代理连接影响
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          // 放回空闲链表中
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 将原有的代理连接设置为无效
          conn.invalidate();
          // 通知等待获取连接的线程(不去判断是否真的有线程在等待)
          state.notifyAll();
        } else {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 超出空闲连接限制,则直接释放当前连接
          conn.getRealConnection().close();
          // 将原有的代理连接设置为无效
          conn.invalidate();
        }
      } else {
        // 连接无效,则统计无效连接个数
        state.badConnectionCount++;
      }
    }
  }

连接池调优

// 经验不足,后续补充

MyBatis DataSource 集成

// 后续看完 Spring-MyBatis 再补充

参考

  1. MyBatis 官方文档