300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

时间:2022-10-29 04:59:47

相关推荐

producer send源码_RocketMq系列之Producer顺序消息发送源码分析(四)

有序消息

消息有序指的是可以按照消息的发送顺序来消费。

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。

顺序消息生产者

public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest2", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息时,需要实现MessageQueueSelector , 用来选择合适的queueSendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;// int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}

上面实现的顺序消息时,通过orderId来进行顺序消息,同一个订单ID的消息,发送到同一个Queue里面

顺序消息消费者

public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");// 设置NameServer地址consumer.setNamesrvAddr("localhost:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}

需要注意,registerMessageListener注册的消息监听器 , 需要使用MessageListenerOrderly,ConsumeOrderlyContext, 不可以使用

MessageListenerConcurrently , ConsumeConcurrentlyContext , 否则消费的顺序无法保证。

源码分析

/*** @param msg 消息* @param selector 消息队列选择器* @param arg 分片值 (类似分库分表里面的分片键)*/@Overridepublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg, selector, arg);}

实际发送

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);}private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 1. 获取topic信息,TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {// 2. 获取当前topic的内部队列信息List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 复制一个消息Message userMessage = MessageAccessor.cloneMessage(msg);// topic信息String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//3. 获取消息队列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 获取到队列了,执行发送消息, 跟普通消息的发送一样的return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);}

步骤说明:

获取当前topic的信息,内部包含消息队列获取topic内部的队列信息获取消息队列,这个其实就是顺序消息实现的核心点selector.select(messageQueueList, userMessage, arg),通过自定义的消息队列选择器,返回相应的队列。 内部完全自定义获取到了消息队列之后,执行发送消息,跟普通消息一样,这里就没有什么重试的说法了。

总结: 顺序消息的核心就是将你希望按照顺序的消息,通过某种特定的条件,计算发送到对应的队列里面去。

顺序消息的缺点:

送顺序消息无法利用集群的Failover特性,因为不能更换broker,MessageQueue进行重试存在队列热点问题,当一个场景下消息非常多的情况,会导致个别队列非常繁忙消费失败时无法跳过, 会导致消费停止消息的并行度依赖于对列数量,不过可以增加队列数量,动态调整

思考: 通过上面那种顺序消息的模式,在broker发生宕机 , 队列数量发生变化时,会造成消费乱序

比如在多master集群的情况下 ,

topic: TP_TEST 总共8个队列MASTER-1 : 1,2,3,4MASTER-2 : 5,6,7,8

一个topic分别在多个master上面有队列, 如果其中一个master宕机了,那么队列数会变成4个,那么顺序消息通过 orderId % queueSize 的这种方式,会造成原来往一个队列里面发送的,会发送到另外一个队列里面去,造成消费乱序。

所以如果是要严格的顺序消息,则不要使用rocketMq, 在极端情况下会造成消费乱序。

/r/eC-YwJDE7s2RrdSj93pq (二维码自动识别)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。