300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

时间:2023-03-27 16:21:48

相关推荐

rocketmq 消费方式_RocketMQ事务消费和顺序消费详解

一、RocketMq有3中消息类型

1.普通消费

2. 顺序消费

3.事务消费

顺序消费场景

在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。

rocketMq实现顺序消费的原理

produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

1、Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.mon.message.Message;importcom.mon.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

2、Consumer.java

packageorder;importjava.util.List;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.atomic.AtomicLong;importcom.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;importcom.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;importcom.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.mon.consumer.ConsumeFromWhere;importcom.mon.message.MessageExt;/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");

consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

结果如下图所示:

这个五条数据被顺序消费了

多个节点(Producer端1个、Consumer端2个)

Producer.java

packageorder;importjava.util.List;importcom.alibaba.rocketmq.client.exception.MQBrokerException;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQProducer;importcom.alibaba.rocketmq.client.producer.MessageQueueSelector;importcom.alibaba.rocketmq.client.producer.SendResult;importcom.mon.message.Message;importcom.mon.message.MessageQueue;importcom.alibaba.rocketmq.remoting.exception.RemotingException;/*** Producer,发送顺序消息*/

public classProducer {public static voidmain(String[] args) {try{

DefaultMQProducer producer= new DefaultMQProducer("order_Producer");

producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");

producer.start();//String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",//"TagE" };

for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},0);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},1);

System.out.println(sendResult);

}for (int i = 1; i <= 5; i++) {

Message msg= new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " +i).getBytes());

SendResult sendResult= producer.send(msg, newMessageQueueSelector() {public MessageQueue select(Listmqs, Message msg, Object arg) {

Integer id=(Integer) arg;int index = id %mqs.size();returnmqs.get(index);

}

},2);

System.out.println(sendResult);

}

producer.shutdown();

}catch(MQClientException e) {

e.printStackTrace();

}catch(RemotingException e) {

e.printStackTrace();

}catch(MQBrokerException e) {

e.printStackTrace();

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

Consumer1.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer1 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer1 Started.");

}

}

Consumer2.java

/*** 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)*/

public classConsumer2 {public static void main(String[] args) throwsMQClientException {

DefaultMQPushConsumer consumer= new DefaultMQPushConsumer("order_Consumer");

consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicOrderTest", "*");/*** 实现了MessageListenerOrderly表示一个队列只会被一个线程取到

*,第二个线程无法访问这个队列*/consumer.registerMessageListener(newMessageListenerOrderly() {

AtomicLong consumeTimes= new AtomicLong(0);public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) {//设置自动提交

context.setAutoCommit(true);for(MessageExt msg : msgs) {

System.out.println(msg+ ",内容:" + newString(msg.getBody()));

}try{

TimeUnit.SECONDS.sleep(5L);

}catch(InterruptedException e) {

e.printStackTrace();

}

;returnConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

System.out.println("Consumer2 Started.");

}

}

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息

Consumer1消费情况如图,都按照顺序执行了

Consumer2消费情况如图,都按照顺序执行了

二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事务。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。