From 160fa38c52267a4e80b2a55f7a2d4849d3d550ab Mon Sep 17 00:00:00 2001 From: fanxb Date: Sun, 29 Mar 2020 00:24:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E4=BD=BF=E7=94=A8redis=E6=A8=A1=E6=8B=9F?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=8D=95/=E5=A4=9A=E5=AE=9E=E4=BE=8B=E7=9A=84=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E6=B6=88=E8=B4=B9=E6=A8=A1=E5=BC=8F=E5=92=8C=E5=8D=95?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B=E7=9A=84=E5=B9=BF=E6=92=AD=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/annotation/MqConsumer.java | 29 ++++ .../common/configuration/MqConfiguration.java | 127 ++++++++++++++++++ .../common/entity/redis/RedisConsumer.java | 19 +++ .../fanxb/bookmark/common/util/RedisUtil.java | 21 +++ 4 files changed, 196 insertions(+) create mode 100644 bookMarkService/common/src/main/java/com/fanxb/bookmark/common/annotation/MqConsumer.java create mode 100644 bookMarkService/common/src/main/java/com/fanxb/bookmark/common/configuration/MqConfiguration.java create mode 100644 bookMarkService/common/src/main/java/com/fanxb/bookmark/common/entity/redis/RedisConsumer.java diff --git a/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/annotation/MqConsumer.java b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/annotation/MqConsumer.java new file mode 100644 index 0000000..f00d3be --- /dev/null +++ b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/annotation/MqConsumer.java @@ -0,0 +1,29 @@ +package com.fanxb.bookmark.common.annotation; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.validation.constraints.Null; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 自定义消费者注解 + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 2020/3/26 + * Time: 15:26 + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface MqConsumer { + + /** + * 队列主题 + */ + String value() default "default_es_topic"; + +} diff --git a/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/configuration/MqConfiguration.java b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/configuration/MqConfiguration.java new file mode 100644 index 0000000..5b04061 --- /dev/null +++ b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/configuration/MqConfiguration.java @@ -0,0 +1,127 @@ +package com.fanxb.bookmark.common.configuration; + +import cn.hutool.core.util.StrUtil; +import com.fanxb.bookmark.common.annotation.MqConsumer; +import com.fanxb.bookmark.common.entity.redis.RedisConsumer; +import com.fanxb.bookmark.common.exception.CustomException; +import com.fanxb.bookmark.common.factory.ThreadPoolFactory; +import com.fanxb.bookmark.common.util.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 2020/3/24 + * Time: 15:37 + */ +@Component +@Slf4j +public class MqConfiguration implements ApplicationRunner { + + /** + * 订阅对象与执行方法关系(支持广播模式) + */ + private static final Map> topicMap = new HashMap<>(); + /** + * 执行线程池 + */ + private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.createPool(2, 8, 5000, 1000, "mqConsumer"); + + @Autowired + ApplicationContext context; + + @Override + public void run(ApplicationArguments args) { + Map map = context.getBeansWithAnnotation(MqConsumer.class); + map.values().forEach(item -> { + if (!(item instanceof RedisConsumer)) { + log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName()); + return; + } + MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class); + MqConsumer annotation = annotations[0]; + topicMap.computeIfAbsent(annotation.value(), k -> new ArrayList<>()).add((RedisConsumer) item); + }); + log.info("es订阅信息汇总完毕!!!!!!"); + //由一个线程始终循环获取es队列数据 + threadPoolExecutor.execute(loop()); + } + + private Runnable loop() { + return () -> { + while (true) { + AtomicInteger count = new AtomicInteger(0); + topicMap.forEach((k, v) -> { + try { + String message = RedisUtil.redisTemplate.opsForList().rightPop(k); + if (message == null) { + count.getAndIncrement(); + } else { + pushTask(v, message, k); + } + } catch (Exception e) { + log.error("redis消息队列异常", e); + } + }); + if (count.get() == topicMap.keySet().size()) { + //当所有的队列都为空时休眠1s + try { + TimeUnit.SECONDS.sleep(1); + } catch (Exception e) { + log.error("休眠出错", e); + } + } + } + }; + } + + /** + * 功能描述: 推送任务到线程池中执行 + * + * @param list list + * @param value value + * @param key key + * @author 123 + * @date 2020/3/28 23:52 + */ + private void pushTask(List list, String value, String key) { + for (RedisConsumer consumer : list) { + threadPoolExecutor.execute(() -> { + try { + consumer.deal(value); + } catch (Exception e) { + log.error("执行消费任务出错", e); + if (list.size() == 1) { + //非广播消息进行数据回补 + RedisUtil.redisTemplate.opsForList().rightPush(key, value); + } + } + }); + } + } + + +} + +@MqConsumer("test1212") +class Test implements RedisConsumer { + @Override + public void deal(String message) { + System.out.println(message); + } +} diff --git a/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/entity/redis/RedisConsumer.java b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/entity/redis/RedisConsumer.java new file mode 100644 index 0000000..a64a2a4 --- /dev/null +++ b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/entity/redis/RedisConsumer.java @@ -0,0 +1,19 @@ +package com.fanxb.bookmark.common.entity.redis; + +/** + * redis消费者类 + * Created By Fxb + * Date: 2020/3/28 + * Time: 22:41 + */ +public interface RedisConsumer { + + /** + * 功能描述: 消费方法,消费者类必须继承此方法 + * + * @param message 数据载体 + * @author 123 + * @date 2020/3/28 22:41 + */ + void deal(String message); +} diff --git a/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/util/RedisUtil.java b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/util/RedisUtil.java index e769c2d..73aa235 100644 --- a/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/util/RedisUtil.java +++ b/bookMarkService/common/src/main/java/com/fanxb/bookmark/common/util/RedisUtil.java @@ -1,6 +1,9 @@ package com.fanxb.bookmark.common.util; import com.alibaba.fastjson.JSON; +import com.fanxb.bookmark.common.constant.RedisConstant; +import com.fanxb.bookmark.common.entity.User; +import com.fanxb.bookmark.common.entity.redis.UserBookmarkUpdate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @@ -79,4 +82,22 @@ public class RedisUtil { } } } + + /** + * 功能描述:推一条数据到mq队列中 + * + * @param topic 队列名 + * @param obj 数据 + * @author 123 + * @date 2020/3/24 14:32 + */ + public static void addToMq(String topic, Object obj) { + String data; + if (obj instanceof String) { + data = (String) obj; + } else { + data = JSON.toJSONString(obj); + } + redisTemplate.opsForList().leftPush(topic, data); + } }