跳到主要内容

集群推送消息实现

RocketMQ部署

Mac M1直接使用Docker的RocketMQ会有问题,因为没有arm版本的,需要自己编排容器镜像docker-compose.yml

version: "3.0"
services:
namesrv:
image: candice0630/rocketmq:5.0.0-alpine
container_name: rocketmqNameServer
#restart: always
volumes:
#挂载路径,冒号左边为服务器本地路径,冒号右边为容器内部路径
- /Users/houyunfei/tools/docker-volumes/rocketmq/nameServer/logs:/root/logs
- /Users/houyunfei/tools/docker-volumes/rocketmq/nameServer/store:/root/store
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runserver.sh:/home/rocketmq/rocketmq-5.0.0/bin/runserver.sh
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runbroker.sh:/home/rocketmq/rocketmq-5.0.0/bin/runbroker.sh
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/tools.sh:/home/rocketmq/rocketmq-5.0.0/bin/tools.sh

environment:
- MEM_XMS=500m
- MEM_XMX=500m
- MEM_XMN=256m #MAX_POSSIBLE_HEAP: 100000000
command:
#服务启动
sh mqnamesrv
#platform: linux/amd64
ports:
- "9876:9876"

rocketmqBroker:
image: candice0630/rocketmq:5.0.0-alpine
container_name: rocketmqBroker
#restart: always
volumes:
#挂载路径,冒号左边为服务器本地路径,冒号右边为容器内部路径
- /Users/houyunfei/tools/docker-volumes/rocketmq/broker/logs:/root/logs
- /Users/houyunfei/tools/docker-volumes/rocketmq/broker/store:/root/store
- /Users/houyunfei/tools/docker-volumes/rocketmq/broker.conf:/home/rocketmq/rocketmq-5.0.0/conf/broker.conf
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runserver.sh:/home/rocketmq/rocketmq-5.0.0/bin/runserver.sh
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runbroker.sh:/home/rocketmq/rocketmq-5.0.0/bin/runbroker.sh
# - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/tools.sh:/home/rocketmq/rocketmq-5.0.0/bin/tools.sh

depends_on:
- namesrv

environment:
- MEM_XMS=500m
- MEM_XMX=500m
- MEM_XMN=256m
- NAMESRV_ADDR:物理ip地址:9876
- BROKER_ID=0
- BROKER_ROLE=ASYNC_MASTER
- FLUSH_DISK_TYPE=ASYNC_FLUSH #MAX_POSSIBLE_HEAP: 200000000
command:
# 服务启动
sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.0.0/conf/broker.conf
#platform: linux/amd64
ports:
- "10909:10909"
- "10911:10911"
- "10912:10912"

rocketmqConsole:
image: candice0630/rocketmq-console-ng:2.0
container_name: rocketmqConsole
depends_on:
- namesrv
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Drocketmq.config.isVIPChannel=false -Dcom.rocketmq.sendMessageWithVIPChannel=false"
#platform: linux/amd64
ports:
- 19876:8080

挂在目录需要设置为自己的,也就是代码中左侧目录部分,还需要配置一个配置文件

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址、物理ip,不能用127.0.0.1、localhost、docker内网ip
brokerIP1 = 192.168.111.182
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95

上面的代码中,倒数第三行中的ip需要设置为自己本地的ip,否则可能会失败。具体可以ifconfig查看

然后启动容器:

docker-compose up -d 

image-20240529142734139

如果想要停止:该命令会停止并删除由 docker-compose up 命令启动的容器(只会关闭当前目录启动的,也就是docker-compose.yml文件所在目录)。

docker-compose down

RocketMQ推送消息

我们之前发送消息的逻辑中,发送消息的逻辑实在监听器里:

/**
* 发送消息
*/
@Override
@Transactional
public Long sendMsg(ChatMessageReq request, Long uid) {
check(request, uid);
// todo 保存消息
AbstractMsgHandler<?> msgHandler = MsgHandlerFactory.getStrategyNoNull(request.getMsgType());
Long msgId = msgHandler.checkAndSaveMsg(request, uid);
// 发布消息发送事件
applicationEventPublisher.publishEvent(new MessageSendEvent(this, msgId));
return msgId;
}

接着我们看监听器里的实现:

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT, classes = MessageSendEvent.class, fallbackExecution = true)
public void messageRoute(MessageSendEvent event) {
Long msgId = event.getMsgId();
// todo RocketMQ 进行消息路由
mqProducer.sendSecureMsg(MQConstant.SEND_MSG_TOPIC, new MsgSendMessageDTO(msgId), msgId);
}

