300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > SpringBoot整合RabbitMQ 消息可靠投递 手动ack 延迟队列 死信队列 消息幂等性保障 消息积压

SpringBoot整合RabbitMQ 消息可靠投递 手动ack 延迟队列 死信队列 消息幂等性保障 消息积压

时间:2018-09-28 15:10:54

相关推荐

SpringBoot整合RabbitMQ 消息可靠投递 手动ack 延迟队列 死信队列 消息幂等性保障 消息积压

1、消息可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

confirm 确认模式return 退回模式

rabbitmq 整个消息投递的路径为:

producer—>rabbitmq broker—>exchange—>queue—>consumer

消息从 producer 到 exchange 则会返回一个 confirmCallback 。

消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。

将利用这两个 callback 控制消息的可靠性投递

因SpringBoot 整合RabbitMQ 当队列或交换机不存在是,自动创建,所以可靠性检测的一般是服务是否宕机。与消费者是否接收/确认消息无无关

1.1、SpringBoot整合

生产端

yaml

spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /admin# 开启publisher-confirm 有以下可选值# simple:同步等待confirm结果,直到超时# correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback# NONE:默认不开启publisher-confirm-type: correlated# 开启publish-return功能。可以定义ReturnCallback# true:调用ReturnCallback# false:直接丢弃消息publisher-returns: true

自定义Callback类

/*** 消息推送确认机制配置文件* @author codinganhour*/@Componentpublic class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@AutowiredRabbitTemplate rabbitTemplate;/*** 初始化方法*/@PostConstructpublic void initMethod() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {Integer receivedDelay = null;if(null != correlationData){correlationData.getReturned().getMessage().getMessageProperties().getReceivedDelay();}if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}if (ack) {System.out.println("消息已经送达Exchange,ack已发");} else {System.out.println("消息没有送达Exchange");}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("消息没有送到队列中");}}

2、手动ACK确认机制

在RabbitMQ中指代的是消费者收到消息后确认的一种行为,关注点在于消费者能否实际接收到MQ发送的消息。

自动Ack时,消费者接收消息后立即ack,然后慢慢处理,重启消费者会丢失消息。手动Ack时,消费者接收消息后,消息状态为 Unacked,如果消费的时候没有手动ack,则mq中的消息总量Total不会减少。

RabbitMQ默认的消息确认机制是:自动确认的

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

其提供了三种确认方式:

自动确认acknowledge=“none”:当消费者接收到消息的时候,就会自动给到RabbitMQ一个回执,告诉MQ我已经收到消息了,不在乎消费者接收到消息之后业务处理的成功与否。

手动确认acknowledge=“manual”:当消费者收到消息后,不会立刻告诉RabbitMQ已经收到消息了,而是等待业务处理成功后,通过调用代码的方式手动向MQ确认消息已经收到。当业务处理失败,就可以做一些重试机制,甚至让MQ重新向消费者发送消息都是可以的。

根据异常情况确认acknowledge=“auto”:该方式是通过抛出异常的类型,来做响应的处理(如重发、确认等)。这种方式比较麻烦。

1.1、SpringBoot 整合RabbitMQ ACK

消费端

manual方式

yaml配置文件

spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /adminlistener:# 容器类型simple或direct 简单理解为一对一;direct理解为一对多个消费者simple:# ACK模式(none,auto,manual,默认为auto)acknowledge-mode: manual# 开启重试retry:# 是否开启重试机制enabled: true

消费者

