RabbitMQ消息队列
RabbitMQ基础
同步与异步对比:
同步:两个人实时打电话,问题:拓展性差,性能下降,级联失败
异步:微信聊天,交互不实时
异步调用模型:
常见的MQ消息队列对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
docker安装
拉取镜像
docker pull rabbitmq:3.8-management
运行:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
收发消息
创建消息队列:
将交换机与消息队列绑定:
发送消息:
接收消息:
数据隔离
添加一个用户:
新加虚拟主机
达到隔离效果:
AMQP
快速入门
AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议)
Spring-AMQP:https://spring.io/projects/spring-amqp
Spring AMQP是一个基于AMQP协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息。它是Spring框架的一部分,可以与Spring Boot和其他Spring项目一起使用。
依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
为了测试方便,我们也可以直接向队列发送消息,跳过交换机。
配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
发送消息:
@SpringBootTest
public class PublisherApplicationTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
String queueName = "simple.queue";
String message = "hello springboot";
rabbitTemplate.convertAndSend(queueName,message);
}
}
接收消息:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
WorkQueues模式
多个消费者共同处理消息处理,
发送消息:
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "work.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
接收消息:
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
这样两个消费者接收到的消息数量是相同的,时间却没有均匀分配,导致第一个消费者处理完了,空闲了很多时间,后面都是2在干活
为了解决这个问题,使用能者多劳策略:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
运行结果:
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,所以处理了较少的消息
交换机
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
发布者发布消息:
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "cxk.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消费者接收消息:
@RabbitListener(queues = "fanout.q1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息 :【" + msg + "】");
}
@RabbitListener(queues = "fanout.q2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机
Fanout会被所有队列消费,direct需要指定key,根据消息的Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
创建交换机,和队列进行绑定,同时绑定key
消费者接收代码:
@RabbitListener(queues = "direct.q1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.q2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
生产者:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "cxk.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
将red
改为blue,只有1可以接受