technology-note/java/其他/4.使用redis构建mq.md
2022-01-20 16:33:50 +08:00

160 lines
5.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
id: "20200605"
date: "2020-06-05 15:42:00"
title: "手把手教你用redis实现一个简单的mq消息队列"
tags: ["java", "redis", "mq"]
categories:
- "java"
- "其他"
---
众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQRabbitMQZeroMQKafkaMetaMQRocketMQ.
但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。
## 为什么是 redis
- redis 通常作为缓存服务引入,因此大部分系统都会有 redis
- redis 本身的资源消耗是极小的,符合我们的轻量要求
- redis 速度很快,几乎不会出现速度瓶颈
- redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]
## 如何实现
利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2\*32-1 条数据,对于大部分应用是完全够用的。
简单来说就是:
- 每个 topic 对应一条队列
- 从队列一段写入数据,从另一端读取数据
- 消费失败,重新将消息放入队列
**注意:代码仅供个人尝鲜使用,请勿用于真实生产环境**
代码仅可在 springboot 环境中使用
### 首先定义注解和接口类
注解代码如下:
```java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
/**
* 队列主题
*/
String topic() default "default_es_topic";
}
```
被该注解修饰的类,将会接收 topic 下的消息。
接口代码如下:
```java
public interface RedisConsumer {
/**
* 功能描述: 消费方法,消费者类必须继承此方法
*
* @param message 数据载体
* @author 123
* @date 2020/3/28 22:41
*/
void deal(String message);
}
```
本接口用于定于接受消息的处理方法。
### 扫描注解修饰类
本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。
1. 扫描部分代码如下:
```java
/**
* MqConfiguration.java
*/
@Override
public void run(ApplicationArguments args) {
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
map.values().forEach(item -> {
if (!(item instanceof RedisConsumer)) {
log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
return;
}
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0];
String topic = annotation.topic();
if (topicMap.containsKey(topic)) {
log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);
} else {
topicMap.put(topic, (RedisConsumer) item);
}
});
log.info("redis订阅信息汇总完毕");
//由一个线程始终循环获取es队列数据
threadPoolExecutor.execute(loop());
}
```
run 方法在 spring 扫描完毕后调用,通过实现`ApplicationRunner`接口实现,通过 spring 的方法来获取所有被`MqConsumer`接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。
2. 执行线程部分代码如下:
```java
private Runnable loop() {
return () -> {
while (true) {
AtomicInteger count = new AtomicInteger(0);
topicMap.forEach((k, v) -> {
try {
String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
if (message == null) {
count.getAndIncrement();
} else {
pushTask(v, message, k);
}
} catch (RedisConnectionFailureException connException) {
log.error("redis无法连接,10s后重试", connException);
sleep(10);
} catch (Exception e) {
log.error("redis消息队列异常", e);
}
});
if (count.get() == topicMap.keySet().size()) {
//当所有的队列都为空时休眠1s
sleep(1);
}
}
};
}
private void pushTask(RedisConsumer item, String value, String key) {
threadPoolExecutor.execute(() -> {
try {
item.deal(value);
} catch (Exception e) {
log.error("执行消费任务出错", e);
//非广播消息进行数据回补
mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
}
});
}
```
loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。
**完整代码见本文结尾**
### 测试
运行项目后调用,`MainController`中的接口即可测试。
完整代码:[github](https://github.com/FleyX/demo-project/tree/master/4.redis-mq)