修改redis实现消息队列代码

This commit is contained in:
fanxb 2020-06-08 17:17:54 +08:00
parent fc47e24267
commit 83b5e1b5e6
3 changed files with 25 additions and 21 deletions

View File

@ -13,6 +13,7 @@ import java.lang.annotation.Target;
* Created By Fxb * Created By Fxb
* Date: 2020/3/26 * Date: 2020/3/26
* Time: 15:26 * Time: 15:26
* @author fanxb
*/ */
@Target(ElementType.TYPE) @Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@ -23,5 +24,4 @@ public @interface MqConsumer {
* 队列主题 * 队列主题
*/ */
String topic() default "default_es_topic"; String topic() default "default_es_topic";
} }

View File

@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
* Created By Fxb * Created By Fxb
* Date: 2020/3/24 * Date: 2020/3/24
* Time: 15:37 * Time: 15:37
*
* @author fanxb
*/ */
@Component @Component
public class MqConfiguration implements ApplicationRunner { public class MqConfiguration implements ApplicationRunner {
@ -35,9 +37,9 @@ public class MqConfiguration implements ApplicationRunner {
private RedisMqUtil mqUtil; private RedisMqUtil mqUtil;
/** /**
* 订阅对象与执行方法关系支持广播模式 * 订阅对象与执行方法关系
*/ */
private static final Map<String, List<RedisConsumer>> topicMap = new HashMap<>(); private static final Map<String, RedisConsumer> topicMap = new HashMap<>();
/** /**
* 执行线程池 * 执行线程池
*/ */
@ -56,7 +58,13 @@ public class MqConfiguration implements ApplicationRunner {
} }
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class); MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0]; MqConsumer annotation = annotations[0];
topicMap.computeIfAbsent(annotation.topic(), k -> new ArrayList<>()).add((RedisConsumer) item); String topic = annotation.topic();
if (topicMap.containsKey(topic)) {
log.error("多个消费者{},消费同一个消息:{},已忽略", item.getClass().getCanonicalName(), topic);
} else {
topicMap.put(topic, (RedisConsumer) item);
}
}); });
log.info("redis订阅信息汇总完毕"); log.info("redis订阅信息汇总完毕");
//由一个线程始终循环获取es队列数据 //由一个线程始终循环获取es队列数据
@ -76,8 +84,8 @@ public class MqConfiguration implements ApplicationRunner {
pushTask(v, message, k); pushTask(v, message, k);
} }
} catch (RedisConnectionFailureException connException) { } catch (RedisConnectionFailureException connException) {
log.error("redis无法连接", connException); log.error("redis无法连接,10s后重试", connException);
sleep(5); sleep(10);
} catch (Exception e) { } catch (Exception e) {
log.error("redis消息队列异常", e); log.error("redis消息队列异常", e);
} }
@ -99,20 +107,16 @@ public class MqConfiguration implements ApplicationRunner {
* @author 123 * @author 123
* @date 2020/3/28 23:52 * @date 2020/3/28 23:52
*/ */
private void pushTask(List<RedisConsumer> list, String value, String key) { private void pushTask(RedisConsumer item, String value, String key) {
for (RedisConsumer consumer : list) { threadPoolExecutor.execute(() -> {
threadPoolExecutor.execute(() -> { try {
try { item.deal(value);
consumer.deal(value); } catch (Exception e) {
} catch (Exception e) { log.error("执行消费任务出错", e);
log.error("执行消费任务出错", e); //非广播消息进行数据回补
if (list.size() == 1) { mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
//非广播消息进行数据回补 }
mqUtil.getRedisTemplate().opsForList().rightPush(key, value); });
}
}
});
}
} }
private void sleep(int s) { private void sleep(int s) {

View File

@ -1,4 +1,4 @@
spring: spring:
redis: redis:
host: localhost host: localhost
port: 6380 port: 6379