300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > #rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失

#rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失

时间:2024-03-15 20:38:28

相关推荐

#rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失

exchange类型:

1, direct

指定direct后, 消息会根据你设置的routeing key(路由键), 发送到对应的队列中

1,新建direct交换机

2,添加队列, 并且绑定路由键

3,测试发送消息, 只有emp队列可以收到

2, fanout

fanout, 广播, 不管你指定不指定路由键, 都会向他绑定的所有队列发送消息

1, 新建fannout交换机

2, 绑定路由键, 因为他是fanout的, 其实这里绑定路由键或者不绑定, 都一样

3, 测试发送消息

4, 可以看到虽然发送指定了路由键是emps, 但是所有的队列都收到了

3, Topic

1, 新建topic交换机, 绑定路由键

2, 发送消息

3, 查看接收情况, 发现全部可以接收到

5, 测试发送消息

6, 查看接收结果, 只有路由键是*.news对应的交换机可以收到

整和SpringBoot

1, 引依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2, 开启

@EnableRabbit

3, 配置

spring:rabbitmq:host: ${version3Ip}port: 5672virtual-host: /

Java实例

测试创建MQ组件

@Slf4j@SpringBootTestclass GulimallOrderApplicationTests {@Autowiredprivate AmqpAdmin amqpAdmin;//创建基础信息@Testvoid createExchange() {//AmqpAdmin创建/*name: 交换机名字, durable:是否持久化, autoDelete:是否自动删除, args:其他参数DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {*/DirectExchange directExchange = new DirectExchange("hello-exchange",true,false );amqpAdmin.declareExchange(directExchange);log.info("交换机{},创建成功","hello-exchange");}/*** 创建队列*/@Testpublic void createuqeue(){Queue queue = new Queue("hello-queue");amqpAdmin.declareQueue(queue);log.info("队列创建成功!");}/*** 将交换机和队列绑定, 指定路由键*/@Testpublic void createBinding(){/*Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) {destination: 目的地destinationType:目的地类型exchange:交换机routingKey: 路由键args: 自定义参数将exchange指定的交换机和destination目的地进行绑定, 使用routingKey作为指定的路由键*/Binding binding = new Binding("hello-queue", Binding.DestinationType.QUEUE, "hello-exchange", "hello.java", null);amqpAdmin.declareBinding(binding);log.info("绑定成功!");}//发送消息@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息*/@Testpublic void sendMsg(){OrderEntity orderEntity = new OrderEntity();orderEntity.setBillContent("Java发送的消息内容!");/*** 如果发送的消息是个对象, 我们会使用序列化机制, 将对象写出去,* 所以对象必须实现 Serializable,* 但是注意这样序列化之后, 在rabbitMQ管理界面,只能看到序列化后的数据, 不能直接查看数据* 如下: rO0ABXNyAC1jb20uYXRndWlndS5ndWxpbWFsbC5vcmRlci5lbnRpdHkuT3JkZXJFbnRpdHkAAAAAAAAAAQIAKkwADmF1dG9Db25maXJtRGF5dAATTGphdmEv* 因此为了让界面直接可以看到, 可以新增一个配置, 改变rabbitMQ的默认转化方式*Jackson2JsonMessageConverter**/rabbitTemplate.convertAndSend("hello-exchange","hello.java",orderEntity);log.info("消息发送成功!");}}

测试接收消息

/*** 接收消息:* 1, @RabbitListener*标在类 + 方法上* 2, @RabbitHandler*标在方法上** @RabbitListener 这个注解必须在容器中才能生效, 比如@Service, @Component* queues:声明需要监听的所有队列** 接收到的消息到时候就会在形参里* 可以写Object msg 也可以是 Message msg* 1, (Object msg)* 2, (Message msg) 这种方式是可以得到原生消息的详细信息, 消息头+ 消息体* byte[] body = msg.getBody();* //消息头属性信息* MessageProperties messageProperties = msg.getMessageProperties();* 3, (Message msg, OrderEntity content)*T<发送的消息类型 OrderEntity entity* 4,(Message msg, OrderEntity content, Channel channel)*channel:当前传输数据的通道, 而且一个客户端连接rabbitMQ, 只会创建一个通道,** Queue: 可以很多人监听, 但是消息队列中的消息只能有一个接收到, 只要有一个人接收到, 消息就会被删除, 因此只能有一个人接收到* 比如: 订单服务启动了多个(三个A,B,C),但是同一个消息只能让一个服务消费, 比如A接收到了, BC就接收不到**/@RabbitListener(queues = "hello-queue")public void recieveMessage(Message msg, OrderEntity content, Channel channel){byte[] body = msg.getBody();//消息头属性信息MessageProperties messageProperties = msg.getMessageProperties();log.info("接收到的消息是:{},类型是:{},内容是: {}",msg,msg.getClass(),content);}

重点:

1, 接收消息:

1), @RabbitListener

标在类 + 方法上

2), @RabbitHandler

