300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Rabbitmq消息可靠投递和重复消费等问题解决方案

Rabbitmq消息可靠投递和重复消费等问题解决方案

时间:2019-06-07 15:51:31

相关推荐

Rabbitmq消息可靠投递和重复消费等问题解决方案

消息的可靠性投递

在一些对数据一致性要求较高的业务场景里面,如果消息在发布和消费过程中出现了问题(消息丢失,消息重复消费),就会导致数据不一致,要做到消息的可靠性投递。

在RabbitMq里面提供了很多保证消息可靠投递的机制,这也是RabbitMq的一个特性。

我们在讲可靠性投递的时候,必须要明确一个问题,因为效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。

RabbitMq的功能模型:

有四个环节可能会导致消息出现问题。下面讲每个环节及其解决方案。

环节一:

消息从生产者发送到Broker,因为一个客观因素,比如网络问题,Broker问题导致消息可能没有发送到Broker中,生产者要确保知道消息是否成功发送到Broker上才行。

Rabbitmq提供了两种服务端确认机制来通知生产者消息是否发送到Broker成功,也就是生产者发送消息给RabbitMq服务端的时候,服务端会以某种方式应答生产者,只要生产者收到了这个应答,就代表消息发送到Broker成功。

第一种机制是事务模式:

我们通过一个 channel.txSelect()的方法把信道设置成事务模式,然后就可以发布消息给 RabbitMQ 了,如果 channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了 RabbitMQ 中。

如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback()方法来实现事务回滚。

java原生API的实现:

public class TransactionProducer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//开启事务模式channel.txSelect();String message = "txtest";try {//发送消息channel.basicPublish("TransactionEx","TransactionMsg",false,false,null,message.getBytes());//int a = 1/0;//正常提交channel.txCommit();System.out.println("发送消息提交成功");//接下正确的业务逻辑}catch (Exception e){//出现异常,回滚消息,即使消息已经成功发送到Broker,也会回滚删除channel.txRollback();System.out.println("出现异常,回滚");//可以做一些补偿措施,比如重发等}}}

流程

现象,正常提交,队列会出现相应消息,异常回滚,队列没有出现相应消息。

弊端:

在事务模式下,只有收到了服务端的Commit-OK指令,才能提交成功,所以可以解决生产者和服务端确认的问题,但是事务模式有一个缺点,他是阻塞服务端的,一条消息没有发送完毕,就不能发送下一条消息,会榨干RabbitMq服务器的性能,所以不建议生产环境使用。官方说会降低250倍性能。

第二种机制是确认机制:

确认模式分为三种:

普通确认模式:在生产者这边通过调用channel.confirmSelect方法将信道设置为Confirm模式,然后发送消息,一旦消息被投递Broker后,Rabbitmq就会发送一个确认的Basic.Ack给生产者,也就是调用channel.waitForConfirms方法返回true,这样生产者就知道消息被服务端接收了。

java原生API的实现:

