feat:增加redis实现mq demo

This commit is contained in:
fanxb 2020-04-28 11:21:22 +08:00
parent ccf975f02d
commit fc47e24267
11 changed files with 375 additions and 0 deletions

41
4.redis-mq/pom.xml Normal file
View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fanxb</groupId>
<artifactId>redis-mq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>utf-8</project.reporting.outputEncoding>
<java.version>11</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,21 @@
package com.fanxb.redismq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/12/20
* Time: 11:27 PM
*/
@SpringBootApplication
@EnableTransactionManagement
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

View File

@ -0,0 +1,27 @@
package com.fanxb.redismq.annotation;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 自定义消费者注解
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 2020/3/26
* Time: 15:26
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
/**
* 队列主题
*/
String topic() default "default_es_topic";
}

View File

@ -0,0 +1,126 @@
package com.fanxb.redismq.configuration;
import com.fanxb.redismq.annotation.MqConsumer;
import com.fanxb.redismq.entity.RedisConsumer;
import com.fanxb.redismq.util.RedisMqUtil;
import io.lettuce.core.RedisConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 2020/3/24
* Time: 15:37
*/
@Component
public class MqConfiguration implements ApplicationRunner {
Logger log = LoggerFactory.getLogger(MqConfiguration.class);
@Autowired
private RedisMqUtil mqUtil;
/**
* 订阅对象与执行方法关系支持广播模式
*/
private static final Map<String, List<RedisConsumer>> topicMap = new HashMap<>();
/**
* 执行线程池
*/
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));
@Autowired
ApplicationContext context;
@Override
public void run(ApplicationArguments args) {
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
map.values().forEach(item -> {
if (!(item instanceof RedisConsumer)) {
log.warn("注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口", item.getClass().getCanonicalName());
return;
}
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0];
topicMap.computeIfAbsent(annotation.topic(), k -> new ArrayList<>()).add((RedisConsumer) item);
});
log.info("redis订阅信息汇总完毕");
//由一个线程始终循环获取es队列数据
threadPoolExecutor.execute(loop());
}
private Runnable loop() {
return () -> {
while (true) {
AtomicInteger count = new AtomicInteger(0);
topicMap.forEach((k, v) -> {
try {
String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
if (message == null) {
count.getAndIncrement();
} else {
pushTask(v, message, k);
}
} catch (RedisConnectionFailureException connException) {
log.error("redis无法连接", connException);
sleep(5);
} catch (Exception e) {
log.error("redis消息队列异常", e);
}
});
if (count.get() == topicMap.keySet().size()) {
//当所有的队列都为空时休眠1s
sleep(1);
}
}
};
}
/**
* 功能描述: 推送任务到线程池中执行
*
* @param list list
* @param value value
* @param key key
* @author 123
* @date 2020/3/28 23:52
*/
private void pushTask(List<RedisConsumer> list, String value, String key) {
for (RedisConsumer consumer : list) {
threadPoolExecutor.execute(() -> {
try {
consumer.deal(value);
} catch (Exception e) {
log.error("执行消费任务出错", e);
if (list.size() == 1) {
//非广播消息进行数据回补
mqUtil.getRedisTemplate().opsForList().rightPush(key, value);
}
}
});
}
}
private void sleep(int s) {
try {
TimeUnit.SECONDS.sleep(s);
} catch (Exception e) {
log.error("休眠失败", e);
}
}
}

View File

@ -0,0 +1,22 @@
package com.fanxb.redismq.consumer;
import com.fanxb.redismq.annotation.MqConsumer;
import com.fanxb.redismq.entity.RedisConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/15/20
* Time: 4:10 AM
*/
@MqConsumer(topic = "topic1")
class Topic1Receiver implements RedisConsumer {
private static final Logger log = LoggerFactory.getLogger(Topic1Receiver.class);
@Override
public void deal(String message) {
log.info("topic1收到信息:" + message);
}
}

View File

@ -0,0 +1,22 @@
package com.fanxb.redismq.consumer;
import com.fanxb.redismq.annotation.MqConsumer;
import com.fanxb.redismq.entity.RedisConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/15/20
* Time: 4:10 AM
*/
@MqConsumer(topic = "topic1")
class Topic1Receiver2 implements RedisConsumer {
private static final Logger log = LoggerFactory.getLogger(Topic1Receiver2.class);
@Override
public void deal(String message) {
log.info("topic1收到信息:" + message);
}
}

View File

@ -0,0 +1,22 @@
package com.fanxb.redismq.consumer;
import com.fanxb.redismq.annotation.MqConsumer;
import com.fanxb.redismq.entity.RedisConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/15/20
* Time: 4:10 AM
*/
@MqConsumer(topic = "topic2")
class Topic2Receiver implements RedisConsumer {
private static final Logger log = LoggerFactory.getLogger(Topic2Receiver.class);
@Override
public void deal(String message) {
log.info("topic2收到信息:" + message);
}
}

View File

@ -0,0 +1,35 @@
package com.fanxb.redismq.controller;
import com.fanxb.redismq.util.RedisMqUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/14/20
* Time: 6:08 AM
*/
@RestController
public class MainController {
private static final Logger log = LoggerFactory.getLogger(MainController.class);
@Autowired
private RedisMqUtil mqUtil;
@RequestMapping("/topic1")
public void addOne(String message) {
mqUtil.addToMq("topic1", message);
}
@RequestMapping("/topic2")
public void addOne1(String message) {
mqUtil.addToMq("topic2", message);
}
}

View File

@ -0,0 +1,19 @@
package com.fanxb.redismq.entity;
/**
* redis消费者类
* Created By Fxb
* Date: 2020/3/28
* Time: 22:41
*/
public interface RedisConsumer {
/**
* 功能描述: 消费方法消费者类必须继承此方法
*
* @param message 数据载体
* @author 123
* @date 2020/3/28 22:41
*/
void deal(String message);
}

View File

@ -0,0 +1,36 @@
package com.fanxb.redismq.util;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* redis-mq 工具类
* Created with IntelliJ IDEA
* Created By Fxb
* Date: 4/12/20
* Time: 11:36 PM
*/
@Component
public class RedisMqUtil {
@Autowired
private StringRedisTemplate redisTemplate;
public StringRedisTemplate getRedisTemplate() {
return redisTemplate;
}
/**
* 功能描述:
*
* @param topic 队列名
* @param obj 数据载体
* @author fanxb
* @date 4/13/20 1:40 AM
*/
public void addToMq(String topic, Object obj) {
String str = obj instanceof String ? (String) obj : JSON.toJSONString(obj);
redisTemplate.opsForList().leftPush(topic, str);
}
}

View File

@ -0,0 +1,4 @@
spring:
redis:
host: localhost
port: 6380