300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > rabbitmq4-工作队列及公平分发模式

rabbitmq4-工作队列及公平分发模式

时间:2020-11-05 04:31:18

相关推荐

rabbitmq4-工作队列及公平分发模式

建议大家如果没有看前一篇文章的时候,还是看一看第一篇文章,因为上篇文章的确把很多的概念都讲解的比较清楚。我发现有很多东西在单独使用rabbitmq是做不了的,例如自定义message投递的id,所以我希望快速的把这几篇介绍的博文写完,然后进入springboot的整合篇,但是我不建议新手一上来就开始使用springboot的整合,就想我在群里面听到的,不知道channel为何物更别提其他的概念了,只有一个稳扎稳打的基础在往高级的地方学习的时候才不费力。

一、简单工作队列

我想大概这种模式的应用场景也就剩下了应用层面的解耦了吧,话不多话,下面直接用代码展示

二、生产者代码:

public class Producer {public static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException{final Connection conn = ConnUtils.getConn();final Channel channel = conn.createChannel();boolean durable = true;boolean exclusive = false;boolean autoDelete = false;channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 这个目前在单独使用rabbitmq的时候没有办法找到自定义这个消息标识的办法,但是在和springboot整合之后会提供这样的方法System.out.println(multiple);System.out.println("wtf 需要这么热吗:::::"+deliveryTag);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("啊哈哈,你被拒绝了……");}});// 这个地方也可以搞一个线程来进行发送channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 ".getBytes());channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +1".getBytes());channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +2".getBytes());channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +3".getBytes());channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +4".getBytes());channel.basicPublish("",QUEUE_NAME,null,"fuck 真他妈的热 +5".getBytes());channel.close();conn.close();}}

三、两个消费者(只需要把代码拷贝一份就可以了)

public class Consumer01 {public static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection conn = ConnUtils.getConn();final Channel channel = conn.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();System.out.println("Recv001"+"message == "+new String(body,"utf-8"));channel.basicAck(deliveryTag,false);}};channel.basicConsume(QUEUE_NAME,false,consumer);}}

先启动两个消费者,因为消息太少,如果先启动生产者,在启动消费者,一个消费者立马就消费完了。

四、结果分析

我们发现两个消费者总是已奇偶的形式出现的,加入两个消费者的消费能力不一样,消费者1消费能力比较高,但是以这种模式的话,那么整个系统的消费能力的上线就有比较弱的消费者2来决定了。所以下面介绍一种公平分发模式:公平指的是能者多劳

我们在channel申明的下面加一行代码:我们分别设置consumer1的消费能力为3,consumer2的消费者能力为1

/*** prefetchCount:告诉MQ不要同时给一个消费者推送超过prefetchCount个消息,* 即一点prefetchCount个消息没有应答,该消费者就会发生阻塞* global:指的是该设置是针对该consumer还是针对channel级别*/channel.basicQos(3,false);

下面我们在观察结果:

我们可以看到奇偶的模式不见了,而且消费者1的吞吐量是大于消费者2的

本节到这里就结束了,有很多的介绍希望大家多去看看前面的文章。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。