跳到主要内容

秒杀优化思路

在电商系统中,秒杀活动是一种常见的营销手段,但同时也对系统的性能和稳定性提出了极高的挑战。为了确保秒杀活动的顺利进行,需要对系统进行优化,包括用户模拟、Redis 优化秒杀、使用阻塞队列优化秒杀以及使用 Redis 消息队列等方面。本文将对这些优化措施进行深入探讨和实践。

用户模拟

为了模拟 1000 个用户同时发送请求进行压力测试,需要编写代码获取 1000 个用户的 token。具体实现如下:

@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;

@Test
@Transactional
public void insertUser() {
final String filePath = "src/main/resources/user.txt";
final int count = 1000;
BufferedWriter writer;
try {
writer = new BufferedWriter(new FileWriter(filePath));
for (int i = 0; i < count; i++) {
String phone = "13" + RandomUtil.randomNumbers(9);
String token = this.login(phone);
writer.write(token);
writer.newLine();
}
writer.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
System.out.println("生成用户token完毕");
}
}

public String login(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
userService.save(user);
// 生成token
String token = UUID.randomUUID().toString();
// 将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldvalue) -> fieldvalue.toString())
);
// 存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 设置有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
return token;
}
}

生成1000个登录用户并生成token

image.png

jmeter设置,设置请求:

image.png

设置token

image.png

这里指定刚才生成token的位置

image.png

测试得到结果:

image.png

Redis优化秒杀

为了提高秒杀系统的性能,我们将耗时比较短的逻辑判断放入到 Redis 中,如库存是否足够、是否一人一单等。这样的操作只要能够完成,就意味着我们一定可以下单完成。我们只需要进行快速的逻辑判断,根本不用等下单逻辑走完,直接给用户返回成功,再在后台开一个线程,让后台线程慢慢去执行 queue 里边的消息。

1653561657295.png

当用户下单之后,判断库存是否充足只需要到 Redis 中根据 key 找对应的 value 是否大于 0 即可。如果不充足,则直接结束;如果充足,继续在 Redis 中判断用户是否可以下单。如果 set 集合中没有这条数据,说明他可以下单;如果 set 集合中有这条记录,则表示重复下单。整个过程需要保证是原子性的,我们可以使用 Lua 来操作。

当以上判断逻辑走完之后,我们可以判断当前 Redis 中返回的结果是否是 0,如果是 0,则表示可以下单,则将之前说的信息存入到到 queue 中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单 id 来判断是否下单成功。

1653562234886.png

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

代码:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中:

    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀到redis
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }
  2. 创建 Lua 脚本,用于判断秒杀库存和一人一单,决定用户是否抢购成功:

    -- 1. 参数列表
    -- 优惠卷id
    local voucherId = ARGV[1]
    -- 用户id
    local userId = ARGV[2]
    -- 数据库key
    -- 库存key
    local stockKey = "seckill:stock:".. voucherId
    -- 订单key
    local orderKey = "seckill:order:".. voucherId
    -- 业务脚本
    -- 判断库存是否充足
    if tonumber(redis.call('get', stockKey)) <= 0 then
    -- 库存不足
    return 1
    end
    -- 判断用户是否下单
    if (redis.call("sismember", orderKey, userId) == 1) then
    -- 存在,说明是重复下单
    return 2
    end
    -- 扣库存
    redis.call('incrby', stockKey, -1)
    -- 下单,保存用户
    redis.call('sadd', orderKey, userId)
    return 0
  3. 初步修改抢优惠券逻辑:

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
    // 执行lua脚本,得到购买资格
    Long userId = UserHolder.getUser().getId();
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
    voucherId.toString(), userId.toString());
    // 判断结果是否为0
    int r = result.intValue();
    // 0,没有购买资格
    if (r!= 0) {
    return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
    }
    // 1,有购买资格,生成订单,保存到阻塞队列
    long orderId = redisIdWorker.nextId("order");
    // 返回订单id
    return Result.ok(orderId);
    }

使用阻塞队列优化秒杀

使用阻塞队列可以进一步优化秒杀系统的性能。具体实现如下:

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder voucherOrder = orderTasks.take();
// 6. 生成订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

IVoucherOrderService proxy;

private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取失败,返回错误或者 重试
log.error("获取锁失败,用户id:{}", userId);
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}

