用java多线程模拟数据库连接池

模拟一个 ConnectionDriver,用于创建 Connection

package tread.demo.threadpool;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

public class ConnectionDriver {
static class ConnectionHandler implements InvocationHandler {

    </span><span style="color: rgba(0, 0, 255, 1)">public</span> Object invoke(Object proxy, Method method, Object[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Throwable {
        </span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">;
    }
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">final</span><span style="color: rgba(0, 0, 0, 1)"> Connection createConnection() {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> (Connection) Proxy.newProxyInstance(ConnectionDriver.<span style="color: rgba(0, 0, 255, 1)">class</span>.getClassLoader(), <span style="color: rgba(0, 0, 255, 1)">new</span> Class&lt;?&gt;[]{Connection.<span style="color: rgba(0, 0, 255, 1)">class</span>}, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ConnectionHandler());
}

}

线程池的实现:

package tread.demo.threadpool;

import java.sql.Connection;
import java.util.LinkedList;

public class ConnectionPool {
private LinkedList<Connection> pool = new LinkedList<Connection>();

</span><span style="color: rgba(0, 0, 255, 1)">public</span> ConnectionPool(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> initialSize) {
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (initialSize &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
        </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i &lt; initialSize; i++<span style="color: rgba(0, 0, 0, 1)">) {
            pool.addLast(ConnectionDriver.createConnection());
        }
    }
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> releaseConnection(Connection connection) {
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (connection != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
        </span><span style="color: rgba(0, 0, 255, 1)">synchronized</span><span style="color: rgba(0, 0, 0, 1)"> (pool) {
            pool.addLast(connection);</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">将Connection还回给Pool</span>
            pool.notifyAll();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">通知等待的线程</span>

}
}
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> Connection fetchConnection(<span style="color: rgba(0, 0, 255, 1)">long</span> mills) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    </span><span style="color: rgba(0, 0, 255, 1)">synchronized</span><span style="color: rgba(0, 0, 0, 1)"> (pool) {
        </span><span style="color: rgba(0, 0, 255, 1)">if</span> (mills &lt;= 0<span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">while</span><span style="color: rgba(0, 0, 0, 1)"> (pool.isEmpty()) {
                pool.wait();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">一直等带release-》Notify</span>

}
return pool.removeFirst();//得到一个 connection
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0) {//基于时间进行等待,一直到超时。
pool.wait();
remaining
= future - System.currentTimeMillis();
}
Connection result
= null;
if (!pool.isEmpty()) {
result
= pool.removeFirst();
}
return result;
}
}
}
}

两点:

  1. 对象的 wait 和 notify
  2. 基于超时时间的等待。

测试:


package tread.demo.threadpool;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
static CountDownLatch start = new CountDownLatch(1);
static CountDownLatch end;

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    </span><span style="color: rgba(0, 0, 255, 1)">int</span> threadCount = 1000<span style="color: rgba(0, 0, 0, 1)">;
    end </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> CountDownLatch(threadCount);
    </span><span style="color: rgba(0, 0, 255, 1)">int</span> count = 20<span style="color: rgba(0, 0, 0, 1)">;
    AtomicInteger got </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AtomicInteger();
    AtomicInteger notGot </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> AtomicInteger();
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i &lt; threadCount; i++<span style="color: rgba(0, 0, 0, 1)">) {
        Thread thread </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> Thread(<span style="color: rgba(0, 0, 255, 1)">new</span> ConnectionRunner(count, got, notGot), "ConnectionRunnerThread"<span style="color: rgba(0, 0, 0, 1)">);
        thread.start();
    }
    start.countDown();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">tart的CountDown为0,保证了所有线程同时执行。</span>
    end.await();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待所有线程执行完毕,</span>
    System.out.println("total invoke: " + (threadCount *<span style="color: rgba(0, 0, 0, 1)"> count));
    System.out.println(</span>"got connection: " +<span style="color: rgba(0, 0, 0, 1)"> got);
    System.out.println(</span>"not got connection: " +<span style="color: rgba(0, 0, 0, 1)"> notGot);

}

</span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">class</span> ConnectionRunner <span style="color: rgba(0, 0, 255, 1)">implements</span><span style="color: rgba(0, 0, 0, 1)"> Runnable {
    </span><span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> count;
    AtomicInteger got;
    AtomicInteger notGot;

    </span><span style="color: rgba(0, 0, 255, 1)">public</span> ConnectionRunner(<span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)"> count, AtomicInteger got, AtomicInteger notGot) {
        </span><span style="color: rgba(0, 0, 255, 1)">this</span>.count =<span style="color: rgba(0, 0, 0, 1)"> count;
        </span><span style="color: rgba(0, 0, 255, 1)">this</span>.got =<span style="color: rgba(0, 0, 0, 1)"> got;
        </span><span style="color: rgba(0, 0, 255, 1)">this</span>.notGot =<span style="color: rgba(0, 0, 0, 1)"> notGot;
    }

    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
        </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
            start.await();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待start的CountDown为0.</span>
        } <span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (InterruptedException e) {
            e.printStackTrace();
        }
        </span><span style="color: rgba(0, 0, 255, 1)">while</span> (count &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                Connection connection </span>= pool.fetchConnection(1);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">超时时间</span>
                <span style="color: rgba(0, 0, 255, 1)">if</span> (connection != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) {
                    </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                        connection.createStatement();
                    } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
                        pool.releaseConnection(connection);
                        got.incrementAndGet();
                    }
                } </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)"> {
                    notGot.incrementAndGet();
                }
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception ex) {
            } </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)"> {
                count</span>--<span style="color: rgba(0, 0, 0, 1)">;
            }
        }
        end.countDown();
    }
}

}

继续巧用了 CatdownLatch

结果:

total invoke: 20000
got connection: 11914
not got connection: 8086

如果调整超时时间,调整为 100ms

结果如下(大部分时候都能得到 connection)

total invoke: 20000
got connection: 19050
not got connection: 950