300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

时间:2020-02-20 03:02:47

相关推荐

rocketmq 消息 自定义_跟我学RocketMQ[1-4]之消息消费及支持spring

博客地址:朝·闻·道​

本文我将继续讲解如何使用DefaultMQPushConsumer对RocketMQ中的消息进行消费,同时在文章的第二部分将继续带领读者朋友对DefaultMQPushConsumer进行薄封装,让我们在Spring中更容易对消息进行消费。DefaultMQPushConsumer不区分普通消息和事务消息,即我们能够利用DefaultMQPushConsumer实现对普通消息和事务消息的消费。

通过DefaultMQProducer消费消息

首先,声明一个DefaultMQPushConsumer客户端,并通过构造器初始化,构造参数为消费者组。官方建议消费者组以“CID_”开头。

DefaultMQPushConsumer consumer =

new DefaultMQPushConsumer("CID_SNOWALKER");

设置NameServer地址

defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

设置Consumer第一次启动从队列头部开始消费

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

设置消费模式为集群方式,CLUSTERING模式下每条消息只会被一个Consumer消费一次,如果设置为BROADCASTING则为广播模式,每个消费者都会将消息消费至少一次。一般我们使用的均为CLUSTERING模式。

defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

注册消息监听器,这里需要实现MessageListenerConcurrently接口,并实现consumeMessage(List msgs, ConsumeConcurrentlyContext context) 方法,我这里的demo是lambda形式,实际上是一样的。如果你不喜欢lambda形式,可以继续使用匿名内部类或者自行定义一个类实现该接口。

defaultMQPushConsumer.registerMessageListener(

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

)

这里注意,当消费逻辑执行成功,则返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,后续将不再对该消息进行消费。如果消费逻辑失败,则需要设置为ConsumeConcurrentlyStatus.RECONSUME_LATER, RocketMQ会对消息进行重新推送,默认推送16次,目的是尽量保证消息消费成功,如果达到最大重试次数,还是失败则进入死信队列,等待人工干预。

调用start()方法,启动对队列的监听,开始进行消息的消费。

defaultMQPushConsumer.start();

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

-01-23 09:55:25.022 INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher :

消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

-01-23 09:55:27.519 INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer :

当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到broker推送消息至消费端,并且被成功消费。

Spring框架整合DefaultMQPushConsumer

我们仍然基于Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE 对DefaultMQPushConsumer进行整合,相关代码已经上传至github

这里对核心代码进行讲解。

首先定义RocketMQPushConsumerAgent.java并将其声明为spring的bean,作用域为prototype,即多例形式。

@Scope("prototype")

@Component

public class RocketMQPushConsumerAgent {

声明消息监听器及消息消费者

private MessageListenerConcurrently messageListener;

private DefaultMQPushConsumer defaultMQPushConsumer;

init()方法为核心的初始化逻辑,在该方法中,初始化了DefaultMQPushConsumer,并设置NameServer地址、消费模式以及将外部实现的监听器设置给内部的messageListener引用。

接着对消息主题进行订阅,对该主题下所有的消息进行监听,这里有待优化,后续将把消息的过滤表达式也暴露给调用者。

所有的配置参数均通过RocketMQConsumerConfig进行设置,保证接口的整洁性,RocketMQConsumerConfig将在附录中进行简单讲解。

public RocketMQPushConsumerAgent init(RocketMQConsumerConfig consumerConfig, MessageListenerConcurrently messageListener) throws MQClientException {

defaultMQPushConsumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());

defaultMQPushConsumer.setNamesrvAddr(consumerConfig.getNameSrvAddr());

defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 消费模式

if (consumerConfig.getMessageModel() != null) {

defaultMQPushConsumer.setMessageModel(consumerConfig.getMessageModel());

}

// 注册监听器

this.messageListener = messageListener;

defaultMQPushConsumer.registerMessageListener(this.messageListener);

defaultMQPushConsumer.subscribe(consumerConfig.getTopic(), "*");

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端组装完成");

return this;

}

独立的启动方法

public void start() throws MQClientException {

this.defaultMQPushConsumer.start();

}

独立的关闭方法

public void destroy() {

defaultMQPushConsumer.shutdown();

LOGGER.debug("com.shield.job.message.rocketmq.RocketMQConsumerAgent消费者客户端[已关闭]");

}

为方便外部对消费者进行进一步的自定义设置,提供外部获取defaultMQPushConsumer的接口。

public DefaultMQPushConsumer getConsumer() {

return defaultMQPushConsumer;

}

RocketMQPushConsumerAgent使用案例

仍然依据开头的示例进行改造。

@Component

public class DemoConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

使用@Resource(name = “rocketMQPushConsumerAgent”)或者直接@Autowired将自定义的消息消费者注入。

@Resource(name = "rocketMQPushConsumerAgent")

RocketMQPushConsumerAgent rocketMQConsumerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQConsumerAgent的初始化。当spring在加载过程中,DemoConsumer初始化之前会调用该init()方法初始化rocketMQConsumerAgent。通过start()链式调用,启动消息消费者,内部是调用的defaultMQPushConsumer.start()方法。

@PostConstruct

void init() {

try {

rocketMQConsumerAgent.init(

new RocketMQConsumerConfig(

"snowalker-consumer-group",

"172.30.83.100:9876",

"SNOWALKER_TEST",

MessageModel.CLUSTERING),

(msgs, context) -> {

for (MessageExt msg : msgs) {

String realMessage = new String(msg.getBody());

LOGGER.info("当前消费线程名={}, 消息id={}, 收到消息为={}",

Thread.currentThread().getName(),

msg.getMsgId(),

realMessage);

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

).start();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者完成");

} catch (MQClientException e) {

e.printStackTrace();

LOGGER.info("DemoConsumer 初始化RocketMQ简单消息消费者失败");

}

}

}

在init()方法中同时将消息监听器的实现逻辑注入,消费者会加载该接口的实现。

附录:RocketMQConsumerConfig配置类

public class RocketMQConsumerConfig {

/**消费者组*/

private String consumerGroup;

/**nameServer地址*/

private String nameSrvAddr;

/**消息消费主题*/

private String topic;

private MessageModel messageModel;

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

}

public RocketMQConsumerConfig(String consumerGroup, String nameSrvAddr, String topic, MessageModel messageModel) {

Preconditions.checkNotNull(consumerGroup);

Preconditions.checkNotNull(nameSrvAddr);

Preconditions.checkNotNull(topic);

Preconditions.checkNotNull(messageModel);

this.consumerGroup = consumerGroup;

this.nameSrvAddr = nameSrvAddr;

this.topic = topic;

this.messageModel = messageModel;

}

public String getConsumerGroup() {

return consumerGroup;

}

public String getNameSrvAddr() {

return nameSrvAddr;

}

public String getTopic() {

return topic;

}

public MessageModel getMessageModel() {

return messageModel;

}

}

该配置类封装了消费者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。