/*** @author*/@Slf4j@Componentpublic class DirectManualListener {/*** 消息最大重试次数*/private static final int MAX_RETRIES = 3;/*** 重试间隔(秒)*/private static final long RETRY_INTERVAL = 5;/*** 手动进入死信队列* RabbitListener中的参数用于表示监听的是哪一个队列*/@RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)public void manualListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {// 重试次数int retryCount = 0;boolean success = false;// 消费失败并且重试次数<=重试上限次数while (!success && retryCount < MAX_RETRIES) {retryCount++;// 具体业务逻辑System.out.println("处理业务逻辑");// 如果失败则重试if (!success) {String errorTip = "第" + retryCount + "次消费失败" +((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");log.error(errorTip);Thread.sleep(RETRY_INTERVAL * 1000);}}if (success) {// 消费成功,确认channel.basicAck(deliveryTag, false);log.info("创建订单数据消费成功");} else {// requeue:false 手动拒绝,进入抛弃或进入死信队列channel.basicNack(deliveryTag, false, false);log.info("创建订单数据消费失败");}}}

auto方式

yaml配置文件

spring:rabbitmq:host: 192.168.0.134port: 5672username: adminpassword: adminvirtual-host: /adminlistener:simple:# ACK模式(none,auto,manual,默认为auto)acknowledge-mode: auto# 开启重试retry:# 是否开启重试机制enabled: true# 最大重试次数,默认3max-attempts: 5# 重试间隔(ms) 默认1秒initial-interval: 500# 重试因子,默认是1。本次推送时间间隔 = 上一次间隔时间 * multipliermultiplier: 2# 最大间隔时间(ms),默认10秒maxInterval: 20000

消费者

@Slf4j@Componentpublic class DirectAutoListener {/*** auto手动抛出异常方式进入死信队列,yaml中max-attempts,initial-interval生效* RabbitListener中的参数用于表示监听的是哪一个队列*/@RabbitListener(queues = DirectManualConfig.DIRECT_QUEUE)public void autoListenerQueue(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {log.info("消息信息:"+message+";消息deliveryTag="+deliveryTag);Thread.sleep(1000);if(deliveryTag != 8){throw new RuntimeException("操作异常");}else{log.info("消息Ack deliveryTag="+deliveryTag);channel.basicAck(deliveryTag, false);}}}

3、延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

场景:

下单后,30分钟未支付,取消订单,回滚库存。

新用户注册成功7天后,发送短信问候。

实现方式:

定时器

缺点:触发时,会扫描数据库,难以精确定位触发时间,数据量大时数据库承受压力过大;

延迟队列(TTL+死信队列组合实现延迟队列的效果)

精确触发,触发时只查询单一数据即可

延迟队列

/*** 延迟队列* @author*/@Slf4j@Configurationpublic class DirectTtlConfig {/*** direct路由模式-交换机*/public static final String DIRECT_EXCHANGE = "direct_ttl_exchange";/*** direct路由模式-队列*/public static final String DIRECT_QUEUE = "direct_ttl_queue";/*** direct路由模式-路由键*/public static final String DIRECT_ROUTING = "direct.ttl.routing";/*** direct路由模式-死信交换机*/public static final String DIRECT_DLX_EXCHANGE = "direct_ttl_dlx_exchange";/*** direct路由模式-死信队列*/public static final String DIRECT_DLX_QUEUE = "direct_ttl_dlx_queue";/*** direct路由模式-路由键*/public static final String DIRECT_DLX_ROUTING = "direct.ttl.dlx.routing";/*** 1、声明交换机* direct路由模式,默认持久化,非自动删除* @return*/@Bean(DIRECT_EXCHANGE)public Exchange directTtlExchange(){return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build();}/*** 2、声明队列* direct路由模式* @return*/@Bean(DIRECT_QUEUE)public Queue directTtlQueue(){// ttl:延迟队列时间,超时为消费则进入死信队列中// deadLetterExchange:绑定死信交换机// deadLetterRoutingKey:绑定死信路由return QueueBuilder.durable(DIRECT_QUEUE).ttl(1000).deadLetterExchange(DIRECT_DLX_EXCHANGE).deadLetterRoutingKey(DIRECT_DLX_ROUTING).build();}/*** 3、队列与交换机进行绑定* direct路由模式* @param queue @Qualifier 将 value 对应的bean 注入到参数中* @param exchange @Qualifier 将 value 对应的bean 注入到参数中* @return*/@Beanpublic Binding directTtlQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs();}/*** 1、声明死信交换机* direct路由模式,默认持久化,非自动删除* @return*/@Bean(DIRECT_DLX_EXCHANGE)public Exchange directDlxExchange(){return ExchangeBuilder.directExchange(DIRECT_DLX_EXCHANGE).build();}/*** 2、声明死信队列* direct路由模式* @return*/@Bean(DIRECT_DLX_QUEUE)public Queue directDlxQueue(){return QueueBuilder.durable(DIRECT_DLX_QUEUE).build();}/*** 3、死信队列与死信交换机进行绑定* direct路由模式* @param queue @Qualifier 将 value 对应的bean 注入到参数中* @param exchange @Qualifier 将 value 对应的bean 注入到参数中* @return*/@Beanpublic Binding directDlxQueueExchange(@Qualifier(DIRECT_DLX_QUEUE) Queue queue, @Qualifier(DIRECT_DLX_EXCHANGE) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DIRECT_DLX_ROUTING).noargs();}}

消费者只需要监听死信队列中消息即可

4、死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

死信的三种情况:

队列消息长度到达限制;

消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;(手动ack(auto,manual)都可以触发)

原队列存在消息过期设置,消息到达超时时间未被消费;

死信队列与延期队列实现方式一致,只是会监听2个消费者,正常队列采用ack(auto,manual)触发是否进入死信队列

QueueBuilder.durable(DIRECT_QUEUE).maxLength():队列中等待消费的数量大于maxLength的数量就会进入死信队列

5、消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

处理方式

传递消息唯一值记录数据库中或者redis中,消费时判断,防止重复消费

更新数据库时可以采用乐观锁方式,关键字段值发生变化则不消费

6、消息积压

消费者宕机积压消费者消费能力不足积压发送者发流量太大

解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。