300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RabbitMq(十一) 死信交换机DLX介绍及使用

RabbitMq(十一) 死信交换机DLX介绍及使用

时间:2023-10-13 02:07:47

相关推荐

RabbitMq(十一) 死信交换机DLX介绍及使用

概述:

死信交换机(DLX dead-letter-exchange)和普通交换机一样,也是一种普通的交换机,只不过一般交换机处理正常的消息,而死信交换机是接收被删除的消息。绑定到死信交换机的队列成为死信队列。死信队列由我们创建指定,而非有系统默认的死信队列。被删除的消息一般分为两类:1,时间上过期后被队列删除的消息;2,在指定了固定大小的队列中,由于消息数量超过队列指定的大小,最先进入消息队列的消息被顶出来的消息。下来我们通过一张图来了一个消息从发送到进入死信队列的过程

图中包含了一个普通的交换机my_normal_exchange,两个指定了不同处理死信消息规则的消息队列 my_ttl_dlx_queue和my_max_dlx_queue。以及一个死信交换机my_dlx_exchange,以及一个死信队列my_dlx_queue。

场景1:当有一个指定路由key为"my_ttl_dlx"的消息发送到普通交换机后,由普通交换机转发到my_ttl_dlx_queue队列上,当该条消息在该队列指定的时间内没有被消费则进入到死信交换机my_dlx_exchange中,并且根据路由key "my_ttl_dlx" 转发到对应的死信队列“my_dlx_queue”上。

场景2:当有一个指定路由key为"my_max_dlx"的消息发送到普通交换机后,由普通交换机转发到my_max_dlx_queue队列上,在后续仍有消息被转发进来,当数量达到队列设置的最大消息数量时,该队列中未被消费的且最先进入队列的消息则被删除进入到死信交换机my_dlx_exchange中,并且根据路由key "my_ttl_dlx" 转发到对应的死信队列“my_dlx_queue”上。

代码实现步骤:

1,使用SpringBoot工程pom引入web、ampq的starter依赖。web工程主要用来调用发送消息。

<?xml version="1.0" encoding="UTF-8"?><project xmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiaohui.mqproducer</groupId><artifactId>MqProducer</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies></project>

2,在application.properties文件中配置web以及rabbitmq的配置信息

#端口server.port=8888#rabbitMq配置spring.rabbitmq.host=172.18.255.54spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/myhostspring.rabbitmq.username=xiaohuispring.rabbitmq.password=root

3,创建rabbitmq xml文件并完成如上图中消息队列以及交换机声明,及对应关系绑定。其中在消息队列中使用“x-message-ttl”、“x-max-length”分别指定消息有效时长和队列最大存在消息数量。resource/spring/rabbitmq.xml配置如下:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="/schema/beans"xmlns:xsi="/2001/XMLSchema-instance"xmlns:rabbit="/schema/rabbit"xsi:schemaLocation="/schema/beans /schema/beans/spring-beans.xsd/schema/rabbit /schema/rabbit/spring-rabbit.xsd"><!-- 定义死信消息接收队列 ,没有则自动创建--><rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true" /><!-- 定义死信交换机 没有则自动创建--><rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true"><!-- 绑定消息队列,并指定routingKey,将不同类型进来的消息都转发到 my_dlx_queue死信消息队列--><rabbit:bindings><rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue" /><rabbit:binding key="my_max_dlx" queue="my_dlx_queue" /></rabbit:bindings></rabbit:direct-exchange><!-- 定义过期队列及其属性,不存在则自动创建--><rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true" ><rabbit:queue-arguments><!-- 投递该消息的队列 ,中消息如果在6秒之内没有被消费则进行删除 --><entry key="x-message-ttl" value-type="long" value="6000" /><!-- 设置消息过期后投递到那个死信队列 --><entry key="x-dead-letter-exchange" value="my_dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!-- 定义一个最大消息数量的消息队列 --><rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true" ><rabbit:queue-arguments><!-- 设置该队列最多2条消息,如果超过,最早的消息将被删除投递到私信队列 --><entry key="x-max-length" value-type="long" value="2" /><!-- 设置消息过期后投递到那个死信队列 --><entry key="x-dead-letter-exchange" value="my_dlx_exchange" /></rabbit:queue-arguments></rabbit:queue><!-- 定义一个正常的交换机,并绑定两个不同死信处理规则的消息队列及其路由key --><rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true"><rabbit:bindings><rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/><rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/></rabbit:bindings></rabbit:direct-exchange></beans>

4,创建SpringBoot 主启动类ProducerApplication.java。并使用@ImportResource 引入我们第三步创建的rabbitmq.xml

package com.xiaohui.rabbitmq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.annotation.ImportResource;/*** 生产者启动类*/@SpringBootApplication@ImportResource("classpath:spring/rabbitmq.xml")public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class,args);}}

5,编写web模块controller,在Controller中我们注入RabbitTemplate 对象,通过该对象我们可以进行发送消息到交换机中。

package com.xiaohui.rabbitmq.controller;import com.xiaohui.rabbitmq.config.RabbitMqConfig;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class MsgController {@AutowiredRabbitTemplate rabbitTemplate;/*** 发送测试验证死信队列* @param msg* @return*/@GetMapping("/sendDlxMsg")public String sendDLXMsg(@RequestParam String msg){rabbitTemplate.convertAndSend("my_normal_exchange","my_ttl_dlx",msg);return "消息发送成功!";}@GetMapping("/sendMaxMsg")public String sendMaxMsg(@RequestParam String msg){rabbitTemplate.convertAndSend("my_normal_exchange","my_max_dlx",msg);return "消息发送成功!";}}

6,启动SpringBoot工程,通过url访问消息发送地址,进行消息发送测试。

6.1测试过期消息删除

登录控制台我们可以可以观察到当我们访问链接发送消息后,消息先出现在my_ttl_dlx_queue队列中,时隔6秒之后出现在my_dlx_queue队列中,我们在死信队列中查到该消息详细信息可以看到该消息的删除过程信息,那个交换机队列进行的删除以及删除原因。

6.2 测试最大消息数量队列

我们发送三次不同消息内容“1:测试最大消息数量,第1条”,“2:测试最大消息数量,第2条”,:3:测试最大消息数量,第3条”。

控制台中我们可以看到 my_max_dlx_queue中只存活了两条消息信息。而死信队列my_dlx_queue中又多了一条,根据我们的预期结果,最先发送的消息“1:测试最大消息数量,第1条”将会进入到死信队列我们打开死信队列查看,结果验证我们的结论正确。

工程目录文件如下(config目录无用):

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。