Hmily:高性能异步分布式事务TCC框架

Hmily 框架特性

  • 无缝集成 Spring,Spring boot start。

  • 无缝集成 Dubbo,SpringCloud,Motan 等 rpc 框架。

  • 多种事务日志的存储方式(redis,mongdb,mysql 等)。

  • 多种不同日志序列化方式(Kryo,protostuff,hession)。

  • 事务自动恢复。

  • 支持内嵌事务的依赖传递。

  • 代码零侵入, 配置简单灵活。

Hmily 为什么这么高性能?

1. 采用 disruptor 进行事务日志的异步读写(disruptor 是一个无锁,无 GC 的并发编程框架)

package com.hmily.tcc.core.disruptor.publisher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory;
import com.hmily.tcc.core.disruptor.handler.HmilyConsumerDataHandler;
import com.hmily.tcc.core.disruptor.translator.HmilyTransactionEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**

  • event publisher.

  • @author xiaoyu(Myth)
    */
    @Component
    public class HmilyTransactionEventPublisher implements DisposableBean {

    private Disruptor<HmilyTransactionEvent> disruptor;

    private final CoordinatorService coordinatorService;

    @Autowired
    public HmilyTransactionEventPublisher(final CoordinatorService coordinatorService) {
    this.coordinatorService = coordinatorService;
    }

    /**

    • disruptor start.

    • @param bufferSize this is disruptor buffer size.

    • @param threadSize this is disruptor consumer thread size.
      */
      public void start(final int bufferSize, final int threadSize) {
      disruptor
      = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, r -> {
      AtomicInteger index
      = new AtomicInteger(1);
      return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
      }, ProducerType.MULTI,
      new BlockingWaitStrategy());

      final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<>(),
      HmilyThreadFactory.create(
      "hmily-log-disruptor", false),
      new ThreadPoolExecutor.AbortPolicy());

      HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
      for (int i = 0; i < threadSize; i++) {
      consumers[i]
      = new HmilyConsumerDataHandler(executor, coordinatorService);
      }
      disruptor.handleEventsWithWorkerPool(consumers);
      disruptor.setDefaultExceptionHandler(
      new IgnoreExceptionHandler());
      disruptor.start();
      }

    /**

    • publish disruptor event.
    • @param tccTransaction {@linkplain com.hmily.tcc.common.bean.entity.TccTransaction }
    • @param type {@linkplain EventTypeEnum}
      */
      public void publishEvent(final TccTransaction tccTransaction, final int type) {
      final RingBuffer<HmilyTransactionEvent> ringBuffer = disruptor.getRingBuffer();
      ringBuffer.publishEvent(
      new HmilyTransactionEventTranslator(type), tccTransaction);
      }

    @Override
    public void destroy() {
    disruptor.shutdown();
    }
    }

在这里 bufferSize 的默认值是 4094 * 4, 用户可以根据自行的情况进行配置。

HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
for (int i = 0; i < threadSize; i++) {consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
}
disruptor.handleEventsWithWorkerPool(consumers);

这里是采用多个消费者去处理队列里面的任务。

2. 异步执行 confrim,cancel 方法。

package com.hmily.tcc.core.service.handler;

import com.hmily.tcc.common.bean.context.TccTransactionContext;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.TccActionEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.service.HmilyTransactionHandler;
import com.hmily.tcc.core.service.executor.HmilyTransactionExecutor;
import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**

  • this is transaction starter.

  • @author xiaoyu
    */
    @Component
    public class StarterHmilyTransactionHandler implements HmilyTransactionHandler {

    private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1;

    private final HmilyTransactionExecutor hmilyTransactionExecutor;

    private final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(),
    HmilyThreadFactory.create(
    "hmily-execute", false),
    new ThreadPoolExecutor.AbortPolicy());

    @Autowired
    public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor) {
    this.hmilyTransactionExecutor = hmilyTransactionExecutor;
    }

    @Override
    public Object handler(final ProceedingJoinPoint point, final TccTransactionContext context)
    throws Throwable {
    Object returnValue;
    try {
    TccTransaction tccTransaction
    = hmilyTransactionExecutor.begin(point);
    try {
    //execute try
    returnValue = point.proceed();
    tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
    hmilyTransactionExecutor.updateStatus(tccTransaction);
    }
    catch (Throwable throwable) {
    //if exception ,execute cancel
    final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
    executor.execute(()
    -> hmilyTransactionExecutor
    .cancel(currentTransaction));
    throw throwable;
    }
    //execute confirm
    final TccTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
    executor.execute(()
    -> hmilyTransactionExecutor.confirm(currentTransaction));
    }
    finally {
    hmilyTransactionExecutor.remove();
    }
    return returnValue;
    }
    }

