300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rocketmq消息积压监控java代码实现

rocketmq消息积压监控java代码实现

时间:2022-10-09 05:50:41

相关推荐

rocketmq消息积压监控java代码实现

最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量

1、启动时装配监控客户端的bean

@Componentpublic class MQAdminExtConfig {private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);@Value("${rocketmq.name-server}")private String nameServer;public static DefaultMQAdminExt defaultMQAdminExt;/*** 启动监控客户端*/@PostConstructpublic void initMqAdminExtConfig(){//初始化一个生产者,用于初始化参数log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);try {DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");producer.setNamesrvAddr(nameServer);producer.start();} catch (MQClientException e) {e.printStackTrace();}try {defaultMQAdminExt = new DefaultMQAdminExt();defaultMQAdminExt.setNamesrvAddr(nameServer);defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));defaultMQAdminExt.start();} catch (Exception e) {e.printStackTrace();}}}

/*** * @param consumerGroup 消费者组* @param topic topic* @return当前topic的积压量*/private static long getBackLogMsg(String consumerGroup,String topic){long diff=0;log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);try {ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);List<MessageQueue> mqList = new LinkedList();mqList.addAll(consumeStats.getOffsetTable().keySet());Collections.sort(mqList);for(MessageQueue queue :mqList){if(topic.equals(queue.getTopic())){OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();}}} catch (Exception e) {//当消费者未消费时此除会报错diff=0;log.error("get offset error -----------------{}",e);}return diff;}

这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来

上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是

关于%RETRY%开头的topic

consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。

但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢

修改

真实使用过程中还是出现了一些问题,即某些情况下

diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();

这个diff会返回0,这里直接修改成

diff=puteTotalDiff();

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。