分布式锁
需求背景
在项目中,为了控制定时任务的执行以及解决数据一致性问题,分布式锁是一种重要的解决方案。
在一些场景中,例如定时任务的执行,如果多个服务器同时执行,可能会导致资源浪费(如 10000 台服务器同时执行)以及产生脏数据(如重复插入)。因此,需要控制同一时间只有一个服务器能执行特定的任务。
传统解决方案及其问题
- 分离定时任务和主程序:只在一个服务器运行定时任务,但成本太大。
- 写死配置:每个服务器都执行定时任务,只有 ip 符合配置的服务器才执行真实业务,其他的直接返回。该方法成本最低,但 ip 不固定,存在问题。
- 动态配置:
- 数据库:可以实现配置的动态存储,但可能存在性能问题。
- Redis:内存数据库,读写速度快,支持一些方便的操作,如
setnx
、lua
脚本。 - 配置中心(nacos,Apollo,Spring Cloud Config):提供了集中式的配置管理,但服务器多且 IP 不可控时仍 会很麻烦。
- 分布式锁:只有抢到锁的服务器才能执行业务逻辑。
锁的核心作用是在有限的资源情况下,控制同一时间只有某些线程能访问到资源。在 Java 中,可以使用synchronized
关键字和并发包的类来实现锁,但这些只对单个 JVM 有效。
分布式锁实现的关键
- 抢锁机制:确保同一时间只有一个服务器能抢到锁。核心思想是先来的人将数据标记为自己的,表示占有锁,后来的人发现标识已存在,则抢锁失败并继续等待。等先来的人执行完方法后,清空标识,其他人才可继续抢锁。
- 实现方式:
- Mysql 数据库:使用
select for update
行级锁,实现较为简单。 - 乐观锁。
- Redis 实现:Redis 作为内存数据库,读写速度快,支持
setnx
、lua
脚本等,方便实现分布式锁。 - zookeeper 实现:在企业中较少使用,不推荐。
- Mysql 数据库:使用
注意事项
- 用户需要释放锁,否则可能导致其他线程无法获取锁。
- 锁一定要加过期时间,以防止死锁等问题。
- 如果方法执行时间过长,锁可能提前过期,可能会引发以下问题:
- 连锁效应:释放掉别人的锁。
- 多个方法同时执行:导致数据不一致。
解决方案:
- 续期:通过开启一个新线程来实现续期操 作。
伪代码如下:
boolean end=false;
new Thread(()->{
if(!end){
续期
}
})
- 原子操作:在释放锁时,使用原子操作确保不会释放别人的锁。因为释放锁的时候,有可能先判断出是自己的锁,但这个时候锁过期了,最后还是释放了别人的锁
//原子操作
if (get lock==A){
del lock
}
Redis+lua脚本实现
Redisson实现分布式锁
Redisson 是一个 Java 操作 Redis 的客户端,它提供了大量的分布式数据集,能简化对 Redis 的操作和使用,使开发者能像使用本地集合一样使用 Redis,完全感知不到 Redis 的存在。
2种引入 方式:
- springboot-starter
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.24.2</version>
</dependency>
- 直接引入
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.24.2</version>
</dependency>
配置类:
package com.yunfei.ikunfriend.config;
/**
* Redisson配置类
*/
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
private String password;
private String database;
@Bean
public RedissonClient redissonClient() {
System.out.println(host);
//配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
config.useSingleServer().setAddress(redisAddress).setDatabase(3).setPassword(password);
//创建实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
测试 :
package com.yunfei.ikunfriend.service.impl;
import org.junit.jupiter.api.Test;
import org.redisson.api.RList;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@SpringBootTest
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Test
void test() {
List<String> ls=new ArrayList<>();
ls.add("cxk");
System.out.println("list:"+ls);
RList<String> list = redissonClient.getList("test-list");
list.add("cxk");
System.out.println("redis:"+list);
list.get(0);
}
}
定时任务与分布式锁的结合
在定时任务中使用分布式锁时,需要注意以下几点:
waitTime
设置为 0,只抢一次,抢不到就放弃。- 释放锁的操作一定要写在
finally
块中,以确保锁能被正确释放。
看门狗机制:
Redisson 中提供了看门狗机制来实现续期。它会开一个监听线程,如果方法还没执行完,就会帮你重置 Redis 锁的过期时间。
原理:
- 监听当前线程,每 10 秒续期一次。
- 如果线程挂掉(包括在 debug 模式下),则锁不会过期。
@Resource
private RedissonClient redissonClient;
//每天的23点59执行
@Scheduled(cron = "0 59 23 * * *")
public void doCacheRecommendUser() {
String redisKey1 = "ikun:precacheJob:docache:lock";
RLock lock = redissonClient.getLock(redisKey1);
try {
if (lock.tryLock(0, 30000, TimeUnit.MICROSECONDS)) {
for (Long userId : mainUserList) {
QueryWrapper<User> queryWrapper = new QueryWrapper<>();
Page<User> page = userService.page(new Page<>(1, 10), queryWrapper);
String redisKey = String.format("ikun:user:recommend:%s", userId);
ValueOperations valueOperations = redisTemplate.opsForValue();
try {
valueOperations.set(redisKey, page.getRecords());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//只能释放自己的锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
加入队伍锁
为解决同一用户重复加入队伍、入队人数超限的问题,使用 Redisson 分布式锁来实现操作互斥,保证了接口幂等性。
@Override
public boolean joinTeam(TeamJoinDTO teamJoinDTO, User loginUser) {
if (teamJoinDTO == null) {
throw new BussinessException(Code.PARAMS_ERROR);
}
Long teamId = teamJoinDTO.getTeamId();
Team team = getTeamById(teamId);
if (team.getExpireTime() != null && team.getExpireTime().before(new Date())) {
throw new BussinessException(Code.PARAMS_ERROR, "队伍已过期");
}
if (team.getStatus().equals(TeamStatusEnum.PRIVATE)) {
throw new BussinessException(Code.NULL_ERROR, "禁止加入私有队伍");
}
String password = teamJoinDTO.getPassword();
if (team.getStatus().equals(TeamStatusEnum.PASSWORD)) {
if (StringUtils.isBlank(password) || !password.equals(team.getPassword())) {
throw new BussinessException(Code.PARAMS_ERROR, "密码错误");
}
}
Long userId = loginUser.getId();
//分布式锁
RLock lock = redissonClient.getLock("ikun:join_team");
try {
while (true) {
if (lock.tryLock(0, 30000, TimeUnit.MICROSECONDS)) {
System.out.println("getLock" + Thread.currentThread().getId());
QueryWrapper<UserTeam> userTeamQueryWrapper = new QueryWrapper<>();
userTeamQueryWrapper.eq("userId", userId);
long count = userTeamService.count(userTeamQueryWrapper);
if (count > 5) {
throw new BussinessException(Code.PARAMS_ERROR, "最多创建和加入五个队伍");
}
//不能重复加入已加入的队伍
userTeamQueryWrapper = new QueryWrapper<>();
userTeamQueryWrapper.eq("userId", userId);
userTeamQueryWrapper.eq("teamId", teamId);
long count2 = userTeamService.count(userTeamQueryWrapper);
if (count2 > 0) {
throw new BussinessException(Code.PARAMS_ERROR, "不能重复加入已加入的队伍");
}
//已加入队伍的人数
long count1 = countTeamUserByTeamId(teamId);
if (count1 >= team.getMaxNum()) {
throw new BussinessException(Code.PARAMS_ERROR, "队伍已满");
}
//插入用户=>队伍关系到关系表
UserTeam userTeam = new UserTeam();
userTeam.setUserId(userId);
userTeam.setTeamId(teamId);
userTeam.setJoinTime(new Date());
return userTeamService.save(userTeam);
}
}
} catch (Exception e) {
throw new BussinessException(Code.SYSTEM_ERROR);
} finally {
//只能释放自己的锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}