最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量
1、启动时装配监控客户端的bean
@Componentpublic class MQAdminExtConfig {private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class);@Value("${rocketmq.name-server}")private String nameServer;public static DefaultMQAdminExt defaultMQAdminExt;/*** 启动监控客户端*/@PostConstructpublic void initMqAdminExtConfig(){//初始化一个生产者,用于初始化参数log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer);try {DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000");producer.setNamesrvAddr(nameServer);producer.start();} catch (MQClientException e) {e.printStackTrace();}try {defaultMQAdminExt = new DefaultMQAdminExt();defaultMQAdminExt.setNamesrvAddr(nameServer);defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));defaultMQAdminExt.start();} catch (Exception e) {e.printStackTrace();}}}
/*** * @param consumerGroup 消费者组* @param topic topic* @return当前topic的积压量*/private static long getBackLogMsg(String consumerGroup,String topic){long diff=0;log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic);try {ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup);List<MessageQueue> mqList = new LinkedList();mqList.addAll(consumeStats.getOffsetTable().keySet());Collections.sort(mqList);for(MessageQueue queue :mqList){if(topic.equals(queue.getTopic())){OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue);log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset());log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset());diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();}}} catch (Exception e) {//当消费者未消费时此除会报错diff=0;log.error("get offset error -----------------{}",e);}return diff;}
这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来
上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是
关于%RETRY%开头的topic,
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢
修改
真实使用过程中还是出现了一些问题,即某些情况下
diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
这个diff会返回0,这里直接修改成
diff=puteTotalDiff();