当 try 方法的 AOP 切面有异常的时候,采用线程池异步去执行 cancel,无异常的时候去执行 confrim 方法。

这里有人可能会问:那么 cancel 方法异常,或者 confrim 方法异常怎么办呢?
答:首先这种情况是非常罕见的,因为你上一面才刚刚执行完 try。其次如果出现这种情况,在 try 阶段会保存好日志,Hmily 有内置的调度线程池来进行恢复,不用担心。

有人又会问:这里如果日志保存异常了怎么办?
答:首先这又是一个牛角尖问题,首先日志配置的参数,在框架启动的时候,会要求你配置的。其次,就算在运行过程中日志保存异常,这时候框架会取缓存中的,并不会影响程序正确执行。最后,万一日志保存异常了,系统又在很极端的情况下 down 机了,恭喜你,你可以去买彩票了,最好的解决办法就是不去解决它。

3.ThreadLocal 缓存的使用。
/**
 * transaction begin.
 *
 * @param point cut point.
 * @return TccTransaction
 */
public TccTransaction begin(final ProceedingJoinPoint point) {LogUtil.debug(LOGGER, () -> "......hmily transaction!start....");
    //build tccTransaction
    final TccTransaction tccTransaction = buildTccTransaction(point, TccRoleEnum.START.getCode(), null);
    //save tccTransaction in threadLocal
    CURRENT.set(tccTransaction);
    //publishEvent
    hmilyTransactionEventPublisher.publishEvent(tccTransaction, EventTypeEnum.SAVE.getCode());
    //set TccTransactionContext this context transfer remote
    TccTransactionContext context = new TccTransactionContext();
    //set action is try
    context.setAction(TccActionEnum.TRYING.getCode());
    context.setTransId(tccTransaction.getTransId());
    context.setRole(TccRoleEnum.START.getCode());
    TransactionContextLocal.getInstance().set(context);
    return tccTransaction;
}

首先要理解,threadLocal 保存的发起者一方法的事务信息。这个很重要,不要会有点懵逼。rpc 的调用,会形成调用链,进行保存。

/**
 * add participant.
 *
 * @param participant {@linkplain Participant}
 */
public void enlistParticipant(final Participant participant) {
    if (Objects.isNull(participant)) {
        return;
    }
    Optional.ofNullable(getCurrentTransaction())
    .ifPresent(c -> {c.registerParticipant(participant);
        updateParticipant(c);
    });}
4.GuavaCache 的使用
package com.hmily.tcc.core.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.helper.SpringBeanUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

/**

  • use google guava cache.

  • @author xiaoyu
    */
    public final class TccTransactionCacheManager {

    private static final int MAX_COUNT = 10000;

    private static final LoadingCache<String, TccTransaction> LOADING_CACHE =
    CacheBuilder.newBuilder().maximumWeight(MAX_COUNT)
    .weigher((Weigher
    <String, TccTransaction>)(string, tccTransaction) -> getSize())
    .build(
    new CacheLoader<String, TccTransaction>() {
    @Override
    public TccTransaction load(final String key) {
    return cacheTccTransaction(key);
    }
    });

    private static CoordinatorService coordinatorService = SpringBeanUtils.getInstance().getBean(CoordinatorService.class);

    private static final TccTransactionCacheManager TCC_TRANSACTION_CACHE_MANAGER = new TccTransactionCacheManager();

    private TccTransactionCacheManager() {

    }

    /**

    • TccTransactionCacheManager.
    • @return TccTransactionCacheManager
      */
      public static TccTransactionCacheManager getInstance() {
      return TCC_TRANSACTION_CACHE_MANAGER;
      }

    private static int getSize() {
    return (int)LOADING_CACHE.size();
    }

    private static TccTransaction cacheTccTransaction(final String key) {
    return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());
    }

    /**

    • cache tccTransaction.
    • @param tccTransaction {@linkplain TccTransaction}
      */
      public void cacheTccTransaction(final TccTransaction tccTransaction) {
      LOADING_CACHE.put(tccTransaction.getTransId(), tccTransaction);
      }

    /**

    • acquire TccTransaction.
    • @param key this guava key.
    • @return {@linkplain TccTransaction}
      */
      public TccTransaction getTccTransaction(final String key) {
      try {
      return LOADING_CACHE.get(key);
      }
      catch (ExecutionException e) {
      return new TccTransaction();
      }
      }

    /**

    • remove guava cache by key.
    • @param key guava cache key.
      */
      public void removeByKey(final String key) {
      if (StringUtils.isNotEmpty(key)) {
      LOADING_CACHE.invalidate(key);
      }
      }

}

