Spring, MyBatis 多数据源的配置和管理

同一个项目有时会涉及到多个数据库,也就是多数据源。多数据源又可以分为两种情况:

1)两个或多个数据库没有相关性,各自独立,其实这种可以作为两个项目来开发。比如在游戏开发中一个数据库是平台数据库,其它还有平台下的游戏对应的数据库;

2)两个或多个数据库是 master-slave 的关系,比如有 mysql 搭建一个 master-master,其后又带有多个 slave;或者采用 MHA 搭建的 master-slave 复制;

目前我所知道的 Spring 多数据源的搭建大概有两种方式,可以根据多数据源的情况进行选择。

1. 采用 spring 配置文件直接配置多个数据源

比如针对两个数据库没有相关性的情况,可以采用直接在 spring 的配置文件中配置多个数据源,然后分别进行事务的配置,如下所示:

    <context:component-scan base-package="net.aazj.service,net.aazj.aop" />
    <context:component-scan base-package="net.aazj.aop" />
    <!-- 引入属性文件 -->
    <context:property-placeholder location="classpath:config/db.properties" />
&lt;!-- 配置数据源 --&gt;
&lt;bean name="dataSource" <span style="color: rgba(0, 0, 255, 1)">class</span>="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"&gt;
    &lt;property name="url" value="${jdbc_url}" /&gt;
    &lt;property name="username" value="${jdbc_username}" /&gt;
    &lt;property name="password" value="${jdbc_password}" /&gt;
    &lt;!-- 初始化连接大小 --&gt;
    &lt;property name="initialSize" value="0" /&gt;
    &lt;!-- 连接池最大使用连接数量 --&gt;
    &lt;property name="maxActive" value="20" /&gt;
    &lt;!-- 连接池最大空闲 --&gt;
    &lt;property name="maxIdle" value="20" /&gt;
    &lt;!-- 连接池最小空闲 --&gt;
    &lt;property name="minIdle" value="0" /&gt;
    &lt;!-- 获取连接最大等待时间 --&gt;
    &lt;property name="maxWait" value="60000" /&gt;
&lt;/bean&gt;

&lt;bean id="sqlSessionFactory" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.mybatis.spring.SqlSessionFactoryBean"&gt;
  &lt;property name="dataSource" ref="dataSource" /&gt;
  &lt;property name="configLocation" value="classpath:config/mybatis-config.xml" /&gt;
  &lt;property name="mapperLocations" value="classpath*:config/mappers/**/*.xml" /&gt;
&lt;/bean&gt;

&lt;!-- Transaction manager <span style="color: rgba(0, 0, 255, 1)">for</span> a single JDBC DataSource --&gt;
&lt;bean id="transactionManager" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.jdbc.datasource.DataSourceTransactionManager"&gt;
    &lt;property name="dataSource" ref="dataSource" /&gt;
&lt;/bean&gt;

&lt;!-- 使用annotation定义事务 --&gt;
&lt;tx:annotation-driven transaction-manager="transactionManager" /&gt; 

&lt;bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.mybatis.spring.mapper.MapperScannerConfigurer"&gt;
  &lt;property name="basePackage" value="net.aazj.mapper" /&gt;
  &lt;property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/&gt;
&lt;/bean&gt;<br>
&lt;!-- Enables the use of the @AspectJ style of Spring AOP --&gt;
&lt;aop:aspectj-autoproxy/&gt;

&lt;!-- ===============第二个数据源的配置=============== --&gt;
&lt;bean name="dataSource_2" <span style="color: rgba(0, 0, 255, 1)">class</span>="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"&gt;
    &lt;property name="url" value="${jdbc_url_2}" /&gt;
    &lt;property name="username" value="${jdbc_username_2}" /&gt;
    &lt;property name="password" value="${jdbc_password_2}" /&gt;
    &lt;!-- 初始化连接大小 --&gt;
    &lt;property name="initialSize" value="0" /&gt;
    &lt;!-- 连接池最大使用连接数量 --&gt;
    &lt;property name="maxActive" value="20" /&gt;
    &lt;!-- 连接池最大空闲 --&gt;
    &lt;property name="maxIdle" value="20" /&gt;
    &lt;!-- 连接池最小空闲 --&gt;
    &lt;property name="minIdle" value="0" /&gt;
    &lt;!-- 获取连接最大等待时间 --&gt;
    &lt;property name="maxWait" value="60000" /&gt;
