diff --git a/4.redis-mq/pom.xml b/4.redis-mq/pom.xml new file mode 100644 index 0000000..72b8fe9 --- /dev/null +++ b/4.redis-mq/pom.xml @@ -0,0 +1,41 @@ + + + 4.0.0 + + com.fanxb + redis-mq + 1.0-SNAPSHOT + + + utf-8 + utf-8 + 11 + + + + org.springframework.boot + spring-boot-starter-parent + 2.1.6.RELEASE + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-redis + + + com.alibaba + fastjson + 1.2.56 + + + + + \ No newline at end of file diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/Application.java b/4.redis-mq/src/main/java/com/fanxb/redismq/Application.java new file mode 100644 index 0000000..71a83d2 --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/Application.java @@ -0,0 +1,21 @@ +package com.fanxb.redismq; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/12/20 + * Time: 11:27 PM + */ +@SpringBootApplication +@EnableTransactionManagement +@EnableScheduling +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java b/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java new file mode 100644 index 0000000..5a942f8 --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java @@ -0,0 +1,27 @@ +package com.fanxb.redismq.annotation; + +import org.springframework.stereotype.Component; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 自定义消费者注解 + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 2020/3/26 + * Time: 15:26 + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +public @interface MqConsumer { + + /** + * 队列主题 + */ + String topic() default "default_es_topic"; + +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java b/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java new file mode 100644 index 0000000..8c6593c --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java @@ -0,0 +1,126 @@ +package com.fanxb.redismq.configuration; + +import com.fanxb.redismq.annotation.MqConsumer; +import com.fanxb.redismq.entity.RedisConsumer; +import com.fanxb.redismq.util.RedisMqUtil; +import io.lettuce.core.RedisConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.ApplicationContext; +import org.springframework.data.redis.RedisConnectionFailureException; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 2020/3/24 + * Time: 15:37 + */ +@Component +public class MqConfiguration implements ApplicationRunner { + Logger log = LoggerFactory.getLogger(MqConfiguration.class); + @Autowired + private RedisMqUtil mqUtil; + + /** + * 订阅对象与执行方法关系(支持广播模式) + */ + private static final Map> topicMap = new HashMap<>(); + /** + * 执行线程池 + */ + private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000)); + + @Autowired + ApplicationContext context; + + @Override + public void run(ApplicationArguments args) { + Map map = context.getBeansWithAnnotation(MqConsumer.class); + map.values().forEach(item -> { + if (!(item instanceof RedisConsumer)) { + log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName()); + return; + } + MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class); + MqConsumer annotation = annotations[0]; + topicMap.computeIfAbsent(annotation.topic(), k -> new ArrayList<>()).add((RedisConsumer) item); + }); + log.info("redis订阅信息汇总完毕!!!!!!"); + //由一个线程始终循环获取es队列数据 + threadPoolExecutor.execute(loop()); + } + + private Runnable loop() { + return () -> { + while (true) { + AtomicInteger count = new AtomicInteger(0); + topicMap.forEach((k, v) -> { + try { + String message = mqUtil.getRedisTemplate().opsForList().rightPop(k); + if (message == null) { + count.getAndIncrement(); + } else { + pushTask(v, message, k); + } + } catch (RedisConnectionFailureException connException) { + log.error("redis无法连接", connException); + sleep(5); + } catch (Exception e) { + log.error("redis消息队列异常", e); + } + }); + if (count.get() == topicMap.keySet().size()) { + //当所有的队列都为空时休眠1s + sleep(1); + } + } + }; + } + + /** + * 功能描述: 推送任务到线程池中执行 + * + * @param list list + * @param value value + * @param key key + * @author 123 + * @date 2020/3/28 23:52 + */ + private void pushTask(List list, String value, String key) { + for (RedisConsumer consumer : list) { + threadPoolExecutor.execute(() -> { + try { + consumer.deal(value); + } catch (Exception e) { + log.error("执行消费任务出错", e); + if (list.size() == 1) { + //非广播消息进行数据回补 + mqUtil.getRedisTemplate().opsForList().rightPush(key, value); + } + } + }); + } + } + + private void sleep(int s) { + try { + TimeUnit.SECONDS.sleep(s); + } catch (Exception e) { + log.error("休眠失败", e); + } + } +} + diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver.java b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver.java new file mode 100644 index 0000000..88dba7a --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver.java @@ -0,0 +1,22 @@ +package com.fanxb.redismq.consumer; + +import com.fanxb.redismq.annotation.MqConsumer; +import com.fanxb.redismq.entity.RedisConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/15/20 + * Time: 4:10 AM + */ +@MqConsumer(topic = "topic1") +class Topic1Receiver implements RedisConsumer { + private static final Logger log = LoggerFactory.getLogger(Topic1Receiver.class); + + @Override + public void deal(String message) { + log.info("topic1收到信息:" + message); + } +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver2.java b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver2.java new file mode 100644 index 0000000..1a29557 --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic1Receiver2.java @@ -0,0 +1,22 @@ +package com.fanxb.redismq.consumer; + +import com.fanxb.redismq.annotation.MqConsumer; +import com.fanxb.redismq.entity.RedisConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/15/20 + * Time: 4:10 AM + */ +@MqConsumer(topic = "topic1") +class Topic1Receiver2 implements RedisConsumer { + private static final Logger log = LoggerFactory.getLogger(Topic1Receiver2.class); + + @Override + public void deal(String message) { + log.info("topic1收到信息:" + message); + } +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic2Receiver.java b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic2Receiver.java new file mode 100644 index 0000000..9a7ac0f --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/consumer/Topic2Receiver.java @@ -0,0 +1,22 @@ +package com.fanxb.redismq.consumer; + +import com.fanxb.redismq.annotation.MqConsumer; +import com.fanxb.redismq.entity.RedisConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/15/20 + * Time: 4:10 AM + */ +@MqConsumer(topic = "topic2") +class Topic2Receiver implements RedisConsumer { + private static final Logger log = LoggerFactory.getLogger(Topic2Receiver.class); + + @Override + public void deal(String message) { + log.info("topic2收到信息:" + message); + } +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/controller/MainController.java b/4.redis-mq/src/main/java/com/fanxb/redismq/controller/MainController.java new file mode 100644 index 0000000..4a7e82e --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/controller/MainController.java @@ -0,0 +1,35 @@ +package com.fanxb.redismq.controller; + +import com.fanxb.redismq.util.RedisMqUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/14/20 + * Time: 6:08 AM + */ +@RestController +public class MainController { + private static final Logger log = LoggerFactory.getLogger(MainController.class); + + @Autowired + private RedisMqUtil mqUtil; + + @RequestMapping("/topic1") + public void addOne(String message) { + mqUtil.addToMq("topic1", message); + } + + @RequestMapping("/topic2") + public void addOne1(String message) { + mqUtil.addToMq("topic2", message); + } + + +} + diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/entity/RedisConsumer.java b/4.redis-mq/src/main/java/com/fanxb/redismq/entity/RedisConsumer.java new file mode 100644 index 0000000..43fcd47 --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/entity/RedisConsumer.java @@ -0,0 +1,19 @@ +package com.fanxb.redismq.entity; + +/** + * redis消费者类 + * Created By Fxb + * Date: 2020/3/28 + * Time: 22:41 + */ +public interface RedisConsumer { + + /** + * 功能描述: 消费方法,消费者类必须继承此方法 + * + * @param message 数据载体 + * @author 123 + * @date 2020/3/28 22:41 + */ + void deal(String message); +} diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/util/RedisMqUtil.java b/4.redis-mq/src/main/java/com/fanxb/redismq/util/RedisMqUtil.java new file mode 100644 index 0000000..0f49cd9 --- /dev/null +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/util/RedisMqUtil.java @@ -0,0 +1,36 @@ +package com.fanxb.redismq.util; + +import com.alibaba.fastjson.JSON; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +/** + * redis-mq 工具类 + * Created with IntelliJ IDEA + * Created By Fxb + * Date: 4/12/20 + * Time: 11:36 PM + */ +@Component +public class RedisMqUtil { + @Autowired + private StringRedisTemplate redisTemplate; + + public StringRedisTemplate getRedisTemplate() { + return redisTemplate; + } + + /** + * 功能描述: + * + * @param topic 队列名 + * @param obj 数据载体 + * @author fanxb + * @date 4/13/20 1:40 AM + */ + public void addToMq(String topic, Object obj) { + String str = obj instanceof String ? (String) obj : JSON.toJSONString(obj); + redisTemplate.opsForList().leftPush(topic, str); + } +} diff --git a/4.redis-mq/src/main/resources/application.yml b/4.redis-mq/src/main/resources/application.yml new file mode 100644 index 0000000..ce2e13f --- /dev/null +++ b/4.redis-mq/src/main/resources/application.yml @@ -0,0 +1,4 @@ +spring: + redis: + host: localhost + port: 6380 \ No newline at end of file