Java 模拟数据库连接池的实现
前面学习过等待 - 通知机制,现在我们在其基础上添加一个超时机制,模拟从连接池中获取、使用和释放连接的过程。客户端获取连接的过程被设定为等待超时模式,即如果在 1000 毫秒内无法获取到可用连接,将会返回给客户端一个 null。设定连接池的大小为 10 个,然后通过调节客户端的线程数来模拟无法获取连接的场景
由于 java.sql.Connection 只是一个接口,最终实现是由数据库驱动提供方来实现,考虑到本例只是演示,我们通过动态代理构造一个 Connection,该 Connection 的代理仅仅是在调用 commit() 方法时休眠 100 毫秒
public class ConnectionDriver {
<span class="hljs-keyword">static</span> <span class="hljs-keyword">class</span> <span class="hljs-title class_">ConnectionHandler</span> <span class="hljs-keyword">implements</span> <span class="hljs-title class_">InvocationHandler</span> {
<span class="hljs-meta">@Override</span>
<span class="hljs-keyword">public</span> Object <span class="hljs-title function_">invoke</span><span class="hljs-params">(Object proxy, Method method, Object[] args)</span> <span class="hljs-keyword">throws</span> Throwable {
<span class="hljs-keyword">if</span> (<span class="hljs-string">"commit"</span>.equals(method.getName())) {
TimeUnit.MICROSECONDS.sleep(<span class="hljs-number">100</span>);
}
<span class="hljs-keyword">return</span> <span class="hljs-literal">null</span>;
}
}
<span class="hljs-comment">/**
* 创建一个 Connection 的代理,在 commit 时休眠 100 毫秒
*/</span>
<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> Connection <span class="hljs-title function_">createConnection</span><span class="hljs-params">()</span> {
<span class="hljs-keyword">return</span> (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
<span class="hljs-keyword">new</span> <span class="hljs-title class_">Class</span><?>[]{Connection.class}, <span class="hljs-keyword">new</span> <span class="hljs-title class_">ConnectionHandler</span>());
}
}
接下来是线程池的实现。本例通过一个双向队列来维护连接,调用方需要先调用 fetchConnection(long) 方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用 releaseConnection(Connection) 方法将连接放回线程池
public class ConnectionPool {
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> LinkedList<Connection> pool = <span class="hljs-keyword">new</span> <span class="hljs-title class_">LinkedList</span><>();
<span class="hljs-keyword">public</span> <span class="hljs-title function_">ConnectionPool</span><span class="hljs-params">(<span class="hljs-type">int</span> initialSize)</span> {
<span class="hljs-comment">// 初始化连接的最大上限</span>
<span class="hljs-keyword">if</span> (initialSize > <span class="hljs-number">0</span>) {
<span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}
<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">releaseConnection</span><span class="hljs-params">(Connection connection)</span> {
<span class="hljs-keyword">if</span> (connection != <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">synchronized</span> (pool) {
<span class="hljs-comment">/* 连接释放后需要进行通知
* 这样其他消费者就能知道连接池已经归还了一个连接
*/</span>
pool.addLast(connection);
pool.notifyAll();
}
}
}
<span class="hljs-comment">/**
* 在给定毫秒时间内获取连接
*/</span>
<span class="hljs-keyword">public</span> Connection <span class="hljs-title function_">fetchConnection</span><span class="hljs-params">(<span class="hljs-type">long</span> mills)</span> <span class="hljs-keyword">throws</span> InterruptedException {
<span class="hljs-keyword">synchronized</span> (pool) {
<span class="hljs-comment">// 完全超时</span>
<span class="hljs-keyword">if</span> (mills < <span class="hljs-number">0</span>) {
<span class="hljs-keyword">while</span> (pool.isEmpty()) {
pool.wait();
}
<span class="hljs-keyword">return</span> pool.removeFirst();
} <span class="hljs-keyword">else</span> {
<span class="hljs-type">long</span> <span class="hljs-variable">future</span> <span class="hljs-operator">=</span> System.currentTimeMillis() + mills;
<span class="hljs-type">long</span> <span class="hljs-variable">remaining</span> <span class="hljs-operator">=</span> mills;
<span class="hljs-keyword">while</span> (pool.isEmpty() && remaining > <span class="hljs-number">0</span>) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
<span class="hljs-type">Connection</span> <span class="hljs-variable">result</span> <span class="hljs-operator">=</span> <span class="hljs-literal">null</span>;
<span class="hljs-keyword">if</span> (!pool.isEmpty()) {
result = pool.removeFirst();
}
<span class="hljs-keyword">return</span> result;
}
}
}
}
最后编写一个用于模拟客户端获取连接的示例,该示例将模拟多个线程同时从连接池获取连接,并记录总尝试获取数、获取成功数和获取失败数
public class ConnectionPoolTest {
<span class="hljs-keyword">static</span> <span class="hljs-type">ConnectionPool</span> <span class="hljs-variable">pool</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">ConnectionPool</span>(<span class="hljs-number">10</span>);
<span class="hljs-keyword">static</span> <span class="hljs-type">CountDownLatch</span> <span class="hljs-variable">start</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">CountDownLatch</span>(<span class="hljs-number">1</span>);
<span class="hljs-keyword">static</span> CountDownLatch end;
<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">main</span><span class="hljs-params">(String[] args)</span> <span class="hljs-keyword">throws</span> InterruptedException {
<span class="hljs-comment">// 线程数量</span>
<span class="hljs-type">int</span> <span class="hljs-variable">threadCount</span> <span class="hljs-operator">=</span> <span class="hljs-number">200</span>;
end = <span class="hljs-keyword">new</span> <span class="hljs-title class_">CountDownLatch</span>(threadCount);
<span class="hljs-type">int</span> <span class="hljs-variable">count</span> <span class="hljs-operator">=</span> <span class="hljs-number">20</span>;
<span class="hljs-type">AtomicInteger</span> <span class="hljs-variable">got</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">AtomicInteger</span>();
<span class="hljs-type">AtomicInteger</span> <span class="hljs-variable">notGot</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">AtomicInteger</span>();
<span class="hljs-keyword">for</span> (<span class="hljs-type">int</span> <span class="hljs-variable">i</span> <span class="hljs-operator">=</span> <span class="hljs-number">0</span>; i < threadCount; i++) {
<span class="hljs-type">Thread</span> <span class="hljs-variable">thread</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">Thread</span>(<span class="hljs-keyword">new</span> <span class="hljs-title class_">ConnectionRunner</span>(count, got, notGot), <span class="hljs-string">"ConnectionRunnerThread"</span>);
thread.start();
}
start.countDown();
end.await();
System.out.println(<span class="hljs-string">"total invoke : "</span> + (threadCount * count));
System.out.println(<span class="hljs-string">"got connection : "</span> + got);
System.out.println(<span class="hljs-string">"not got connection : "</span> + notGot);
}
<span class="hljs-keyword">static</span> <span class="hljs-keyword">class</span> <span class="hljs-title class_">ConnectionRunner</span> <span class="hljs-keyword">implements</span> <span class="hljs-title class_">Runnable</span> {
<span class="hljs-type">int</span> count;
AtomicInteger got;
AtomicInteger notGot;
<span class="hljs-keyword">public</span> <span class="hljs-title function_">ConnectionRunner</span><span class="hljs-params">(<span class="hljs-type">int</span> count, AtomicInteger got, AtomicInteger notGot)</span> {
<span class="hljs-built_in">this</span>.count = count;
<span class="hljs-built_in">this</span>.got = got;
<span class="hljs-built_in">this</span>.notGot = notGot;
}
<span class="hljs-meta">@Override</span>
<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">run</span><span class="hljs-params">()</span> {
<span class="hljs-keyword">try</span> {
start.await();
} <span class="hljs-keyword">catch</span> (Exception e) {
e.printStackTrace();
}
<span class="hljs-keyword">while</span> (count > <span class="hljs-number">0</span>) {
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// 从线程池中获取连接,如果 1000ms 内无法获取到,将返回 null</span>
<span class="hljs-comment">// 分别统计获取连接的数量 got 和未获取到的数量 notGot</span>
<span class="hljs-type">Connection</span> <span class="hljs-variable">connection</span> <span class="hljs-operator">=</span> pool.fetchConnection(<span class="hljs-number">1000</span>);
<span class="hljs-keyword">if</span> (connection != <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">try</span> {
connection.createStatement();
connection.commit();
} <span class="hljs-keyword">finally</span> {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} <span class="hljs-keyword">else</span> {
notGot.incrementAndGet();
}
} <span class="hljs-keyword">catch</span> (Exception e) {
e.printStackTrace();
} <span class="hljs-keyword">finally</span> {
count--;
}
}
end.countDown();
}
}
}
笔者设置线程数量为 200 时,得出结果如下
当设置为 500 时,得出结果如下,当然具体结果根据机器性能而异
可见,随着客户端线程数的增加,客户端出现超时无法获取连接的比率不断升高。这种等待超时模式能保证程序出问题时,线程不会一直运行,而是按时返回,并告知客户端获取连接出现问题。数据库连接池的实际也可以应用到其他资源获取的场景,针对昂贵资源的获取都应该加以限制
__EOF__