跳到主要内容

分布式锁

需求背景

在项目中,为了控制定时任务的执行以及解决数据一致性问题,分布式锁是一种重要的解决方案。

在一些场景中,例如定时任务的执行,如果多个服务器同时执行,可能会导致资源浪费(如 10000 台服务器同时执行)以及产生脏数据(如重复插入)。因此,需要控制同一时间只有一个服务器能执行特定的任务。

传统解决方案及其问题

  1. 分离定时任务和主程序:只在一个服务器运行定时任务,但成本太大。
  2. 写死配置:每个服务器都执行定时任务,只有 ip 符合配置的服务器才执行真实业务,其他的直接返回。该方法成本最低,但 ip 不固定,存在问题。
  3. 动态配置:
    • 数据库:可以实现配置的动态存储,但可能存在性能问题。
    • Redis:内存数据库,读写速度快,支持一些方便的操作,如setnxlua脚本。
    • 配置中心(nacos,Apollo,Spring Cloud Config):提供了集中式的配置管理,但服务器多且 IP 不可控时仍会很麻烦。
  4. 分布式锁:只有抢到锁的服务器才能执行业务逻辑。

锁的核心作用是在有限的资源情况下,控制同一时间只有某些线程能访问到资源。在 Java 中,可以使用synchronized关键字和并发包的类来实现锁,但这些只对单个 JVM 有效。

分布式锁实现的关键

  1. 抢锁机制:确保同一时间只有一个服务器能抢到锁。核心思想是先来的人将数据标记为自己的,表示占有锁,后来的人发现标识已存在,则抢锁失败并继续等待。等先来的人执行完方法后,清空标识,其他人才可继续抢锁。
  2. 实现方式:
    • Mysql 数据库:使用select for update行级锁,实现较为简单。
    • 乐观锁
    • Redis 实现:Redis 作为内存数据库,读写速度快,支持setnxlua脚本等,方便实现分布式锁。
    • zookeeper 实现:在企业中较少使用,不推荐。

注意事项

  1. 用户需要释放锁,否则可能导致其他线程无法获取锁。
  2. 锁一定要加过期时间,以防止死锁等问题。
  3. 如果方法执行时间过长,锁可能提前过期,可能会引发以下问题:
    • 连锁效应:释放掉别人的锁。
    • 多个方法同时执行:导致数据不一致。

解决方案:

  • 续期:通过开启一个新线程来实现续期操作。

伪代码如下:

boolean end=false;
new Thread(()->{
if(!end){
续期
}
})
  • 原子操作:在释放锁时,使用原子操作确保不会释放别人的锁。因为释放锁的时候,有可能先判断出是自己的锁,但这个时候锁过期了,最后还是释放了别人的锁
//原子操作
if (get lock==A){
del lock
}

Redis+lua脚本实现

Redisson实现分布式锁

Redisson 是一个 Java 操作 Redis 的客户端,它提供了大量的分布式数据集,能简化对 Redis 的操作和使用,使开发者能像使用本地集合一样使用 Redis,完全感知不到 Redis 的存在。

2种引入方式:

  1. springboot-starter
     <dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.24.2</version>
</dependency>
  1. 直接引入
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.24.2</version>
</dependency>

配置类:

package com.yunfei.ikunfriend.config;

/**
* Redisson配置类
*/
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
private String password;

private String database;

