用java多线程模拟数据库连接池
模拟一个 ConnectionDriver,用于创建 Connection
package tread.demo.threadpool;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;public class ConnectionDriver {
static class ConnectionHandler implements InvocationHandler {</span><span style="color: rgba(0, 0, 255, 1)">public</span> Object invoke(Object proxy, Method method, Object[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Throwable { </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">; } } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">final</span><span style="color: rgba(0, 0, 0, 1)"> Connection createConnection() { </span><span style="color: rgba(0, 0, 255, 1)">return</span> (Connection) Proxy.newProxyInstance(ConnectionDriver.<span style="color: rgba(0, 0, 255, 1)">class</span>.getClassLoader(), <span style="color: rgba(0, 0, 255, 1)">new</span> Class<?>[]{Connection.<span style="color: rgba(0, 0, 255, 1)">class</span>}, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionHandler()); }
}
线程池的实现:
package tread.demo.threadpool;import java.sql.Connection;
import java.util.LinkedList;public class ConnectionPool {
private LinkedList<Connection> pool = new LinkedList<Connection>();</span><span style="color: rgba(0, 0, 255, 1)">public</span> ConnectionPool(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> initialSize) { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (initialSize > 0<span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i < initialSize; i++<span style="color: rgba(0, 0, 0, 1)">) { pool.addLast(ConnectionDriver.createConnection()); } } } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> releaseConnection(Connection connection) { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (connection != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">synchronized</span><span style="color: rgba(0, 0, 0, 1)"> (pool) { pool.addLast(connection);</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">将Connection还回给Pool</span> pool.notifyAll();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">通知等待的线程</span>
}
}
}</span><span style="color: rgba(0, 0, 255, 1)">public</span> Connection fetchConnection(<span style="color: rgba(0, 0, 255, 1)">long</span> mills) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { </span><span style="color: rgba(0, 0, 255, 1)">synchronized</span><span style="color: rgba(0, 0, 0, 1)"> (pool) { </span><span style="color: rgba(0, 0, 255, 1)">if</span> (mills <= 0<span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> (pool.isEmpty()) { pool.wait();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">一直等带release-》Notify</span>
}
return pool.removeFirst();//得到一个 connection
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0) {//基于时间进行等待,一直到超时。
pool.wait();
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}
两点:
- 对象的 wait 和 notify
- 基于超时时间的等待。
测试:
package tread.demo.threadpool;import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
static CountDownLatch start = new CountDownLatch(1);
static CountDownLatch end;</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception { </span><span style="color: rgba(0, 0, 255, 1)">int</span> threadCount = 1000<span style="color: rgba(0, 0, 0, 1)">; end </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> CountDownLatch(threadCount); </span><span style="color: rgba(0, 0, 255, 1)">int</span> count = 20<span style="color: rgba(0, 0, 0, 1)">; AtomicInteger got </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AtomicInteger(); AtomicInteger notGot </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AtomicInteger(); </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i < threadCount; i++<span style="color: rgba(0, 0, 0, 1)">) { Thread thread </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Thread(<span style="color: rgba(0, 0, 255, 1)">new</span> ConnectionRunner(count, got, notGot), "ConnectionRunnerThread"<span style="color: rgba(0, 0, 0, 1)">); thread.start(); } start.countDown();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">tart的CountDown为0,保证了所有线程同时执行。</span> end.await();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待所有线程执行完毕,</span> System.out.println("total invoke: " + (threadCount *<span style="color: rgba(0, 0, 0, 1)"> count)); System.out.println(</span>"got connection: " +<span style="color: rgba(0, 0, 0, 1)"> got); System.out.println(</span>"not got connection: " +<span style="color: rgba(0, 0, 0, 1)"> notGot); } </span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> ConnectionRunner <span style="color: rgba(0, 0, 255, 1)">implements</span><span style="color: rgba(0, 0, 0, 1)"> Runnable { </span><span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> count; AtomicInteger got; AtomicInteger notGot; </span><span style="color: rgba(0, 0, 255, 1)">public</span> ConnectionRunner(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> count, AtomicInteger got, AtomicInteger notGot) { </span><span style="color: rgba(0, 0, 255, 1)">this</span>.count =<span style="color: rgba(0, 0, 0, 1)"> count; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.got =<span style="color: rgba(0, 0, 0, 1)"> got; </span><span style="color: rgba(0, 0, 255, 1)">this</span>.notGot =<span style="color: rgba(0, 0, 0, 1)"> notGot; } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() { </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { start.await();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待start的CountDown为0.</span> } <span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (InterruptedException e) { e.printStackTrace(); } </span><span style="color: rgba(0, 0, 255, 1)">while</span> (count > 0<span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { Connection connection </span>= pool.fetchConnection(1);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">超时时间</span> <span style="color: rgba(0, 0, 255, 1)">if</span> (connection != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { connection.createStatement(); } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> { pool.releaseConnection(connection); got.incrementAndGet(); } } </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> { notGot.incrementAndGet(); } } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex) { } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> { count</span>--<span style="color: rgba(0, 0, 0, 1)">; } } end.countDown(); } }
}
继续巧用了 CatdownLatch
结果:
total invoke: 20000 got connection: 11914 not got connection: 8086
如果调整超时时间,调整为 100ms
结果如下(大部分时候都能得到 connection)
total invoke: 20000 got connection: 19050 not got connection: 950