&lt;/bean&gt;

&lt;bean id="sqlSessionFactory_slave" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.mybatis.spring.SqlSessionFactoryBean"&gt;
  &lt;property name="dataSource" ref="dataSource_2" /&gt;
  &lt;property name="configLocation" value="classpath:config/mybatis-config-2.xml" /&gt;
  &lt;property name="mapperLocations" value="classpath*:config/mappers2/**/*.xml" /&gt;
&lt;/bean&gt;

&lt;!-- Transaction manager <span style="color: rgba(0, 0, 255, 1)">for</span> a single JDBC DataSource --&gt;
&lt;bean id="transactionManager_2" <span style="color: rgba(0, 0, 255, 1)">class</span>="org.springframework.jdbc.datasource.DataSourceTransactionManager"&gt;
    &lt;property name="dataSource" ref="dataSource_2" /&gt;
&lt;/bean&gt;

&lt;!-- 使用annotation定义事务 --&gt;
&lt;tx:annotation-driven transaction-manager="transactionManager_2" /&gt; 

&lt;bean <span style="color: rgba(0, 0, 255, 1)">class</span>="org.mybatis.spring.mapper.MapperScannerConfigurer"&gt;
  &lt;property name="basePackage" value="net.aazj.mapper2" /&gt;
  &lt;property name="sqlSessionFactoryBeanName" value="sqlSessionFactory_2"/&gt;
&lt;/bean&gt;</pre>

如上所示,我们分别配置了两个 dataSource,两个 sqlSessionFactory,两个 transactionManager,以及关键的地方在于 MapperScannerConfigurer 的配置——使用sqlSessionFactoryBeanName属性,注入不同的 sqlSessionFactory 的名称,这样的话,就为不同的数据库对应的 mapper 接口注入了对应的 sqlSessionFactory。

需要注意的是,多个数据库的这种配置是不支持分布式事务的,也就是同一个事务中,不能操作多个数据库。这种配置方式的优点是很简单,但是却不灵活。对于 master-slave 类型的多数据源配置而言不太适应,master-slave 性的多数据源的配置,需要特别灵活,需要根据业务的类型进行细致的配置。比如对于一些耗时特别大的 select 语句,我们希望放到 slave 上执行,而对于 update,delete 等操作肯定是只能在 master 上执行的,另外对于一些实时性要求很高的 select 语句,我们也可能需要放到 master 上执行——比如一个场景是我去商城购买一件兵器,购买操作的很定是 master,同时购买完成之后,需要重新查询出我所拥有的兵器和金币,那么这个查询可能也需要防止 master 上执行,而不能放在 slave 上去执行,因为 slave 上可能存在延时,我们可不希望玩家发现购买成功之后,在背包中却找不到兵器的情况出现。

所以对于 master-slave 类型的多数据源的配置,需要根据业务来进行灵活的配置,哪些 select 可以放到 slave 上,哪些 select 不能放到 slave 上。所以上面的那种所数据源的配置就不太适应了。

2. 基于 AbstractRoutingDataSource 和 AOP 的多数据源的配置

基本原理是,我们自己定义一个 DataSource 类 ThreadLocalRountingDataSource,来继承 AbstractRoutingDataSource,然后在配置文件中向 ThreadLocalRountingDataSource 注入 master 和 slave 的数据源,然后通过 AOP 来灵活配置,在哪些地方选择  master 数据源,在哪些地方需要选择 slave 数据源。下面看代码实现:

1)先定义一个 enum 来表示不同的数据源:

package net.aazj.enums;

/**

  • 数据源的类别:master/slave
    */
    public enum DataSources {
    MASTER, SLAVE
    }

2)通过 TheadLocal 来保存每个线程选择哪个数据源的标志 (key):

package net.aazj.util;