创建订单的代码如下:

@Override
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单
Long userId = voucherOrder.getUserId();
int count = this.query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("用户已经抢购过了,用户id:{}", userId);
return;
}
// 5. 扣减库存
boolean update = seckillVoucherService.update()
.setSql("stock=stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!update) {
// 扣减失败
log.error("扣减库存失败,用户id:{}", userId);
return;
}
// 创建订单
this.save(voucherOrder);
}

修改后的抢优惠券逻辑如下:

@Override
public Result seckillVoucher(Long voucherId) {
// 执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
// 判断结果是否为0
int r = result.intValue();
// 0,没有购买资格
if (r!= 0) {
return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
}
// 1,有购买资格,生成订单,保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}

使用 Redis 消息队列

消息队列的概念:消息队列是一种存放消息的队列,它包括消息队列、生产者和消费者三个角色。生产者发送消息到消息队列,消费者从消息队列获取消息并处理消息。

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

1653574849336.png

基于List实现消息队列

Redis 的 list 数据结构是一个双向链表,可以很容易地模拟出队列效果。可以使用 LPUSH 结合 RPOP 或 RPUSH 结合 LPOP 来实现。但需要注意的是,当队列中没有消息时,RPOP 或 LPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。因此,这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。

  • 优点:利用 Redis 存储,不受限于 JVM 内存上限;基于 Redis 的持久化机制,数据安全性有保证;可以满足消息有序性。
  • 缺点:无法避免消息丢失;只支持单消费者。

1653575176451.png

基于 PubSub 实现消息队列

PubSub 是 Redis 2.0 版本引入的消息传递模型,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

1653575506373.png

  • 优点:采用发布订阅模型,支持多生产、多消费。
  • 缺点:不支持数据持久化;无法避免消息丢失;消息堆积有上限,超出时数据丢失。

基于 Stream 实现消息队列

  • 发送消息:创建消息队列 users,发送消息name=jack,age=18,Redis 会自动生成 ID,例如xadd users * name jack age 18
  • 读消息:读第一个消息可以使用xread count 1 streams users 0;XREAD 阻塞方式,读取最新的消息可以使用xread count 1 block 1000 streams users $
  • STREAM 类型消息队列的 XREAD 命令特点:消息可回溯;一个消息可以被多个消费者读取;可以阻塞读取;有消息漏读的风险。

基于 Stream 的消息队列-消费者组

消费者组(Consumer Group)将多个消费者划分到一个组中,监听同一个队列。

1653577801668.png

  • 可以使用XGROUP DESTORY key groupName删除指定的消费者组,

  • 使用XGROUP CREATECONSUMER key groupname consumername给指定的消费者组添加消费者,

  • 使用XGROUP DELCONSUMER key groupname consumername删除消费者组中的指定消费者,

  • 使用XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key...] ID [ID...]从消费者组读取消息。

参数解释:

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

几种方式对比

1653578560691.png

使用Stream完成秒杀优化

需求:

  1. 创建一个 Stream 类型的消息队列,名为 stream.orders。
  2. 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId。
  3. 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单。

修改后的 Lua 脚本如下:

-- 1. 参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- 数据库key
-- 库存key
local stockKey = "seckill:stock:".. voucherId
-- 订单key
local orderKey = "seckill:order:".. voucherId
-- 业务脚本
-- 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
-- 库存不足
return 1
end
-- 判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
-- 存在,说明是重复下单
return 2
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单,保存用户
redis.call('sadd', orderKey, userId)
-- 发送消息到队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

发送消息的代码如下:

@Override
public Result seckillVoucher(Long voucherId) {
// 执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
// 判断结果是否为0
int r = result.intValue();
// 0,没有购买资格
if (r!= 0) {
return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
}
// 1,有购买资格,生成订单,保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}

修改后的业务代码如下:

private class VoucherOrderHandler implements Runnable {
final String queueName = "stream.orders";

@Override
public void run() {
while (true) {
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断是否获取成功
if (list == null || list.isEmpty()) {
// 失败,重试
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 成功,生成订单
handleVoucherOrder(voucherOrder);
// ACK确认 sack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单失败", e);
handlePendingList();
}
}
}

private void handlePendingList() {
while (true) {
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//