300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RabbitMQ中Fanout交换机的使用

RabbitMQ中Fanout交换机的使用

时间:2022-06-03 15:07:48

相关推荐

RabbitMQ中Fanout交换机的使用

交换机的基本原理

案例

由于Fanout类型的交换机原理类似于广播的模式,所以需要先启动消息的消费者以避免消息的丢失。

接受方:

消费者1:

public class Reseive {public static void main(String[] args) {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.79.140");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");/**通道资源和连接资源不需要关闭* 关闭小概率会抛出异常* */Connection connection=null;Channel channel=null;try {connection=factory.newConnection();channel=connection.createChannel();/**由于Fanout类型的交换机的消息类似于广播模式,所以它不需要绑定RountingKey* 而又可能会有多个消息消费者来接受队列中的数据,因此从队列时要创建一个随机的队列名称。** 没有参数的queueDeclare函数会创建一个名字随机的队列* 该队列的数据是非持久化的、自动删除的、排外的(即只允许有一个监听者)** getQueue()函数用于获取该队列的名称* */String queueName= channel.queueDeclare().getQueue();/**创建交换机* */channel.exchangeDeclare("fanoutExchange","fanout",true);/**由于Fanout类型的交换机不需要绑定RoutingKey所以这里为空字符串* */channel.queueBind(queueName,"fanoutExchange","");/**获取消息* */channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){/**获取队列中消息函数* */@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message =new String(body);System.out.println("消息消费者1:"+message);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}}

消费者2:

public class Reseive02 {public static void main(String[] args) {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.79.140");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection=null;Channel channel=null;try {connection=factory.newConnection();channel=connection.createChannel();String queueName= channel.queueDeclare().getQueue();channel.exchangeDeclare("fanoutExchange","fanout",true);channel.queueBind(queueName,"fanoutExchange","");channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){/**获取队列中消息函数* */@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message =new String(body);System.out.println("消息消费者2:"+message);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}}

执行结果:

从结果中可以看到它会随机的生成一个队列名称且创建的交换机绑定了队列中。当没有消息的消费者监听到这个队列时会自动删除。

发送方:

由于使用了Fanout交换机,所以消息的接受方可能会有多个因此不建议在消息发送的时候创建队列以及绑定交换机。但必须确保交换机存在。

public class Send {public static void main(String[] args) {ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.79.140");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection=null;Channel channel=null;try {connection=factory.newConnection();channel=connection.createChannel();channel.exchangeDeclare("fanoutExchange","fanout",true);/**发送消息* */String message="Fanout交换机测试消息";/**发送消息到队列* */channel.basicPublish("fanoutExchange","",null,message.getBytes("utf-8"));System.out.println("消息发送成功");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}finally{if(channel!=null){try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if(connection!=null){try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}

执行结果:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。