From efcc6de2aae8adb8083156e22ed56986d2592442 Mon Sep 17 00:00:00 2001 From: fanxb Date: Mon, 8 Jun 2020 17:16:06 +0800 Subject: [PATCH] =?UTF-8?q?add:=E5=A2=9E=E5=8A=A0redis=E6=9E=84=E5=BB=BA?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- java/其他/4.使用redis构建mq.md | 155 ++++++++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 3 deletions(-) diff --git a/java/其他/4.使用redis构建mq.md b/java/其他/4.使用redis构建mq.md index 5eb9edf..3c6825d 100644 --- a/java/其他/4.使用redis构建mq.md +++ b/java/其他/4.使用redis构建mq.md @@ -1,9 +1,158 @@ --- -id: "20200424" -date: "2020-04-24 15:42:00" -title: "如何让apache集成php详细步骤" +id: "20200605" +date: "2020-06-05 15:42:00" +title: "手把手教你用redis实现一个简单的mq消息队列" tags: ["java", "redis", "mq"] categories: - "java" - "其他" --- + +众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ. + +但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。 + +## 为什么是 redis + +- redis 通常作为缓存服务引入,因此大部分系统都会有 redis +- redis 本身的资源消耗是极小的,符合我们的轻量要求 +- redis 速度很快,几乎不会出现速度瓶颈 +- redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345] + +## 如何实现 + +利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2\*32-1 条数据,对于大部分应用是完全够用的。 + +简单来说就是: + +- 每个 topic 对应一条队列 +- 从队列一段写入数据,从另一端读取数据 +- 消费失败,重新将消息放入队列 + +**注意:代码仅供个人尝鲜使用,请勿用于真实生产环境** + +代码仅可在 springboot 环境中使用 + +### 首先定义注解和接口类 + +注解代码如下: + +```java +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface MqConsumer { + /** + * 队列主题 + */ + String topic() default "default_es_topic"; +} +``` + +被该注解修饰的类,将会接收 topic 下的消息。 + +接口代码如下: + +```java +public interface RedisConsumer { + + /** + * 功能描述: 消费方法,消费者类必须继承此方法 + * + * @param message 数据载体 + * @author 123 + * @date 2020/3/28 22:41 + */ + void deal(String message); +} +``` + +本接口用于定于接受消息的处理方法。 + +### 扫描注解修饰类 + +本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。 + +1. 扫描部分代码如下: + +```java +/** + * MqConfiguration.java + */ +@Override +public void run(ApplicationArguments args) { + Map map = context.getBeansWithAnnotation(MqConsumer.class); + map.values().forEach(item -> { + if (!(item instanceof RedisConsumer)) { + log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName()); + return; + } + MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class); + MqConsumer annotation = annotations[0]; + String topic = annotation.topic(); + if (topicMap.containsKey(topic)) { + log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic); + } else { + topicMap.put(topic, (RedisConsumer) item); + } + + }); + log.info("redis订阅信息汇总完毕!!!!!!"); + //由一个线程始终循环获取es队列数据 + threadPoolExecutor.execute(loop()); +} +``` + +run 方法在 spring 扫描完毕后调用,通过实现`ApplicationRunner`接口实现,通过 spring 的方法来获取所有被`MqConsumer`接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。 + +2. 执行线程部分代码如下: + +```java +private Runnable loop() { + return () -> { + while (true) { + AtomicInteger count = new AtomicInteger(0); + topicMap.forEach((k, v) -> { + try { + String message = mqUtil.getRedisTemplate().opsForList().rightPop(k); + if (message == null) { + count.getAndIncrement(); + } else { + pushTask(v, message, k); + } + } catch (RedisConnectionFailureException connException) { + log.error("redis无法连接,10s后重试", connException); + sleep(10); + } catch (Exception e) { + log.error("redis消息队列异常", e); + } + }); + if (count.get() == topicMap.keySet().size()) { + //当所有的队列都为空时休眠1s + sleep(1); + } + } + }; +} +private void pushTask(RedisConsumer item, String value, String key) { + threadPoolExecutor.execute(() -> { + try { + item.deal(value); + } catch (Exception e) { + log.error("执行消费任务出错", e); + //非广播消息进行数据回补 + mqUtil.getRedisTemplate().opsForList().rightPush(key, value); + } + }); +} +``` + +loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。 + +**完整代码见本文结尾** + +### 测试 + +运行项目后调用,`MainController`中的接口即可测试。 + +本文原创发布于:[手把手教你用 redis 实现一个简单的 mq 消息队列](http://www.tapme.top/blog/detail/20200605)