秒杀优化思路
在电商系统中,秒杀活动是一种常见的营销手段,但同时也对系统的性能和稳定性提出了极高的挑战。为了确保秒杀活动的顺利进行,需要对系统进行优化,包括用户模拟、Redis 优化秒杀、使用阻塞队列优化秒杀以及使用 Redis 消息队列等方面。本文将对这些优化措施进行深入探讨和实践。
用户模拟
为了模拟 1000 个用户同时发送请求进行压力测试,需要编写代码获取 1000 个用户的 token。具体实现如下:
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;
@Test
@Transactional
public void insertUser() {
final String filePath = "src/main/resources/user.txt";
final int count = 1000;
BufferedWriter writer;
try {
writer = new BufferedWriter(new FileWriter(filePath));
for (int i = 0; i < count; i++) {
String phone = "13" + RandomUtil.randomNumbers(9);
String token = this.login(phone);
writer.write(token);
writer.newLine();
}
writer.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
System.out.println("生成用户token完毕");
}
}
public String login(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
userService.save(user);
// 生成token
String token = UUID.randomUUID().toString();
// 将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldvalue) -> fieldvalue.toString())
);
// 存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 设置有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
return token;
}
}
生成1000个登录用户并生成token
jmeter设置,设置请求:
设置token
这里指定刚才生成token的位置
测试得到结果:
Redis优化秒杀
为了提高秒杀系统的性能,我们将耗时比较短的逻辑判断放入到 Redis 中,如库存是否足够、是否一人一单等。这样的操作只要能够完成,就意味着我们一定可以下单完成。我们只需要进行快速的逻辑判断,根本不用等下单逻辑走完,直接给用户返回成功,再在后台开一个线程,让后台线程慢慢去执行 queue 里边的消息。
当用户下单之后,判断库存是否充足只需要到 Redis 中根据 key 找对应的 value 是否大于 0 即可。如果不充足,则直接结束;如果充足,继续在 Redis 中判断用户是否可以下单。如果 set 集合中没有这条数据,说明他可以下单;如果 set 集合中有这条记录,则表示重复下单。整个过程需要保证是原子性的,我们可以使用 Lua 来操作。
当以上判断逻辑走完之后,我们可以判断当前 Redis 中返回的结果是否是 0,如果是 0,则表示可以下单,则将之前说的信息存入到到 queue 中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单 id 来判断是否下单成功。
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
代码:
-
新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中:
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
} -
创建 Lua 脚本,用于判断秒杀库存和一人一单,决定用户是否抢购成功:
-- 1. 参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 数据库key
-- 库存key
local stockKey = "seckill:stock:".. voucherId
-- 订单key
local orderKey = "seckill:order:".. voucherId
-- 业务脚本
-- 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
-- 库存不足
return 1
end
-- 判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
-- 存在,说明是重复下单
return 2
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单,保存用户
redis.call('sadd', orderKey, userId)
return 0 -
初步修改抢优惠券逻辑:
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
// 执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString());
// 判断结果是否为0
int r = result.intValue();
// 0,没有购买资格
if (r!= 0) {
return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
}
// 1,有购买资格,生成订单,保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
// 返回订单id
return Result.ok(orderId);
}
使用阻塞队列优化秒杀
使用阻塞队列可以进一步优化秒杀系统的性能。具体实现如下:
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder voucherOrder = orderTasks.take();
// 6. 生成订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
IVoucherOrderService proxy;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
// 获取失败,返回错误或者 重试
log.error("获取锁失败,用户id:{}", userId);
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
创建订单的代码如下:
@Override
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单
Long userId = voucherOrder.getUserId();
int count = this.query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("用户已经抢购过了,用户id:{}", userId);
return;
}
// 5. 扣减库存
boolean update = seckillVoucherService.update()
.setSql("stock=stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!update) {
// 扣减失败
log.error("扣减库存失败,用户id:{}", userId);
return;
}
// 创建订单
this.save(voucherOrder);
}
修改后的抢优惠券逻辑如下:
@Override
public Result seckillVoucher(Long voucherId) {
// 执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
// 判断结果是否为0
int r = result.intValue();
// 0,没有购买资格
if (r!= 0) {
return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
}
// 1,有购买资格,生成订单,保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}
使用 Redis 消息队列
消息队列的概念:消息队列是一种存放消息的队列,它包括消息队列、生产者和消费者三个角色。生产者发送消息到消息队列,消费者从消息队列获取消息并处理消息。
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
基于List实现消息队列
Redis 的 list 数据结构是一个双向链表,可以很容易地模拟出队列效果。可以使用 LPUSH 结合 RPOP 或 RPUSH 结合 LPOP 来实现。但需要注意的是,当队列中没有消息时,RPOP 或 LPOP 操作会返回 null,并不像 JVM 的阻塞队列那样会阻塞并等待消息。因此,这里应该使用 BRPOP 或者 BLPOP 来实现阻塞效果。
- 优点:利用 Redis 存储,不受限于 JVM 内存上限;基于 Redis 的持久化机制,数据安全性有保证;可以满足消息有序性。
- 缺点:无法避免消息丢失;只支持单消费者。
基于 PubSub 实现消息队列
PubSub 是 Redis 2.0 版本引入的消息传递模型,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel]
:订阅一个或多个频道
PUBLISH channel msg
:向一个频道发送消息
PSUBSCRIBE pattern[pattern]
:订阅与pattern格式匹配的所有频道
- 优点:采用发布订阅模型,支持多生产、多消费。
- 缺点:不支持数据持久化;无法避免消息丢失;消息堆积有上限,超出时数据丢失。
基于 Stream 实现消息队列
- 发送消息:创建消息队列 users,发送消息
name=jack,age=18
,Redis 会自动生成 ID,例如xadd users * name jack age 18
。 - 读消息:读第一个消息可以使用
xread count 1 streams users 0
;XREAD 阻塞方式,读取最新的消息可以使用xread count 1 block 1000 streams users $
。 - STREAM 类型消息队列的 XREAD 命令特点:消息可回溯;一个消息可以被多个消费者读取;可以阻塞读取;有消息漏读的风险。
基于 Stream 的消息队列-消费者组
消费者组(Consumer Group)将多个消费者划分到一个组中,监听同一个队列。
-
可以使用
XGROUP DESTORY key groupName
删除指定的消费者组, -
使用
XGROUP CREATECONSUMER key groupname consumername
给指定的消费者组添加消费者, -
使用
XGROUP DELCONSUMER key groupname consumername
删除消费者组中的指定消费者, -
使用
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key...] ID [ID...]
从消费者组读取消息。
参数解释:
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的 最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
几种方式对比
使用Stream完成秒杀优化
需求:
- 创建一个 Stream 类型的消息队列,名为 stream.orders。
- 修改之前的秒杀下单 Lua 脚本,在认定有抢购资格后,直接向 stream.orders 中添加消息,内容包含 voucherId、userId、orderId。
- 项目启动时,开启一个线程任务,尝试获取 stream.orders 中的消息,完成下单。
修改后的 Lua 脚本如下:
-- 1. 参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]
-- 数据库key
-- 库存key
local stockKey = "seckill:stock:".. voucherId
-- 订单key
local orderKey = "seckill:order:".. voucherId
-- 业务脚本
-- 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
-- 库存不足
return 1
end
-- 判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
-- 存在,说明是重复下单
return 2
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单,保存用户
redis.call('sadd', orderKey, userId)
-- 发送消息到队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
发送消息的代码如下:
@Override
public Result seckillVoucher(Long voucherId) {
// 执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
// 判断结果是否为0
int r = result.intValue();
// 0,没有购买资格
if (r!= 0) {
return Result.fail((r == 1)? "库存不足 " : "不能重复下单");
}
// 1,有购买资格,生成订单,保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 返回订单id
return Result.ok(orderId);
}
修改后的业务代码如下:
private class VoucherOrderHandler implements Runnable {
final String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断是否获取成功
if (list == null || list.isEmpty()) {
// 失败,重试
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 成功,生成订单
handleVoucherOrder(voucherOrder);
// ACK确认 sack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单失败", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
// 获取消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//