合作机构:阿里云 / 腾讯云 / 亚马逊云 / DreamHost / NameSilo / INWX / GODADDY / 百度统计
大家好,我是小?,一个漂泊江湖多年的 985 非科班程序员,曾混迹于国企、互联网大厂和创业公司的后台开发攻城狮。
在今天的文章中,我们来聊一聊 RabbitMQ,这是小 ? 在工作中用的最早的消息中间件,主要用于大量数据的异步消费。
RabbitMQ 是一个开源的消息中间件,它实现了高级消息队列协议(AMQP),同时提供了各种重要组件来支持消息的生产、传输和消费。
图片
RabbitMQ 的工作方式是基于生产者、交换机和队列之间的协作。这是一个简单的消息传递过程:
这种模型具有高度的灵活性,可以轻松处理大量消息,同时确保消息的可靠传递。
说到消息中间件,很多人首先想到的就是 Kafka,但 RabbitMQ 也是许多金融或互联网公司构建可靠、可伸缩和高性能系统的首选。
这是为什么呢?
主要得从 RabbitMQ 的特性说起,主要有二:一个是功能强大,另一个是可靠性!
RabbitMQ 注重消息的可靠性和灵活性,适合任务排队和消息传递。而 Kafka 是分布式流式平台,注重日志存储和数据分发。
顺序消费也是可靠性的一种,RabbitMQ 可以使用单一队列或多个单一队列来确保顺序消费。
除此之外,RabbitMQ 还提供持久性队列和消息,以确保消息在 RabbitMQ 服务器宕机后不会丢失。另外,生产者可以使用发布确认机制来确认消息是否被接收。
RabbitMQ 相对 kafka 可靠性更好,数据更不易丢失,这对于一些数据敏感型的业务来说,显然更适合用前者。
并且,RabbitMQ 中原生支持死信队列,可以更好地处理未完成的业务消息,以及实现延时队列等特性,接下来我们一一介绍。
RabbitMQ 提供了多个队列模型来保证消息的顺序消费。这对于某些应用程序非常重要,例如处理订单、支付和库存管理。
图片
如上图所示,有三条业务消息分别是删除、增加和修改操作,但是 Consumer 没有按顺序消费,最终存储的顺序是增加、修改和删除,就会发生数据错乱。
针对消息有序性的问题,RabbitMQ 的解决方法是分三个阶段来保证。
消息发送时,需要业务来保证顺序性,就是保证生产者入队的顺序是有序的。
在分布式的场景下如果难以保证各个服务器的入队顺序,则可以加分布式锁的方式来解决。或者在业务生产方的消息里带上消息递增 ID,以及消息产生的时间戳。
在 RabbitMQ 的消息会保存在队列(Queue)中,在同一个队列里的消息是先进先出(FIFO)的,这个由 RabbitMQ 来帮我们保证顺序。
而不同队列中的消息,RabbitMQ 无法保证其顺序性,就像我们在食堂打饭一样,站在不同的排队队列,我们也无法保证会比其他队列的人先打上饭。
一般来说,出队后的顺序消费交给消费者去保证。我们说的保证消费顺序,通常也是指消费者消费消息的顺序。
有多个消费者的情况下,通常是无法保证消息顺序的。
这就相当于我们在排队打饭时,有多个打饭阿姨,但是每个阿姨打饭的速度不一致,对应我们消费者的消费能力也不同。
所以,为了保证消息的顺序性,我们可以只使用一个消费者来接收业务消息。
就好比只有一个阿姨在打饭,来得早就一定能早点打上饭。但很明显,这样效率不是很高,所以在使用时我们需要权衡利弊:看业务更需要顺序性,还是更需要消费效率。
在保证顺序消费时,另一个迂回策略是可以使用优先级队列(Priority Queue)。
在 RabbitMQ3.5 之后,当消费者数量较少,如果服务器检测到消费者不能及时消费消息的情况下,优先级队列就会生效。
具体有两种优先级策略:
在声明队列时,我们可以通过 x-max-priority 属性来设置队列的最大优先级,或通过 Priority 属性来设置消息的优先级,从 1~10。
Golang 实现代码如下:
// 队列属性
props := make(map[string]interface{})
// 设置队列最大优先级
props["x-max-priority"] = 10
ch.Publish(
"tizi365", // 交换机
"", // 路由参数
false,
false,
amqp.Publishing{
Priority:5, // 设置消息优先级
DeliveryMode:2, // 消息投递模式,1代表非持久化,2代表持久化,
ContentType: "text/plain",
Body: []byte(body),
})
TOP