文章目录
普通消息消息发送同步发送消息异步发送消息单向发送消息代码示例导入RocketMQ的依赖定义同步消息的发送者定义异步消息的发送者定义消费者普通消息
消息发送
同步发送消息
同步发送消息是指:Producer发出一条消息后,会在收到MQ返回的ACK后才发送下一条消息。该方式的消息可靠性最高,但是消息发送效率太低。
异步发送消息
异步发送消息是指:Producer发出消息后无须等待MQ返回ACK,直接发送下一条消息。该方式有一定可靠性,发送效率相对同步较高
单向发送消息
单向发送消息是指:Producer仅负责发送消息,不等待、不处理MQ的ACK,该发送方式MQ也不返回ACK。该方式发送效率最高,但可靠性最低。
代码示例
导入RocketMQ的依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></dependency>
定义同步消息的发送者
public class SynProducer {public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//创建一个Producer,参数为ProducerGroup名称DefaultMQProducer producer = new DefaultMQProducer("testGroup");//指定nameserver地址producer.setNamesrvAddr("Rocemq:9876");//设置失败重试次数producer.setRetryTimesWhenSendFailed(3);//设置发送超时时间为5秒producer.setSendMsgTimeout(5000);//开启生产者producer.start();//生产并发送100条消息for (int i = 0;i<100;i++){byte[] body = ("hello"+i).getBytes();//注意不要倒错包,org.mon.message.Message;Message msg = new Message("Topic","Tag",body);msg.setKeys("key:"+i);SendResult sendResult = producer.send(msg);System.out.println(sendResult);}producer.shutdown();}}
发送的状态:
SEND_OK
:发送成功FLUSH_DISK_TIMEOUT
:刷盘超时,仅在同步刷盘出现,异步不会出现FLUSH_SLAVE_TIMEOUT
:slave同步超时,同步复制会出现该状态,异步复制不会出现SLAVE_NOT_AVAILABLE
:Slave不可用,同步复制会出现该状态,异步复制不会出现
定义异步消息的发送者
public class AsyncProducer {public void Producer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test");producer.setNamesrvAddr("rocket:9876");//指定异步发送失败后不进行重试producer.setRetryTimesWhenSendFailed(0);//指定新创建的Topic的Queue数量为2producer.setDefaultTopicQueueNums(2);producer.start();for (int i =0;i<100;i++){byte []body = ("hi:"+i).getBytes();try{Message msg = new Message("topicA","tag",body);//异步发送,指定回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});//由于采用的是异步发送,若不sleep,会导致消息还未发送就会将producer给关闭,报错TimeUnit.SECONDS.sleep(3);producer.shutdown();} catch (RemotingException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}}
定义消费者
public class Consumer {public void Consumer() throws MQClientException {//定义一个pull消费者//DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("test");DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("test");pushConsumer.setNamesrvAddr("rocketmq:9876");//指定从第一条消息开始消费pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//指定topic与TagpushConsumer.subscribe("Topic","*");//指定采用"广播模式"进行消费,默认为集群模式pushConsumer.setMessageModel(MessageModel.BROADCASTING);//注册监听器pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> lists,ConsumeConcurrentlyContext context) {//逐条消费消息for (MessageExt list:lists){System.out.println(list);}//返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//开启消费者消费pushConsumer.start();System.out.println("Consumer Started");}}