Spark 数据库 连接池 java DAO 工厂
在一般的 java 项目 以及 现在特别火的大数据分析项目中 ,用到数据库以及数据库资源池 连接的事情 是在稀松平常不过的了 。今天就简单的梳理下 这是一个怎样的过程:
我们按照代码的调度顺序审视下 :
Comment ,我们是从 Spark 数据分析做 demo 展开的 :
第一,假设读写数据库一定是从业务层面发出的 ,那么就应该有以下代码
这是我们众多代码中的最后一步 ,写数据到数据库的代码,将最后生成的数据写入数据库 ,假设现在数据库类型不能,就要求我们提供可配置的功能了 ,the most impotent code I marked in red and bold
JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD = top10SessionRDD.join(sessionid2detailRDD); sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
Row row = tuple._2._2;
SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(1));
。。。。。。。、//reomve some contents
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); //
sessionDetailDAO.insert(sessionDetail);
}
});
public static ISessionDetailDAO getSessionDetailDAO() { return new SessionDetailDAOImpl(); }第二 ,上面代码来自 DAOFactory 中定主意 Impl 的实现
接下来让看下 Impl 代码是如何实现的
第三 insert 中的数据是我们 insert 到 DB 中的数据 ,在这里要转化成参数 跟 sql 拼接起来
public class SessionDetailDAOImpl implements ISessionDetailDAO {/**
* 插入一条数据
* @param sessionDetail
*/
public void insert(SessionDetail sessionDetail) {
String sql = "insert into table_name values(?,?,?,?,?,?,?,?,?,?,?,?)";
Object[] params = new Object[]{sessionDetail.getTaskid(),
sessionDetail.getUserid(),
sessionDetail.getSessionid(),
JDBCHelper jdbcHelper = JDBCHelper.getInstance();
jdbcHelper.executeUpdate(sql, params);
}
第四,介绍下 JDBChelper 是怎么实现的 ,这一步需要到数据库资源池中获取 db 链接
import com.zkys.spark.conf.ConfigurationManager; import com.zkys.spark.constant.Constants; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.LinkedList; import java.util.List; public class JDBCHelper { static { try { String driver = ConfigurationManager.getProperty(Constants.JDBC_DRIVER); Class.forName(driver); } catch (Exception e) {e.printStackTrace(); } } private static JDBCHelper instance = null; public static JDBCHelper getInstance() { if(instance == null) { synchronized(JDBCHelper.class) { //synchronized http://www.cnblogs.com/GnagWang/archive/2011/02/27/1966606.html if(instance == null) { instance = new JDBCHelper();//调用私有无参构造方法 ,需要在构造方法中实现数据库的链接 } } } return instance; }// 数据库连接池
private LinkedList<Connection> datasource = new LinkedList<Connection>();
//对于新增和删除操作add和remove,LinedList比较占优势,因为ArrayList要移动数据。
private JDBCHelper() {
int datasourceSize = ConfigurationManager.getInteger(
Constants.JDBC_DATASOURCE_SIZE);
for(int i = 0; i < datasourceSize; i++) {
boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
String url = null;
String user = null;
String password = null;
url = ConfigurationManager.getProperty(Constants.JDBC_URL);
user = ConfigurationManager.getProperty(Constants.JDBC_USER);
password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD);
try {
//经过循环在这里面创建了10 个数据库链接 ,并把连接放到LinkList 里面了
Connection conn = DriverManager.getConnection(url, user, password);
datasource.push(conn);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public synchronized Connection getConnection() {
while(datasource.size() == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return datasource.poll();
}
public int executeUpdate(String sql, Object[] params) {
int rtn = 0;
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = getConnection();
conn.setAutoCommit(false);
pstmt = conn.prepareStatement(sql);
if(params != null && params.length > 0) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
}rtn = pstmt.executeUpdate()<span style="color: rgba(204, 120, 50, 1)">;
conn.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}<span style="color: rgba(204, 120, 50, 1)">return </span>rtn<span style="color: rgba(204, 120, 50, 1)">;
}
public void executeQuery(String sql, Object[] params, QueryCallback callback) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
conn = getConnection();
pstmt = conn.prepareStatement(sql);
if(params != null && params.length > 0) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
}rs = pstmt.executeQuery()<span style="color: rgba(204, 120, 50, 1)">;
callback.process(rs);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}
}
public int[] executeBatch(String sql, List<Object[]> paramsList) {
int[] rtn = null;
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = getConnection();
// 第一步:使用Connection对象,取消自动提交
conn.setAutoCommit(false);
pstmt = conn.prepareStatement(sql);
// 第二步:使用PreparedStatement.addBatch()方法加入批量的SQL参数
if(paramsList != null && paramsList.size() > 0) {
for(Object[] params : paramsList) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
pstmt.addBatch();
}
}<span style="color: rgba(128, 128, 128, 1)">// </span><span style="font-family: "宋体"; color: rgba(128, 128, 128, 1)">第三步:使用</span><span style="color: rgba(128, 128, 128, 1)">PreparedStatement.executeBatch()</span><span style="font-family: "宋体"; color: rgba(128, 128, 128, 1)">方法,执行批量的</span><span style="color: rgba(128, 128, 128, 1)">SQL</span><span style="font-family: "宋体"; color: rgba(128, 128, 128, 1)">语句
rtn = pstmt.executeBatch();
// 最后一步:使用Connection对象,提交批量的SQL语句
conn.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}<span style="color: rgba(204, 120, 50, 1)">return </span>rtn<span style="color: rgba(204, 120, 50, 1)">;
}
/**
* 静态内部类:查询回调接口
* @author Administrator
*
*/
public static interface QueryCallback {<span style="color: rgba(98, 151, 85, 1); font-style: italic">/**
* 处理查询结果
* @param rs
* @throws Exception
*/
void process(ResultSet rs) throws Exception;
}}
第五,由于是可以配置的,所以需要加入更多元素
import java.io.InputStream; import java.util.Properties; public class ConfigurationManager { private static Properties prop = new Properties(); static { try { InputStream in = ConfigurationManager.class .getClassLoader().getResourceAsStream("my.properties"); prop.load(in); } catch (Exception e) {e.printStackTrace(); } } public static String getProperty(String key) { return prop.getProperty(key); } public static Integer getInteger(String key) { String value = getProperty(key); try { return Integer.valueOf(value); } catch (Exception e) {e.printStackTrace(); } return 0; } public static Boolean getBoolean(String key) { String value = getProperty(key); try { return Boolean.valueOf(value); } catch (Exception e) {e.printStackTrace(); } return false; } public static Long getLong(String key) { String value = getProperty(key); try { return Long.valueOf(value); } catch (Exception e) {e.printStackTrace(); } return 0L; }}