RabbitMQ 的扇出(fanout)交换机是将接收到到消息广播给它知道的所有队列,从而实现生产者发送一条消息,可以供多个消费者消费。
我们的计划是这样的:
创建一个交换机business.test.exchange.fanout
,类型是Fanout创建两个队列business.test.queue1
和business.test.queue2
,绑定到交换机business.test.exchange.fanout
pom
<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
yml
server:port: 8080spring:rabbitmq:host: **.***.**.***port: 5672username: **password: **
config
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig {/** 创建一个交换机 类型是fanout ,创建两个队列 绑定到该交换机上 *//*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("business.test.exchange.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("business.test.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("business.test.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalTime;/*** @author: Snow* @date: /1/5* *************************************************** 修改记录(时间--修改人--修改说明):*/@RestController@RequestMapping("/fanout")public class SendController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(){// 队列名称String exchangeName = "business.test.exchange.fanout";// 消息String message = "hello, fanout! " + LocalTime.now();rabbitTemplate.convertAndSend(exchangeName, "", message);return message;}}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;/*** @author: Snow* @date: /1/5* *************************************************** 修改记录(时间--修改人--修改说明):*/@Componentpublic class Consumer {@RabbitListener(queues = "business.test.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "business.test.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}}
测试结果
访问 http://localhost:8080/fanout/send
控制台打印
消费者1接收到Fanout消息:【hello, fanout! 09:06:01.540】消费者2接收到Fanout消息:【hello, fanout! 09:06:01.540】