300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rocketmq发送顺序消息(四)

rocketmq发送顺序消息(四)

时间:2018-10-05 01:54:49

相关推荐

rocketmq发送顺序消息(四)

rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息。此处我们讲解如何发送rocketmq顺序消息

producer

public class ProducerOrder {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("testGrp");// 设置nameserver地址 nameserver具备路由功能(发现服务,有点注册中心的意思),让其分配合理的broker来进行消息发送producer.setNamesrvAddr("192.168.52.11:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 20; i++) {Message message = new Message("monkeyOrderMsgTopic", ("这是顺序消息:" + i).getBytes());producer.send(message,// 自定义选择Queuenew MessageQueueSelector() {/**** @param list 当前topic里所有的queue* @param message 要发送的消息* @param o 对应到 send() 里的 args参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 根据传入的参数决定QueueMessageQueue messageQueue = list.get((Integer)o);return messageQueue;}}, 0, 3000);}System.out.println("发送完成");}}

consumer

public class ConsumerOrder {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumerOrder");consumer.setNamesrvAddr("192.168.52.11:9876");consumer.subscribe("monkeyOrderMsgTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody()) + "current Thread:" + Thread.currentThread().getName());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("consumer start .....");}}

测试结果:

总结:

你们应该如何保证消息的顺序?

同一topic

同一个QUEUE

发消息的时候一个线程去发送消息

消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly

多个queue 只能保证单个queue里的顺序

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。