public class CommonConfirm {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String message = "commonConfirmTest";channel.basicPublish("TransactionEx","TransactionMsg",false,false,null,message.getBytes());//等待确认,也可以设置超时,也有个重载方法不设置超时时间的try {if (channel.waitForConfirms(5000)){//返回true的话确认发送成功System.out.println("发送成功");}else {System.out.println("发送失败");}}catch (Exception e){System.out.println("确认超时");//但是可能发送成功的情况,要做点补偿才行}}}

但是这个是一条一条的发送确认,等待确认的过程中时阻塞的,所以这种方式会阻塞客户端,效率也不会太高。

批量确认:就是在开启Confirm模式后,先发送一批消息,然后再批量确认,使用的是channel.waitForConfirmsOrDie方法,如果方法没有抛出异常,就代表消息都被服务端接受了。

java原生API的实现:

public class BatchConfirm {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String message = "batchConfirmTest-";try {//发五条消息for (int i = 0;i < 5 ;i++){channel.basicPublish("TransactionEx","TransactionMsg",false,false,null,(message+i).getBytes());}//批量确认channel.waitForConfirmsOrDie();System.out.println("发送成功");}catch (Exception e){//抛出异常说明至少一条消息发送失败System.out.println("至少一条消息发送失败");}}}

批量确认的方式比单条确认的方式效率要高,但是也有两个问题,第一个就是批量的数量的确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升不上去。数量多的话,又会带来另一个问题,比如我们发 1000 条消息才确认一次,因为不确定哪条消息发送失败了,那么前面所有的消息都要重发。但是发送了也可能导致一些消息重复发送。所以该方法存在较大问题。

异步确认方式:

异步确认模式需要在添加一个ConfirmListener,并且使用一个sortset来维护一个没有被确认的消息。

java原生API实现:

public class AsyncConfirm {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//用于保存维护未被确认的消息,要有序set 这里就是要treesetfinal SortedSet<Long> confirmSet = new TreeSet<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));//处理发送成功的情况的方法// deliveryTag,发送成功的消息的标志,multiple是否批量发送的if (multiple){//如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认//如果为true,表示确认了此条条以及前面的n条的消息,需要批量重Sort中删除//表示清除deliveryTag这条消息以及前面的消息,因为这个集合是维护未被确认的消息,而这些消息被确认了,就要删除掉了。confirmSet.headSet(deliveryTag + 1L).clear();//下面也可以做一些业务逻辑}else {//单条确认confirmSet.remove(deliveryTag);}System.out.println("未确认消息:" + confirmSet);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//这个是处理未发送成功的消息System.out.println("Broker未确认消息,标识:" + deliveryTag);if (multiple) {// headSet表示后面参数之前的所有元素,全部删除confirmSet.headSet(deliveryTag + 1L).clear();} else {confirmSet.remove(deliveryTag);}//这里可以写一些补偿逻辑}});String message = "asyncConfirmTest";channel.confirmSelect();for (int i = 0;i<10;i++){//获取下一次发送的发送Idlong nextPublishSeqNo = channel.getNextPublishSeqNo();//加到为确认队列中confirmSet.add(nextPublishSeqNo);channel.basicPublish("TransactionEx", "TransactionMsg", null, (message +"-"+ i).getBytes());}System.out.println("所有消息:"+confirmSet);}}

结果:

环节二:

上面使用异步确认是比较好的方法解决环节一的投递确认问题,现在进入环节二的问题。

环节二就是消息到了Broker交换机,可能因为路由键错误,或者队列不存在等原因导致交换机没有把消息路由到队列中的情况。

我们有两种方式处理无法路由的情况,一种是让服务端发送通知给生产者告诉生产者进行处理,另一种是让交换机路由到他的备份交换机上。

方式一:让服务端发送通知给生产者告诉生产者进行处理,使用java原生API的ReturnListener来实现。

实现:

public class ReturnListener {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String message = "returnListener";channel.addReturnListener(new com.rabbitmq.client.ReturnListener() {@Overridepublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {System.out.println("消息未能路由到队列");System.out.println("replyCode = " + replyCode);System.out.println("exchange = " + exchange );System.out.println("routingKey =" + routingKey);System.out.println("body = " + new String(body));}});//mandatory要设置为true才行,原因是//当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,// 那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;// 通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;channel.basicPublish("TransactionEx","TransactionMsg1",true,false,null,message.getBytes());}}

结果,把路由键故意修改错,导致路由不到队列,就会触发ReturnListener的回调,如果成功路由,就不会调用了。

方式二:使用交换器的备用交换器来接收为路由到的消息,此处使用的备用交换器可以使用fanout类型的,确保任何路由键的消息都能路由到队列中。

java原生API实现

public class BackExchange {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();String message = "BackExchange";//创建备用交换器channel.exchangeDeclare("backExchange",BuiltinExchangeType.FANOUT,true,false,false,null);//创建交换器把主交换器并设置备用交换器Map<String,Object> properties = new HashMap<>();//设置备用交换器properties.put("alternate-exchange","backExchange");channel.exchangeDeclare("originalExchange",BuiltinExchangeType.DIRECT,true,false,false,properties);//创建主交换器的队列并绑定channel.queueDeclare("originalQueue",true,false,false,null);channel.queueBind("originalQueue","originalExchange","testback");//创建备用交换器的队列并绑定channel.queueDeclare("backQueue",true,false,false,null);//因为交换器的类型是fanout 所以绑定键无所谓channel.queueBind("backQueue","backExchange","xxx");channel.basicPublish("originalExchange","testback1",true,false,null,message.getBytes());}}

现象,首先输入正确的路由键确保路由成功,然后看到主队列中多了一条消息,其次输入错误的路由键确保路由失败,然后发现备用队列中对了一条消息,而主队列中没有多消息。

环节三

第三个环节是消息在队列存储,如果没有消费者的话,队列一直存在在数据库中。如果 RabbitMQ 的服务或者硬件发生故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘。

此处要实现 持久化交换器、持久化队列、持久化消息三部曲并使用集群保证环节三。三个持久化就不演示了,因为只是创建时的一个参数而已。集群的话专门有文章讲解。

环节四

消息投递到消费者,如果消费者收到消息后没来得及处理就发生了异常,这导致消息收到了,但是没有处理到消息,这就会导致业务数据的一致性发生问题,服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给消费者。

RabbitMq提供了消费者确认机制,消费者可以自动或者手动地发送ACK确认给服务端。服务端时候ACK消息后,就会删除队列中的该条消息,然后继续推送下一条到消费者中。

如果服务端一直没有收到ACK确认,然后一直没有删除消息,然后消费者断开了,RabbitMQ会把这条消息发送给其他消费者,其他消费者,这就可能会导致消息重复消费的问题。

如果服务端一直没有收到ACK确认,然后消费者也没有断开,那么该队列就会阻塞住,直到得到确认删除消息才继续下一条消息。

消费者在订阅队列时,可以指定ACK参数,当autoAck等于false时,RabbitMQ会等待消费者显示手动地回复确认后才删除这条消息。

确认机制有自动和手动,自动的话,在消费者收到消息后就会立即给服务端发送确认,如果是手动的话,要靠我们自己控制应答的时机。

Java原生API可以通过绑定队列时设置确认模式:

//生产者正常发送public class ACKProducer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String msg = "test ack message ";// 声明队列(默认交换机AMQP default,Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentschannel.queueDeclare("testACKQueue", false, false, false, null);// 发送消息// String exchange, String routingKey, BasicProperties props, byte[] bodyfor (int i =0; i<5; i++){//交换器为空表示不经过交换器直接进入队列,第二个参数由路由键要改为队列名channel.basicPublish("", "testACKQueue", null, (msg+i).getBytes());}}}public class AckConsumer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://guest:guest@192.168.18.140:5672");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("testACKQueue", false, false, false, null);System.out.println(" Waiting for message....");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, //消费标志Envelope envelope, //AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("接收到的消息为 : '" + msg + "'");long deliveryTag = envelope.getDeliveryTag();if (msg.contains("拒收")){//如果消息包含拒收字样,就拒绝收该消息//参数1:消息的发送标志//参数2:消息是否重新入队。channel.basicReject(deliveryTag,false);System.out.println("成功拒绝消息" + deliveryTag);}else if (msg.contains("异常")){//multiple 是否批量拒绝,如果是,那么这个id之前没发送确认的消息都会拒绝,,false的话值拒绝当前消息channel.basicNack(deliveryTag,true,false);System.out.println("成功拒绝消息" + deliveryTag);}else {//multiple true代表批量确认channel.basicAck(deliveryTag,true);System.out.println("成功消费消息" + deliveryTag);}}};//消费消息,autoAck设置为false。channel.basicConsume("testACKQueue",false,consumer);}}

如果消息无法处理或者消费失败,也有两种拒绝方式。Basic.Reject方法和Basic.Nack批量拒绝,如果requeue参数设置为true,会把这条队列重新存入队列,以便发给下一个消费者来消费,如果只有一个消费者(准确说就是一种消费者,就是代码都一样,只是服务集群中的多个节点,这种方式可能会出现无限循环消费的情况,因为这种消费者消费不了可能是代码逻辑问题,可以不入队而是把消息投入到另外的队列给另外的消费者消费或者打印日志、记录数据库等做个记录以做补偿)。

消费者回调

生产者最终确定消息有没有被消费者成功消费的两种方式:

消费者消费完消息后,可以调用生产者要调用的回调方法。弊端,可能会导致系统间一定程度耦合。好处,系统比较简单。消费者处理完消息后,可以发送一条消息给生产者,让生产者自己处理回调。弊端,因为又要考虑回调消息的可靠投递和重复消费,补偿等问题,会使得系统更复杂。

订单系统发送一条库存修改消息给库存系统,库存系统处理完后,订单系统要修改订单的状态,使用方式一就是库存系统远程Rpc,rest调用等方式调用订单系统相应接口来修改订单的状态,另一种方式是库存系统发送一条订单状态修改的消息给订单系统,给订单系统通过消费方式来修改订单状态。

消息补偿机制:

如果生产者的API没有被调用或者没有收到消费者的消息,或者消费者没有成功消费到生产者发送的消息时,可以通过一些补偿机制来保证消息数据的一致性。

比如设定一个定时重发机制,在某个时间内没有收到响应的消息,可以设置一个定时重发机制,但是要注意重发的次数和时间间隔,因为一个消息消费失败,立即重复发送会继续消费失败的概率是很大的,所以没必要把间隔设置的太频繁,次数也最好不要无限次,要设定有限的重发次数,别平白无故消费系统的资源,比如5秒重发一次,共重发三次,重试都失败后可以进行入库,然后进行人工的补偿,对消息事实一致性不是很高,只要最终一致性的业务场景,可以消费失败后不立即重复,而是对消费失败消息入库,然后用后台定时任务在某个空闲的时间段进行消息的重新发送。

消息幂等性

如果消息的消费者实际上是消费成功的,数据也发生了改变,但是只是在发送回调消息给生产者或者调用生产者的API时出现了问题,导致了生产者误认为消息消费失败重复消息,然后消费者再次消息,就可能导致消息的重复消费了,所以,为了避免相同消息重复处理,必须采取一定的措施,而RabbitMq服务器没有这种机制实现。

消息的重复消费有两个原因:

生产者的问题,环节1重复发送了消息,比如在开启了Confirm模式下但没有收到确认,生产者重复投递消息,到时队列中有重复的消息。环节4出现问题,由于消费者在发送回调消息给生产者或者调用生产者的API时出现了问题,或者其他原因,消息重复投递。生产者代码或者网络问题。

可以对每一条消息生成唯一的一个业务Id,通过日志或者消息落库来做消息的重复控制,比如每消费成功一条消息,就把这条消息落库,然后消费者每次消费消息时,就看库中有没有该消息的业务Id,有就是已经消费过了,就不重复消费了。如果为了提高效率还可以把消息落入类似redis之类的缓存中。

消息最终一致性

如果消费者确实宕机了,带着代码出现了问题,导致无法正常消费,在我们尝试多次重复后,消息最终都没有处理,可以做记录,日志、数据库等,然后进行人工补偿,比如可以人工修改数据库数据进行数据一致性的补偿。

消息有序性

消息的一致性讲的是消息消费的顺序跟消息生产的顺序是一致的,比如同步商户信息,生产消息的顺序是:新增门店、绑定产品、激活门店,消费的顺序也要这般,但是由于消费者消费的速度可能不一致,导致最终消息消费的速度可能不一致。

如果消息都在同一队列中,由于消费者消费的速度可能不一致,所以在队列有多个消费者的情况下是无法保证消息的顺序性的,因为假如消费者1拿到了新增门店的消息,消费者2拿到了绑定门店的消息,消费者3拿到了激活门店的消息,但是由于消费者2消费消息较快,先消费了绑定门店的消息,就会导致消息消费顺序不一致问题。只有一个队列一个消费者的情况下才能保证消息消费的有序性。

如果消息不在同一队列中,可以通过分布式锁来控制消息的消费顺序。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。