300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

时间:2021-03-19 11:58:40

相关推荐

Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

文章目录

1. 消息可能出现丢失的情况2. 生产者如何保证消息的可靠性投递2.1 消息落库打标 + confirm机制2.2 消息幂等性如何保证?2.3 延时消息确认3. rabbitMQ服务器如何防止消息丢失4. 消费者如何防止消息丢失

1. 消息可能出现丢失的情况

消息可能出现丢失的情况如上图所示,针对生产者、MQ、消费者三个维度都可能出现消息丢失

生产者在向MQ服务器Broker发送message时,可能由于网络原因,消息发送失败,在传输过程中丢失,此时消息还未到达MQ服务器

RabbitMq服务器接收到消息,此时RabbitMq服务器突然宕机,造成消息丢失

消费端拿到消息后,还未来得及处理就宕机或者被重启了,造成消息丢失

2. 生产者如何保证消息的可靠性投递

2.1 消息落库打标 + confirm机制

针对生产者在向Broker发送message时的消息丢失,可以使用消息入库打标记并配合mq的confirm机制来保证消息可靠性,首先要了解什么是mq 的confirm机制?生产者向mq服务端发送消息时,mq服务器会根据消息的接收情况给生产者一个应答,生产者根据应答情况来确保该条消息是否成功的发送到了mq服务器!而这个应答的过程就是mq的confirm机制。

confirm机制的现实步骤如下:

在生产者的channel 上开启confirm机制channel.confirmSelect();在生产者的channel上添加监听,用来监听mq-server返回的应答

伪代码如下:

//开启confirm机制channel.confirmSelect();//设置confirm 监听channel.addConfirmListener(new AngleConfirmListerner());//......发送消息 //注意:生产者连接不能断开,否则无法监听回调================= 消息监听器AngleConfirmListerner =================public class AngleConfirmListerner implements ConfirmListener {//broker正常签收@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息deliveryTag" + deliveryTag + "被正常签收");}//broker异常签收@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息deliveryTag" + deliveryTag + "没被签收");}}

消息入库打标解决思路

以订单系统向mq发消息为例

发消息链路流程

把订单数据入库,同时构建消息数据,消息数据同样入库,记录在message表中,消息状态初始标记为0生产者(订单服务)发送消息到mq服务器mq会通过confirm机制返回确认消息,订单服务监听回调的confirm结果正常情况下,mq接收消息成功并返回ACK,订单服务监听到此ACK,并更新message表中的消息状态标记为1,表示发送与接收正常!异常情况下,由于网络闪断,导致消费端监控mq服务访问的确认消息 没有收到,那么在msg_db中的那条消息的 状态永远就是0状态。这个时候,我们需要对这种情况下做出补偿根据消费者(库存服务)消费情况(消费正常、消费异常),分别修改message表中的消息状态为3或4!

异常情况下的补偿机制

启动一个定时任务,去扫描message这个消息记录表,针对消息状态为0的消息,根据业务来设置重发规则

①:插入message表中的消息,如果3分钟后状态还是0,代表未发送或发送失败,那么进行消息重发,并记录消息重发次数②:如果消息重发次数大于5次,且消息状态还是0的时候,就把这条消息状态设置为2,代表消息发送不成功,此时人工介入,调查未成功原因!

注意:消息落库打标,这种方式与RokcetMQ的事务消息的思想非常类似,只不过这里使用的是本地定时扫描发送失败的消息,而RokcetMQ则是在Broker中轮询扫描失败消息。两种方式的触发位置不同!

2.2 消息幂等性如何保证?

幂等性简而言之,就是对接口发起的一次调用和多次调用,所产生的结果都是一致的。某些接口具有天然的幂等性: 比如查询接口,不管是查询一次还是多次,返回的结果都是一致的。

但对于标题2.1的发消息链路中,mq返回成功的ACK时,如果因为网络原因ACK发送失败,就导致消息生产者(订单服务)无法修改message表中的消息状态,又因为补偿机制的存在,回轮询扫描、判断并重发,如果没有幂等性保证,就会造成订单的重复提交!再或者用户多次点击提交订单,如果没有接口幂等性,也会造成订单重复提交!

订单重复提交幂等性解决方案

如果使用户由于网络卡顿而心急,不断点击提交订单造成的订单重复提交,可以为订单生成一个全局唯一性ID(订单号+业务类型),并把该唯一id使用setnx命令保存在redis,在第一次保存的时候,由于redis中没有该key,那么就会把全局唯一ID 设置上,此时订单入库保存。若出现前端重复点击按钮, 由于第一步已经setnx上了 ,就会阻止后面的保存

