Java使用Redis实现分布式锁

1、概述

此处使用 Redis 的 setNx 命令和 expire 命令和 del 命令来实现分布式锁。

首先我们要知道, 我们的 redis 执行命令是队列方式的,并不存在多个命令同时运行,所有命令都是串行的访问。那么这就说明我们多个客户端连接 Redis 的时候不存在其并发的问题。

其实实现分布式锁并不仅仅可以使用 Redis 完成,也可以使用其他的方式来完成,最主要的目的就是有一个地方能作为锁状态,然后通过这个锁的状态来实现代码中的功能。只要我们这个锁操作的时候是是串行的,那么就能实现分布式锁。

其实有一个问题,为什么我们不使用 Java 中的 synchronized 而要去搞一个分布式锁呢?其实就是因为现在都是分布式环境,而 Java 内置的 synchronized 是针对单个 Java 进程的锁,而分布式环境下有 n 个 Java 进程,而分布式锁实现的多个 Java 进程之间的锁。

那么为什么我们要使用 setNx 命令,而不使用其他命令呢?例如 get 命令,这种当我们获取到 key 以后,可能已经是脏数据了,而我们的 setNx 的意思是,我们设置一个 key,如果此 key 已经存在,那么则返回 0,不存在则返回 1 并设置成功,我们就可以利用这个方式来实现所谓的分布式锁。

注意,分布式锁实现最重要的地方就是有一个步骤能做到串行且不会脏数据。

废话不多说直接上现成的方法。

2、代码

/**
* Redis 锁工具类
*
* @author dh
* @date 20211028103258
**/
@Component
public class RedisLockHelper {
@Autowired
public RedisTemplate redisTemplate;
/**
* 获取锁
* @param key 锁 key
* @param seconds 最大锁时间
* @return true: 成功,false: 失败
*/
public boolean lock(String key,Long seconds){
return (Boolean) redisTemplate.execute((RedisCallback) connection -> {
/** 如果不存在, 那么则 true, 则允许执行, */
Boolean acquire = connection.setNX(key.getBytes(), String.valueOf(key).getBytes());
/** 防止死锁, 将其 key 设置过期时间 */
connection.expire(key.getBytes(), seconds);
if (acquire) {
return true;
}
return false;
});
}
/**
* 删除锁
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
}

3、案例

如果理解力强的朋友拿到这个方法就很快的能实现业务中的功能,我们这里给一个防止重复提交的实现案例。

防重复提交注解 RepeatSubmitIntercept

/**
* 重复提交拦截注解
* @author dh
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RepeatSubmitIntercept {
/**
* 最大阻挡时间,默认 5s
*/
long maxTime() default 5L;
/**
* 重复提交时返回 msg
*/
String errorTitle() default "当前操作重复!";
/**
* 拦截方式:
* 1、如果为 0: 那么则根据当前用户拦截, 那么当前方法该用户在上次请求完成前内只能访问一次.
* 2、如果为 1: 那么则根据当前指定参数名进行拦截, 那么当前方法该用户同一参数在上次请求完成前只能访问一次.
*/
int type() default 0;
/**
* 拦截方式:
* 如果拦截方式为 0, 那么根据请求头来判断用户
*/
String userHead() default CacheConstants.AUTHORIZATION_HEADER;
/**
* 如果拦截方式为 1 时, 指定的参数名称集合
* @return
*/
String []parameters() default {};
/**
* redis 中 key 前缀, 一般不需要修改此
*/
String redis_lock_prefix() default "super_bridal_repeat_submit_lock_prefix_";
/**
* 当该方法处于被拦截状态时, 重复尝试次数,0 则不尝试
* @return
*/
int rewaitCount() default 0;
}

aop

/**
* 防重复提交的注解
*
* @param point
* @return
* @throws Throwable
*/
@Around("@annotation( 包名.........RepeatSubmitIntercept)")
public Object noRepeatSubmitAround(ProceedingJoinPoint point) throws Throwable {
HttpServletRequest request = ServletUtils.getRequest();
String uriStringBase64 = Base64.getEncoder().encodeToString(request.getRequestURI().getBytes());
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
RepeatSubmitIntercept repeatSubmitIntercept = method.getAnnotation(RepeatSubmitIntercept.class);
if (repeatSubmitIntercept.maxTime() < 1L) {
throw new RepeatSubmitInterceptException("重复提交拦截器报错 -- 设置最大阻挡时间错误, 至少大于 1s", 500);
}
if (StringUtils.isBlank(repeatSubmitIntercept.errorTitle())) {
throw new RepeatSubmitInterceptException("重复提交拦截器报错 -- 错误信息提醒请勿设置为空 / 空串", 500);
}
if (StringUtils.isBlank(repeatSubmitIntercept.redis_lock_prefix())) {
throw new RepeatSubmitInterceptException("重复提交拦截器报错 -- 前缀 Key 不能为空 / 空串", 500);
}
String token = Convert.toStr(ServletUtils.getRequest().getHeader(repeatSubmitIntercept.userHead()));
StringBuilder key = new StringBuilder()
.append(repeatSubmitIntercept.redis_lock_prefix())
.append(token)
.append("/")
.append(uriStringBase64);
if (StringUtils.isEmpty(token)) {
throw new RepeatSubmitInterceptException("重复提交拦截器报错 -- 当前拦截方式为 [用户拦截], 但其请求头中 token 为空!", 500);
}
/** 用户拦截的方式 */
if (repeatSubmitIntercept.type() == 0) {
/** 此处应该使用请求头中 token 作为 key, 那么此处不做其他操作. */
} else if (repeatSubmitIntercept.type() == 1) {
/** 从请求参数中获取 key */
// ................... 省略
} else {
throw new RepeatSubmitInterceptException("重复提交拦截器报错 -- 当前拦截方式为未设置!", 500);
}
if (redisLockHelper.lock(key.toString(), repeatSubmitIntercept.maxTime())) {
return execute(key.toString(), point);
} else {
/**
* 1、判断允许重复等待
* 2、重复等待操作
* */
if (repeatSubmitIntercept.rewaitCount() > 0) {
int i = 0;
while (i < repeatSubmitIntercept.rewaitCount()) {
/** 暂停 100ms 再去拿 */
Thread.sleep(100);
i++;
if (redisLockHelper.lock(key.toString(), repeatSubmitIntercept.maxTime())) {
return execute(key.toString(), point);
}
}
}
}
throw new RepeatSubmitInterceptException(repeatSubmitIntercept.errorTitle(), 500);
}

注意这里的 RepeatSubmitInterceptException 是自定义的异常。

使用的地方

@GetMapping("/test1")
@RepeatSubmitIntercept()
public AjaxResult test1(){
System.out.println("进入了请求:" + System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return AjaxResult.success();
}

该实现中如有问题欢迎留言。