300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RabbitMQ 死信队列 DLX配置 延迟队列实现

RabbitMQ 死信队列 DLX配置 延迟队列实现

时间:2020-02-02 10:51:54

相关推荐

RabbitMQ 死信队列 DLX配置 延迟队列实现

先明白几个概念:

死信

消息如何变成死信:

1.消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;

2.消息过期;

3.队列达到最大长度。

死信邮箱DLX

当消息在一个队列中变成死信之后,它可以被重新被发送到另一个交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。

死信队列:

1.当某个队列(非死信队列)中存在死信时

2.RabbitMQ 就会自动地将这个消息重新发布到设置的DLX上去

3.进而被路由到另一个队列,即死信队列

死信队列的运行图;消息最后会被绑定在死信队列的消费者消费;生产者发送一条消息,然后经过普通交换器存储到消息队列中。没有消费者去消费,导致消息过期。设置了DLX后,消息被丢给死信邮箱中,最后被路由到跟死信邮箱绑定的死信队列中

死信队列的实现:

创建一个死信路由器DLX.创建时可以为这个DLX 指定路由键,则死信邮箱内的死信路由键则更改。如果没有特殊指定,则使用原队列的路由键;创建一个死信队列,绑定死信邮箱.在声明消息队列时候方,设置x-dead-letter-exchange 参数,为这个队列添加死信队列

实现:

public class Send {//队列名private final static String QUEUE_NAME = "queue";//死信队列名private final static String DLX_QUEUE_NAME = "dlx_queue";//路由器名private final static String EXCHANGE_NAME = "exchange";//死信路由器名private final static String DLX_EXCHANGE_NAME = "dlx_exchange";//绑定键private final static String BINDING_KEY = "exchange";//路由键private final static String ROUTING_KEY = "exchange";private static Connection connection =null;private static Channel channel = null;public static void main(String[] args) {Map<String, Object> ttlQueue = new HashMap<String, Object>(4);ttlQueue.put( "x-message-ttl" , 6000);ttlQueue.put("x-dead-letter-exchange","dlx_exchange");ttlQueue.put("x-dead-letter-routing-key", "dlx-routing-key");try{// 获取到连接以及mq通道connection = ConnectionUtil.getConnection();// 从连接中创建通道channel = connection.createChannel();//声明了一个direct 类型的交换器channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);//声明一个死信邮箱channel.exchangeDeclare("dlx_exchange", "direct");// 声明队列QUEUE_NAMEchannel.queueDeclare(QUEUE_NAME, false, false, false, ttlQueue);//将路由与队列绑定,再为绑定的路径赋值一个绑定键channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,BINDING_KEY);//声明死信队列channel.queueDeclare(DLX_QUEUE_NAME, false, false, false,null);//将路由与队列绑定,再为绑定的路径赋值一个绑定键channel.queueBind(DLX_QUEUE_NAME,DLX_EXCHANGE_NAME,"dlx-routing-key");//发送数据for (int i=0;i<10;i++){// 消息内容String message = "Hello World!"+i;AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();// 设置TTL=6000msbuilder.expiration("5000");AMQP.BasicProperties properties = builder.build();//指定发送消息到哪个路由,以及他的路由键,消息等channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true,properties, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(2000);}}catch (Exception e){e.printStackTrace();}finally {//关闭通道和连接try {channel.close();connection.close();} catch (IOException e) {e.printStackTrace();}}}}

延迟队列:

延迟队列存储延迟消息,"延迟消息"指当消息被发送以后,并不想让消费者立刻拿到消息而是等待特定时间后,消费者才能消费这个消息

实现:

RabbitMQ本身没有延迟队列,可以通过DLX和TTL模拟延迟队列

运作流程:

死信队列的用法,也就是延迟队列的用法。假设需要10秒的延迟,生产者通过路由器将发送的消息存储在消息队列中。消费者订阅是死信队列。当消息从消息队列中过期之后被DLX路由器存入死信队列中这样消费者就恰巧消费到了延迟10 秒的这条息。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。