@Bean
public RedissonClient redissonClient() {
System.out.println(host);
//配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
config.useSingleServer().setAddress(redisAddress).setDatabase(3).setPassword(password);
//创建实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}

测试 :

package com.yunfei.ikunfriend.service.impl;

import org.junit.jupiter.api.Test;
import org.redisson.api.RList;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

@SpringBootTest
public class RedissonTest {


@Resource
private RedissonClient redissonClient;

@Test
void test() {
List<String> ls=new ArrayList<>();
ls.add("cxk");
System.out.println("list:"+ls);

RList<String> list = redissonClient.getList("test-list");
list.add("cxk");
System.out.println("redis:"+list);
list.get(0);
}
}

定时任务与分布式锁的结合

在定时任务中使用分布式锁时,需要注意以下几点:

  1. waitTime设置为 0,只抢一次,抢不到就放弃。
  2. 释放锁的操作一定要写在finally块中,以确保锁能被正确释放。

看门狗机制:

Redisson 中提供了看门狗机制来实现续期。它会开一个监听线程,如果方法还没执行完,就会帮你重置 Redis 锁的过期时间。

原理:

  1. 监听当前线程,每 10 秒续期一次。
  2. 如果线程挂掉(包括在 debug 模式下),则锁不会过期。
    @Resource
private RedissonClient redissonClient;

//每天的23点59执行
@Scheduled(cron = "0 59 23 * * *")
public void doCacheRecommendUser() {
String redisKey1 = "ikun:precacheJob:docache:lock";
RLock lock = redissonClient.getLock(redisKey1);
try {
if (lock.tryLock(0, 30000, TimeUnit.MICROSECONDS)) {
for (Long userId : mainUserList) {
QueryWrapper<User> queryWrapper = new QueryWrapper<>();
Page<User> page = userService.page(new Page<>(1, 10), queryWrapper);
String redisKey = String.format("ikun:user:recommend:%s", userId);
ValueOperations valueOperations = redisTemplate.opsForValue();
try {
valueOperations.set(redisKey, page.getRecords());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//只能释放自己的锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

加入队伍锁

为解决同一用户重复加入队伍、入队人数超限的问题,使用 Redisson 分布式锁来实现操作互斥,保证了接口幂等性。

    @Override
public boolean joinTeam(TeamJoinDTO teamJoinDTO, User loginUser) {
if (teamJoinDTO == null) {
throw new BussinessException(Code.PARAMS_ERROR);
}
Long teamId = teamJoinDTO.getTeamId();
Team team = getTeamById(teamId);
if (team.getExpireTime() != null && team.getExpireTime().before(new Date())) {
throw new BussinessException(Code.PARAMS_ERROR, "队伍已过期");
}
if (team.getStatus().equals(TeamStatusEnum.PRIVATE)) {
throw new BussinessException(Code.NULL_ERROR, "禁止加入私有队伍");
}
String password = teamJoinDTO.getPassword();
if (team.getStatus().equals(TeamStatusEnum.PASSWORD)) {
if (StringUtils.isBlank(password) || !password.equals(team.getPassword())) {
throw new BussinessException(Code.PARAMS_ERROR, "密码错误");
}
}

Long userId = loginUser.getId();
//分布式锁
RLock lock = redissonClient.getLock("ikun:join_team");
try {
while (true) {
if (lock.tryLock(0, 30000, TimeUnit.MICROSECONDS)) {
System.out.println("getLock" + Thread.currentThread().getId());
QueryWrapper<UserTeam> userTeamQueryWrapper = new QueryWrapper<>();
userTeamQueryWrapper.eq("userId", userId);
long count = userTeamService.count(userTeamQueryWrapper);
if (count > 5) {
throw new BussinessException(Code.PARAMS_ERROR, "最多创建和加入五个队伍");
}
//不能重复加入已加入的队伍
userTeamQueryWrapper = new QueryWrapper<>();
userTeamQueryWrapper.eq("userId", userId);
userTeamQueryWrapper.eq("teamId", teamId);
long count2 = userTeamService.count(userTeamQueryWrapper);
if (count2 > 0) {
throw new BussinessException(Code.PARAMS_ERROR, "不能重复加入已加入的队伍");
}

//已加入队伍的人数
long count1 = countTeamUserByTeamId(teamId);
if (count1 >= team.getMaxNum()) {
throw new BussinessException(Code.PARAMS_ERROR, "队伍已满");
}

//插入用户=>队伍关系到关系表
UserTeam userTeam = new UserTeam();
userTeam.setUserId(userId);
userTeam.setTeamId(teamId);
userTeam.setJoinTime(new Date());
return userTeamService.save(userTeam);
}
}
} catch (Exception e) {
throw new BussinessException(Code.SYSTEM_ERROR);
} finally {
//只能释放自己的锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}