This commit is contained in:
fleyx 2025-01-09 22:40:54 +08:00
parent e775d389c0
commit 7148ad31fb
6 changed files with 126 additions and 47 deletions

View File

@ -11,6 +11,7 @@ public class DirectRabbitConfig {
public final static String DIRECT_ONE = "direct.one";
public final static String DIRECT_TWO= "direct.two";
public final static String DIRECT_ONE_COPY = "direct.one.copy";
public final static String DIRECT_EXCHANGE= "directExchange1";
@Bean
@ -18,6 +19,11 @@ public class DirectRabbitConfig {
return new Queue(DIRECT_ONE);
}
@Bean
public Queue directQueueOneCopy() {
return new Queue(DIRECT_ONE_COPY);
}
@Bean
public Queue directQueueTwo(){
return new Queue(DIRECT_TWO);
@ -33,11 +39,16 @@ public class DirectRabbitConfig {
return BindingBuilder.bind(directQueueOne).to(directExchange).with("direct.one");
}
@Bean
Binding bindingDirectExchangeOneCopy(Queue directQueueOneCopy, DirectExchange directExchange){
return BindingBuilder.bind(directQueueOneCopy).to(directExchange).with("direct.one");
}
@Bean
Binding bindingDirectExchangeTwo(Queue directQueueTwo, DirectExchange directExchange){
//# 表示零个或多个词
//* 表示一个词
return BindingBuilder.bind(directQueueTwo).to(directExchange).with("direct.#");
return BindingBuilder.bind(directQueueTwo).to(directExchange).with("direct.two");
}
}

View File

@ -1,5 +1,7 @@
package org.example.controller;
import cn.hutool.core.util.RandomUtil;
import org.example.config.DirectRabbitConfig;
import org.example.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -12,19 +14,15 @@ import org.springframework.web.bind.annotation.RestController;
public class MqDirectController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/simple")
public void sendSimple(){
this.rabbitTemplate.convertAndSend("topic.one","abc");
}
@GetMapping("/one")
public void sendOne(){
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one","abc");
this.rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE,"direct.one", RandomUtil.randomString(10));
}
@GetMapping("/two")
public void sendTwo(){
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",System.currentTimeMillis());
this.rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE,"direct.two",System.currentTimeMillis());
}
}

View File

@ -0,0 +1,25 @@
package org.example.controller;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import org.example.config.TopicRabbitConfig;
import org.example.entity.mq.MqMessage;
import org.example.listener.Listener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/fanout")
public class MqFanoutController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/add")
public void add() {
this.rabbitTemplate.convertAndSend("fanoutExchange","", RandomUtil.randomString(10));
}
}

View File

@ -4,9 +4,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import org.example.config.TopicRabbitConfig;
import org.example.entity.mq.MqMessage;
import org.example.listener.AutoCreateListener;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.example.listener.Listener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -37,11 +35,12 @@ public class MqTopicController {
@GetMapping("/autoCreateQueue")
public void autoCreateQueue() {
//设置唯一id,用于防重复消费
MessagePostProcessor postProcessor = message -> {
message.getMessageProperties().setHeader("unique-id", IdUtil.fastSimpleUUID());
return message;
};
this.rabbitTemplate.convertAndSend(AutoCreateListener.EXCHANGE, AutoCreateListener.KEY,
this.rabbitTemplate.convertAndSend(Listener.EXCHANGE, Listener.KEY,
new MqMessage(RandomUtil.randomString(3), RandomUtil.randomString(5)), postProcessor);
}
}

View File

@ -1,36 +0,0 @@
package org.example.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.example.entity.mq.MqMessage;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AutoCreateListener {
public static final String QUEUE = "autoCreateQueue";
public static final String EXCHANGE = "autoCreateExchange";
public static final String KEY = "autoCreate";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE, durable = "true", autoDelete = "false"),
exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.TOPIC),
key = {KEY}
), concurrency = "4", ackMode = "MANUAL")
public void process(Channel channel, Message message, @Payload MqMessage body ) throws Exception {
MessageProperties properties = message.getMessageProperties();
log.info("收到消息:{},{},{},{}", properties.getDeliveryTag(), properties.getHeader("unique-id"), properties.getReceivedRoutingKey(), new ObjectMapper().writeValueAsString(body));
channel.basicAck(properties.getDeliveryTag(), false);
}
}

View File

@ -0,0 +1,82 @@
package org.example.listener;
import cn.hutool.core.util.RandomUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.example.config.DirectRabbitConfig;
import org.example.entity.mq.MqMessage;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Listener {
public static final String QUEUE = "autoCreateQueue";
public static final String EXCHANGE = "autoCreateExchange";
public static final String KEY = "autoCreate";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE, durable = "true", autoDelete = "false"),
exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.TOPIC),
key = {KEY}
), concurrency = "4", ackMode = "MANUAL")
public void autoCreateQueue(Channel channel, Message message, @Payload MqMessage body) throws Exception {
MessageProperties properties = message.getMessageProperties();
log.info("收到消息:{},{},{},{}", properties.getDeliveryTag(), properties.getHeader("unique-id"), properties.getReceivedRoutingKey(), new ObjectMapper().writeValueAsString(body));
channel.basicAck(properties.getDeliveryTag(), false);
}
@RabbitListener(queues = DirectRabbitConfig.DIRECT_ONE, ackMode = "AUTO")
public void directOneQueue(Message message, @Payload String body) {
log.info("directOne收到消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
}
@RabbitListener(queues = DirectRabbitConfig.DIRECT_ONE_COPY, ackMode = "AUTO")
public void directOneCopyQueue(Message message, @Payload String body) {
log.info("directOneCopy收到消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
}
@RabbitListener(queues = DirectRabbitConfig.DIRECT_TWO, ackMode = "AUTO")
public void directTwoQueue(Message message, @Payload String body) {
log.info("directTwo收到消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout1Queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT)
), ackMode = "AUTO")
public void fanout1(Message message, @Payload String body) {
log.info("fanout1收到广播消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout1Queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT)
), ackMode = "AUTO")
public void fanout11(Message message, @Payload String body) {
log.info("fanout11收到广播消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
}
/**
* 测试AUTO ack
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout2Queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.FANOUT)
), ackMode = "AUTO")
public void fanout2(Message message, @Payload String body) {
log.info("fanout2收到广播消息{},{}", message.getMessageProperties().getDeliveryTag(), body);
if (RandomUtil.randomInt() % 2 == 1) {
throw new RuntimeException("error");
}
}
}