新增spring cloud stream 消息队列

This commit is contained in:
fanxb 2019-02-22 17:20:41 +08:00
parent a73db3e26b
commit 284c57dbed
43 changed files with 1669 additions and 0 deletions

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fxb</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>confsvr</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Camden.SR5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,16 @@
package com.fxb.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.config.server.EnableConfigServer;
@SpringBootApplication
@EnableConfigServer
@EnableDiscoveryClient
public class ConfsvrApplication {
public static void main(String[] args) {
SpringApplication.run(ConfsvrApplication.class, args);
}
}

View File

@ -0,0 +1,29 @@
server:
port: 8888
eureka:
instance:
#注册服务的IP而不是服务器名
prefer-ip-address: true
client:
#向eureka注册服务
register-with-eureka: true
#拉取注册表的本地副本
fetch-registry: true
service-url:
#Eureka服务的位置(如果有多个注册中心,使用,分隔)
defaultZone: http://localhost:8761/eureka/
spring:
profiles:
# 使用文件系统来存储配置信息需要设置为native
active: native
application:
name: confsvr
cloud:
config:
server:
native:
# 使用文件来存放配置文件,为每个应用程序提供用逗号分隔的文件夹列表
searchLocations: file:///D:/configFolder/licensingservice,file:///D:/configFolder/organizationservice

View File

@ -0,0 +1,16 @@
package com.fxb.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ConfsvrApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@ -0,0 +1,72 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fxb</groupId>
<artifactId>eurekasvr</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>eurekasvr</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Camden.SR5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,14 @@
package com.fxb.eurekasvr;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class EurekasvrApplication {
public static void main(String[] args) {
SpringApplication.run(EurekasvrApplication.class, args);
}
}

View File

@ -0,0 +1,12 @@
server:
port: 8761
eureka:
client:
#不注册自己
register-with-eureka: false
#不在本地缓存注册表信息
fetch-registry: false
server:
#接受请求前的等待实际,开发模式下不要开启
#wait-time-in-ms-when-sync-empty: 5

View File

@ -0,0 +1,16 @@
package com.fxb.eurekasvr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class EurekasvrApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fxb</groupId>
<artifactId>licensingservice</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>licensingservice</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Camden.SR5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<!--本依赖不是必须的spring-cloud-starter-hystrix已经带了但是在Camden.SR5发行版本中使用了1.5.6,这个版本有一个不一致的地方,在
没有后备的情况下会抛出java.lang.reflect.UndeclaredThrowableException而不是com.netflix.hystrix.exception.HystrixRuntimeException,
在后续版本中修复了这个问题-->
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.9</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,32 @@
package com.fxb.licensingservice.Entity;
import java.io.Serializable;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2018/11/22 19:52
*/
public class Licensing implements Serializable {
private Organization organization;
private boolean isValid;
public Organization getOrganization() {
return organization;
}
public void setOrganization(Organization organization) {
this.organization = organization;
}
public boolean isValid() {
return isValid;
}
public void setValid(boolean valid) {
isValid = valid;
}
}

View File

