扇出(fanout)交换机是将接收到到消息广播给它知道的所有队列,从而实现生产者发送一条消息,可以供多个消费者消费。
案例:一个生产者发送一条消息,通过扇出交换机,广播给两个消费者消费,如下图:
1、创建连接获取信道RabbitUtils工具类,linux上需要关闭防火墙才可以连接上
//连接工厂,创建信道工具类public class RabbitUtils {// 得到一个连接的 channelpublic static Channel getChannel() throws Exception {// 创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.23.129");factory.setUsername("user");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}
2、创建生产者发送消息
(1)调用工具类连接rabbitmq,获得信道;
(2)调用exchangeDeclare(交换机名称,交换机类型)方法声明交换机,此处交换机位fanout类型;
(3)调用basicPublish(交换机名称,交换机与队列连接路由,其他参数,消息体)方法发送消息。
public class Producer {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHANGE,"fanout");//发送消息体String message="你好,fanout型交换机";//发送消息channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:["+message+"]");}}
3、创建消费者接受消息
(1)调用工具类连接rabbitmq,获得信道;
(2)调用queueDeclare(队列名,是否持久化,是否只供一个消费者消费,是否自动删除,其他参数)方法声明队列;
(3)调用queueBind(队列名,交换机名,队列与交换机连接路由)绑定交换机与队列;
(4)调用 basicConsume(队列名,是否自动应答,接收成功回调,接收失败回调)消费消息。
public class Consumer1 {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";//接收消息public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//队列名称String queueName="queue1";//声明队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列与交换机channel.queueBind(queueName,FANOUT_EXCHANGE,"");//接收消息回调函数DeliverCallback deliverCallback=(deliverTag,message)->{System.out.println("接收到从队列"+queueName+"传来到消息:"+new String(message.getBody()));};//消息中断回调函数CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息接收中断");};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}}
public class Consumer2 {//交换机名字public static final String FANOUT_EXCHANGE="fanout_exchange";//接收消息public static void main(String[] args) throws Exception {//获得连接信道Channel channel = RabbitUtils.getChannel();//队列名称String queueName="queue2";//声明队列channel.queueDeclare(queueName,false,false,false,null);//绑定队列与交换机channel.queueBind(queueName,FANOUT_EXCHANGE,"");//接收消息回调函数DeliverCallback deliverCallback=(deliverTag,message)->{System.out.println("接收到从队列"+queueName+"传来到消息:"+new String(message.getBody()));};//消息中断回调函数CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息接收中断");};//接收消息channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}}
4、测试,启动生产者和两个消费者
生产者发送一条消息,被两个消费者同时接收到,即fanout型交换机以广播形式将消息发送给所有它知道的队列