本文最后更新于 95 天前,其中的信息可能已经有所发展或是发生改变。
1.核心概念
生产者:负责发送消息。
消费者:负责订阅队列并消费队列里的消息。
交换机:RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,由交换机的类型决定。
队列:RabbitMQ 内部使用的一种数据结构,是消息的存储容器,负责暂存从交换机路由过来的消息,直到消费者处理完毕。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。生产者可以将消息发送到队列,消费者可以从队列接收数据。
2.交换机类型
2.1Direct Exchange
直连交换机,根据消息携带的路由键(routing key)将消息投递给对应队列。
2.2Fanout Exchange
扇型交换机,广播消息到所有绑定的队列,忽略路由键。
2.3Topic Exchange
主题交换机,支持通配符(*、#)模式匹配路由键,再根据路由键将消息投递给对应队列。
* (星号)用来表示一个单词 (必须出现的)
# (警号)用来表示任意数量(零个或多个)单词
当一个队列的绑定键为 # 时,这个队列会无视消息的路由键,接收所有的消息,相当于扇型交换机。
当 * 和 # 都未在绑定键中出现的时候,相当于直连交换机。
3.队列模式
3.1简单队列模式
生产者、队列、单个消费者、没有交换机(RabbitMQ 也会有一个默认的交换机)
@Slf4j
@Component
public class Customer {
/**
* queuesToDeclare:支持多个队列,将队列绑定到默认交换机上,testQueue为队列名称。
* @param msg 接收到的消息
*/
@RabbitListener(queuesToDeclare = @Queue(value = "testQueue"))
public void listener(String msg) {
log.info("接收消息:{}", msg);
}
}
3.2工作队列模式
生产者、队列、多个消费者、没有交换机。
3.3发布/订阅模式
也称扇出模式,生产者、队列、多个消费者、交换机类型为 FanoutExchange。
@Slf4j
@Component
public class FanoutCustomer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "fanoutCustomer"),
exchange = @Exchange(
value = "fanoutExchange",
type = ExchangeTypes.FANOUT
)
)
)
public void listener(String msg) {
log.info("接收消息【发布/订阅模式】:{}", msg);
}
}
3.4路由模式
生产者、队列、多个消费者、交换机类型为 Direct Exchange。
@Slf4j
@Component
public class RouteCustomer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "routeQueue1"),
exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT),
key = "routeKey1"
)
)
public void listener01(String msg) {
log.info("接收消息【路由模式】:{}", msg);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "routeQueue2"),
exchange = @Exchange(value = "routeExchange", type = ExchangeTypes.DIRECT),
key = "routeKey2"
)
)
public void listener02(String msg) {
log.info("接收消息【路由模式】:{}", msg);
}
}
3.5主题模式
生产者、队列、多个消费者、交换机类型为 Topic Exchange。
@Slf4j
@Component
public class TopicCustomer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueue1"),
exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
key = "topic.*"
)
)
public void listener01(String msg) {
log.info("接收消息【主题模式】listener01:{}", msg);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueue2"),
exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
key = "topic.#"
)
)
public void listener02(String msg) {
log.info("接收消息【主题模式】listener02:{}", msg);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "topicQueue3"),
exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
key = "topic.y.#"
)
)
public void listener03(String msg) {
log.info("接收消息【主题模式】listener03:{}", msg);
}
}
3.6 RPC 模式
生产者发送一条消息,消费者获取并把消费结果返还给生产者。
3.7发布者确认模式
RabbitMQ 提供的一种机制,用于确保消息被成功发送到交换机并被接收到,以及确保消息被正确地路由到队列中。
4.基本使用
@Autowired
private RabbitTemplate rabbitTemplate;
public void test_product_route() throws InterruptedException {
rabbitTemplate.convertAndSend("routeExchange", "routeKey1", "路由模式,消息1");
rabbitTemplate.convertAndSend("routeExchange", "routeKey2", "路由模式,消息2");
}