读写分离
This commit is contained in:
parent
68cd5d89c1
commit
bb64ddf34b
@ -1,48 +1,48 @@
|
||||
  前面的博客有说到spring boot搭建见另一篇博文,其实那篇博文还没写,现在来填个坑。我们使用spring initializr来构建,idea和eclipse都支持这种方式,构建过程类似,这里以idea为例,详细记录构建过程。
|
||||
|
||||
###1.选择spring initializr
|
||||
|
||||
![1532967570728](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967570728.png)
|
||||
|
||||
next
|
||||
|
||||
#### 2.设置参数
|
||||
|
||||
![1532967772110](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967772110.png)
|
||||
|
||||
next
|
||||
|
||||
#### 3.选择依赖
|
||||
|
||||
  在这里选择spring boot版本和web依赖(忽略sql的依赖,如有需要[点击这里](f),单独将mybatis的整合),后面也可手动编辑pom文件修改增加删除依赖
|
||||
|
||||
![1532967938985](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967938985.png)
|
||||
|
||||
这里我们选择web搭建一个简单的REST风格demo。然后next。
|
||||
|
||||
####4.设置项目存放地址
|
||||
|
||||
![1532968024509](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532968024509.png)
|
||||
|
||||
这样就成功构建了一个springboot项目。
|
||||
|
||||
#### 5.测试
|
||||
|
||||
  现在新建一个controller包,包下新建一个HelloController,创建之后项目目录结构如下:
