300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...

rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...

时间:2022-08-12 03:06:35

相关推荐

rabbitmq如何保证消息不被重复消费_RabbitMQ保证消息可靠投递与消费的正确使用姿势...

前言

MQ 是什么?MQ 我们可以理解为消息队列。

队列是什么?队列我们可以理解为管道。

即以管道的方式做消息传递。

场景展示:

1.我们在双11的凌晨大量秒杀和抢购商品,然后去结算的时候,发现界面会通过图片或文字的方式提醒我们,让我们稍等。而不是动不动就页面卡死、报错。

在这个业务场景中,我们就可以采用队列的机制来处理,因为同时结算就只能达到这么多。

2.平时的超市中购物也是一样,当我们在结算的时候,并不会一窝蜂一样涌入收银台,而是排队结算。这也是队列机制。

综上所述:就是排队。一个接着一个的处理,不能插队。

优秀队列管理专家:RabbitMQ

RabbitMQ的是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也可称为:面向消息的中间件)。

支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。同时支持大量协议,如AMQP、XMPP、SMTP、STOMP。并且支持消息的持久化,负载均衡和集群。另外支持消息确认机制,如事务、异步确认等。本文重点阐述RabbitMQ的消息确认机制。

留住每一条消息

因为服务器的异常崩溃导致的消息丢失?

不存在的。

RabbitMQ的消息持久化可以让我们不丢失任何消息。

万一消息在到达broker之前已经丢失怎么办?

我们还有办法!

如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者并不知道消息有没有正确到达broker。这个问题该怎么解决?

RabbitMQ为我们提供了两种方式,第一,通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;第二,通过将channel设置成confirm模式来实现。

事务机制:

该模式与数据库的事务非常相似。RabbitMQ中与事务机制有关的方法有txSelect(), txCommit()以及txRollback()。txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。在通过txSelect开启事务之后,我们便可以发布消息给broker了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。示例代码如下:

该模式用法简单,但是有个致命的缺点,那就是事务提交非常慢,会严重降低系统吞吐量,所以一般不推荐使用该模式,而改用confirm模式。

Confirm模式

Productor Confirm模式:

生产者将channel设置成confirm模式,一旦channel进入confirm模式,所有在该channel上面发布的消息都会被指派一个唯一的ID(Correlation Id从1开始)。一旦消息被投递到所匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID)。这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出。broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack (注:已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的)。示例代码如下:

Consumer Confirm模式:

如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。

如果RabbitMQ向消费者发送消息时,消费者服务器挂了,消息也不会有超时;即使一个消息需要非常长的时间处理,也不会导致消息超时。这样消息永远不会从RabbitMQ服务器中删除。只有当消费者正确的发送ACK确认反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

消息的ACK确认机制默认是开启的。ACK分为自动(auto)和手动(manul)2种模式:

自动确认模式,消息被认为是在发送后立即成功发送的,这种模式可以实现更高的吞吐量,但却是不安全的。如果在成功发送之前,消费者的TCP连接或通道关闭,服务器发送的消息将丢失。在使用自动确认模式时,需要考虑的另一件事是消费者过载。示例代码如下:

手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量。然而,在自动确认的情况下,没有这样的限制。示例代码如下:

忘记确认

忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。如果要监控这种错误,可以使用rabbitmqctl messages_unacknowledged命令打印出出相关的信息。

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,如果只是简单的拒绝那么消息会丢失,需要将相应的requeue参数设置为true,那么RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果requeue参数设置为false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。示例代码如下:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。