2019-12-08 09:07:04 +00:00
|
|
|
|
---
|
2020-06-08 17:16:06 +08:00
|
|
|
|
id: "20200605"
|
|
|
|
|
date: "2020-06-05 15:42:00"
|
|
|
|
|
title: "手把手教你用redis实现一个简单的mq消息队列"
|
2019-12-08 09:07:04 +00:00
|
|
|
|
tags: ["java", "redis", "mq"]
|
|
|
|
|
categories:
|
|
|
|
|
- "java"
|
|
|
|
|
- "其他"
|
|
|
|
|
---
|
2020-06-08 17:16:06 +08:00
|
|
|
|
|
|
|
|
|
众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.
|
|
|
|
|
|
|
|
|
|
但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。
|
|
|
|
|
|
|
|
|
|
## 为什么是 redis
|
|
|
|
|
|
|
|
|
|
- redis 通常作为缓存服务引入,因此大部分系统都会有 redis
|
|
|
|
|
- redis 本身的资源消耗是极小的,符合我们的轻量要求
|
|
|
|
|
- redis 速度很快,几乎不会出现速度瓶颈
|
|
|
|
|
- redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]
|
|
|
|
|
|
|
|
|
|
## 如何实现
|
|
|
|
|
|
|
|
|
|
利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2\*32-1 条数据,对于大部分应用是完全够用的。
|
|
|
|
|
|
|
|
|
|
简单来说就是:
|
|
|
|
|
|
|
|
|
|
- 每个 topic 对应一条队列
|
|
|
|
|
- 从队列一段写入数据,从另一端读取数据
|
|
|
|
|
- 消费失败,重新将消息放入队列
|
|
|
|
|
|
|
|
|
|
**注意:代码仅供个人尝鲜使用,请勿用于真实生产环境**
|
|
|
|
|
|
|
|
|
|
代码仅可在 springboot 环境中使用
|
|
|
|
|
|
|
|
|
|
### 首先定义注解和接口类
|
|
|
|
|
|
|
|
|
|
注解代码如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
@Target(ElementType.TYPE)
|
|
|
|
|
@Retention(RetentionPolicy.RUNTIME)
|
|
|
|
|
@Component
|
|
|
|
|
public @interface MqConsumer {
|
|
|
|
|
/**
|
|
|
|
|
* 队列主题
|
|
|
|
|
*/
|
|
|
|
|
String topic() default "default_es_topic";
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
被该注解修饰的类,将会接收 topic 下的消息。
|
|
|
|
|
|
|
|
|
|
接口代码如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
public interface RedisConsumer {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 功能描述: 消费方法,消费者类必须继承此方法
|
|
|
|
|
*
|
|
|
|
|
* @param message 数据载体
|
|
|
|
|
* @author 123
|
|
|
|
|
* @date 2020/3/28 22:41
|
|
|
|
|
*/
|
|
|
|
|
void deal(String message);
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
本接口用于定于接受消息的处理方法。
|
|
|
|
|
|
|
|
|
|
### 扫描注解修饰类
|
|
|
|
|
|
|
|
|
|
本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。
|
|
|
|
|
|
|
|
|
|
1. 扫描部分代码如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
/**
|
|
|
|
|
* MqConfiguration.java
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public void run(ApplicationArguments args) {
|
|
|
|
|
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
|
|
|
|
|
map.values().forEach(item -> {
|
|
|
|
|
if (!(item instanceof RedisConsumer)) {
|
|
|
|
|
log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
|
|
|
|
|
MqConsumer annotation = annotations[0];
|
|
|
|
|
String topic = annotation.topic();
|
|
|
|
|
if (topicMap.containsKey(topic)) {
|
|
|
|
|
log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);
|
|
|
|
|
} else {
|
|
|
|
|
topicMap.put(topic, (RedisConsumer) item);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
log.info("redis订阅信息汇总完毕!!!!!!");
|
|
|
|
|
//由一个线程始终循环获取es队列数据
|
|
|
|
|
threadPoolExecutor.execute(loop());
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
run 方法在 spring 扫描完毕后调用,通过实现`ApplicationRunner`接口实现,通过 spring 的方法来获取所有被`MqConsumer`接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。
|
|
|
|
|
|
|
|
|
|
2. 执行线程部分代码如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
private Runnable loop() {
|
|
|
|
|
return () -> {
|
|
|
|
|
while (true) {
|
|
|
|
|
AtomicInteger count = new AtomicInteger(0);
|
|
|
|
|
topicMap.forEach((k, v) -> {
|
|
|
|
|
try {
|
|
|
|
|
String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
|
|
|
|
|
if (message == null) {
|
|
|
|
|
count.getAndIncrement();
|
|
|
|
|
} else {
|
|
|
|
|
pushTask(v, message, k);
|
|
|
|
|
}
|
|
|
|
|
} catch (RedisConnectionFailureException connException) {
|
|
|
|
|
log.error("redis无法连接,10s后重试", connException);
|
|
|
|
|
sleep(10);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("redis消息队列异常", e);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
if (count.get() == topicMap.keySet().size()) {
|
|
|
|
|
//当所有的队列都为空时休眠1s
|
|
|
|
|
sleep(1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
private void pushTask(RedisConsumer item, String value, String key) {
|
|
|
|
|
threadPoolExecutor.execute(() -> {
|
|
|
|
|
try {
|
|
|
|
|
item.deal(value);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("执行消费任务出错", e);
|
|
|
|
|
//非广播消息进行数据回补
|
|
|
|
|
mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。
|
|
|
|
|
|
|
|
|
|
**完整代码见本文结尾**
|
|
|
|
|
|
|
|
|
|
### 测试
|
|
|
|
|
|
|
|
|
|
运行项目后调用,`MainController`中的接口即可测试。
|
|
|
|
|
|
2020-06-09 10:05:25 +08:00
|
|
|
|
完整代码:[github](https://github.com/FleyX/demo-project/tree/master/4.redis-mq)
|
|
|
|
|
|
2020-06-08 17:16:06 +08:00
|
|
|
|
本文原创发布于:[手把手教你用 redis 实现一个简单的 mq 消息队列](http://www.tapme.top/blog/detail/20200605)
|