|
||||
|
||||
![1532969025023](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532969025023.png)
|
||||
|
||||
HelloController代码如下:
|
||||
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/home")
|
||||
public class HelloController{
|
||||
@GetMapping("/hello")
|
||||
public String sayHello(){
|
||||
return "hello";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
  前面的博客有说到spring boot搭建见另一篇博文,其实那篇博文还没写,现在来填个坑。我们使用spring initializr来构建,idea和eclipse都支持这种方式,构建过程类似,这里以idea为例,详细记录构建过程。
|
||||
|
||||
###1.选择spring initializr
|
||||
|
||||
![1532967570728](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967570728.png)
|
||||
|
||||
next
|
||||
|
||||
#### 2.设置参数
|
||||
|
||||
![1532967772110](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967772110.png)
|
||||
|
||||
next
|
||||
|
||||
#### 3.选择依赖
|
||||
|
||||
  在这里选择spring boot版本和web依赖(忽略sql的依赖,如有需要[点击这里](f),单独将mybatis的整合),后面也可手动编辑pom文件修改增加删除依赖
|
||||
|
||||
![1532967938985](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967938985.png)
|
||||
|
||||
这里我们选择web搭建一个简单的REST风格demo。然后next。
|
||||
|
||||
####4.设置项目存放地址
|
||||
|
||||
![1532968024509](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532968024509.png)
|
||||
|
||||
这样就成功构建了一个springboot项目。
|
||||
|
||||
#### 5.测试
|
||||
|
||||
  现在新建一个controller包,包下新建一个HelloController,创建之后项目目录结构如下:
|
||||
|
||||
![1532969025023](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532969025023.png)
|
||||
|
||||
HelloController代码如下:
|
||||
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/home")
|
||||
public class HelloController{
|
||||
@GetMapping("/hello")
|
||||
public String sayHello(){
|
||||
return "hello";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
然后运行项目,访问localhost:8080/home/hello即可看到hello字符串。
|
@ -1,48 +1,48 @@
|
||||
  前面的博客有说到spring boot搭建见另一篇博文,其实那篇博文还没写,现在来填个坑。我们使用spring initializr来构建,idea和eclipse都支持这种方式,构建过程类似,这里以idea为例,详细记录构建过程。
|
||||
|
||||
###1.选择spring initializr
|
||||
|
||||
![1532967570728](.\springboot搭建.assets\1532967570728.png)
|
||||
|
||||
next
|
||||
|
||||
#### 2.设置参数
|
||||
|
||||
![1532967772110](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967772110.png)
|
||||
|
||||
next
|
||||
|
||||
#### 3.选择依赖
|
||||
|
||||
  在这里选择spring boot版本和web依赖(忽略sql的依赖,如有需要[点击这里](f),单独将mybatis的整合),后面也可手动编辑pom文件修改增加删除依赖
|
||||
|
||||
![1532967938985](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967938985.png)
|
||||
|
||||
这里我们选择web搭建一个简单的REST风格demo。然后next。
|
||||
|
||||
####4.设置项目存放地址
|
||||
|
||||
![1532968024509](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532968024509.png)
|
||||
|
||||
这样就成功构建了一个springboot项目。
|
||||
|
||||
#### 5.测试
|
||||
|
||||
  现在新建一个controller包,包下新建一个HelloController,创建之后项目目录结构如下:
|
||||
|
||||
![1532969025023](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532969025023.png)
|
||||
|
||||
HelloController代码如下:
|
||||
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/home")
|
||||
public class HelloController{
|
||||
@GetMapping("/hello")
|
||||
public String sayHello(){
|
||||
return "hello";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
然后运行项目,访问localhost:8080/home/hello即可看到hello字符串。
|
||||
  前面的博客有说到spring boot搭建见另一篇博文,其实那篇博文还没写,现在来填个坑。我们使用spring initializr来构建,idea和eclipse都支持这种方式,构建过程类似,这里以idea为例,详细记录构建过程。
|
||||
|
||||
###1.选择spring initializr
|
||||
|
||||
![1532967570728](.\springboot搭建.assets\1532967570728.png)
|
||||
|
||||
next
|
||||
|
||||
#### 2.设置参数
|
||||
|
||||
![1532967772110](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967772110.png)
|
||||
|
||||
next
|
||||
|
||||
#### 3.选择依赖
|
||||
|
||||
  在这里选择spring boot版本和web依赖(忽略sql的依赖,如有需要[点击这里](f),单独将mybatis的整合),后面也可手动编辑pom文件修改增加删除依赖
|
||||
|
||||
![1532967938985](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532967938985.png)
|
||||
|
||||
这里我们选择web搭建一个简单的REST风格demo。然后next。
|
||||
|
||||
####4.设置项目存放地址
|
||||
|
||||
![1532968024509](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532968024509.png)
|
||||
|
||||
这样就成功构建了一个springboot项目。
|
||||
|
||||
#### 5.测试
|
||||
|
||||
  现在新建一个controller包,包下新建一个HelloController,创建之后项目目录结构如下:
|
||||
|
||||
![1532969025023](D:\笔记\markdown\springboot系列\springboot搭建.assets\1532969025023.png)
|
||||
|
||||
HelloController代码如下:
|
||||
|
||||
```java
|
||||
@RestController
|
||||
@RequestMapping("/home")
|
||||
public class HelloController{
|
||||
@GetMapping("/hello")
|
||||
public String sayHello(){
|
||||
return "hello";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
然后运行项目,访问localhost:8080/home/hello即可看到hello字符串。
|
||||
|
File diff suppressed because one or more lines are too long
@ -1,89 +1,89 @@
|
||||
  紧接着上一篇,上一篇中登录验证都由security帮助我们完成了,如果我们想要增加一个验证码登录或者其它的自定义校验就没办法了,因此这一篇讲解如何实现这个功能。
|
||||
|
||||
##一、 实现自定义登录校验类
|
||||
|
||||
  继承UsernamePasswordAuthenticationFilter类来拓展登录校验,代码如下:
|
||||
```java
|
||||
public class MyUsernamePasswordAuthentication extends UsernamePasswordAuthenticationFilter{
|
||||
|
||||
private Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
public Authentication attemptAuthentication(HttpServletRequest request, HttpServletResponse response)
|
||||
throws AuthenticationException {
|
||||
//我们可以在这里进行额外的验证,如果验证失败抛出继承AuthenticationException的自定义错误。
|
||||
log.info("在这里进行验证码判断");
|
||||
//只要最终的验证是账号密码形式就无需修改后续过程
|
||||
return super.attemptAuthentication(request, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAuthenticationManager(AuthenticationManager authenticationManager) {
|
||||
// TODO Auto-generated method stub
|
||||
super.setAuthenticationManager(authenticationManager);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
##二、 将自定义登录配置到security中
|
||||
  编写自定义登录过滤器后,configure Bean修改为如下:
|
||||
```java
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http
|
||||
.csrf() //跨站
|
||||
.disable() //关闭跨站检测
|
||||
//自定义鉴权过程,无需下面设置
|
||||
.authorizeRequests()//验证策略
|
||||
.antMatchers("/public/**").permitAll()//无需验证路径
|
||||
.antMatchers("/user/**").permitAll()
|
||||
.antMatchers("/login").permitAll()//放行登录
|
||||
.antMatchers(HttpMethod.GET, "/user").hasAuthority("getAllUser")//拥有权限才可访问
|
||||
.antMatchers(HttpMethod.GET, "/user").hasAnyAuthority("1","2")//拥有任一权限即可访问
|
||||
//角色类似,hasRole(),hasAnyRole()
|
||||
.anyRequest().authenticated()
|
||||
.and()
|
||||
//自定义异常处理
|
||||
.exceptionHandling()
|
||||
.authenticationEntryPoint(myAuthenticationEntryPoint)//未登录处理
|
||||
.accessDeniedHandler(myAccessDeniedHandler)//权限不足处理
|
||||
.and()
|
||||
//加入自定义登录校验
|
||||
.addFilterBefore(myUsernamePasswordAuthentication(),UsernamePasswordAuthenticationFilter.class)
|
||||
.rememberMe()//默认放在内存中
|
||||
.rememberMeServices(rememberMeServices())
|
||||
.key("INTERNAL_SECRET_KEY")
|
||||
// 重写usernamepasswordauthenticationFilter后,下面的formLogin()设置将失效,需要手动设置到个性化过滤器中
|
||||
// .and()
|
||||
// .formLogin()
|
||||
// .loginPage("/public/unlogin") //未登录跳转页面,设置了authenticationentrypoint后无需设置未登录跳转页面
|
||||
// .loginProcessingUrl("/public/login")//登录api
|
||||
// .successForwardUrl("/success")
|
||||
// .failureForwardUrl("/failed")
|
||||
// .usernameParameter("id")
|
||||
// .passwordParameter("password")
|
||||
// .failureHandler(myAuthFailedHandle) //登录失败处理
|
||||
// .successHandler(myAuthSuccessHandle)//登录成功处理
|
||||
// .usernameParameter("id")
|
||||
.and()
|
||||
.logout()//自定义登出
|
||||
.logoutUrl("/public/logout")
|
||||
.logoutSuccessUrl("public/logoutSuccess")
|
||||
.logoutSuccessHandler(myLogoutSuccessHandle);
|
||||
}
|
||||
```
|
||||
然后再编写Bean,代码如下:
|
||||
```java
|
||||
@Bean
|
||||
public MyUsernamePasswordAuthentication myUsernamePasswordAuthentication(){
|
||||
MyUsernamePasswordAuthentication myUsernamePasswordAuthentication = new MyUsernamePasswordAuthentication();
|
||||
myUsernamePasswordAuthentication.setAuthenticationFailureHandler(myAuthFailedHandle); //设置登录失败处理类
|
||||
myUsernamePasswordAuthentication.setAuthenticationSuccessHandler(myAuthSuccessHandle);//设置登录成功处理类
|
||||
myUsernamePasswordAuthentication.setFilterProcessesUrl("/public/login");
|
||||
myUsernamePasswordAuthentication.setRememberMeServices(rememberMeServices()); //设置记住我
|
||||
myUsernamePasswordAuthentication.setUsernameParameter("id");
|
||||
myUsernamePasswordAuthentication.setPasswordParameter("password");
|
||||
return myUsernamePasswordAuthentication;
|
||||
}
|
||||
```
|
||||
  紧接着上一篇,上一篇中登录验证都由security帮助我们完成了,如果我们想要增加一个验证码登录或者其它的自定义校验就没办法了,因此这一篇讲解如何实现这个功能。
|
||||
|
||||
##一、 实现自定义登录校验类
|
||||
|
||||
  继承UsernamePasswordAuthenticationFilter类来拓展登录校验,代码如下:
|
||||
```java
|
||||
public class MyUsernamePasswordAuthentication extends UsernamePasswordAuthenticationFilter{
|
||||
|
||||
private Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
public Authentication attemptAuthentication(HttpServletRequest request, HttpServletResponse response)
|
||||
throws AuthenticationException {
|
||||
//我们可以在这里进行额外的验证,如果验证失败抛出继承AuthenticationException的自定义错误。
|
||||
log.info("在这里进行验证码判断");
|
||||
//只要最终的验证是账号密码形式就无需修改后续过程
|
||||
return super.attemptAuthentication(request, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAuthenticationManager(AuthenticationManager authenticationManager) {
|
||||
// TODO Auto-generated method stub
|
||||
super.setAuthenticationManager(authenticationManager);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
##二、 将自定义登录配置到security中
|
||||
  编写自定义登录过滤器后,configure Bean修改为如下:
|
||||
```java
|
||||
@Override
|
||||
protected void configure(HttpSecurity http) throws Exception {
|
||||
http
|
||||
.csrf() //跨站
|
||||
.disable() //关闭跨站检测
|
||||
//自定义鉴权过程,无需下面设置
|
||||
.authorizeRequests()//验证策略
|
||||
.antMatchers("/public/**").permitAll()//无需验证路径
|
||||
.antMatchers("/user/**").permitAll()
|
||||
.antMatchers("/login").permitAll()//放行登录
|
||||
.antMatchers(HttpMethod.GET, "/user").hasAuthority("getAllUser")//拥有权限才可访问
|
||||
.antMatchers(HttpMethod.GET, "/user").hasAnyAuthority("1","2")//拥有任一权限即可访问
|
||||
//角色类似,hasRole(),hasAnyRole()
|
||||
.anyRequest().authenticated()
|
||||
.and()
|
||||
//自定义异常处理
|
||||
.exceptionHandling()
|
||||
.authenticationEntryPoint(myAuthenticationEntryPoint)//未登录处理
|
||||
.accessDeniedHandler(myAccessDeniedHandler)//权限不足处理
|
||||
.and()
|
||||
//加入自定义登录校验
|
||||
.addFilterBefore(myUsernamePasswordAuthentication(),UsernamePasswordAuthenticationFilter.class)
|
||||
.rememberMe()//默认放在内存中
|
||||
.rememberMeServices(rememberMeServices())
|
||||
.key("INTERNAL_SECRET_KEY")
|
||||
// 重写usernamepasswordauthenticationFilter后,下面的formLogin()设置将失效,需要手动设置到个性化过滤器中
|
||||
// .and()
|
||||
// .formLogin()
|
||||
// .loginPage("/public/unlogin") //未登录跳转页面,设置了authenticationentrypoint后无需设置未登录跳转页面
|
||||
// .loginProcessingUrl("/public/login")//登录api
|
||||
// .successForwardUrl("/success")
|
||||
// .failureForwardUrl("/failed")
|
||||
// .usernameParameter("id")
|
||||
// .passwordParameter("password")
|
||||
// .failureHandler(myAuthFailedHandle) //登录失败处理
|
||||
// .successHandler(myAuthSuccessHandle)//登录成功处理
|
||||
// .usernameParameter("id")
|
||||
.and()
|
||||
.logout()//自定义登出
|
||||
.logoutUrl("/public/logout")
|
||||
.logoutSuccessUrl("public/logoutSuccess")
|
||||
.logoutSuccessHandler(myLogoutSuccessHandle);
|
||||
}
|
||||
```
|
||||
然后再编写Bean,代码如下:
|
||||
```java
|
||||
@Bean
|
||||
public MyUsernamePasswordAuthentication myUsernamePasswordAuthentication(){
|
||||
MyUsernamePasswordAuthentication myUsernamePasswordAuthentication = new MyUsernamePasswordAuthentication();
|
||||
myUsernamePasswordAuthentication.setAuthenticationFailureHandler(myAuthFailedHandle); //设置登录失败处理类
|
||||
myUsernamePasswordAuthentication.setAuthenticationSuccessHandler(myAuthSuccessHandle);//设置登录成功处理类
|
||||
myUsernamePasswordAuthentication.setFilterProcessesUrl("/public/login");
|
||||
myUsernamePasswordAuthentication.setRememberMeServices(rememberMeServices()); //设置记住我
|
||||
myUsernamePasswordAuthentication.setUsernameParameter("id");
|
||||
myUsernamePasswordAuthentication.setPasswordParameter("password");
|
||||
return myUsernamePasswordAuthentication;
|
||||
}
|
||||
```
|
||||
完成。
|
File diff suppressed because one or more lines are too long
@ -1,205 +1,205 @@
|
||||
<h3 id="#一、背景">一、背景</h3>
|
||||
|
||||
  我们都知道http协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息。想要实现浏览器的主动推送有两种主流实现方式:
|
||||
|
||||
- 轮询:缺点很多,但是实现简单
|
||||
- websocket:在浏览器和服务器之间建立tcp连接,实现全双工通信
|
||||
|
||||
  springboot使用websocket有两种方式,一种是实现简单的websocket,另外一种是实现**STOMP**协议。这一篇实现简单的websocket,STOMP下一篇在讲。
|
||||
|
||||
**注意:如下都是针对使用springboot内置容器**
|
||||
|
||||
<h3 id="二、实现">二、实现</h3>
|
||||
|
||||
<h4 id="1、依赖引入">1、依赖引入</h4>
|
||||
|
||||
  要使用websocket关键是`@ServerEndpoint`这个注解,该注解是javaee标准中的注解,tomcat7及以上已经实现了,如果使用传统方法将war包部署到tomcat中,只需要引入如下javaee标准依赖即可:
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>javax</groupId>
|
||||
<artifactId>javaee-api</artifactId>
|
||||
<version>7.0</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
```
|
||||
如使用springboot内置容器,无需引入,springboot已经做了包含。我们只需引入如下依赖即可:
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
<version>1.5.3.RELEASE</version>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
<h4 id="2、注入Bean">2、注入Bean</h4>
|
||||
|
||||
  首先注入一个**ServerEndpointExporter**Bean,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint。代码如下:
|
||||
```java
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
@Bean
|
||||
public ServerEndpointExporter serverEndpointExporter(){
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<h4 id="3、申明endpoint">3、申明endpoint</h4>
|
||||
|
||||
  建立**MyWebSocket.java**类,在该类中处理websocket逻辑
|
||||
```java
|
||||
@ServerEndpoint(value = "/websocket") //接受websocket请求路径
|
||||
@Component //注册到spring容器中
|
||||
public class MyWebSocket {
|
||||
|
||||
|
||||
//保存所有在线socket连接
|
||||
private static Map<String,MyWebSocket> webSocketMap = new LinkedHashMap<>();
|
||||
|
||||
//记录当前在线数目
|
||||
private static int count=0;
|
||||
|
||||
//当前连接(每个websocket连入都会创建一个MyWebSocket实例
|
||||
private Session session;
|
||||
|
||||
private Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
//处理连接建立
|
||||
@OnOpen
|
||||
public void onOpen(Session session){
|
||||
this.session=session;
|
||||
webSocketMap.put(session.getId(),this);
|
||||
addCount();
|
||||
log.info("新的连接加入:{}",session.getId());
|
||||
}
|
||||
|
||||
//接受消息
|
||||
@OnMessage
|
||||
public void onMessage(String message,Session session){
|
||||
log.info("收到客户端{}消息:{}",session.getId(),message);
|
||||
try{
|
||||
this.sendMessage("收到消息:"+message);
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
//处理错误
|
||||
@OnError
|
||||
public void onError(Throwable error,Session session){
|
||||
log.info("发生错误{},{}",session.getId(),error.getMessage());
|
||||
}
|
||||
|
||||
//处理连接关闭
|
||||
@OnClose
|
||||
public void onClose(){
|
||||
webSocketMap.remove(this.session.getId());
|
||||
reduceCount();
|
||||
log.info("连接关闭:{}",this.session.getId());
|
||||
}
|
||||
|
||||
//群发消息
|
||||
|
||||
//发送消息
|
||||
public void sendMessage(String message) throws IOException {
|
||||
this.session.getBasicRemote().sendText(message);
|
||||
}
|
||||
|
||||
//广播消息
|
||||
public static void broadcast(){
|
||||
MyWebSocket.webSocketMap.forEach((k,v)->{
|
||||
try{
|
||||
v.sendMessage("这是一条测试广播");
|
||||
}catch (Exception e){
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//获取在线连接数目
|
||||
public static int getCount(){
|
||||
return count;
|
||||
}
|
||||
|
||||
//操作count,使用synchronized确保线程安全
|
||||
public static synchronized void addCount(){
|
||||
MyWebSocket.count++;
|
||||
}
|
||||
|
||||
public static synchronized void reduceCount(){
|
||||
MyWebSocket.count--;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<h4 id="4、客户的实现">4、客户的实现</h4>
|
||||
|
||||
  客户端使用h5原生websocket,部分浏览器可能不支持。代码如下:
|
||||
```html
|
||||
<html>
|
||||
|
||||
<head>
|
||||
<title>websocket测试</title>
|
||||
<meta charset="utf-8">
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<button onclick="sendMessage()">测试</button>
|
||||
<script>
|
||||
let socket = new WebSocket("ws://localhost:8080/websocket");
|
||||
socket.onerror = err => {
|
||||
console.log(err);
|
||||
};
|
||||
socket.onopen = event => {
|
||||
console.log(event);
|
||||
};
|
||||
socket.onmessage = mess => {
|
||||
console.log(mess);
|
||||
};
|
||||
socket.onclose = () => {
|
||||
console.log("连接关闭");
|
||||
};
|
||||
|
||||
function sendMessage() {
|
||||
if (socket.readyState === 1)
|
||||
socket.send("这是一个测试数据");
|
||||
else
|
||||
alert("尚未建立websocket连接");
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
```
|
||||
|
||||
<h3 id="三、测试">三、测试</h3>
|
||||
|
||||
  建立一个controller测试群发,代码如下:
|
||||
```java
|
||||
@RestController
|
||||
public class HomeController {
|
||||
|
||||
@GetMapping("/broadcast")
|
||||
public void broadcast(){
|
||||
MyWebSocket.broadcast();
|
||||
}
|
||||
}
|
||||
```
|
||||
然后打开上面的html,可以看到浏览器和服务器都输出连接成功的信息:
|
||||
```
|
||||
浏览器:
|
||||
Event {isTrusted: true, type: "open", target: WebSocket, currentTarget: WebSocket, eventPhase: 2, …}
|
||||
|
||||
服务端:
|
||||
2018-08-01 14:05:34.727 INFO 12708 --- [nio-8080-exec-1] com.fxb.h5websocket.MyWebSocket : 新的连接加入:0
|
||||
```
|
||||
点击测试按钮,可在服务端看到如下输出:
|
||||
```
|
||||
2018-08-01 15:00:34.644 INFO 12708 --- [nio-8080-exec-6] com.fxb.h5websocket.MyWebSocket : 收到客户端2消息:这是一个测试数据
|
||||
```
|
||||
再次打开html页面,这样就有两个websocket客户端,然后在浏览器访问[localhost:8080/broadcast](localhost:8080/broadcast)测试群发功能,每个客户端都会输出如下信息:
|
||||
```
|
||||
MessageEvent {isTrusted: true, data: "这是一条测试广播", origin: "ws://localhost:8080", lastEventId: "", source: null, …}
|
||||
```
|
||||
<br/>
|
||||
  源码可在[github]()上下载,记得点赞,star哦
|
||||
|
||||
<h3 id="#一、背景">一、背景</h3>
|
||||
|
||||
  我们都知道http协议只能浏览器单方面向服务器发起请求获得响应,服务器不能主动向浏览器推送消息。想要实现浏览器的主动推送有两种主流实现方式:
|
||||
|
||||
- 轮询:缺点很多,但是实现简单
|
||||
- websocket:在浏览器和服务器之间建立tcp连接,实现全双工通信
|
||||
|
||||
  springboot使用websocket有两种方式,一种是实现简单的websocket,另外一种是实现**STOMP**协议。这一篇实现简单的websocket,STOMP下一篇在讲。
|
||||
|
||||
**注意:如下都是针对使用springboot内置容器**
|
||||
|
||||
<h3 id="二、实现">二、实现</h3>
|
||||
|
||||
<h4 id="1、依赖引入">1、依赖引入</h4>
|
||||
|
||||
  要使用websocket关键是`@ServerEndpoint`这个注解,该注解是javaee标准中的注解,tomcat7及以上已经实现了,如果使用传统方法将war包部署到tomcat中,只需要引入如下javaee标准依赖即可:
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>javax</groupId>
|
||||
<artifactId>javaee-api</artifactId>
|
||||
<version>7.0</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
```
|
||||
如使用springboot内置容器,无需引入,springboot已经做了包含。我们只需引入如下依赖即可:
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-websocket</artifactId>
|
||||
<version>1.5.3.RELEASE</version>
|
||||
<type>pom</type>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
<h4 id="2、注入Bean">2、注入Bean</h4>
|
||||
|
||||
  首先注入一个**ServerEndpointExporter**Bean,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint。代码如下:
|
||||
```java
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
@Bean
|
||||
public ServerEndpointExporter serverEndpointExporter(){
|
||||
return new ServerEndpointExporter();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<h4 id="3、申明endpoint">3、申明endpoint</h4>
|
||||
|
||||
  建立**MyWebSocket.java**类,在该类中处理websocket逻辑
|
||||
```java
|
||||
@ServerEndpoint(value = "/websocket") //接受websocket请求路径
|
||||
@Component //注册到spring容器中
|
||||
public class MyWebSocket {
|
||||
|
||||
|
||||
//保存所有在线socket连接
|
||||
private static Map<String,MyWebSocket> webSocketMap = new LinkedHashMap<>();
|
||||
|
||||
//记录当前在线数目
|
||||
private static int count=0;
|
||||
|
||||
//当前连接(每个websocket连入都会创建一个MyWebSocket实例
|
||||
private Session session;
|
||||
|
||||
private Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
//处理连接建立
|
||||
@OnOpen
|
||||
public void onOpen(Session session){
|
||||
this.session=session;
|
||||
webSocketMap.put(session.getId(),this);
|
||||
addCount();
|
||||
log.info("新的连接加入:{}",session.getId());
|
||||
}
|
||||
|
||||
//接受消息
|
||||
@OnMessage
|
||||
public void onMessage(String message,Session session){
|
||||
log.info("收到客户端{}消息:{}",session.getId(),message);
|
||||
try{
|
||||
this.sendMessage("收到消息:"+message);
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
//处理错误
|
||||
@OnError
|
||||
public void onError(Throwable error,Session session){
|
||||
log.info("发生错误{},{}",session.getId(),error.getMessage());
|
||||
}
|
||||
|
||||
//处理连接关闭
|
||||
@OnClose
|
||||
public void onClose(){
|
||||
webSocketMap.remove(this.session.getId());
|
||||
reduceCount();
|
||||
log.info("连接关闭:{}",this.session.getId());
|
||||
}
|
||||
|
||||
//群发消息
|
||||
|
||||
//发送消息
|
||||
public void sendMessage(String message) throws IOException {
|
||||
this.session.getBasicRemote().sendText(message);
|
||||
}
|
||||
|
||||
//广播消息
|
||||
public static void broadcast(){
|
||||
MyWebSocket.webSocketMap.forEach((k,v)->{
|
||||
try{
|
||||
v.sendMessage("这是一条测试广播");
|
||||
}catch (Exception e){
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
//获取在线连接数目
|
||||
public static int getCount(){
|
||||
return count;
|
||||
}
|
||||
|
||||
//操作count,使用synchronized确保线程安全
|
||||
public static synchronized void addCount(){
|
||||
MyWebSocket.count++;
|
||||
}
|
||||
|
||||
public static synchronized void reduceCount(){
|
||||
MyWebSocket.count--;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<h4 id="4、客户的实现">4、客户的实现</h4>
|
||||
|
||||
  客户端使用h5原生websocket,部分浏览器可能不支持。代码如下:
|
||||
```html
|
||||
<html>
|
||||
|
||||
<head>
|
||||
<title>websocket测试</title>
|
||||
<meta charset="utf-8">
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<button onclick="sendMessage()">测试</button>
|
||||
<script>
|
||||
let socket = new WebSocket("ws://localhost:8080/websocket");
|
||||
socket.onerror = err => {
|
||||
console.log(err);
|
||||
};
|
||||
socket.onopen = event => {
|
||||
console.log(event);
|
||||
};
|
||||
socket.onmessage = mess => {
|
||||
console.log(mess);
|
||||
};
|
||||
socket.onclose = () => {
|
||||
console.log("连接关闭");
|
||||
};
|
||||
|
||||
function sendMessage() {
|
||||
if (socket.readyState === 1)
|
||||
socket.send("这是一个测试数据");
|
||||
else
|
||||
alert("尚未建立websocket连接");
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
|
||||
</html>
|
||||
```
|
||||
|
||||
<h3 id="三、测试">三、测试</h3>
|
||||
|
||||
  建立一个controller测试群发,代码如下:
|
||||
```java
|
||||
@RestController
|
||||
public class HomeController {
|
||||
|
||||
@GetMapping("/broadcast")
|
||||
public void broadcast(){
|
||||
MyWebSocket.broadcast();
|
||||
}
|
||||
}
|
||||
```
|
||||
然后打开上面的html,可以看到浏览器和服务器都输出连接成功的信息:
|
||||
```
|
||||
浏览器:
|
||||
Event {isTrusted: true, type: "open", target: WebSocket, currentTarget: WebSocket, eventPhase: 2, …}
|
||||
|
||||
服务端:
|
||||
2018-08-01 14:05:34.727 INFO 12708 --- [nio-8080-exec-1] com.fxb.h5websocket.MyWebSocket : 新的连接加入:0
|
||||
```
|
||||
点击测试按钮,可在服务端看到如下输出:
|
||||
```
|
||||
2018-08-01 15:00:34.644 INFO 12708 --- [nio-8080-exec-6] com.fxb.h5websocket.MyWebSocket : 收到客户端2消息:这是一个测试数据
|
||||
```
|
||||
再次打开html页面,这样就有两个websocket客户端,然后在浏览器访问[localhost:8080/broadcast](localhost:8080/broadcast)测试群发功能,每个客户端都会输出如下信息:
|
||||
```
|
||||
MessageEvent {isTrusted: true, data: "这是一条测试广播", origin: "ws://localhost:8080", lastEventId: "", source: null, …}
|
||||
```
|
||||
<br/>
|
||||
  源码可在[github]()上下载,记得点赞,star哦
|
||||
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -1,153 +1,153 @@
|
||||
  单个MQ节点总是不可靠的,一旦该节点出现故障,MQ服务就不可用了,势必会产生较大的损失。这里记录activeMQ如何开启主从备份,一旦master(主节点故障),slave(从节点)立即提供服务,实现原理是运行多个MQ使用同一个持久化数据源,这里以jdbc数据源为例。同一时间只有一个节点(节点A)能够抢到数据库的表锁,其他节点进入阻塞状态,一旦A发生错误崩溃,其他节点就会重新获取表锁,获取到锁的节点成为master,其他节点为slave,如果节点A重新启动,也将成为slave。
|
||||
|
||||
主从备份解决了单节点故障的问题,但是同一时间提供服务的只有一个master,显然是不能面对数据量的增长,所以需要一种横向拓展的集群方式来解决面临的问题。
|
||||
|
||||
### 一、activeMQ设置
|
||||
|
||||
#### 1、平台版本说明:
|
||||
|
||||
- 平台:windows
|
||||
- activeMQ版本:5.9.1,[下载地址](https://www.apache.org/dist/activemq/5.9.1/apache-activemq-5.9.1-bin.zip.asc)
|
||||
- jdk版本:1.8
|
||||
|
||||
#### 2、下载jdbc依赖
|
||||
|
||||
  下载下面三个依赖包,放到activeMQ安装目录下的lib文件夹中。
|
||||
|
||||
[mysql驱动](http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar)
|
||||
|
||||
[dhcp依赖](http://central.maven.org/maven2/org/apache/commons/commons-dbcp2/2.1.1/commons-dbcp2-2.1.1.jar)
|
||||
|
||||
[commons-pool2依赖](http://maven.aliyun.com/nexus/service/local/artifact/maven/redirect?r=jcenter&g=org.apache.commons&a=commons-pool2&v=2.6.0&e=jar)
|
||||
|
||||
###二、主从备份
|
||||
|
||||
####1、修改jettty
|
||||
|
||||
  首先修改conf->jetty.xml,这里是修改activemq的web管理端口,管理界面账号密码默认为admin/admin
|
||||
|
||||
```xml
|
||||
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
|
||||
<!-- the default port number for the web console -->
|
||||
<property name="port" value="8161"/>
|
||||
</bean>
|
||||
```
|
||||
|
||||
####2、修改activemq.xml
|
||||
|
||||
  然后修改conf->activemq.xml
|
||||
|
||||
- 设置连接方式
|
||||
|
||||
默认是下面五种连接方式都打开,这里我们只要tcp,把其他的都注释掉,然后在这里设置activemq的服务端口,可以看到每种连接方式都对应一个端口。
|
||||
|
||||
```xml
|
||||
<transportConnectors>
|
||||
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
|
||||
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
|
||||
</transportConnectors>
|
||||
```
|
||||
|
||||
|
||||
|
||||
- 设置jdbc数据库
|
||||
|
||||
mysql数据库中创建activemq库,在`broker`标签的下面也就是根标签`beans`的下一级创建一个bean节点,内容如下:
|
||||
|
||||
```xml
|
||||
<bean id="mysql-qs" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
|
||||
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
|
||||
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
|
||||
<property name="username" value="root"/>
|
||||
<property name="password" value="123456"/>
|
||||
<property name="poolPreparedStatements" value="true"/>
|
||||
</bean>
|
||||
```
|
||||
|
||||
- 设置数据源
|
||||
|
||||
首先修改broker节点,设置name和persistent(默认为true),也可不做修改,修改后如下:
|
||||
|
||||
```xml
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq1" persistent="true" dataDirectory="${activemq.data}">
|
||||
```
|
||||
|
||||
然后设置持久化方式,使用到我们之前设置的mysql-qs
|
||||
|
||||
```xml
|
||||
<persistenceAdapter>
|
||||
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
|
||||
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data" dataSource="#mysql-qs"/>
|
||||
</persistenceAdapter>
|
||||
```
|
||||
|
||||
#### 3、启动
|
||||
|
||||
  设置完毕后启动activemq(双击bin中的acitveMQ.jar),启动完成后可以看到如下日志信息:
|
||||
|
||||
```verilog
|
||||
INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
INFO | Attempting to acquire the exclusive lock to become the Master broker
|
||||
INFO | Becoming the master on dataSource: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
```
|
||||
|
||||
接着我们修改一下tcp服务端口,改为61617,然后重新启动,日志信息如下:
|
||||
|
||||
```verilog
|
||||
INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
INFO | Attempting to acquire the exclusive lock to become the Master broker
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
```
|
||||
|
||||
可以看到从节点一直在尝试获取表锁成为主节点,这样一旦主节点失效,从节点能够立刻取代主节点提供服务。这样我们便实现了主从备份。
|
||||
|
||||
### 三、负载均衡
|
||||
|
||||
  activemq可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当一条消息发送到brokerA的队列test中,有一个消费者连上了brokerB,并且想要获取test队列,brokerA中的test队列就会路由到brokerB上。
|
||||
|
||||
   开启负载均衡需要设置`networkConnectors`节点,静态路由配置如下:
|
||||
|
||||
```xml
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:failover://(tcp://localhost:61616,tcp://localhost:61617)" duplex="false"/>
|
||||
</networkConnectors>
|
||||
```
|
||||
|
||||
brokerA和brokerB都要设置该配置,以连上对方。
|
||||
|
||||
### 四、测试
|
||||
|
||||
####1、建立mq
|
||||
|
||||
  组建两组broker,每组做主从配置。
|
||||
|
||||
- brokerA:
|
||||
- 主:设置web管理端口8761,设置mq名称`mq`,设置数据库地址为activemq,设置tcp服务端口61616,设置负载均衡静态路由`static:failover://(tcp://localhost:61618,tcp://localhost:61619)`,然后启动
|
||||
- 从:上面的基础上修改tcp服务端口为61617,然后启动
|
||||
- brokerB:
|
||||
- 主:设置web管理端口8762,设置mq名称`mq1`,设置数据库地址activemq1,设置tcp服务端口61618,设置负载均衡静态路由`static:failover://(tcp://localhost:61616,tcp://localhost:61617)`,然后启动
|
||||
- 从:上面的基础上修改tcp服务端口为61619,然后启动
|
||||
|
||||
#### 2、springboot测试
|
||||
|
||||
   沿用上一篇的项目,修改配置文件的broker-url为`failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)`,然后启动项目访问会在控制台看到如下日志:
|
||||
|
||||
```java
|
||||
2018-07-31 15:09:25.076 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
1I'm from queue1:hello
|
||||
2018-07-31 15:09:26.599 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
2I'm from queue1:hello
|
||||
2018-07-31 15:09:29.002 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61616
|
||||
1I'm from queue1:hello
|
||||
2018-07-31 15:09:34.931 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
2I'm from queue1:hello
|
||||
```
|
||||
|
||||
  单个MQ节点总是不可靠的,一旦该节点出现故障,MQ服务就不可用了,势必会产生较大的损失。这里记录activeMQ如何开启主从备份,一旦master(主节点故障),slave(从节点)立即提供服务,实现原理是运行多个MQ使用同一个持久化数据源,这里以jdbc数据源为例。同一时间只有一个节点(节点A)能够抢到数据库的表锁,其他节点进入阻塞状态,一旦A发生错误崩溃,其他节点就会重新获取表锁,获取到锁的节点成为master,其他节点为slave,如果节点A重新启动,也将成为slave。
|
||||
|
||||
主从备份解决了单节点故障的问题,但是同一时间提供服务的只有一个master,显然是不能面对数据量的增长,所以需要一种横向拓展的集群方式来解决面临的问题。
|
||||
|
||||
### 一、activeMQ设置
|
||||
|
||||
#### 1、平台版本说明:
|
||||
|
||||
- 平台:windows
|
||||
- activeMQ版本:5.9.1,[下载地址](https://www.apache.org/dist/activemq/5.9.1/apache-activemq-5.9.1-bin.zip.asc)
|
||||
- jdk版本:1.8
|
||||
|
||||
#### 2、下载jdbc依赖
|
||||
|
||||
  下载下面三个依赖包,放到activeMQ安装目录下的lib文件夹中。
|
||||
|
||||
[mysql驱动](http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar)
|
||||
|
||||
[dhcp依赖](http://central.maven.org/maven2/org/apache/commons/commons-dbcp2/2.1.1/commons-dbcp2-2.1.1.jar)
|
||||
|
||||
[commons-pool2依赖](http://maven.aliyun.com/nexus/service/local/artifact/maven/redirect?r=jcenter&g=org.apache.commons&a=commons-pool2&v=2.6.0&e=jar)
|
||||
|
||||
###二、主从备份
|
||||
|
||||
####1、修改jettty
|
||||
|
||||
  首先修改conf->jetty.xml,这里是修改activemq的web管理端口,管理界面账号密码默认为admin/admin
|
||||
|
||||
```xml
|
||||
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
|
||||
<!-- the default port number for the web console -->
|
||||
<property name="port" value="8161"/>
|
||||
</bean>
|
||||
```
|
||||
|
||||
####2、修改activemq.xml
|
||||
|
||||
  然后修改conf->activemq.xml
|
||||
|
||||
- 设置连接方式
|
||||
|
||||
默认是下面五种连接方式都打开,这里我们只要tcp,把其他的都注释掉,然后在这里设置activemq的服务端口,可以看到每种连接方式都对应一个端口。
|
||||
|
||||
```xml
|
||||
<transportConnectors>
|
||||
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
|
||||
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
|
||||
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
|
||||
</transportConnectors>
|
||||
```
|
||||
|
||||
|
||||
|
||||
- 设置jdbc数据库
|
||||
|
||||
mysql数据库中创建activemq库,在`broker`标签的下面也就是根标签`beans`的下一级创建一个bean节点,内容如下:
|
||||
|
||||
```xml
|
||||
<bean id="mysql-qs" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
|
||||
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
|
||||
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
|
||||
<property name="username" value="root"/>
|
||||
<property name="password" value="123456"/>
|
||||
<property name="poolPreparedStatements" value="true"/>
|
||||
</bean>
|
||||
```
|
||||
|
||||
- 设置数据源
|
||||
|
||||
首先修改broker节点,设置name和persistent(默认为true),也可不做修改,修改后如下:
|
||||
|
||||
```xml
|
||||
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq1" persistent="true" dataDirectory="${activemq.data}">
|
||||
```
|
||||
|
||||
然后设置持久化方式,使用到我们之前设置的mysql-qs
|
||||
|
||||
```xml
|
||||
<persistenceAdapter>
|
||||
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
|
||||
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data" dataSource="#mysql-qs"/>
|
||||
</persistenceAdapter>
|
||||
```
|
||||
|
||||
#### 3、启动
|
||||
|
||||
  设置完毕后启动activemq(双击bin中的acitveMQ.jar),启动完成后可以看到如下日志信息:
|
||||
|
||||
```verilog
|
||||
INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
INFO | Attempting to acquire the exclusive lock to become the Master broker
|
||||
INFO | Becoming the master on dataSource: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
```
|
||||
|
||||
接着我们修改一下tcp服务端口,改为61617,然后重新启动,日志信息如下:
|
||||
|
||||
```verilog
|
||||
INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
|
||||
INFO | Attempting to acquire the exclusive lock to become the Master broker
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
INFO | Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...
|
||||
```
|
||||
|
||||
可以看到从节点一直在尝试获取表锁成为主节点,这样一旦主节点失效,从节点能够立刻取代主节点提供服务。这样我们便实现了主从备份。
|
||||
|
||||
### 三、负载均衡
|
||||
|
||||
  activemq可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当一条消息发送到brokerA的队列test中,有一个消费者连上了brokerB,并且想要获取test队列,brokerA中的test队列就会路由到brokerB上。
|
||||
|
||||
   开启负载均衡需要设置`networkConnectors`节点,静态路由配置如下:
|
||||
|
||||
```xml
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:failover://(tcp://localhost:61616,tcp://localhost:61617)" duplex="false"/>
|
||||
</networkConnectors>
|
||||
```
|
||||
|
||||
brokerA和brokerB都要设置该配置,以连上对方。
|
||||
|
||||
### 四、测试
|
||||
|
||||
####1、建立mq
|
||||
|
||||
  组建两组broker,每组做主从配置。
|
||||
|
||||
- brokerA:
|
||||
- 主:设置web管理端口8761,设置mq名称`mq`,设置数据库地址为activemq,设置tcp服务端口61616,设置负载均衡静态路由`static:failover://(tcp://localhost:61618,tcp://localhost:61619)`,然后启动
|
||||
- 从:上面的基础上修改tcp服务端口为61617,然后启动
|
||||
- brokerB:
|
||||
- 主:设置web管理端口8762,设置mq名称`mq1`,设置数据库地址activemq1,设置tcp服务端口61618,设置负载均衡静态路由`static:failover://(tcp://localhost:61616,tcp://localhost:61617)`,然后启动
|
||||
- 从:上面的基础上修改tcp服务端口为61619,然后启动
|
||||
|
||||
#### 2、springboot测试
|
||||
|
||||
   沿用上一篇的项目,修改配置文件的broker-url为`failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)`,然后启动项目访问会在控制台看到如下日志:
|
||||
|
||||
```java
|
||||
2018-07-31 15:09:25.076 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
1I'm from queue1:hello
|
||||
2018-07-31 15:09:26.599 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
2I'm from queue1:hello
|
||||
2018-07-31 15:09:29.002 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61616
|
||||
1I'm from queue1:hello
|
||||
2018-07-31 15:09:34.931 INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://localhost:61618
|
||||
2I'm from queue1:hello
|
||||
```
|
||||
|
||||
证明负载均衡成功。
|
311
springboot系列/读写分离配置/springboot配置读写分离.md
Normal file
311
springboot系列/读写分离配置/springboot配置读写分离.md
Normal file
@ -0,0 +1,311 @@
|
||||
  近日工作任务较轻,有空学习学习技术,遂来研究如果实现读写分离。这里用博客记录下过程,一方面可备日后查看,同时也能分享给大家(网上的资料真的大都是抄来抄去,,还不带格式的,看的真心难受)。
|
||||
|
||||
[完整代码](https://github.com/FleyX/demo-project/tree/master/dxfl)
|
||||
|
||||
## 1、背景
|
||||
|
||||
  一个项目中数据库最基础同时也是最主流的是单机数据库,读写都在一个库中。当用户逐渐增多,单机数据库无法满足性能要求时,就会进行读写分离改造(适用于读多写少),写操作一个库,读操作多个库,通常会做一个数据库集群,开启主从备份,一主多从,以提高读取性能。当用户更多读写分离也无法满足时,就需要分布式数据库了(可能以后会学习怎么弄)。
|
||||
|
||||
  正常情况下读写分离的实现,首先要做一个一主多从的数据库集群,同时还需要进行数据同步。这一篇记录如何用mysql搭建一个一主多次的配置,下一篇记录代码层面如何实现读写分离。
|
||||
|
||||
## 2、搭建一主多从数据库集群
|
||||
|
||||
  主从备份需要多台虚拟机,我是用wmware完整克隆多个实例,注意直接克隆的虚拟机会导致每个数据库的uuid相同,需要修改为不同的uuid。修改方法参考这个:[点击跳转](https://blog.csdn.net/pratise/article/details/80413198)。
|
||||
|
||||
- 主库配置
|
||||
|
||||
主数据库(master)中新建一个用户用于从数据库(slave)读取主数据库二进制日志,sql语句如下:
|
||||
|
||||
```sql
|
||||
mysql> CREATE USER 'repl'@'%' IDENTIFIED BY '123456';#创建用户
|
||||
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';#分配权限
|
||||
mysql>flush privileges; #刷新权限
|
||||
```
|
||||
|
||||
同时修改mysql配置文件开启二进制日志,新增部分如下:
|
||||
|
||||
```sql
|
||||
[mysqld]
|
||||
server-id=1
|
||||
log-bin=master-bin
|
||||
log-bin-index=master-bin.index
|
||||
```
|
||||
|
||||
然后重启数据库,使用`show master status;`语句查看主库状态,如下所示:
|
||||
|
||||
![主库状态](.\读写分离配置\pic1.png)
|
||||
|
||||
- 从库配置
|
||||
|
||||
同样先新增几行配置:
|
||||
|
||||
```sql
|
||||
[mysqld]
|
||||
server-id=2
|
||||
relay-log-index=slave-relay-bin.index
|
||||
relay-log=slave-relay-bin
|
||||
```
|
||||
|
||||
然后重启数据库,使用如下语句连接主库:
|
||||
|
||||
```sql
|
||||
CHANGE MASTER TO
|
||||
MASTER_HOST='192.168.226.5',
|
||||
MASTER_USER='root',
|
||||
MASTER_PASSWORD='123456',
|
||||
MASTER_LOG_FILE='master-bin.000003',
|
||||
MASTER_LOG_POS=154;
|
||||
```
|
||||
|
||||
接着运行`start slave;`开启备份,正常情况如下图所示:Slave_IO_Running和Slave_SQL_Running都为yes。
|
||||
|
||||
![1536223020742](.\读写分离配置\pic2.png)
|
||||
|
||||
可以用这个步骤开启多个从库。
|
||||
|
||||
  默认情况下备份是主库的全部操作都会备份到从库,实际可能需要忽略某些库,可以在主库中增加如下配置:
|
||||
|
||||
```sql
|
||||
# 不同步哪些数据库
|
||||
binlog-ignore-db = mysql
|
||||
binlog-ignore-db = test
|
||||
binlog-ignore-db = information_schema
|
||||
|
||||
# 只同步哪些数据库,除此之外,其他不同步
|
||||
binlog-do-db = game
|
||||
```
|
||||
|
||||
## 3、代码层面进行读写分离
|
||||
|
||||
  代码环境是springboot+mybatis+druib连接池。想要读写分离就需要配置多个数据源,在进行写操作是选择写的数据源,读操作时选择读的数据源。其中有两个关键点:
|
||||
|
||||
- 如何切换数据源
|
||||
- 如何根据不同的方法选择正确的数据源
|
||||
|
||||
### 1)、如何切换数据源
|
||||
|
||||
  通常用springboot时都是使用它的默认配置,只需要在配置文件中定义好连接属性就行了,但是现在我们需要自己来配置了,spring是支持多数据源的,多个datasource放在一个HashMap`TargetDataSource`中,通过`dertermineCurrentLookupKey`获取key来觉定要使用哪个数据源。因此我们的目标就很明确了,建立多个datasource放到TargetDataSource中,同时重写dertermineCurrentLookupKey方法来决定使用哪个key。
|
||||
|
||||
### 2)、如何选择数据源
|
||||
|
||||
  事务一般是注解在Service层的,因此在开始这个service方法调用时要确定数据源,有什么通用方法能够在开始执行一个方法前做操作呢?相信你已经想到了那就是**切面 **。怎么切有两种办法:
|
||||
|
||||
- 注解式,定义一个只读注解,被该数据标注的方法使用读库
|
||||
- 方法名,根据方法名写切点,比如getXXX用读库,setXXX用写库
|
||||
|
||||
### 3)、代码编写
|
||||
|
||||
#### a、编写配置文件,配置两个数据源信息
|
||||
|
||||
  只有必填信息,其他都有默认设置
|
||||
|
||||
```yml
|
||||
mysql:
|
||||
datasource:
|
||||
#读库数目
|
||||
num: 1
|
||||
type-aliases-package: com.example.dxfl.dao
|
||||
mapper-locations: classpath:/mapper/*.xml
|
||||
config-location: classpath:/mybatis-config.xml
|
||||
write:
|
||||
url: jdbc:mysql://192.168.226.5:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true
|
||||
username: root
|
||||
password: 123456
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
read:
|
||||
url: jdbc:mysql://192.168.226.6:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true
|
||||
username: root
|
||||
password: 123456
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
```
|
||||
|
||||
#### b、编写DbContextHolder类
|
||||
|
||||
  这个类用来设置数据库类别,其中有一个ThreadLocal用来保存每个线程的是使用读库,还是写库。代码如下:
|
||||
|
||||
```java
|
||||
/**
|
||||
* Description 这里切换读/写模式
|
||||
* 原理是利用ThreadLocal保存当前线程是否处于读模式(通过开始READ_ONLY注解在开始操作前设置模式为读模式,
|
||||
* 操作结束后清除该数据,避免内存泄漏,同时也为了后续在该线程进行写操作时任然为读模式
|
||||
* @author fxb
|
||||
* @date 2018-08-31
|
||||
*/
|
||||
public class DbContextHolder {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(DbContextHolder.class);
|
||||
public static final String WRITE = "write";
|
||||
public static final String READ = "read";
|
||||
|
||||
private static ThreadLocal<String> contextHolder= new ThreadLocal<>();
|
||||
|
||||
public static void setDbType(String dbType) {
|
||||
if (dbType == null) {
|
||||
log.error("dbType为空");
|
||||
throw new NullPointerException();
|
||||
}
|
||||
log.info("设置dbType为:{}",dbType);
|
||||
contextHolder.set(dbType);
|
||||
}
|
||||
|
||||
public static String getDbType() {
|
||||
return contextHolder.get() == null ? WRITE : contextHolder.get();
|
||||
}
|
||||
|
||||
public static void clearDbType() {
|
||||
contextHolder.remove();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### c、重写determineCurrentLookupKey方法
|
||||
|
||||
  spring在开始进行数据库操作时会通过这个方法来决定使用哪个数据库,因此我们在这里调用上面DbContextHolder类的`getDbType()`方法获取当前操作类别,同时可进行读库的负载均衡,代码如下:
|
||||
|
||||
```java
|
||||
public class MyAbstractRoutingDataSource extends AbstractRoutingDataSource {
|
||||
|
||||
@Value("${mysql.datasource.num}")
|
||||
private int num;
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(this.getClass());
|
||||
|
||||
@Override
|
||||
protected Object determineCurrentLookupKey() {
|
||||
String typeKey = DbContextHolder.getDbType();
|
||||
if (typeKey == DbContextHolder.WRITE) {
|
||||
log.info("使用了写库");
|
||||
return typeKey;
|
||||
}
|
||||
//使用随机数决定使用哪个读库
|
||||
int sum = NumberUtil.getRandom(1, num);
|
||||
log.info("使用了读库{}", sum);
|
||||
return DbContextHolder.READ + sum;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### d、编写配置类
|
||||
|
||||
  由于要进行读写分离,不能再用springboot的默认配置,我们需要手动来进行配置。首先生成数据源,使用@ConfigurProperties自动生成数据源:
|
||||
|
||||
```java
|
||||
/**
|
||||
* 写数据源
|
||||
*
|
||||
* @Primary 标志这个 Bean 如果在多个同类 Bean 候选时,该 Bean 优先被考虑。
|
||||
* 多数据源配置的时候注意,必须要有一个主数据源,用 @Primary 标志该 Bean
|
||||
*/
|
||||
@Primary
|
||||
@Bean
|
||||
@ConfigurationProperties(prefix = "mysql.datasource.write")
|
||||
public DataSource writeDataSource() {
|
||||
return new DruidDataSource();
|
||||
}
|
||||
```
|
||||
|
||||
读数据源类似,注意有多少个读库就要设置多少个读数据源,Bean名为read+序号。
|
||||
|
||||
  然后设置数据源,使用的是我们之前写的MyAbstractRoutingDataSource类
|
||||
|
||||
```java
|
||||
/**
|
||||
* 设置数据源路由,通过该类中的determineCurrentLookupKey决定使用哪个数据源
|
||||
*/
|
||||
@Bean
|
||||
public AbstractRoutingDataSource routingDataSource() {
|
||||
MyAbstractRoutingDataSource proxy = new MyAbstractRoutingDataSource();
|
||||
Map<Object, Object> targetDataSources = new HashMap<>(2);
|
||||
targetDataSources.put(DbContextHolder.WRITE, writeDataSource());
|
||||
targetDataSources.put(DbContextHolder.READ+"1", read1());
|
||||
proxy.setDefaultTargetDataSource(writeDataSource());
|
||||
proxy.setTargetDataSources(targetDataSources);
|
||||
return proxy;
|
||||
}
|
||||
```
|
||||
|
||||
  接着需要设置sqlSessionFactory
|
||||
|
||||
```java
|
||||
/**
|
||||
* 多数据源需要自己设置sqlSessionFactory
|
||||
*/
|
||||
@Bean
|
||||
public SqlSessionFactory sqlSessionFactory() throws Exception {
|
||||
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
|
||||
bean.setDataSource(routingDataSource());
|
||||
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
|
||||
// 实体类对应的位置
|
||||
bean.setTypeAliasesPackage(typeAliasesPackage);
|
||||
// mybatis的XML的配置
|
||||
bean.setMapperLocations(resolver.getResources(mapperLocation));
|
||||
bean.setConfigLocation(resolver.getResource(configLocation));
|
||||
return bean.getObject();
|
||||
}
|
||||
```
|
||||
|
||||
  最后还得配置下事务,否则事务不生效
|
||||
|
||||
```java
|
||||
/**
|
||||
* 设置事务,事务需要知道当前使用的是哪个数据源才能进行事务处理
|
||||
*/
|
||||
@Bean
|
||||
public DataSourceTransactionManager dataSourceTransactionManager() {
|
||||
return new DataSourceTransactionManager(routingDataSource());
|
||||
}
|
||||
```
|
||||
|
||||
### 4)、选择数据源
|
||||
|
||||
  多数据源配置好了,但是代码层面如何选择选择数据源呢?这里介绍两种办法:
|
||||
|
||||
#### a、注解式
|
||||
|
||||
  首先定义一个只读注解,被这个注解方法使用读库,其他使用写库,如果项目是中途改造成读写分离可使用这个方法,无需修改业务代码,只要在只读的service方法上加一个注解即可。
|
||||
|
||||
```java
|
||||
@Target({ElementType.METHOD,ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface ReadOnly {
|
||||
}
|
||||
```
|
||||
|
||||
  然后写一个切面来切换数据使用哪种数据源,重写getOrder保证本切面优先级高于事务切面优先级,在启动类加上`@EnableTransactionManagement(order = 10) `,为了代码如下:
|
||||
|
||||
```java
|
||||
@Aspect
|
||||
@Component
|
||||
public class ReadOnlyInterceptor implements Ordered {
|
||||
private static final Logger log= LoggerFactory.getLogger(ReadOnlyInterceptor.class);
|
||||
|
||||
@Around("@annotation(readOnly)")
|
||||
public Object setRead(ProceedingJoinPoint joinPoint,ReadOnly readOnly) throws Throwable{
|
||||
try{
|
||||
DbContextHolder.setDbType(DbContextHolder.READ);
|
||||
return joinPoint.proceed();
|
||||
}finally {
|
||||
//清楚DbType一方面为了避免内存泄漏,更重要的是避免对后续在本线程上执行的操作产生影响
|
||||
DbContextHolder.clearDbType();
|
||||
log.info("清除threadLocal");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### b、方法名式
|
||||
|
||||
  这种方法不许要注解,但是需要事务名称按一定规则编写,然后通过切面来设置数据库类别,比如`setXXX`设置为写、`getXXX`设置为读,代码我就不写了,应该都知道怎么写。
|
||||
|
||||
### 4、测试
|
||||
|
||||
  编写好代码来试试结果如何,下面是运行截图:
|
||||
|
||||
![1536312274474](.\读写分离配置\pic3.png)
|
||||
|
||||
  断断续续写了好几天终于是写完了,,,如果有帮助到你,,欢迎star哦,,这里是完整代码地址:[点击跳转](https://github.com/FleyX/demo-project/tree/master/dxfl)
|
BIN
springboot系列/读写分离配置/读写分离配置/pic1.png
Normal file
BIN
springboot系列/读写分离配置/读写分离配置/pic1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 5.1 KiB |
BIN
springboot系列/读写分离配置/读写分离配置/pic2.png
Normal file
BIN
springboot系列/读写分离配置/读写分离配置/pic2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 12 KiB |
BIN
springboot系列/读写分离配置/读写分离配置/pic3.png
Normal file
BIN
springboot系列/读写分离配置/读写分离配置/pic3.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 44 KiB |
Loading…
x
Reference in New Issue
Block a user