300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > springboot 演示 rabbitmq dlx 死信队列案例

springboot 演示 rabbitmq dlx 死信队列案例

时间:2021-11-02 09:39:16

相关推荐

springboot 演示 rabbitmq dlx 死信队列案例

springboot 演示 rabbitmq dlx 死信队列案例

说明: DLX,Dead Letter Exchange(死信交换机), 当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 消息成为死信的三种情况:

队列消息长度到达限制, 超过队列的最大容纳量原队列存在消息过期设置,消息到达超时时间未被消费消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

1. 编写Rabbitmq 配置类

说明:

声明正常的队列(DLX_QUEUE_NORMAL)和交换机(DLX_EXCHANGE_NORMAL)声明死信队列(DLX_QUEUE)和死信交换机(DLX_EXCHANGE)正常队列绑定死信交换机,两个参数: x-dead-letter-exchange: 死信交换机名称 x-dead-letter-routing-key: 正常队列和死信交换机之间的路由

import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class RabbitMQConfig {/*** 测试 DLX 死信队列* 1. 声明正常的队列(DLX_QUEUE_NORMAL)和交换机(DLX_EXCHANGE_NORMAL)* 2. 声明死信队列(DLX_QUEUE)和死信交换机(DLX_EXCHANGE)* 3. 正常队列绑定死信交换机,两个参数:*x-dead-letter-exchange: 死信交换机名称*x-dead-letter-routing-key: 正常队列和死信交换机之间的路由* @return*/public static final String DLX_QUEUE_NORMAL = "springboot_dlx_queue_normal";public static final String DLX_EXCHANGE_NORMAL = "springboot_dlx_exchange_normal";public static final String DLX_QUEUE = "springboot_dlx_queue";public static final String DLX_EXCHANGE = "springboot_dlx_exchange";// 1. 定义交换机@Bean("DlxExchangeNormal")public Exchange dlxExchangeNormal(){return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NORMAL).durable(true).build();}@Bean("DlxExchange")public Exchange dlxExchange(){return ExchangeBuilder.topicExchange(DLX_EXCHANGE).durable(true).build();}// 2. 定义队列@Bean("DlxQueue")public Queue dlxQueue(){return QueueBuilder.durable(DLX_QUEUE).build();}@Bean("DlxQueueNormal")public Queue dlxQueueNormal(){// 正常队列需要绑定死信交换机Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange",DLX_EXCHANGE); // 设置队列绑定的死信交换机名称arguments.put("x-dead-letter-routing-key","dlx.test"); // 设置队列与死信交换机的路由arguments.put("x-message-ttl",10000); // 设置队列过期时间arguments.put("x-max-length",10); // 设置队列最大长度, 最多容纳10条return QueueBuilder.durable(DLX_QUEUE_NORMAL).withArguments(arguments).build();}// 3. 绑定交换机与队列@Beanpublic Binding dlxBindQueueNormalAndExchange(@Qualifier("DlxQueueNormal") Queue dlxQueueNormal,@Qualifier("DlxExchangeNormal") Exchange dlxExchangeNormal){return BindingBuilder.bind(dlxQueueNormal).to(dlxExchangeNormal).with("dlx.normal.#").noargs();}@Beanpublic Binding dlxBindQueueAndExchange(@Qualifier("DlxQueue") Queue dlxQueue,@Qualifier("DlxExchange") Exchange dlxExchange){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx.#").noargs();}}

2. 编写测试类, 发送消息

import com.itheima.rabbitmqConfig.RabbitMQConfig;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest@RunWith(SpringRunner.class)public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试死信队列, 消息成为死信的三种情况:* -- 1. 队列消息长度到达限制* -- 2. 原队列存在消息过期设置,消息到达超时时间未被消费* -- 3. 消费者拒接消费消息,并且不重回队列*/@Testpublic void dlxTest(){// 测试队列消息过期rabbitTemplate.convertAndSend(RabbitMQConfig.DLX_EXCHANGE_NORMAL,"dlx.normal.TEST","it is a normal message,but it will change a dlx !");// 队列消息长度到达限制,该队列目前最多只能存储10条信息for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConfig.DLX_EXCHANGE_NORMAL,"dlx.normal.TEST","it is a normal message,but it will change a dlx !");}}}

3. 模拟消费者拒绝接收消息, 让消息变成死信消息

说明: channel.basicNack(deliveryTag,true,false); requeue 必须设置为false

import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.ponent;/*** DLX(Dead Letter Exchange) 死信队列* 消息成为死信的三种情况:* -- 队列消息长度到达限制* -- 原队列存在消息过期设置,消息到达超时时间未被消费* -- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false*/@Componentpublic class ListenerConsumerDlx implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "springboot_dlx_queue_normal")public void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println(new String(message.getBody()));System.out.println("逻辑业务处理完成...");int i = 3/0; //出错/*** 手动确认签收 basicAck(long deliveryTag, boolean multiple)* deliveryTag 标签* multiple接收多条信息*/channel.basicAck(deliveryTag,true);}catch (Exception e){/*** 签收失败 basicNack(long deliveryTag, boolean multiple, boolean requeue)* deliveryTag 标签* multiple接收多条信息* requeuetrue,消息重回Queue,broker重新发送该消息给消费者; false: 消息变成死信消息,存储到死信交换机*/System.out.println("it is dlx message!");channel.basicNack(deliveryTag,true,false);}}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。