diff --git a/rabbitmq/src/main/java/org/example/config/DirectRabbitConfig.java b/rabbitmq/src/main/java/org/example/config/DirectRabbitConfig.java index 40fd9c7..c585a37 100644 --- a/rabbitmq/src/main/java/org/example/config/DirectRabbitConfig.java +++ b/rabbitmq/src/main/java/org/example/config/DirectRabbitConfig.java @@ -11,6 +11,7 @@ public class DirectRabbitConfig { public final static String DIRECT_ONE = "direct.one"; public final static String DIRECT_TWO= "direct.two"; + public final static String DIRECT_ONE_COPY = "direct.one.copy"; public final static String DIRECT_EXCHANGE= "directExchange1"; @Bean @@ -18,6 +19,11 @@ public class DirectRabbitConfig { return new Queue(DIRECT_ONE); } + @Bean + public Queue directQueueOneCopy() { + return new Queue(DIRECT_ONE_COPY); + } + @Bean public Queue directQueueTwo(){ return new Queue(DIRECT_TWO); @@ -33,11 +39,16 @@ public class DirectRabbitConfig { return BindingBuilder.bind(directQueueOne).to(directExchange).with("direct.one"); } + @Bean + Binding bindingDirectExchangeOneCopy(Queue directQueueOneCopy, DirectExchange directExchange){ + return BindingBuilder.bind(directQueueOneCopy).to(directExchange).with("direct.one"); + } + @Bean Binding bindingDirectExchangeTwo(Queue directQueueTwo, DirectExchange directExchange){ //# 表示零个或多个词 //* 表示一个词 - return BindingBuilder.bind(directQueueTwo).to(directExchange).with("direct.#"); + return BindingBuilder.bind(directQueueTwo).to(directExchange).with("direct.two"); } } \ No newline at end of file diff --git a/rabbitmq/src/main/java/org/example/controller/MqDirectController.java b/rabbitmq/src/main/java/org/example/controller/MqDirectController.java index 3387972..fd76db0 100644 --- a/rabbitmq/src/main/java/org/example/controller/MqDirectController.java +++ b/rabbitmq/src/main/java/org/example/controller/MqDirectController.java @@ -1,5 +1,7 @@ package org.example.controller; +import cn.hutool.core.util.RandomUtil; +import org.example.config.DirectRabbitConfig; import org.example.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -12,19 +14,15 @@ import org.springframework.web.bind.annotation.RestController; public class MqDirectController { @Autowired private RabbitTemplate rabbitTemplate; - @GetMapping("/simple") - public void sendSimple(){ - this.rabbitTemplate.convertAndSend("topic.one","abc"); - } @GetMapping("/one") public void sendOne(){ - this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one","abc"); + this.rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE,"direct.one", RandomUtil.randomString(10)); } @GetMapping("/two") public void sendTwo(){ - this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",System.currentTimeMillis()); + this.rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE,"direct.two",System.currentTimeMillis()); } } diff --git a/rabbitmq/src/main/java/org/example/controller/MqFanoutController.java b/rabbitmq/src/main/java/org/example/controller/MqFanoutController.java new file mode 100644 index 0000000..4d2cebe --- /dev/null +++ b/rabbitmq/src/main/java/org/example/controller/MqFanoutController.java @@ -0,0 +1,25 @@ +package org.example.controller; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.RandomUtil; +import org.example.config.TopicRabbitConfig; +import org.example.entity.mq.MqMessage; +import org.example.listener.Listener; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/fanout") +public class MqFanoutController { + @Autowired + private RabbitTemplate rabbitTemplate; + + @GetMapping("/add") + public void add() { + this.rabbitTemplate.convertAndSend("fanoutExchange","", RandomUtil.randomString(10)); + } +} diff --git a/rabbitmq/src/main/java/org/example/controller/MqTopicController.java b/rabbitmq/src/main/java/org/example/controller/MqTopicController.java index 5805d74..a357ccb 100644 --- a/rabbitmq/src/main/java/org/example/controller/MqTopicController.java +++ b/rabbitmq/src/main/java/org/example/controller/MqTopicController.java @@ -4,9 +4,7 @@ import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; import org.example.config.TopicRabbitConfig; import org.example.entity.mq.MqMessage; -import org.example.listener.AutoCreateListener; -import org.springframework.amqp.AmqpException; -import org.springframework.amqp.core.Message; +import org.example.listener.Listener; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -37,11 +35,12 @@ public class MqTopicController { @GetMapping("/autoCreateQueue") public void autoCreateQueue() { + //设置唯一id,用于防重复消费 MessagePostProcessor postProcessor = message -> { message.getMessageProperties().setHeader("unique-id", IdUtil.fastSimpleUUID()); return message; }; - this.rabbitTemplate.convertAndSend(AutoCreateListener.EXCHANGE, AutoCreateListener.KEY, + this.rabbitTemplate.convertAndSend(Listener.EXCHANGE, Listener.KEY, new MqMessage(RandomUtil.randomString(3), RandomUtil.randomString(5)), postProcessor); } } diff --git a/rabbitmq/src/main/java/org/example/listener/AutoCreateListener.java b/rabbitmq/src/main/java/org/example/listener/AutoCreateListener.java deleted file mode 100644 index 101b8c2..0000000 --- a/rabbitmq/src/main/java/org/example/listener/AutoCreateListener.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.example.listener; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.example.entity.mq.MqMessage; -import org.springframework.amqp.core.ExchangeTypes; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.stereotype.Component; - -@Component -@Slf4j -public class AutoCreateListener { - public static final String QUEUE = "autoCreateQueue"; - public static final String EXCHANGE = "autoCreateExchange"; - public static final String KEY = "autoCreate"; - - @RabbitListener(bindings = @QueueBinding( - value = @Queue(value = QUEUE, durable = "true", autoDelete = "false"), - exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.TOPIC), - key = {KEY} - ), concurrency = "4", ackMode = "MANUAL") - public void process(Channel channel, Message message, @Payload MqMessage body ) throws Exception { - MessageProperties properties = message.getMessageProperties(); - log.info("收到消息:{},{},{},{}", properties.getDeliveryTag(), properties.getHeader("unique-id"), properties.getReceivedRoutingKey(), new ObjectMapper().writeValueAsString(body)); - channel.basicAck(properties.getDeliveryTag(), false); - } -} diff --git a/rabbitmq/src/main/java/org/example/listener/Listener.java b/rabbitmq/src/main/java/org/example/listener/Listener.java new file mode 100644 index 0000000..c8b08c8 --- /dev/null +++ b/rabbitmq/src/main/java/org/example/listener/Listener.java @@ -0,0 +1,82 @@ +package org.example.listener; + +import cn.hutool.core.util.RandomUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.example.config.DirectRabbitConfig; +import org.example.entity.mq.MqMessage; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class Listener { + public static final String QUEUE = "autoCreateQueue"; + public static final String EXCHANGE = "autoCreateExchange"; + public static final String KEY = "autoCreate"; + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = QUEUE, durable = "true", autoDelete = "false"), + exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.TOPIC), + key = {KEY} + ), concurrency = "4", ackMode = "MANUAL") + public void autoCreateQueue(Channel channel, Message message, @Payload MqMessage body) throws Exception { + MessageProperties properties = message.getMessageProperties(); + log.info("收到消息:{},{},{},{}", properties.getDeliveryTag(), properties.getHeader("unique-id"), properties.getReceivedRoutingKey(), new ObjectMapper().writeValueAsString(body)); + channel.basicAck(properties.getDeliveryTag(), false); + } + + @RabbitListener(queues = DirectRabbitConfig.DIRECT_ONE, ackMode = "AUTO") + public void directOneQueue(Message message, @Payload String body) { + log.info("directOne收到消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + } + + @RabbitListener(queues = DirectRabbitConfig.DIRECT_ONE_COPY, ackMode = "AUTO") + public void directOneCopyQueue(Message message, @Payload String body) { + log.info("directOneCopy收到消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + } + + @RabbitListener(queues = DirectRabbitConfig.DIRECT_TWO, ackMode = "AUTO") + public void directTwoQueue(Message message, @Payload String body) { + log.info("directTwo收到消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + } + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "fanout1Queue", durable = "true", autoDelete = "false"), + exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT) + ), ackMode = "AUTO") + public void fanout1(Message message, @Payload String body) { + log.info("fanout1收到广播消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + } + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "fanout1Queue", durable = "true", autoDelete = "false"), + exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT) + ), ackMode = "AUTO") + public void fanout11(Message message, @Payload String body) { + log.info("fanout11收到广播消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + } + + /** + * 测试AUTO ack + */ + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = "fanout2Queue", durable = "true", autoDelete = "false"), + exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT) + ), ackMode = "AUTO") + public void fanout2(Message message, @Payload String body) { + log.info("fanout2收到广播消息:{},{}", message.getMessageProperties().getDeliveryTag(), body); + if (RandomUtil.randomInt() % 2 == 1) { + throw new RuntimeException("error"); + } + } + +}