标在方法上

2, @RabbitListener 这个注解必须在容器中才能生效, 比如@Service, @Component
3, Queue: 可以很多人监听, 但是消息队列中的消息只能有一个接收到, 只要有一个人接收到, 消息就会被删除, 因此只能有一个人接收到; 比如: 订单服务启动了多个(三个A,B,C),但是同一个消息只能让一个服务消费, 比如A接收到了, BC就接收不到

消息确认

先看下雷丰阳老师讲的

根据图可以看到, 可靠投递分为两个角度,

publisher(生产端):

1, comfirmCallback: 消息推送到整个broker的过程, 进行确认回调, 如果是单机的, 只要broker接收到消息就会执行回调, `如果是集群模式, 需要所有的broker接收到才会回调`但是注意这一步, 只是说broker接收到了消息, 但是不保证可以正确的投递, 因此引入出了第2步 2,returnCallback: 在broker内部, exchange交换机根据routingKey路由键, 找到Queue队列的时候, 也是可能存在消息丢失, 比如路由键找不到, 这个时候就会回调, 因此也是需要加入确认回调

注意:

这个确认回调就类似于,ajax的提交, 成功回调success, 失败回调error, 而且这个是将生产者生产消息, 推送, 交换机寻找推送队列, 等都作为一个事务去处理的, 因此很消耗效率

consumer(消费端):

保证每个消息被正确消费掉后, broker才可以删除这个消息

1, 自动签收, 改成手动签收, 或者拒绝签收,拒绝签收有设置可以让他, 重新入队, 也可以直接丢弃, 就这一个好玩的, 其他没发现

消息确认总结

1, 生产端

生产端需要确认的地方有两个点,一个是消息抵达broker的过程中,另一个是broker内部通过exchange交换机根据routingKey找到队列的过程中, 因此生产端需要确认的点也有两个, 而代码实现上,第一个对应的是ConfirmCallback回调方法, 第二个对应的是ReturnCallback回调方法

需要注意的是

1, 发送消息的时候, 可以指定消息的id(CorrelationData),这样后续consumer接收到消息,可以记录在库, 后续对账, 找出没接收到消息的id

/*** 发送指定id, 这样就可以将生产者确认接收到的消息id都记录下来,* 后续就可以遍历就可以找到没收到的消息是那些了*/@Testpublic void sendMsg2(){OrderEntity orderEntity = new OrderEntity();orderEntity.setBillContent("Java发送的消息内容!");rabbitTemplate.convertAndSend("hello-exchange","hello11.java111",orderEntity, new CorrelationData(UUID.randomUUID().toString()));log.info("消息发送成功!");}

2, 修改rabbitTemplate的配置

@AutowiredRabbitTemplate rabitTemplate;/*** 生产者的可靠消息传递* 1, 开启配置*publisher-confirms: true #开启发送端确认* 2, 修改rabbitTemplate的ConfirmCallback回调方法 --> 这个是消息到broker的过程* 3, 开启配置*publisher-returns: true # 开启发送端消息抵达队列确认 第二步, broker内交换机到队列的过程*template:* mandatory: true #只要抵达队列, 以异步方式, 优先回调returnConfirm的方法* 3, 修改rabbitTemplate的ReturnCallback回调方法 --> 这个是broker内, 交换机到队列的过程***/@PostConstruct //当前类对象创建完成后, 执行这个方法public void initTemplate(){//注意这个方法是, 只要消息抵达到了broker, ack = true, 服务器收到消息就回调rabitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 当前消息的唯一关联数据, CorrelationData.id , 可以直接理解为这个消息的整体的唯一id* @param ack 消息是否成功收到* @param cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息生产确认!!!!correlationData:{},ack:{},cause:{} ",correlationData,ack,cause);}});//设置消息抵达队列的确认回调, 就是broker内, 交换机到队列的过程rabitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {//只要消息没有投递给指定的队列, 就会触发, 这个类似于一个失败回调/*** @param message 投递失败的消息, 的详细信息* @param replyCode 回复的状态码* @param replyText 回复的文本内容* @param exchange 当时这个消息发送给那个交换机* @param routingKey 当时这个消息用的哪个路由键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error("交换机内部投递失败, message:{},replayCode:{},replayText:{}, routingKey:{}",message,replyCode,replyText,exchange,routingKey);}});}

消费端

没啥好说的

延时投递

实现方式一, 给交换机exchange设置过期参数, 本交换机都是30分钟过期

生产者, 推送路由键给exchange1, exchange1接收到以后, 在里边保存30分钟后, exchange1将过期30分钟的消息推送给exchange2, 然后由消费者监听exchange2, 这样, exchange2里边就都是过期了30分钟的数据了;

