diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java b/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java index 5a942f8..e6f5a65 100644 --- a/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/annotation/MqConsumer.java @@ -13,6 +13,7 @@ import java.lang.annotation.Target; * Created By Fxb * Date: 2020/3/26 * Time: 15:26 + * @author fanxb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @@ -23,5 +24,4 @@ public @interface MqConsumer { * 队列主题 */ String topic() default "default_es_topic"; - } diff --git a/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java b/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java index 8c6593c..34cfed0 100644 --- a/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java +++ b/4.redis-mq/src/main/java/com/fanxb/redismq/configuration/MqConfiguration.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; * Created By Fxb * Date: 2020/3/24 * Time: 15:37 + * + * @author fanxb */ @Component public class MqConfiguration implements ApplicationRunner { @@ -35,9 +37,9 @@ public class MqConfiguration implements ApplicationRunner { private RedisMqUtil mqUtil; /** - * 订阅对象与执行方法关系(支持广播模式) + * 订阅对象与执行方法关系 */ - private static final Map> topicMap = new HashMap<>(); + private static final Map topicMap = new HashMap<>(); /** * 执行线程池 */ @@ -56,7 +58,13 @@ public class MqConfiguration implements ApplicationRunner { } MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class); MqConsumer annotation = annotations[0]; - topicMap.computeIfAbsent(annotation.topic(), k -> new ArrayList<>()).add((RedisConsumer) item); + String topic = annotation.topic(); + if (topicMap.containsKey(topic)) { + log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic); + } else { + topicMap.put(topic, (RedisConsumer) item); + } + }); log.info("redis订阅信息汇总完毕!!!!!!"); //由一个线程始终循环获取es队列数据 @@ -76,8 +84,8 @@ public class MqConfiguration implements ApplicationRunner { pushTask(v, message, k); } } catch (RedisConnectionFailureException connException) { - log.error("redis无法连接", connException); - sleep(5); + log.error("redis无法连接,10s后重试", connException); + sleep(10); } catch (Exception e) { log.error("redis消息队列异常", e); } @@ -99,20 +107,16 @@ public class MqConfiguration implements ApplicationRunner { * @author 123 * @date 2020/3/28 23:52 */ - private void pushTask(List list, String value, String key) { - for (RedisConsumer consumer : list) { - threadPoolExecutor.execute(() -> { - try { - consumer.deal(value); - } catch (Exception e) { - log.error("执行消费任务出错", e); - if (list.size() == 1) { - //非广播消息进行数据回补 - mqUtil.getRedisTemplate().opsForList().rightPush(key, value); - } - } - }); - } + private void pushTask(RedisConsumer item, String value, String key) { + threadPoolExecutor.execute(() -> { + try { + item.deal(value); + } catch (Exception e) { + log.error("执行消费任务出错", e); + //非广播消息进行数据回补 + mqUtil.getRedisTemplate().opsForList().rightPush(key, value); + } + }); } private void sleep(int s) { diff --git a/4.redis-mq/src/main/resources/application.yml b/4.redis-mq/src/main/resources/application.yml index ce2e13f..351b164 100644 --- a/4.redis-mq/src/main/resources/application.yml +++ b/4.redis-mq/src/main/resources/application.yml @@ -1,4 +1,4 @@ spring: redis: host: localhost - port: 6380 \ No newline at end of file + port: 6379 \ No newline at end of file