@ -0,0 +1,40 @@
package com.fxb.licensingservice.Entity;
import java.io.Serializable;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2018/11/22 19:30
*/
public class Organization implements Serializable {
private String id;
private String name;
public Organization() {
}
public Organization(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,58 @@
package com.fxb.licensingservice.Entity;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 17:07
*/
public class OrganizationChange {
/**
* 动作:修改删除等
*/
private String action;
/**
* 组织id
*/
private String orgId;
/**
* 关联id
*/
private String id;
public OrganizationChange(String action, String orgId, String id) {
this.action = action;
this.orgId = orgId;
this.id = id;
}
public OrganizationChange(){}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getOrgId() {
return orgId;
}
public void setOrgId(String orgId) {
this.orgId = orgId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -0,0 +1,17 @@
package com.fxb.licensingservice.Entity;
/**
* 类功能简述用于存储传入http请求中获取的值
*
* @author fanxb
* @date 2019/2/21 16:25
*/
public class UserContext {
public static final String ID_HEADER_KEY = "id";
/**
* 用户访问管理id
*/
public String id;
}

View File

@ -0,0 +1,48 @@
package com.fxb.licensingservice;
import com.fxb.licensingservice.interceptors.UserContextInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
import java.util.Collections;
import java.util.List;
@SpringBootApplication
@EnableCircuitBreaker //告诉Spring Cloud将要使用Hystrix
@EnableDiscoveryClient
public class LicensingserviceApplication {
private Logger logger = LoggerFactory.getLogger(LicensingserviceApplication.class);
/**
* 使用带有Ribbon 功能的Spring RestTemplate
*/
@LoadBalanced
@Bean
@SuppressWarnings("unchecked")
public RestTemplate getRestTemplate(){
RestTemplate restTemplate = new RestTemplate();
//加上拦截器发出请求前加入管理id Header
List interceptors = restTemplate.getInterceptors();
if(interceptors==null){
restTemplate.setInterceptors(Collections.singletonList(new UserContextInterceptor()));
}else{
interceptors.add(new UserContextInterceptor());
}
return restTemplate;
}
public static void main(String[] args) {
SpringApplication.run(LicensingserviceApplication.class, args);
}
}

View File

@ -0,0 +1,47 @@
package com.fxb.licensingservice.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/20 17:07
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean(name = "redisTemplate")
@SuppressWarnings("all")
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(mapper);
// 设置RedisTemplate
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
RedisSerializer stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setValueSerializer(serializer);
template.setHashKeySerializer(stringSerializer);
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
}

View File

@ -0,0 +1,33 @@
package com.fxb.licensingservice.controller;
import com.fxb.licensingservice.Entity.Licensing;
import com.fxb.licensingservice.service.OrganizationByRibbonService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2018/11/22 19:51
*/
@RestController
public class LicensingController {
private OrganizationByRibbonService ribbonService;
@Autowired
public LicensingController(OrganizationByRibbonService ribbonService) {
this.ribbonService = ribbonService;
}
@GetMapping("/licensingByRibbon/{orgId}")
public Licensing getLicensingByRibbon(@PathVariable("orgId") String orgId) throws Exception {
Licensing licensing = new Licensing();
licensing.setOrganization(ribbonService.getOrganizationWithRibbon(orgId));
return licensing;
}
}

View File

@ -0,0 +1,17 @@
package com.fxb.licensingservice.event;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/22 16:53
*/
public interface CustomInput {
@Input("customInput")
SubscribableChannel in();
}

View File

@ -0,0 +1,17 @@
package com.fxb.licensingservice.event;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/22 16:53
*/
public interface CustomOutput {
@Output("customOutput")
MessageChannel out();
}

View File

@ -0,0 +1,29 @@
package com.fxb.licensingservice.event;
import com.fxb.licensingservice.Entity.OrganizationChange;
import com.fxb.licensingservice.util.RedisKeyUtils;
import com.fxb.licensingservice.util.RedisUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/22 10:33
*/
@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息
public class OrgChange {
private Logger logger = LoggerFactory.getLogger(OrgChange.class);
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChange change){
logger.info("收到一个消息组织id为{},关联id为{}",change.getOrgId(),change.getId());
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
}
}

View File

@ -0,0 +1,51 @@
package com.fxb.licensingservice.filters;
import com.fxb.licensingservice.Entity.UserContext;
import com.fxb.licensingservice.util.UserContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 16:23
*/
@Component
@WebFilter(urlPatterns = "/*", filterName="userContextFilter")
public class UserContextFilter implements Filter {
private Logger logger = LoggerFactory.getLogger(UserContextHolder.class);
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void destroy() {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
String id = request.getHeader(UserContext.ID_HEADER_KEY);
if (id == null || id.trim().length() == 0) {
UserContextHolder.remove();
} else {
UserContext context = new UserContext();
context.id = id;
UserContextHolder.setContext(context);
logger.info("当前入栈id:{}", id);
}
filterChain.doFilter(servletRequest,servletResponse);
}
}

View File

@ -0,0 +1,28 @@
package com.fxb.licensingservice.interceptors;
import com.fxb.licensingservice.Entity.UserContext;
import com.fxb.licensingservice.util.UserContextHolder;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import java.io.IOException;
/**
* 类功能简述用于发起实际http调用前在请求头中加入关联id
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 16:43
*/
public class UserContextInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpHeaders headers = httpRequest.getHeaders();
headers.add(UserContext.ID_HEADER_KEY,UserContextHolder.getContext().id);
return clientHttpRequestExecution.execute(httpRequest,bytes);
}
}

View File

