300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RabbitMq——扇出(fanout)交换机

RabbitMq——扇出(fanout)交换机

时间:2020-02-23 18:01:02

相关推荐

RabbitMq——扇出(fanout)交换机

扇出(fanout)交换机是将接收到到消息广播给它知道的所有队列,从而实现生产者发送一条消息,可以供多个消费者消费。

案例:一个生产者发送一条消息,通过扇出交换机,广播给两个消费者消费,如下图:

1、创建连接获取信道RabbitUtils工具类,linux上需要关闭防火墙才可以连接上

//连接工厂,创建信道工具类public class RabbitUtils {// 得到一个连接的 channelpublic static Channel getChannel() throws Exception {// 创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.23.129");factory.setUsername("user");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}

2、创建生产者发送消息

(1)调用工具类连接rabbitmq,获得信道;

(2)调用exchangeDeclare(交换机名称,交换机类型)方法声明交换机,此处交换机位fanout类型;

(3)调用basicPublish(交换机名称,交换机与队列连接路由,其他参数,消息体)方法发送消息。

public class Producer {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHANGE,"fanout");//发送消息体String message="你好,fanout型交换机";//发送消息channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:["+message+"]");}}

3、创建消费者接受消息

(1)调用工具类连接rabbitmq,获得信道;

(2)调用queueDeclare(队列名,是否持久化,是否只供一个消费者消费,是否自动删除,其他参数)方法声明队列;

(3)调用queueBind(队列名,交换机名,队列与交换机连接路由)绑定交换机与队列;

(4)调用 basicConsume(队列名,是否自动应答,接收成功回调,接收失败回调)消费消息。

public class Consumer1 {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";//接收消息public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//队列名称String queueName="queue1";//声明队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列与交换机channel.queueBind(queueName,FANOUT_EXCHANGE,"");//接收消息回调函数DeliverCallback deliverCallback=(deliverTag,message)->{System.out.println("接收到从队列"+queueName+"传来到消息:"+new String(message.getBody()));};//消息中断回调函数CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息接收中断");};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}}

public class Consumer2 {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";//接收消息public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//队列名称String queueName="queue2";//声明队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列与交换机channel.queueBind(queueName,FANOUT_EXCHANGE,"");//接收消息回调函数DeliverCallback deliverCallback=(deliverTag,message)->{System.out.println("接收到从队列"+queueName+"传来到消息:"+new String(message.getBody()));};//消息中断回调函数CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息接收中断");};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}}

4、测试,启动生产者和两个消费者

生产者发送一条消息,被两个消费者同时接收到,即fanout型交换机以广播形式将消息发送给所有它知道的队列

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。