rocketmq怎么发送消息可参考我的上一篇博客:rocketmq发送第一条消息。此处我们讲解如何发送rocketmq顺序消息
producer
public class ProducerOrder {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("testGrp");// 设置nameserver地址 nameserver具备路由功能(发现服务,有点注册中心的意思),让其分配合理的broker来进行消息发送producer.setNamesrvAddr("192.168.52.11:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 20; i++) {Message message = new Message("monkeyOrderMsgTopic", ("这是顺序消息:" + i).getBytes());producer.send(message,// 自定义选择Queuenew MessageQueueSelector() {/**** @param list 当前topic里所有的queue* @param message 要发送的消息* @param o 对应到 send() 里的 args参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 根据传入的参数决定QueueMessageQueue messageQueue = list.get((Integer)o);return messageQueue;}}, 0, 3000);}System.out.println("发送完成");}}
consumer
public class ConsumerOrder {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumerOrder");consumer.setNamesrvAddr("192.168.52.11:9876");consumer.subscribe("monkeyOrderMsgTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody()) + "current Thread:" + Thread.currentThread().getName());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("consumer start .....");}}
测试结果:
总结:
你们应该如何保证消息的顺序?
同一topic
同一个QUEUE
发消息的时候一个线程去发送消息
消费的时候 一个线程 消费一个queue里的消息或者使用MessageListenerOrderly
多个queue 只能保证单个queue里的顺序