@ -0,0 +1,70 @@
package com.fxb.licensingservice.service;
import com.fxb.licensingservice.Entity.Organization;
import com.fxb.licensingservice.util.RedisKeyUtils;
import com.fxb.licensingservice.util.RedisUtils;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2018/11/23 15:36
*/
@Component
public class OrganizationByRibbonService {
private RestTemplate restTemplate;
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
public OrganizationByRibbonService(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
}
@HystrixCommand(commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "2000")
}, fallbackMethod = "getOrganizationWithRibbonBackup",
threadPoolKey = "licenseByOrgThreadPool",
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "10")
})
public Organization getOrganizationWithRibbon(String id) {
String key = RedisKeyUtils.getOrgCacheKey(id);
//先从redis缓存取数据
Object res = RedisUtils.get(key);
if (res == null) {
logger.info("当前数据无缓存:{}", id);
try{
ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
HttpMethod.GET, null, Organization.class, id);
res = responseEntity.getBody();
RedisUtils.setObj(key, res);
}catch (Exception e){
e.printStackTrace();
}
} else {
logger.info("当前数据为缓存数据:{}", id);
}
return (Organization) res;
}
public Organization getOrganizationWithRibbonBackup(String id) {
Organization organization = new Organization();
organization.setId("0");
organization.setName("组织服务调用失败");
return organization;
}
}

View File

@ -0,0 +1,17 @@
package com.fxb.licensingservice.util;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/22 10:36
*/
public class RedisKeyUtils {
private static final String ORG_CACHE_PREFIX = "orgCache_";
public static String getOrgCacheKey(String orgId){
return ORG_CACHE_PREFIX+orgId;
}
}

View File