注意这里phase = TransactionPhase.BEFORE_COMMIT表示是在事务提交之前执行。

我们使用RocketMQ进行消息路由,发送到SEND_MSG_TOPIC节点

下面是MsgSendConsumer

/**
* Description: 发送消息更新房间收信箱,并同步给房间成员信箱
*/
@RocketMQMessageListener(consumerGroup = MQConstant.SEND_MSG_GROUP, topic = MQConstant.SEND_MSG_TOPIC)
@Component
public class MsgSendConsumer implements RocketMQListener<MsgSendMessageDTO> {
@Override
public void onMessage(MsgSendMessageDTO dto) {
Message message = messageDao.getById(dto.getMsgId());
Room room = roomCache.get(message.getRoomId());
ChatMessageResp msgResp = chatService.getMsgResp(message, null);
//所有房间更新房间最新消息
roomService.refreshActiveTime(room.getId(), message.getId(), message.getCreateTime());
roomCache.delete(room.getId());
if (room.isHotRoom()) {//热门群聊推送所有在线的人
//更新热门群聊时间-redis
hotRoomCache.refreshActiveTime(room.getId(), message.getCreateTime());
//推送所有人
pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp));
} else {
List<Long> memberUidList = new ArrayList<>();
if (Objects.equals(room.getType(), RoomTypeEnum.GROUP.getType())) {//普通群聊推送所有群成员
memberUidList = groupMemberCache.getMemberUidList(room.getId());
} else if (Objects.equals(room.getType(), RoomTypeEnum.FRIEND.getType())) {//单聊对象
//对单人推送
RoomFriend roomFriend = roomFriendService.getByRoomId(room.getId());
memberUidList = Arrays.asList(roomFriend.getUid1(), roomFriend.getUid2());
}
//更新所有群成员的会话时间
contactService.refreshOrCreateActiveTime(room.getId(), memberUidList, message.getId(), message.getCreateTime());
//推送房间成员
pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp), memberUidList);
}
}
}

这个是一个 RocketMQ 消息监听器,用于处理消息发送更新房间收信箱并同步给房间成员收信箱的任务。以下是对代码的解析:

  1. 该类 MsgSendConsumer 实现了 RocketMQListener 接口,监听 MQConstant.SEND_MSG_TOPIC 主题下的消息。
  2. 调用 roomService.refreshActiveTime 方法更新房间最新消息时间。目的是让左侧的消息列表进行排序
  3. 判断该房间是否为热门群聊:
    • 如果是热门群聊,则调用 hotRoomCache.refreshActiveTime 方法更新热门群聊最近活跃时间,并调用 pushService.sendPushMsg 方法推送消息给所有在线用户。
    • 如果不是热门群聊,则根据房间类型获取相应的成员 uid 列表,调用 contactService.refreshOrCreateActiveTime 方法更新成员最近活跃时间,并调用 pushService.sendPushMsg 方法推送消息给房间成员。

接下来我们去监听这个topic的消息,并通过WebSocketService推送给前端

@RocketMQMessageListener(topic = MQConstant.PUSH_TOPIC, consumerGroup = MQConstant.PUSH_GROUP, messageModel = MessageModel.BROADCASTING)
@Component
public class PushConsumer implements RocketMQListener<PushMessageDTO> {
@Autowired
private WebSocketService webSocketService;

@Override
public void onMessage(PushMessageDTO message) {
WSPushTypeEnum wsPushTypeEnum = WSPushTypeEnum.of(message.getPushType());
switch (wsPushTypeEnum) {
case USER:
message.getUidList().forEach(uid -> {
webSocketService.sendToUid(message.getWsBaseMsg(), uid);
});
break;
case ALL:
webSocketService.sendToAllOnline(message.getWsBaseMsg(), null);
break;
}
}
}

这个代码是一个 RocketMQ 消息监听器,用于处理推送消息的任务。以下是对代码的解析:

  1. 该类 PushConsumer 实现了 RocketMQListener 接口,监听 MQConstant.PUSH_TOPIC 主题下的消息。注意这里要将messageModel = MessageModel.BROADCASTING,因为原本的模式是CLUSTERING
  2. 根据不同的推送类型,执行不同的推送逻辑:
    • 如果是 USER 类型,则遍历 uidList 中的用户 ID,调用 webSocketService.sendToUid 方法分别推送消息给每个用户。
    • 如果是 ALL 类型,则调用 webSocketService.sendToAllOnline 方法推送消息给所有在线用户。