fix
This commit is contained in:
commit
96b0a347d4
35
java/springboot系列/数据库/springboot整合redis.md
Normal file
35
java/springboot系列/数据库/springboot整合redis.md
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
---
|
||||||
|
id: "2019-02-22-14-59"
|
||||||
|
title: "springboot整合Mybatis(xml和注解)"
|
||||||
|
tag: ["java", "","spring-boot","redis","nosql"]
|
||||||
|
categories:
|
||||||
|
- "java"
|
||||||
|
- "spring boot学习"
|
||||||
|
---
|
||||||
|
|
||||||
|
  项目源代码在 github,地址为:[https://github.com/FleyX/demo-project/tree/master/mybatis-test](https://github.com/FleyX/demo-project/tree/master/mybatis-test),有需要的自取。
|
||||||
|
|
||||||
|
  redis作为一个高性能的内存数据库,如果不会用就太落伍了,之前在node.js中用过redis,本篇记录如何将redis集成到spring boot中。提供redis操作类,和注解使用redis两种方式。主要内容如下:
|
||||||
|
|
||||||
|
- docker安装redis
|
||||||
|
- springboot 集成redis
|
||||||
|
- 编写redis操作类
|
||||||
|
- 通过注解使用redis
|
||||||
|
|
||||||
|
# 安装redis
|
||||||
|
|
||||||
|
  通过docker安装,docker compose编排文件如下:
|
||||||
|
```yml
|
||||||
|
# docker-compose.yml
|
||||||
|
version: "3"
|
||||||
|
services:
|
||||||
|
redis:
|
||||||
|
container_name: redis
|
||||||
|
image: redis:3.2.10
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
```
|
||||||
|
|
||||||
|
  然后在`docker-compose.yml`所在目录使用`docker-compose up -d`命令,启动redis。
|
||||||
|
|
||||||
|
# 集成springboot
|
@ -244,7 +244,7 @@ licensestatic:
|
|||||||
|
|
||||||
**_问题又来了_**
|
**_问题又来了_**
|
||||||
|
|
||||||
  _虽然上面的配置能够操作成功,但是在 Ribbon 中禁用 Eureka 支持会造成一个问题,那就是服务网关代理的所有服务的 Ribbon 支持都没有了,Zuul 无法使用 Ribbon 来缓存服务的查找,每次请求都要调用 Eureka 查询服务实例,这样会对 Eureka 服务器造成巨大的压力,显然不可取的。_
|
  _禁用eureka支持会导致所有服务的地址都需要手动指定,ribbon不会再从eureka中获取服务实例信息。所以没办法混合使用_
|
||||||
|
|
||||||
  目前有两种办法来规避这个问题:
|
  目前有两种办法来规避这个问题:
|
||||||
|
|
||||||
@ -366,6 +366,7 @@ public class IdFilter extends ZuulFilter {
|
|||||||
|
|
||||||
  现在从 zuul 服务网关发往许可证服务的 http 请求已经携带了 id。
|
  现在从 zuul 服务网关发往许可证服务的 http 请求已经携带了 id。
|
||||||
|
|
||||||
|
|
||||||
### b、后置过滤器
|
### b、后置过滤器
|
||||||
|
|
||||||
  后置过滤器通常用于进行敏感信息过滤和响应记录。这里我们实现一个后置过滤器,将许可证服务请求的响应内容打印到控制台上同时把`id`header 插入到服务客户端请求的 response 中。
|
  后置过滤器通常用于进行敏感信息过滤和响应记录。这里我们实现一个后置过滤器,将许可证服务请求的响应内容打印到控制台上同时把`id`header 插入到服务客户端请求的 response 中。
|
||||||
|
@ -2,12 +2,19 @@
|
|||||||
id: "2019-01-03-19-19"
|
id: "2019-01-03-19-19"
|
||||||
date: "2019-01-03-19-19"
|
date: "2019-01-03-19-19"
|
||||||
title: "springCloud学习4(Spring-Cloud-Stream事件驱动)"
|
title: "springCloud学习4(Spring-Cloud-Stream事件驱动)"
|
||||||
tags: ["spring-boot", "spring-cloud","spring-cloud-stream","kafka","事件驱动"]
|
tags:
|
||||||
|
["spring-boot", "spring-cloud", "spring-cloud-stream", "kafka", "事件驱动"]
|
||||||
categories:
|
categories:
|
||||||
- "java"
|
- "java"
|
||||||
- "springCloud实战"
|
- "springCloud实战"
|
||||||
---
|
---
|
||||||
|
|
||||||
|
![hei](https://raw.githubusercontent.com/FleyX/files/master/teachSystem/20190223170520.png)
|
||||||
|
|
||||||
|
**本篇原创发布于:**[FleyX 的个人博客](http://tapme.top/blog/detail/2019-01-03-19-19)
|
||||||
|
|
||||||
|
**本篇所用全部代码:**[FleyX 的 github](https://github.com/FleyX/demo-project/tree/master/springcloud/spring-cloud-stream%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97)
|
||||||
|
|
||||||
  想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。
|
  想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。
|
||||||
|
|
||||||
  同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(Event Driven Architecture,EDA),也被称为消息驱动架构(Message Driven Architecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。
|
  同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(Event Driven Architecture,EDA),也被称为消息驱动架构(Message Driven Architecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。
|
||||||
@ -49,21 +56,21 @@ categories:
|
|||||||
|
|
||||||
<!-- more -->
|
<!-- more -->
|
||||||
|
|
||||||
## spring cloud stream架构
|
## spring cloud stream 架构
|
||||||
|
|
||||||
  spring cloud stream中有4个组件涉及到消息发布和消息消费,分别为:
|
  spring cloud stream 中有 4 个组件涉及到消息发布和消息消费,分别为:
|
||||||
|
|
||||||
1. 发射器<br/>
|
1. 发射器<br/>
|
||||||
  当一个服务准备发送消息时,它将使用发射器发布消息。发射器是一个Spring注解接口,它接收一个普通Java对象,表示要发布的消息。发射器接收消息,然后序列化(默认序列化为JSON)后发布到通道中。
|
  当一个服务准备发送消息时,它将使用发射器发布消息。发射器是一个 Spring 注解接口,它接收一个普通 Java 对象,表示要发布的消息。发射器接收消息,然后序列化(默认序列化为 JSON)后发布到通道中。
|
||||||
|
|
||||||
2. 通道<br/>
|
2. 通道<br/>
|
||||||
  通道是对队列的一个抽象。通道名称是与目标队列名称相关联的。但是队列名称并不会直接公开在代码中,代码永远只会使用通道名。
|
  通道是对队列的一个抽象。通道名称是与目标队列名称相关联的。但是队列名称并不会直接公开在代码中,代码永远只会使用通道名。
|
||||||
|
|
||||||
3. 绑定器<br/>
|
3. 绑定器<br/>
|
||||||
  绑定器是spring cloud stream框架的一部分,它是与特定消息平台对话的Spring代码。通过绑定器,使得开发人员不必依赖于特定平台的库和API来发布和消费消息。
|
  绑定器是 spring cloud stream 框架的一部分,它是与特定消息平台对话的 Spring 代码。通过绑定器,使得开发人员不必依赖于特定平台的库和 API 来发布和消费消息。
|
||||||
|
|
||||||
4. 接收器<br/>
|
4. 接收器<br/>
|
||||||
  服务通过接收器来从队列中接收消息,并将消息反序列化。
|
  服务通过接收器来从队列中接收消息,并将消息反序列化。
|
||||||
|
|
||||||
处理逻辑如下:
|
处理逻辑如下:
|
||||||
|
|
||||||
@ -71,12 +78,275 @@ categories:
|
|||||||
|
|
||||||
## 实战
|
## 实战
|
||||||
|
|
||||||
  继续使用之前的项目,在许可证服务中缓存组织数据到redis中。
|
  继续使用之前的项目,在许可证服务中缓存组织数据到 redis 中。
|
||||||
|
|
||||||
### 建立redis服务
|
### 建立 redis 服务
|
||||||
|
|
||||||
  为方便起见,使用docker创建redis,建立脚本如下:
|
  为方便起见,使用 docker 创建 redis,建立脚本如下:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker run -itd --name redis -p
|
docker run -itd --name redis --net host redis:
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### 建立 kafka 服务
|
||||||
|
|
||||||
|
### 在组织服务中编写消息生产者
|
||||||
|
|
||||||
|
  首先在 organization 服务中引入 spring cloud stream 和 kafka 的依赖。
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.cloud</groupId>
|
||||||
|
<artifactId>spring-cloud-stream</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.cloud</groupId>
|
||||||
|
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
```
|
||||||
|
|
||||||
|
  然后在 events 类中编写`SimpleSouce`类,用于组织数据修改,产生一条消息到队列中。代码如下:
|
||||||
|
|
||||||
|
```java
|
||||||
|
@EnableBinding(Source.class)
|
||||||
|
public class SimpleSource {
|
||||||
|
private Logger logger = LoggerFactory.getLogger(SimpleSource.class);
|
||||||
|
|
||||||
|
private Source source;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public SimpleSource(Source source) {
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publishOrChange(String action, String orgId) {
|
||||||
|
logger.info("在请求:{}中,发送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
|
||||||
|
OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
|
||||||
|
source.output().send(MessageBuilder.withPayload(change).build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
这里使用的是默认通道,Source 类定义的 output 通道发消息。后面通过 Sink 定义的 input 通道收消息。
|
||||||
|
|
||||||
|
  然后在`OrganizationController`类中定义一个 delete 方法,并注入 SimpleSouce 类,代码如下:
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Autowired
|
||||||
|
private SimpleSource simpleSource;
|
||||||
|
|
||||||
|
@DeleteMapping(value = "/organization/{orgId}")
|
||||||
|
public void deleteOne(@PathVariable("orgId") String id) {
|
||||||
|
logger.debug("删除了组织:{}", id);
|
||||||
|
simpleSource.publishOrChange("delete", id);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
  最后在配置文件中加入消息队列的配置:
|
||||||
|
|
||||||
|
```yml
|
||||||
|
# 省略了其他配置
|
||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
stream:
|
||||||
|
bindings:
|
||||||
|
output:
|
||||||
|
destination: orgChangeTopic
|
||||||
|
content-type: application/json
|
||||||
|
kafka:
|
||||||
|
binder:
|
||||||
|
# 替换为部署kafka的ip和端口
|
||||||
|
zk-nodes: 192.168.226.5:2181
|
||||||
|
brokers: 192.168.226.5:9092
|
||||||
|
```
|
||||||
|
|
||||||
|
  现在我们可以测试下访问[localhost:5555/apis/org/organization/12](localhost:5555/apis/org/organization/12),可以看到控制台打印消息生成的日志。
|
||||||
|
|
||||||
|
### 在许可证服务中编写消息消费者
|
||||||
|
|
||||||
|
  集成 redis 的方法,参看[]()。这里不作说明。
|
||||||
|
|
||||||
|
  首先引入依赖,依赖项同上面组织服务。
|
||||||
|
|
||||||
|
  然后在 event 包下创建`OrgChange`的类,代码如下:
|
||||||
|
|
||||||
|
```java
|
||||||
|
@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息
|
||||||
|
public class OrgChange {
|
||||||
|
|
||||||
|
private Logger logger = LoggerFactory.getLogger(OrgChange.class);
|
||||||
|
|
||||||
|
@StreamListener(Sink.INPUT)
|
||||||
|
public void loggerSink(OrganizationChange change){
|
||||||
|
logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId());
|
||||||
|
//删除失效缓存
|
||||||
|
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//下面两个都在util包下
|
||||||
|
//RedisKeyUtils.java代码如下
|
||||||
|
public class RedisKeyUtils {
|
||||||
|
|
||||||
|
private static final String ORG_CACHE_PREFIX = "orgCache_";
|
||||||
|
|
||||||
|
public static String getOrgCacheKey(String orgId){
|
||||||
|
return ORG_CACHE_PREFIX+orgId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//RedisUtils.java代码如下
|
||||||
|
@Component
|
||||||
|
@SuppressWarnings("all")
|
||||||
|
public class RedisUtils {
|
||||||
|
|
||||||
|
public static RedisTemplate redisTemplate;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public void setRedisTemplate(RedisTemplate redisTemplate) {
|
||||||
|
RedisUtils.redisTemplate = redisTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean setObj(String key,Object value){
|
||||||
|
return setObj(key,value,0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description:
|
||||||
|
*
|
||||||
|
* @author fanxb
|
||||||
|
* @date 2019/2/21 15:21
|
||||||
|
* @param key 键
|
||||||
|
* @param value 值
|
||||||
|
* @param time 过期时间,单位ms
|
||||||
|
* @return boolean 是否成功
|
||||||
|
*/
|
||||||
|
public static boolean setObj(String key,Object value,long time){
|
||||||
|
try{
|
||||||
|
if(time<=0){
|
||||||
|
redisTemplate.opsForValue().set(key,value);
|
||||||
|
}else{
|
||||||
|
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Object get(String key){
|
||||||
|
if(key==null){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try{
|
||||||
|
Object obj = redisTemplate.opsForValue().get(key);
|
||||||
|
return obj;
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void del(String... key){
|
||||||
|
if(key!=null && key.length>0){
|
||||||
|
redisTemplate.delete(CollectionUtils.arrayToList(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
  上面用到的是 Sink.INPUT 通道,这个和之前的 Source.OUTPUT 通道刚好一队,一个负责收,一个负责发。
|
||||||
|
|
||||||
|
  然后修改`OrganizationByRibbonService.java`文件中的`getOrganizationWithRibbon`方法:
|
||||||
|
|
||||||
|
```java
|
||||||
|
public Organization getOrganizationWithRibbon(String id) {
|
||||||
|
String key = RedisKeyUtils.getOrgCacheKey(id);
|
||||||
|
//先从redis缓存取数据
|
||||||
|
Object res = RedisUtils.get(key);
|
||||||
|
if (res == null) {
|
||||||
|
logger.info("当前数据无缓存:{}", id);
|
||||||
|
try{
|
||||||
|
|
||||||
|
ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
|
||||||
|
HttpMethod.GET, null, Organization.class, id);
|
||||||
|
res = responseEntity.getBody();
|
||||||
|
RedisUtils.setObj(key, res);
|
||||||
|
}catch (Exception e){
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info("当前数据为缓存数据:{}", id);
|
||||||
|
}
|
||||||
|
return (Organization) res;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
  最后修改配置文件,为 input 通道指定 topic,配置如下:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
spring:
|
||||||
|
cloud:
|
||||||
|
stream:
|
||||||
|
bindings:
|
||||||
|
input:
|
||||||
|
destination: orgChangeTopic
|
||||||
|
content-type: application/json
|
||||||
|
# 定义将要消费消息的消费者组的名称
|
||||||
|
# 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的
|
||||||
|
# 一个副本会被该组的某个实例所消费
|
||||||
|
group: licensingGroup
|
||||||
|
kafka:
|
||||||
|
binder:
|
||||||
|
zk-nodes: 192.168.226.5:2181
|
||||||
|
brokers: 192.168.226.5:9092
|
||||||
|
```
|
||||||
|
|
||||||
|
基本和发送的配置相同,只是这里是为`input`通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。
|
||||||
|
|
||||||
|
  现在来多次访问[localhost:5555/apis/licensingservice/licensingByRibbon/12](localhost:5555/apis/licensingservice/licensingByRibbon/12),可以看到 licensingservice 控制台打印数据从缓存中读取,如下所示:
|
||||||
|
|
||||||
|
![缓存](https://raw.githubusercontent.com/FleyX/files/master/blogImg/linux/spring%20cloud/20190222164125.png)
|
||||||
|
|
||||||
|
然后再以 delete 访问[localhost:5555/apis/org/organization/12](localhost:5555/apis/org/organization/12)清除缓存,再次访问 licensingservice 服务,结果如下:
|
||||||
|
|
||||||
|
![清除缓存](https://raw.githubusercontent.com/FleyX/files/master/blogImg/linux/spring%20cloud/20190222164445.png)
|
||||||
|
|
||||||
|
### 自定义通道
|
||||||
|
|
||||||
|
  上面用的是`Spring Cloud Stream`自带的 input/output 通道,那么要如何自定义通道呢?下面以自定义`customInput/customOutput`通道为例。
|
||||||
|
|
||||||
|
#### 自定义发数据通道
|
||||||
|
|
||||||
|
```java
|
||||||
|
public interface CustomOutput {
|
||||||
|
@Output("customOutput")
|
||||||
|
MessageChannel out();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
  对于每个自定义的发数据通道,需使用@OutPut 注解标记的返回 MessageChannel 类的方法。
|
||||||
|
|
||||||
|
#### 自定义收数据通道
|
||||||
|
|
||||||
|
```java
|
||||||
|
public interface CustomInput {
|
||||||
|
|
||||||
|
@Input("customInput")
|
||||||
|
SubscribableChannel in();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
  同上,对应自定义的收数据通道,需要使用@Input 注解标记的返回 SubscribableChannel 类的方法。
|
||||||
|
|
||||||
|
# 结束
|
||||||
|
|
||||||
|
  看完本篇你应该已经能够在 Spring Cloud 中集成 Spring Cloud Stream 消息队列了,貌似这个也能用到普通的 spring boot 项目中,比直接集成 mq 更加的优雅。
|
||||||
|
|
||||||
|
_2019,Fighting!_
|
||||||
|
|
||||||
|
**本篇原创发布于:**[FleyX 的个人博客](http://tapme.top/blog/detail/2019-01-03-19-19)
|
||||||
|
|
||||||
|
**本篇所用全部代码:**[FleyX 的 github](https://github.com/FleyX/demo-project/tree/master/springcloud/spring-cloud-stream%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user