在参与者中,我们使用了 ThreadLocal,而在参与者中,我们为什么不使用呢?

其实原因有二点:首先. 因为 try,和 confrim 会不在一个线程里,会造成 ThreadLocal 失效。当考虑到 RPC 集群的时候,可能会负载到不同的机器上。

这里有一个细节就是:

private static TccTransaction cacheTccTransaction(final String key) {
    return Optional.ofNullable(coordinatorService.findByTransId(key)).orElse(new TccTransaction());}

当 GuavaCache 里面没有的时候,会去查询日志返回,这样就保证了对集群环境的支持。以上 4 点造就了 Hmily 是一个异步的高性能分布式事务 TCC 框架的原因。

Hmily 如何使用?
(https://github.com/yu199195/hmily/tree/master/hmily-tcc-demo)

首先因为之前的包命名问题,框架包并没有上传到 maven 中心仓库,固需要使用者自己拉取代码,编译 deploy 到自己的私服。

1.dubbo 用户

  • 在你的 Api 接口项目引入
    <dependency>
        <groupId>com.hmily.tcc</groupId>
        <artifactId>hmily-tcc-annotation</artifactId>
        <version>{you version}</version>
    </dependency>
  • 在你的服务提供者项目引入
    <dependency>
        <groupId>com.hmily.tcc</groupId>
        <artifactId>hmily-tcc-dubbo</artifactId>
        <version>{you version}</version>
    </dependency>
  • 配置启动 bean
    <!-- Aspect 切面配置,是否开启 AOP 切面-->
    <aop:aspectj-autoproxy expose-proxy="true"/>
    <!--扫描框架的包-->
    <context:component-scan base-package="com.hmily.tcc.*"/>
    <!--启动类属性配置-->
    <bean id="hmilyTransactionBootstrap" class="com.hmily.tcc.core.bootstrap.HmilyTransactionBootstrap">
        <property name="serializer" value="kryo"/>
        <property name="recoverDelayTime" value="120"/>
        <property name="retryMax" value="3"/>
        <property name="scheduledDelay" value="120"/>
        <property name="scheduledThreadMax" value="4"/>
        <property name="repositorySupport" value="db"/>
        <property name="tccDbConfig">
            <bean class="com.hmily.tcc.common.config.TccDbConfig">
                <property name="url"
                          value="jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&amp;characterEncoding=utf8"/>
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                <property name="username" value="root"/>
                <property name="password" value="123456"/>
            </bean>
        </property>
    </bean>

当然配置属性很多,这里只给出了 demo,具体可以参考这个类:

package com.hmily.tcc.common.config;

import com.hmily.tcc.common.enums.RepositorySupportEnum;
import lombok.Data;

/**

  • hmily config.

  • @author xiaoyu
    */
    @Data
    public class TccConfig {

    /**

    • Resource suffix this parameter please fill in about is the transaction store path.
    • If it's a table store this is a table suffix, it's stored the same way.
    • If this parameter is not filled in, the applicationName of the application is retrieved by default
      */
      private String repositorySuffix;

    /**

    • log serializer.
    • {@linkplain com.hmily.tcc.common.enums.SerializeEnum}
      */
      private String serializer = "kryo";

    /**

    • scheduledPool Thread size.
      */
      private int scheduledThreadMax = Runtime.getRuntime().availableProcessors() << 1;

    /**

    • scheduledPool scheduledDelay unit SECONDS.
      */
      private int scheduledDelay = 60;

    /**

    • retry max.
      */
      private int retryMax = 3;

    /**

    • recoverDelayTime Unit seconds
    • (note that this time represents how many seconds after the local transaction was created before execution).
      */
      private int recoverDelayTime = 60;

    /**

    • Parameters when participants perform their own recovery.
    • 1.such as RPC calls time out
    • 2.such as the starter down machine
      */
      private int loadFactor = 2;

    /**

    • repositorySupport.
    • {@linkplain RepositorySupportEnum}
      */
      private String repositorySupport = "db";

    /**

    • disruptor bufferSize.
      */
      private int bufferSize = 4096 * 2 * 2;

    /**

    • this is disruptor consumerThreads.
      */
      private int consumerThreads = Runtime.getRuntime().availableProcessors() << 1;

    /**

    • db config.
      */
      private TccDbConfig tccDbConfig;

    /**

    • mongo config.
      */
      private TccMongoConfig tccMongoConfig;

    /**

    • redis config.
      */
      private TccRedisConfig tccRedisConfig;

    /**

    • zookeeper config.
      */
      private TccZookeeperConfig tccZookeeperConfig;

    /**

    • file config.
      */
      private TccFileConfig tccFileConfig;

}

2.SpringCloud 用户

  • 需要引入
     <dependency>
           <groupId>com.hmily.tcc</groupId>
           <artifactId>hmily-tcc-springcloud</artifactId>
           <version>{you version}</version>
    </dependency>
  • 配置启动 bean 如上。

3.Motan 用户

  • 需要引入
    <dependency>
        <groupId>com.hmily.tcc</groupId>
        <artifactId>hmily-tcc-motan</artifactId>
        <version>{you version}</version>
     </dependency>
  • 配置启动 bean 如上。

hmily-spring-boot-start

  • 那这个就更容易了,只需要根据你的 RPC 框架去引入不同的 jar 包。
  • 如果你是 dubbo 用户,那么引入
    <dependency>
         <groupId>com.hmily.tcc</groupId>
         <artifactId>hmily-tcc-spring-boot-starter-dubbo</artifactId>
         <version>${your version}</version>
     </dependency>
  • 如果你是 SpringCloud 用户,那么引入
    <dependency>
         <groupId>com.hmily.tcc</groupId>
         <artifactId>hmily-tcc-spring-boot-starter-springcloud</artifactId>
         <version>${your version}</version>
     </dependency>
  • 如果你是 Motan 用户,那么引入
    <dependency>
         <groupId>com.hmily.tcc</groupId>
         <artifactId>hmily-tcc-spring-boot-starter-motan</artifactId>
         <version>${your version}</version>
     </dependency>
  • 然后在你的 yml 里面进行如下配置:
    hmily:
        tcc :
            serializer : kryo
            recoverDelayTime : 128
            retryMax : 3
            scheduledDelay : 128
            scheduledThreadMax :  10
            repositorySupport : db
            tccDbConfig :
                     driverClassName  : com.mysql.jdbc.Driver
                     url :  jdbc:mysql://192.168.1.98:3306/tcc?useUnicode=true&amp;characterEncoding=utf8
                     username : root
                     password : 123456
    
        #repositorySupport : redis
        #tccRedisConfig:
                 #masterName: mymaster
                 #sentinel : true
                 #sentinelUrl : 192.168.1.91:26379;192.168.1.92:26379;192.168.1.93:26379
                 #password  : foobaredbbexONE123
    
    
       # repositorySupport : zookeeper
       #         host      : 92.168.1.73:2181
       #         sessionTimeOut      :  100000
       #         rootPath  : /tcc
    
       # repositorySupport : mongodb
       #       mongoDbUrl  : 192.168.1.68:27017
       #       mongoDbName  :  happylife
       #       mongoUserName  : xiaoyu
       #       mongoUserPwd   : 123456
    
       # repositorySupport : file
       #         path      : /account
       #         prefix    :  account</span></pre>
    

就这么简单,然后就可以在接口方法上加上 @Tcc 注解,进行愉快的使用了。当然因为篇幅问题,很多东西只是简单的描述,尤其是逻辑方面的。

下面是 github 地址:https://github.com/yu199195/hmily