feat:使用redis模拟消息队列,支持单/多实例的生产消费模式和单实例的广播模式

This commit is contained in:
fanxb 2020-03-29 00:24:46 +08:00
parent 0b062c0e1a
commit 160fa38c52
4 changed files with 196 additions and 0 deletions

View File

@ -0,0 +1,29 @@
package com.fanxb.bookmark.common.annotation;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.validation.constraints.Null;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 自定义消费者注解
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 2020/3/26
* Time: 15:26
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
/**
* 队列主题
*/
String value() default "default_es_topic";
}

View File

@ -0,0 +1,127 @@
package com.fanxb.bookmark.common.configuration;
import cn.hutool.core.util.StrUtil;
import com.fanxb.bookmark.common.annotation.MqConsumer;
import com.fanxb.bookmark.common.entity.redis.RedisConsumer;
import com.fanxb.bookmark.common.exception.CustomException;
import com.fanxb.bookmark.common.factory.ThreadPoolFactory;
import com.fanxb.bookmark.common.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 2020/3/24
* Time: 15:37
*/
@Component
@Slf4j
public class MqConfiguration implements ApplicationRunner {
/**
* 订阅对象与执行方法关系支持广播模式
*/
private static final Map<String, List<RedisConsumer>> topicMap = new HashMap<>();
/**
* 执行线程池
*/
private static final ThreadPoolExecutor threadPoolExecutor = ThreadPoolFactory.createPool(2, 8, 5000, 1000, "mqConsumer");
@Autowired
ApplicationContext context;
@Override
public void run(ApplicationArguments args) {
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
map.values().forEach(item -> {
if (!(item instanceof RedisConsumer)) {
log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
return;
}
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0];
topicMap.computeIfAbsent(annotation.value(), k -> new ArrayList<>()).add((RedisConsumer) item);
});
log.info("es订阅信息汇总完毕");
//由一个线程始终循环获取es队列数据
threadPoolExecutor.execute(loop());
}
private Runnable loop() {
return () -> {
while (true) {
AtomicInteger count = new AtomicInteger(0);
topicMap.forEach((k, v) -> {
try {
String message = RedisUtil.redisTemplate.opsForList().rightPop(k);
if (message == null) {
count.getAndIncrement();
} else {
pushTask(v, message, k);
}
} catch (Exception e) {
log.error("redis消息队列异常", e);
}
});
if (count.get() == topicMap.keySet().size()) {
//当所有的队列都为空时休眠1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
log.error("休眠出错", e);
}
}
}
};
}
/**
* 功能描述: 推送任务到线程池中执行
*
* @param list list
* @param value value
* @param key key
* @author 123
* @date 2020/3/28 23:52
*/
private void pushTask(List<RedisConsumer> list, String value, String key) {
for (RedisConsumer consumer : list) {
threadPoolExecutor.execute(() -> {
try {
consumer.deal(value);
} catch (Exception e) {
log.error("执行消费任务出错", e);
if (list.size() == 1) {
//非广播消息进行数据回补
RedisUtil.redisTemplate.opsForList().rightPush(key, value);
}
}
});
}
}
}
@MqConsumer("test1212")
class Test implements RedisConsumer {
@Override
public void deal(String message) {
System.out.println(message);
}
}

View File

@ -0,0 +1,19 @@
package com.fanxb.bookmark.common.entity.redis;
/**
* redis消费者类
* Created By Fxb
* Date: 2020/3/28
* Time: 22:41
*/
public interface RedisConsumer {
/**
* 功能描述: 消费方法消费者类必须继承此方法
*
* @param message 数据载体
* @author 123
* @date 2020/3/28 22:41
*/
void deal(String message);
}

View File

@ -1,6 +1,9 @@
package com.fanxb.bookmark.common.util;
import com.alibaba.fastjson.JSON;
import com.fanxb.bookmark.common.constant.RedisConstant;
import com.fanxb.bookmark.common.entity.User;
import com.fanxb.bookmark.common.entity.redis.UserBookmarkUpdate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@ -79,4 +82,22 @@ public class RedisUtil {
}
}
}
/**
* 功能描述:推一条数据到mq队列中
*
* @param topic 队列名
* @param obj 数据
* @author 123
* @date 2020/3/24 14:32
*/
public static void addToMq(String topic, Object obj) {
String data;
if (obj instanceof String) {
data = (String) obj;
} else {
data = JSON.toJSONString(obj);
}
redisTemplate.opsForList().leftPush(topic, data);
}
}