mq服务端是如何保证幂等性的?

mq服务端在接受消息时,会对每一条消息都生成一个全局唯一的与业务无关的ID(inner_msg_id),先根据inner_msg_id是否需要重复发送,再决定消息是否落地 。这样保证每条消息都只会在mq服务端落地一次。如果由于网络原因mq落地成功,但返回ack失败,生产者由于补偿机制重发消息,mq服务端会对比新消息的inner_msg_id,由于此条消息在mq服务器已落地,所以id相等,不予处理!

2.3 延时消息确认

消息入库打标存在缺点:在消息入库打标第一步的过程中,既插入了业务数据表,也同时插入了消息记录表,进行了二次db操作,在分布式环境下,可能还要保证分布式事务。延时消息确认机制相比消息入库打标,减少了一次message入库操作,不用加分布式事务,系统速度显著提高。延时消息的思路如下:

步骤如下:

订单服务首先将业务代码入库,注意:消息并没有入库发送业务消息给mq发送第二个延迟确认消息,与业务消息发送时间有一定间隔(1分钟),保证消费端处理完毕并消息入库之后才发送!库存服务监听第二步发送的业务消息进行消费消费端(库存服务)发送确认消息ack到mq,此时第三步还没有执行,间隔时间未到!回调服务监听到这个确认消息把这个消费端完成消费的确认消息入库回调服务检查到延迟确认消息,会在数据库查询是否有这条消息如果没有查到这条消息,说明第五步库存服务消费消息失败了。此时回调服务通过RPC给一个重新发送命令到上游系统

3. rabbitMQ服务器如何防止消息丢失

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。所以就要对消息进行持久化处理。如何持久化?要想做到消息持久化,必须满足以下三个条件,缺一不可。下面具体说明下:

Exchange 设置持久化

/*** exchangeName:交换机名称* exchangeType:交换机类型* true: 开启消息持久化* false:代表连接停掉后不自动删除掉* null:其他参数*/channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);

Queue 设置持久化

/*** queueName:队列名称* true: 开启消息持久化* false:代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问* false:代表连接停掉后不自动删除掉* null:其他参数*/channel.queueDeclare(queueName,true,false,false,null);

Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

//消息属性AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2)//消息持久化.contentEncoding("UTF-8").correlationId(UUID.randomUUID().toString()).headers(infoMap).build();//生产者发送消息channel.basicPublish(exchangeName,routingKey,basicProperties,(msgBody+i).getBytes());

4. 消费者如何防止消息丢失

首先看一下,生产者、mq服务器、消费者之间的消息流转过程

第一步:消息生产者向Mq服务端发送消息第二步:mq 服务端把消息进行落地第三步:mq 服务端向消息生产者发送ack第四步:消息消费者从mq服务端拉取消息消费第五步:消费者向mq服务端发送ack第六步:mq服务端将落地消息删除

第四步消费者获取到消息之后,没有来得及处理完毕,自己直接宕机了,因为消息者默认采用的是自动ack,此时RabbitMQ的自动ack机制会通知MQ Server这条消息已经处理好了,此时消息就丢了,并不是预期的。

那么我们可以采用手动ack机制来解决这个问题,消费端处理完逻辑之后再通知MQ Server,这样消费者没处理完消息不会发送ack,如果在消费者拿到消息,没来得及处理的情况下自己挂了,此时MQ集群会自动感知到,它就会自觉的重发消息给其他的消费者服务实例。

根据上面的思路你需要完成下面的两步操作:

①:消费者关闭自动ack

//第二个参数为false代表不自动ackchannel.basicConsume(queueName,false,new TulingAckConsumer(channel));

②:消费完成才ack,否则不ack

try{//模拟业务Integer mark = (Integer) properties.getHeaders().get("mark");if(mark != 0 ) {//模拟消息消费System.out.println("消费消息:"+new String(body));//消费完成,发出ack通知channel.basicAck(envelope.getDeliveryTag(),false);}else{//否则抛异常throw new RuntimeException("模拟业务异常");}}catch (Exception e) {System.out.println("异常消费消息:"+new String(body));//捕捉异常,消息重回队列,让其他集群节点消费//channel.basicNack(envelope.getDeliveryTag(),false,true);//不重回队列,杀死这个消息channel.basicNack(envelope.getDeliveryTag(),false,false);}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。