import net.aazj.enums.DataSources;

public class DataSourceTypeManager {
private static final ThreadLocal<DataSources> dataSourceTypes = new ThreadLocal<DataSources>(){
@Override
protected DataSources initialValue(){
return DataSources.MASTER;
}
};

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span><span style="color: rgba(0, 0, 0, 1)"> DataSources get(){
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> dataSourceTypes.get();
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> set(DataSources dataSourceType){
    dataSourceTypes.set(dataSourceType);
}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> reset(){
    dataSourceTypes.set(DataSources.MASTER0);
}

}

3)定义 ThreadLocalRountingDataSource,继承 AbstractRoutingDataSource:

package net.aazj.util;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class ThreadLocalRountingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceTypeManager.get();
}
}

4)在配置文件中向 ThreadLocalRountingDataSource 注入 master 和 slave 的数据源:

    <context:component-scan base-package="net.aazj.service,net.aazj.aop" />
    <context:component-scan base-package="net.aazj.aop" />
    <!-- 引入属性文件 -->
    <context:property-placeholder location="classpath:config/db.properties" />    
    <!-- 配置数据源 Master -->
    <bean name="dataSourceMaster" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url}" />
        <property name="username" value="${jdbc_username}" />
        <property name="password" value="${jdbc_password}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="0" />
        <!-- 连接池最大使用连接数量 -->
        <property name="maxActive" value="20" />
        <!-- 连接池最大空闲 -->
        <property name="maxIdle" value="20" />
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="0" />
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="60000" />
    </bean>    
    <!-- 配置数据源 Slave -->
    <bean name="dataSourceSlave" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url_slave}" />
        <property name="username" value="${jdbc_username_slave}" />
        <property name="password" value="${jdbc_password_slave}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="0" />
        <!-- 连接池最大使用连接数量 -->
        <property name="maxActive" value="20" />
        <!-- 连接池最大空闲 -->
        <property name="maxIdle" value="20" />
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="0" />
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="60000" />
    </bean>    
    <bean id="dataSource" class="net.aazj.util.ThreadLocalRountingDataSource">
        <property name="defaultTargetDataSource" ref="dataSourceMaster" />
        <property name="targetDataSources">
            <map key-type="net.aazj.enums.DataSources">
                <entry key="MASTER" value-ref="dataSourceMaster"/>
                <entry key="SLAVE" value-ref="dataSourceSlave"/>
                <!-- 这里还可以加多个 dataSource -->
            </map>
        </property>
    </bean>    
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
      <property name="dataSource" ref="dataSource" />
      <property name="configLocation" value="classpath:config/mybatis-config.xml" />
      <property name="mapperLocations" value="classpath*:config/mappers/**/*.xml" />
    </bean>    
    <!-- Transaction manager for a single JDBC DataSource -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource" />
    </bean>    
    <!-- 使用 annotation 定义事务 -->
    <tx:annotation-driven transaction-manager="transactionManager" /> 
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
      <property name="basePackage" value="net.aazj.mapper" />
      <!-- <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/> -->
    </bean>        

上面 spring 的配置文件中,我们针对 master 数据库和 slave 数据库分别定义了 dataSourceMaster 和 dataSourceSlave 两个 dataSource,然后注入到 <bean id="dataSource" class="net.aazj.util.ThreadLocalRountingDataSource"> 中,这样我们的 dataSource 就可以来根据 key 的不同来选择 dataSourceMaster 和 dataSourceSlave 了。

5)使用 Spring AOP 来指定 dataSource 的 key ,从而 dataSource 会根据 key 选择 dataSourceMaster 和 dataSourceSlave:

package net.aazj.aop;

import net.aazj.enums.DataSources;
import net.aazj.util.DataSourceTypeManager;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;

@Aspect // for aop
@Component // for auto scan
@Order(0)  // execute before @Transactional