@ -0,0 +1,74 @@
package com.fxb.licensingservice.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.concurrent.TimeUnit;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 15:05
*/
@Component
@SuppressWarnings("all")
public class RedisUtils {
public static RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtils.redisTemplate = redisTemplate;
}
public static boolean setObj(String key,Object value){
return setObj(key,value,0);
}
/**
* Description:
*
* @author fanxb
* @date 2019/2/21 15:21
* @param key
* @param value
* @param time 过期时间,单位ms
* @return boolean 是否成功
*/
public static boolean setObj(String key,Object value,long time){
try{
if(time<=0){
redisTemplate.opsForValue().set(key,value);
}else{
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
public static Object get(String key){
if(key==null){
return null;
}
try{
Object obj = redisTemplate.opsForValue().get(key);
return obj;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void del(String... key){
if(key!=null && key.length>0){
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}

View File

@ -0,0 +1,33 @@
package com.fxb.licensingservice.util;
import com.fxb.licensingservice.Entity.UserContext;
import org.springframework.util.Assert;
/**
* 类功能简述存储每个传入请求id到ThreadLocal中
*
* @author fanxb
* @date 2019/2/21 16:29
*/
public class UserContextHolder {
private static final ThreadLocal<UserContext> USER_CONTEXT_HOLDER=new ThreadLocal<>();
public static UserContext getContext(){
UserContext context = USER_CONTEXT_HOLDER.get();
if(context==null){
context = new UserContext();
USER_CONTEXT_HOLDER.set(context);
}
return context;
}
public static void setContext(UserContext context){
Assert.notNull(context,"context不允许为空");
USER_CONTEXT_HOLDER.set(context);
}
public static void remove(){
USER_CONTEXT_HOLDER.remove();
}
}

View File

@ -0,0 +1,43 @@
spring:
application:
#指定名称以便spring cloud config客户端知道查找哪个配置
name: licensingservice
profiles:
#指定环境
active: dev
redis:
database: 0
host: 192.168.226.5
port: 6379
password:
timeout: 3000
pool:
max-active: 10
max-wait: -1
max-idle: 10
min-idle: 0
cloud:
config:
enabled: true
stream:
bindings:
input:
destination: orgChangeTopic
content-type: application/json
# 定义将要消费消息的消费者组的名称
# 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的
# 一个副本会被该组的某个实例所消费
group: licensingGroup
kafka:
binder:
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
eureka:
instance:
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:8761/eureka/

View File

@ -0,0 +1,16 @@
package com.fxb.licensingservice;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class LicensingserviceApplicationTests {
@Test
public void contextLoads() {
}
}

View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fxb</groupId>
<artifactId>organizationservice</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>organizationservice</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Camden.SR5</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,14 @@
package com.fxb.organizationservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@SpringBootApplication
@EnableDiscoveryClient
public class OrganizationserviceApplication {
public static void main(String[] args) {
SpringApplication.run(OrganizationserviceApplication.class, args);
}
}

View File

@ -0,0 +1,43 @@
package com.fxb.organizationservice.controller;
import com.fxb.organizationservice.events.SimpleSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2018/11/22 18:23
*/
@RestController
public class OrganizationController {
@Autowired
private SimpleSource simpleSource;
private Logger logger = LoggerFactory.getLogger(OrganizationController.class);
@GetMapping(value = "/organization/{orgId}")
public Object getOrganizationInfo(@PathVariable("orgId") String orgId) throws Exception {
Map<String, String> data = new HashMap<>(2);
data.put("id", orgId);
data.put("name", orgId + "公司");
return data;
}
@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
logger.debug("删除了组织:{}", id);
simpleSource.publishOrChange("delete", id);
}
}

View File

@ -0,0 +1,56 @@
package com.fxb.organizationservice.entity;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 17:07
*/
public class OrganizationChange {
/**
* 动作:修改删除等
*/
private String action;
/**
* 组织id
*/
private String orgId;
/**
* 关联id
*/
private String id;
public OrganizationChange(String action, String orgId,String id) {
this.action = action;
this.orgId = orgId;
this.id = id;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getOrgId() {
return orgId;
}
public void setOrgId(String orgId) {
this.orgId = orgId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@ -0,0 +1,17 @@
package com.fxb.organizationservice.entity;
/**
* 类功能简述用于存储传入http请求中获取的值
*
* @author fanxb
* @date 2019/2/21 16:25
*/
public class UserContext {
public static final String ID_HEADER_KEY = "id";
/**
* 用户访问管理id
*/
public String id;
}

View File

@ -0,0 +1,39 @@
package com.fxb.organizationservice.events;
import com.fxb.organizationservice.controller.OrganizationController;
import com.fxb.organizationservice.entity.OrganizationChange;
import com.fxb.organizationservice.utils.UserContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 16:09
*/
@EnableBinding(Source.class)
public class SimpleSource {
private Logger logger = LoggerFactory.getLogger(SimpleSource.class);
private Source source;
@Autowired
public SimpleSource(Source source) {
this.source = source;
}
public void publishOrChange(String action, String orgId) {
logger.info("在请求:{}中发送kafka消息{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
source.output().send(MessageBuilder.withPayload(change).build());
}
}

View File

@ -0,0 +1,51 @@
package com.fxb.organizationservice.filters;
import com.fxb.organizationservice.entity.UserContext;
import com.fxb.organizationservice.utils.UserContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.servlet.*;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/2/21 16:23
*/
@Component
@WebFilter(urlPatterns = "/*", filterName="userContextFilter")
public class UserContextFilter implements Filter {
private Logger logger = LoggerFactory.getLogger(UserContextHolder.class);
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void destroy() {
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) servletRequest;
String id = request.getHeader(UserContext.ID_HEADER_KEY);
if (id == null || id.trim().length() == 0) {
UserContextHolder.remove();
} else {
UserContext context = new UserContext();
context.id = id;
UserContextHolder.setContext(context);
logger.info("当前入栈id:{}", id);
}
filterChain.doFilter(servletRequest,servletResponse);
}
}

View File

@ -0,0 +1,33 @@
package com.fxb.organizationservice.utils;
import com.fxb.organizationservice.entity.UserContext;
import org.springframework.util.Assert;
/**
* 类功能简述存储每个传入请求id到ThreadLocal中
*
* @author fanxb
* @date 2019/2/21 16:29
*/
public class UserContextHolder {
private static final ThreadLocal<UserContext> USER_CONTEXT_HOLDER=new ThreadLocal<>();
public static UserContext getContext(){
UserContext context = USER_CONTEXT_HOLDER.get();
if(context==null){
context = new UserContext();
USER_CONTEXT_HOLDER.set(context);
}
return context;
}
public static void setContext(UserContext context){
Assert.notNull(context,"context不允许为空");
USER_CONTEXT_HOLDER.set(context);
}
public static void remove(){
USER_CONTEXT_HOLDER.remove();
}
}

View File

@ -0,0 +1,27 @@
spring:
application:
#指定名称以便spring cloud config客户端知道查找哪个配置
name: organizationservice
profiles:
#指定环境
active: dev
cloud:
config:
enabled: true
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
eureka:
instance:
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:8761/eureka/

View File

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fxb</groupId>
<artifactId>zuul_svr</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zuul_svr</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,18 @@
package com.fxb.zuul_svr;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
@SpringBootApplication
@EnableZuulProxy
@EnableDiscoveryClient
public class ZuulSvrApplication {
public static void main(String[] args) {
SpringApplication.run(ZuulSvrApplication.class, args);
}
}

View File

@ -0,0 +1,66 @@
package com.fxb.zuul_svr.filter;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/1/8 18:48
*/
@Component
public class IdFilter extends ZuulFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(IdFilter.class);
/**
* 返回过滤器类型 pre:前置过滤器post后置过滤器routing:路由过滤器error错误过滤器
*/
@Override
public String filterType() {
return "pre";
}
/**
* 过滤器执行顺序
*/
@Override
public int filterOrder() {
return 1;
}
/**
* 是否启动此过滤器
*/
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() {
RequestContext ctx = RequestContext.getCurrentContext();
String id = ctx.getRequest().getHeader("id");
//如果request找不到再到zuul的方法中找id.request不允许直接修改response中的header
// 所以为了让后续的过滤器能够获取到id才有下面的语法
if(id==null){
id = ctx.getZuulRequestHeaders().get("id");
}
if (id == null) {
id = UUID.randomUUID().toString();
LOGGER.info("{} 无id生成id:{}",ctx.getRequest().getRequestURI(), id);
ctx.addZuulRequestHeader("id", id);
} else {
LOGGER.info("{}存在id{}", ctx.getRequest().getRequestURI(), id);
}
return null;
}
}

View File

@ -0,0 +1,64 @@
package com.fxb.zuul_svr.filter;
import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/1/8 18:48
*/
@Component
public class ResponseFilter extends ZuulFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(ResponseFilter.class);
/**
* 返回过滤器类型 pre:前置过滤器post后置过滤器routing:路由过滤器error错误过滤器
*/
@Override
public String filterType() {
return "post";
}
/**
* 过滤器执行顺序
*/
@Override
public int filterOrder() {
return 1;
}
/**
* 是否启动此过滤器
*/
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run(){
RequestContext ctx = RequestContext.getCurrentContext();
String id = ctx.getZuulRequestHeaders().get("id");
ctx.getResponse().addHeader("id", id);
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(ctx.getResponseDataStream()));
String response = reader.readLine();
LOGGER.info("响应为:{}", response);
//写到输出流中,本来可以由zuul框架来操作但是我们已经读取了输入流zuul读不到数据了所以要手动写响应到response
ctx.getResponse().setHeader("Content-Type","application/json;charset=utf-8");
ctx.getResponse().getWriter().write(response);
} catch (Exception e) {
}
return null;
}
}

View File

@ -0,0 +1,56 @@
package com.fxb.zuul_svr.filter;
import com.netflix.zuul.ZuulFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 类功能简述
* 类功能详述
*
* @author fanxb
* @date 2019/1/8 18:48
*/
@Component
public class RouteFilter extends ZuulFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(RouteFilter.class);
/**
* 返回过滤器类型 pre:前置过滤器post后置过滤器routing:路由过滤器error错误过滤器
*/
@Override
public String filterType() {
return "routing";
}
/**
* 过滤器执行顺序
*/
@Override
public int filterOrder() {
return 1;
}
/**
* 是否启动此过滤器
*/
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run(){
/**
* 下面只写出实现思路真的完全实现下面的功能代码量较大可以参考spring 微服务实战中的实现<a href="https://github.com/carnellj/spmia-chapter6/blob/master/zuulsvr/src/main/java/com/thoughtmechanix/zuulsvr/filters/SpecialRoutesFilter.java">点击跳转</a>
*
* 1.获取当前路径
* 2.判断是否需要进行特殊路由
* 3.如需要进行特殊路由在此发起http请求
* 3.将响应写到response返回给调用者
*/
return null;
}
}

View File

@ -0,0 +1,30 @@
spring:
application:
name: zuulservice
#服务发现配置
eureka:
instance:
prefer-ip-address: true
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://localhost:8761/eureka/
server:
port: 5555
#zuul配置自定义服务路径
zuul:
routes:
#用于内部识别关键字
organizationservice: /org/**
# 使用","分隔,“*”表示全部忽略
ignored-services: organizationservice
prefix: /apis
ribbon:
readTimeOut: false
eureka:
#禁用Eureka支持
enabled: true

View File

@ -0,0 +1,17 @@
package com.fxb.zuul_svr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ZuulSvrApplicationTests {
@Test
public void contextLoads() {
}
}