Spark 数据库 连接池 java DAO 工厂


在一般的 java 项目  以及 现在特别火的大数据分析项目中 ,用到数据库以及数据库资源池 连接的事情 是在稀松平常不过的了 。今天就简单的梳理下 这是一个怎样的过程:

我们按照代码的调度顺序审视下 :

Comment ,我们是从 Spark 数据分析做 demo 展开的  :

第一,假设读写数据库一定是从业务层面发出的 ,那么就应该有以下代码

这是我们众多代码中的最后一步 ,写数据到数据库的代码,将最后生成的数据写入数据库 ,假设现在数据库类型不能,就要求我们提供可配置的功能了 ,the most impotent code I marked in red and bold


JavaPairRDD<String, Tuple2<String, Row>> sessionDetailRDD =
      top10SessionRDD.join(sessionid2detailRDD);
sessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
Row row = tuple._2._2;

SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(1));
。。。。。。。、//reomve some contents
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); //
sessionDetailDAO.insert(sessionDetail)
;
}
});

public static ISessionDetailDAO getSessionDetailDAO() {
   return new SessionDetailDAOImpl();
}
第二 ,上面代码来自 DAOFactory 中定主意 Impl 的实现

接下来让看下 Impl 代码是如何实现的

第三  insert 中的数据是我们 insert 到 DB 中的数据 ,在这里要转化成参数 跟 sql 拼接起来

public class SessionDetailDAOImpl implements ISessionDetailDAO {

/**
* 插入一条数据
* @param sessionDetail
*/
public void insert(SessionDetail sessionDetail) {
String sql = "insert into table_name values(?,?,?,?,?,?,?,?,?,?,?,?)";

Object[] params = new Object[]{sessionDetail.getTaskid(),
sessionDetail.getUserid(),
sessionDetail.getSessionid(),


JDBCHelper jdbcHelper = JDBCHelper.getInstance();
jdbcHelper.executeUpdate(sql, params);
}

第四,介绍下 JDBChelper 是怎么实现的 ,这一步需要到数据库资源池中获取 db 链接




import com.zkys.spark.conf.ConfigurationManager;
import com.zkys.spark.constant.Constants;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.LinkedList;
import java.util.List;

public class JDBCHelper {

   static {
      try {
         String driver = ConfigurationManager.getProperty(Constants.JDBC_DRIVER);
         Class.forName(driver);
      } catch (Exception e) {e.printStackTrace();  
      }
   }


   private static JDBCHelper instance = null;

   public static JDBCHelper getInstance() {
      if(instance == null) {
         synchronized(JDBCHelper.class) {  //synchronized http://www.cnblogs.com/GnagWang/archive/2011/02/27/1966606.html
            if(instance == null) {
               instance = new JDBCHelper();//调用私有无参构造方法 ,需要在构造方法中实现数据库的链接
            }
         }
      }
      return instance;
   }

// 数据库连接池
private LinkedList<Connection> datasource = new LinkedList<Connection>();
//对于新增和删除操作addremoveLinedList比较占优势,因为ArrayList要移动数据。

private JDBCHelper() {

int datasourceSize = ConfigurationManager.getInteger(
Constants.JDBC_DATASOURCE_SIZE);
for(int i = 0; i < datasourceSize; i++) {
boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
String url = null;
String user = null;
String password = null;
url = ConfigurationManager.getProperty(Constants.JDBC_URL);
user = ConfigurationManager.getProperty(Constants.JDBC_USER);
password = ConfigurationManager.getProperty(Constants.JDBC_PASSWORD);
try {
//经过循环在这里面创建了10 个数据库链接 ,并把连接放到LinkList 里面了
Connection conn = DriverManager.getConnection(url, user, password);
datasource.push(conn);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public synchronized Connection getConnection() {
while(datasource.size() == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return datasource.poll();
}

public int executeUpdate(String sql, Object[] params) {
int rtn = 0;
Connection conn = null;
PreparedStatement pstmt = null;

try {
conn = getConnection();
conn.setAutoCommit(false);

pstmt = conn.prepareStatement(sql);

if(params != null && params.length > 0) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
}

     rtn = pstmt.executeUpdate()<span style="color: rgba(204, 120, 50, 1)">;


conn.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}

  <span style="color: rgba(204, 120, 50, 1)">return </span>rtn<span style="color: rgba(204, 120, 50, 1)">;

}

public void executeQuery(String sql, Object[] params, QueryCallback callback) {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet rs = null;

try {
conn = getConnection();
pstmt = conn.prepareStatement(sql);

if(params != null && params.length > 0) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
}

     rs = pstmt.executeQuery()<span style="color: rgba(204, 120, 50, 1)">;


callback.process(rs);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}
}

public int[] executeBatch(String sql, List<Object[]> paramsList) {
int[] rtn = null;
Connection conn = null;
PreparedStatement pstmt = null;

try {
conn = getConnection();

// 第一步:使用Connection对象,取消自动提交
conn.setAutoCommit(false);

pstmt = conn.prepareStatement(sql);

// 第二步:使用PreparedStatement.addBatch()方法加入批量的SQL参数
if(paramsList != null && paramsList.size() > 0) {
for(Object[] params : paramsList) {
for(int i = 0; i < params.length; i++) {
pstmt.setObject(i + 1, params[i]);
}
pstmt.addBatch();
}
}

     <span style="color: rgba(128, 128, 128, 1)">// </span><span style="font-family: &quot;宋体&quot;; color: rgba(128, 128, 128, 1)">第三步:使用</span><span style="color: rgba(128, 128, 128, 1)">PreparedStatement.executeBatch()</span><span style="font-family: &quot;宋体&quot;; color: rgba(128, 128, 128, 1)">方法,执行批量的</span><span style="color: rgba(128, 128, 128, 1)">SQL</span><span style="font-family: &quot;宋体&quot;; color: rgba(128, 128, 128, 1)">语句

rtn = pstmt.executeBatch();

// 最后一步:使用Connection对象,提交批量的SQL语句
conn.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(conn != null) {
datasource.push(conn);
}
}

  <span style="color: rgba(204, 120, 50, 1)">return </span>rtn<span style="color: rgba(204, 120, 50, 1)">;

}

/**
* 静态内部类:查询回调接口
* @author Administrator
*
*/
public static interface QueryCallback {

  <span style="color: rgba(98, 151, 85, 1); font-style: italic">/**

* 处理查询结果
* @param rs
* @throws Exception
*/
void process(ResultSet rs) throws Exception;

}

}


第五,由于是可以配置的,所以需要加入更多元素


import java.io.InputStream;
import java.util.Properties;

public class ConfigurationManager {

   private static Properties prop = new Properties();
   static {
      try {
        
         InputStream in = ConfigurationManager.class
               .getClassLoader().getResourceAsStream("my.properties");
         prop.load(in);  
      } catch (Exception e) {e.printStackTrace();  
      }
   }
   
   public static String getProperty(String key) {
      return prop.getProperty(key);
   }
  
   public static Integer getInteger(String key) {
      String value = getProperty(key);
      try {
         return Integer.valueOf(value);
      } catch (Exception e) {e.printStackTrace();
      }
      return 0;
   }
   
   public static Boolean getBoolean(String key) {
      String value = getProperty(key);
      try {
         return Boolean.valueOf(value);
      } catch (Exception e) {e.printStackTrace();
      }
      return false;
   }

   public static Long getLong(String key) {
      String value = getProperty(key);
      try {
         return Long.valueOf(value);
      } catch (Exception e) {e.printStackTrace();
      }
      return 0L;
   }

}