public class DataSourceInterceptor {
@Pointcut(
"execution(public * net.aazj.service..*.getUser(..))")
public void dataSourceSlave(){};

@Before(</span>"dataSourceSlave()"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> before(JoinPoint jp) {
    DataSourceTypeManager.set(DataSources.SLAVE);
}<br>    // ... ...

}

这里我们定义了一个 Aspect 类,我们使用 @Before 来在符合 @Pointcut("execution(public * net.aazj.service..*.getUser(..))") 中的方法被调用之前,调用 DataSourceTypeManager.set(DataSources.SLAVE) 设置了 key 的类型为 DataSources.SLAVE,所以 dataSource 会根据 key=DataSources.SLAVE 选择 dataSourceSlave 这个 dataSource。所以该方法对于的 sql 语句会在 slave 数据库上执行 ( 经网友老刘 1987提醒,这里存在多个Aspect 之间的一个执行顺序的问题,必须保证切换数据源的 Aspect 必须在 @Transactional 这个 Aspect 之前执行,所以这里使用了 @Order(0) 来保证切换数据源先于 @Transactional 执行)。

我们可以不断的扩充 DataSourceInterceptor  这个 Aspect,在中进行各种各样的定义,来为某个 service 的某个方法指定合适的数据源对应的 dataSource。

这样我们就可以使用 Spring AOP 的强大功能来,十分灵活进行配置了。

6)AbstractRoutingDataSource 原理剖析

ThreadLocalRountingDataSource 继承了 AbstractRoutingDataSource,实现其抽象方法 protected abstract Object determineCurrentLookupKey(); 从而实现对不同数据源的路由功能。我们从源码入手分析下其中原理:

public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean
AbstractRoutingDataSource 实现了 InitializingBean 那么 spring 在初始化该 bean 时,会调用 InitializingBean 的接口
void afterPropertiesSet() throws Exception; 我们看下 AbstractRoutingDataSource 是如何实现这个接口的:
    @Override
    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property'targetDataSources'is required");}
        this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
        for (Map.Entry<Object, Object> entry : this.targetDataSources.entrySet()) {
            Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.resolvedDataSources.put(lookupKey, dataSource);
        }
        if (this.defaultTargetDataSource != null) {
            this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);}
    }
targetDataSources 是我们在 xml 配置文件中注入的 dataSourceMaster 和 dataSourceSlave. afterPropertiesSet方法就是使用注入的
dataSourceMaster 和 dataSourceSlave 来构造一个 HashMap——resolvedDataSources。方便后面根据 key 从该 map 中取得对应的 dataSource
我们在看下 AbstractDataSource 接口中的 Connection getConnection() throws SQLException; 是如何实现的:
    @Override
    public Connection getConnection() throws SQLException {
        return determineTargetDataSource().getConnection();
    }

关键在于 determineTargetDataSource(),根据方法名就可以看出,应该此处就决定了使用哪个 dataSource :

    protected DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
        Object lookupKey = determineCurrentLookupKey();
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");}
        return dataSource;
    }
Object lookupKey = determineCurrentLookupKey(); 该方法是我们实现的,在其中获取 ThreadLocal 中保存的 key 值。获得了 key 之后,
在从afterPropertiesSet()中初始化好了的resolvedDataSources这个 map 中获得 key 对应的 dataSource。而ThreadLocal 中保存的 key 值
是通过 AOP 的方式在调用 service 中相关方法之前设置好的。OK,到此搞定!

7)扩展 ThreadLocalRountingDataSource

上面我们只是实现了 master-slave 数据源的选择。如果有多台 master 或者有多台 slave。多台 master 组成一个 HA,要实现当其中一台 master 挂了是,自动切换到另一台 master,这个功能可以使用 LVS/Keepalived 来实现,也可以通过进一步扩展 ThreadLocalRountingDataSource 来实现,可以另外加一个线程专门来每个一秒来测试 mysql 是否正常来实现。同样对于多台 slave 之间要实现负载均衡,同时当一台 slave 挂了时,要实现将其从负载均衡中去除掉,这个功能既可以使用 LVS/Keepalived 来实现,同样也可以通过近一步扩展ThreadLocalRountingDataSource 来实现

3. 总结

从本文中我们可以体会到 AOP 的强大和灵活。

本文使用的是 mybatis, 其实使用 Hibernate 也应该是相似的配置。