This commit is contained in:
fleyx 2025-01-08 00:06:01 +08:00
parent 90b120b4ba
commit e775d389c0
10 changed files with 332 additions and 0 deletions

38
rabbitmq/.gitignore vendored Normal file
View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

60
rabbitmq/pom.xml Normal file
View File

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.13</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.11</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,14 @@
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = "org.example")
public class BackendApplication {
public static void main(String[] args) {
SpringApplication.run(BackendApplication.class, args);
}
}

View File

@ -0,0 +1,43 @@
package org.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class DirectRabbitConfig {
public final static String DIRECT_ONE = "direct.one";
public final static String DIRECT_TWO= "direct.two";
public final static String DIRECT_EXCHANGE= "directExchange1";
@Bean
public Queue directQueueOne(){
return new Queue(DIRECT_ONE);
}
@Bean
public Queue directQueueTwo(){
return new Queue(DIRECT_TWO);
}
@Bean
DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
Binding bindingDirectExchangeOne(Queue directQueueOne, DirectExchange directExchange){
return BindingBuilder.bind(directQueueOne).to(directExchange).with("direct.one");
}
@Bean
Binding bindingDirectExchangeTwo(Queue directQueueTwo, DirectExchange directExchange){
//# 表示零个或多个词
//* 表示一个词
return BindingBuilder.bind(directQueueTwo).to(directExchange).with("direct.#");
}
}

View File

@ -0,0 +1,43 @@
package org.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Configuration
@Component
public class TopicRabbitConfig {
public final static String TOPIC_ONE = "topic.one";
public final static String TOPIC_TWO = "topic.two";
public final static String TOPIC_EXCHANGE = "topicExchange1";
@Bean
public Queue queueOne(){
return new Queue(TOPIC_ONE);
}
@Bean
public Queue queueTwo(){
return new Queue(TOPIC_TWO);
}
@Bean
TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
Binding bindingExchangeOne(Queue queueOne, TopicExchange topicExchange){
return BindingBuilder.bind(queueOne).to(topicExchange).with("topic.one");
}
@Bean
Binding bindingExchangeTwo(Queue queueTwo, TopicExchange topicExchange){
//# 表示零个或多个词
//* 表示一个词
return BindingBuilder.bind(queueTwo).to(topicExchange).with("topic.#");
}
}

View File

@ -0,0 +1,30 @@
package org.example.controller;
import org.example.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/direct")
public class MqDirectController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/simple")
public void sendSimple(){
this.rabbitTemplate.convertAndSend("topic.one","abc");
}
@GetMapping("/one")
public void sendOne(){
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one","abc");
}
@GetMapping("/two")
public void sendTwo(){
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",System.currentTimeMillis());
}
}

View File

@ -0,0 +1,47 @@
package org.example.controller;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import org.example.config.TopicRabbitConfig;
import org.example.entity.mq.MqMessage;
import org.example.listener.AutoCreateListener;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/topic")
public class MqTopicController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/simple")
public void sendSimple() {
this.rabbitTemplate.convertAndSend("topic.one", "abc");
}
@GetMapping("/one")
public void sendOne() {
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, "topic.one", "abc");
}
@GetMapping("/two")
public void sendTwo() {
this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, "topic.two", System.currentTimeMillis());
}
@GetMapping("/autoCreateQueue")
public void autoCreateQueue() {
MessagePostProcessor postProcessor = message -> {
message.getMessageProperties().setHeader("unique-id", IdUtil.fastSimpleUUID());
return message;
};
this.rabbitTemplate.convertAndSend(AutoCreateListener.EXCHANGE, AutoCreateListener.KEY,
new MqMessage(RandomUtil.randomString(3), RandomUtil.randomString(5)), postProcessor);
}
}

View File

@ -0,0 +1,14 @@
package org.example.entity.mq;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class MqMessage implements Serializable {
private static final long serialVersionUID = -4465394232288598749L;
private String name;
private String code;
}

View File

@ -0,0 +1,36 @@
package org.example.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.example.entity.mq.MqMessage;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AutoCreateListener {
public static final String QUEUE = "autoCreateQueue";
public static final String EXCHANGE = "autoCreateExchange";
public static final String KEY = "autoCreate";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE, durable = "true", autoDelete = "false"),
exchange = @Exchange(value = EXCHANGE, type = ExchangeTypes.TOPIC),
key = {KEY}
), concurrency = "4", ackMode = "MANUAL")
public void process(Channel channel, Message message, @Payload MqMessage body ) throws Exception {
MessageProperties properties = message.getMessageProperties();
log.info("收到消息:{},{},{},{}", properties.getDeliveryTag(), properties.getHeader("unique-id"), properties.getReceivedRoutingKey(), new ObjectMapper().writeValueAsString(body));
channel.basicAck(properties.getDeliveryTag(), false);
}
}

View File

@ -0,0 +1,7 @@
spring:
rabbitmq:
host: 127.0.0.1 #地址
port: 5672 #端口
username: admin #用户名
password: admin #密码
virtual-host: test1 #虚拟主机路径