300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RocketMQ快速入门之消息过滤器(用户自定义属性)

RocketMQ快速入门之消息过滤器(用户自定义属性)

时间:2022-02-03 18:53:25

相关推荐

RocketMQ快速入门之消息过滤器(用户自定义属性)

默认配置下,不支持自定义属性,需要设置开启:

#加入到broker的配置文件中enablePropertyFilter=true

package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.MessageSelector;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.message.MessageExt;import java.io.UnsupportedEncodingException;import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("learn-consumer");consumer.setNamesrvAddr("localhost:9876");// 订阅消息,接收的是所有消息// consumer.subscribe("my-topic", "*");consumer.subscribe("my-topic-filter", MessageSelector.bySql("sex='女' AND age>=18"));consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {System.out.println("消息:" + new String(msg.getBody(), "UTF-8"));}} catch (UnsupportedEncodingException e) {e.printStackTrace();}System.out.println("接收到消息 -> " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}}

package cn.learn.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.mon.message.Message;public class SyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("learn");producer.setNamesrvAddr("localhost:9876");producer.start();//发送消息String msg = "这是一个用户的消息, id = 1001";Message message = new Message("my-topic-filter", "delete", msg.getBytes("UTF-8"));message.putUserProperty("sex","女");message.putUserProperty("age","18");SendResult sendResult = producer.send(message);System.out.println("消息id:" + sendResult.getMsgId());System.out.println("消息队列:" + sendResult.getMessageQueue());System.out.println("消息offset值:" + sendResult.getQueueOffset());System.out.println(sendResult);producer.shutdown();}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。