简单说就是:

推送正常推送, 只不过以前是推送给A, 然后消费者就监听A,但是这里是为了做延时, 就有点不一样; 推送正常推, 推送给A, 但是A需要设置一些交换机参数, 让A将消息保存三十分钟后丢弃, 丢弃的时候, 就丢弃给B, 然后B接收到的消息, 就是过了30分钟后的消息, 然后消费者, 去监听B的消息;

也就是:生产者推送消息给A, A将消息攥在手里30分钟后, 丢给B, 用户去监听消费B的消息;

实现方式二: 给推送的消息设置30分钟过期时间, 交换机设置将过期消息(死信)推送给B交换机

简单说: 类似于1, 只是说1是给交换机设置全部的过期时间, 这里是给推送的消息设置过期时间

总结:

其实两种方式, 目的都是,先将推送的消息, 都变成死信,然后将死信推送给新的交换机, 然后监听这个新的交换机就好了;

代码模拟延时队列的第二种实现

主要参考雷神讲解, 链接:

/video/BV1np4y1C7Yf?p=292&vd_source=2f702df7504ba1aeb00e78f5a3c23047

1, 配置mq

主要是创建了两个队列, 一个交换机

然后进行了两次绑定

package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;import java.util.HashMap;import java.util.Map;/*** 如果rabbitMQ里没有交换机和队列啥的, 系统运行就会自动创建, 如果MQ里已经有了, 就不会创建, 使用MQ里边的** 流程就是:* 1,* 生产者生产 路由键为: order.create.order 的消息, 根据这个消息, 找到对应的交换机 order-event-exchange* 参考orderCreateOrderBinging()方法, 这个交换机内部, 将order.create.order 和 order.delay.queue 绑定* 相当于又转发到了, order.delay.queue 这个路由键,* 通过order.delay.queue 这个路由键, 找到对应的交换机, 还是order-event-exchange 参考orderCreateOrderBinging()** 2,* 参考orderDelayQueue()方法, 上述过程已经到了order.delay.queue 这个路由键* 可以看到这个方法给这个路由键设置了, ttl等参数, 而且对应的交换机是order-event-exchange* 重要的是, 指定了另外一个路由的路由键 order.release.order** 3,* 等到1分钟一过, order.release.order 就会接收到新的消息, 也就是过了一分钟的死信消息, 参考orderReleaseOrderBinging()方法* 这个方法会给order.release.order.queue队列推送一个路由键为 order.release.order的消息,** 4,* 然后通过监听order.release.order.queue的队列, 接收到, 已经超时一分钟的死信消息了** 5,* 也就是:* Producer*生产--> 路由键为order.create.order 的消息* --> 到order-event-exchange交换机* --> 将这个消息变成时效性为1分钟的消息* -->推送给 路由键order.delay.queue的消息, delay的消息是具有时效性的, 这里相当于把没有时效的消息, 转换成了有时效的消息, 就是create到delay的过度*--> delay消息时效一过, 还是经过order-event-exchange交换机, 推送一个路由键为order.release.order的消息*--> 把order.release.order的消息推送给order.release.order.queue队列* --> 监听order.release.order.queue队列的消费者进行消费处理* 总结说就是,*生产者生create的普通消息, exchange将其变成具有时效性的delay消息*delay时效一过, 把时效性的消息变成普通消息, release消息,然后推送给order.release.order.queue队列*消费者监听release队列, 进行消费**/@Configurationpublic class MyMQConfig {/*@Bean 容器中的Binding, Queue, Exchange都会自动创建, 前提是RabbitMQ没有的情况下如果RabbitMQ里已经存在了, @Bean新设置的属性, 不会覆盖, 还是使用rabbitMQ原来有的*//*** 创建死信队列* 创建延时队列,* 其实就是配置消息什么时候变成死信, 死信之后, 交给那个交换机, 交给那个路由键* @return*/@Beanpublic Queue orderDelayQueue(){/*** 设置消息, 1分钟过期,*/Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "order-event-exchange"); //指定死信路由args.put("x-dead-letter-routing-key", "order.release.order"); //指定死信路由键args.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false,args);return queue;}@Beanpublic Queue orderReleaseQueue(){Map<String,Object> args = new HashMap<>();Queue queue = new Queue("order.release.order.queue", true, false, false,args);return queue;}@Beanpublic Exchange orderEventExchange(){return new TopicExchange("order-event-exchange",true,false);}/*** 设置队列和交换机的绑定关系*/@Beanpublic Binding orderCreateOrderBinging(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinging(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity order, Channel channel, Message msg) throws IOException {System.out.println("收到过期的订单信息:"+order.getOrderSn()+"时间是: "+order.getModifyTime());channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); //手动签收}}

2, 生产者发送消息

@AutowiredRabbitTemplate rabbitTemplate;@ResponseBody@GetMapping("/test/createOrder")public String createOrderTest(){//模拟订单下单成功OrderEntity order = new OrderEntity();order.setOrderSn(UUID.randomUUID().toString());order.setModifyTime(new Date());//给MQ发送消息rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);return "ok";}

3, 消费者消费

主要就是第一步的最后一个方法

@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity order, Channel channel, Message msg) throws IOException {System.out.println("收到过期的订单信息:"+order.getOrderSn()+"时间是: "+order.getModifyTime());channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); //手动签收}

4, 总结

如果rabbitMQ里没有交换机和队列啥的, 系统运行就会自动创建, 如果MQ里已经有了, 就不会创建, 使用MQ里边的

流程就是:

1,生产者生产 路由键为: order.create.order 的消息, 根据这个消息, 找到对应的交换机 order-event-exchange

参考orderCreateOrderBinging()方法, 这个交换机内部, 将order.create.order 和 order.delay.queue 绑定

相当于又转发到了, order.delay.queue 这个路由键,

通过order.delay.queue 这个路由键, 找到对应的交换机, 还是order-event-exchange 参考orderCreateOrderBinging()

2,参考orderDelayQueue()方法, 上述过程已经到了order.delay.queue 这个路由键

可以看到这个方法给这个路由键设置了, ttl等参数, 而且对应的交换机是order-event-exchange

重要的是, 指定了另外一个路由的路由键 order.release.order

3, 等到1分钟一过, order.release.order 就会接收到新的消息, 也就是过了一分钟的死信消息, 参考orderReleaseOrderBinging()方法

这个方法会给order.release.order.queue队列推送一个路由键为 order.release.order的消息,

4,然后通过监听order.release.order.queue的队列, 接收到, 已经超时一分钟的死信消息了

5,

也就是:

Producer

生产--> 路由键为order.create.order 的消息

--> 到order-event-exchange交换机

--> 将这个消息变成时效性为1分钟的消息

-->推送给 路由键order.delay.queue的消息, delay的消息是具有时效性的, 这里相当于把没有时效的消息, 转换成了有时效的消息, 就是create到delay的过度

--> delay消息时效一过, 还是经过order-event-exchange交换机, 推送一个路由键为order.release.order的消息

--> 把order.release.order的消息推送给order.release.order.queue队列

--> 监听order.release.order.queue队列的消费者进行消费处理

总结说就是,

1,生产者生create的普通消息, exchange将其变成具有时效性的delay消息2,delay时效一过, 把时效性的消息变成普通消息, release消息,然后推送给order.release.order.queue队列3,消费者监听release队列, 进行消费

想要保证消息的可靠投递, 以及延时投递, 都得结合任务场景去做, 而且不能单方面的使用延时投递, 就保证万无一失了; 因为有时候由于网络, 宕机, 导致你延时投递的消息, 和解锁的消息, 并不是顺序执行, 比如解锁的消息先执行了, 这条消息已经消费了, 结果你延时投递的消息才刚到, 这样子, 这笔消息就再也没办法解锁了

生产者消息确保发送成功

生产者消息丢失

1, 业务代码做好try-catch补偿机制, 将消息的发送记录在日志, 失败的消息, 定期重新发送给MQ

// try-catch保证消息必然发送出去try{rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);}catch (Exception e){//TODO 设法将消息成功发出去给MQ/*可以将失败的消息记录在日志, 然后定时扫描, 重新发送消息给MQ*/}

2, 生产者消息做好手动签收, 防止即使消息发送给MQ了, 已经抵达到了Broker, 但是Broker还没完成就宕机了3, 保证broker实例的监控, 也就是集群的健康

另外, 消费者防止消息丢失, 也是类似, 一个就是手动ACK, 二就是kafka记录偏移量

消息重复

重复消费

重复原因:

1, 业务消费消息成功了, ack的时候, 结果MQ宕机, 或者业务系统宕机, 导致MQ以为你没消费呢, 然后又给了别人, 让别人去消费了2, 业务消费失败了, 由于重试机制, 自动又将消息发送给别人了

解决方式:

1, 设置幂等性2, 使用防重表, 将消费的每一个消息都有一个唯一标识, 记录在表, 已经消费的就不进行第二次消费3,rabbitMQ的每一个消息都有redelivered字段(), 可以获取到是否是被重新投递过来的

//redelivered 可以判断出, 当前消息是否被第二次及以后重新派发过来的Boolean redelivered = message.getMessageProperties().getRedelivered();

消息积压

解决方式:

1, 上线更多的消费者, 去消费2, 设置专门的消费者服务, 将消息先批量取出来, 记录数据库, 离线慢慢去处理

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。