阅读量点赞量功能开发

  • 阅读文档时,更新阅读数
  • 文档的点赞功能,更新点赞数
  • 更新电子书的文档数、阅读数、点赞数
  • 有文档被点赞时,前端可以收到通知
  • SpringBoot异步化、WebSocket、RocketMQ

文档阅读数更新

前端点击某篇文档时,doc.view_count + 1

Mybatis的xml里的参数,使用#,尽量避免用$,会有被注入的风险

更新表字段view_count +1,如果原值是空,则+1没效果,也不会报错
update doc set view_count = view_count + 1 where id = xxx

insert的时候,如果写了某个字段,则该字段的default值不起作用。比如insert时没有值,是null,会使default 0失效

文档点赞功能开发

前端在文档内容的下方,增加点赞按钮,点击后doc.vote_count + 1

  • 线程本地变量的使用,参考RequestContext类
    在拦截器中赋值,在service中取值
    优点:线程间互不干扰;同个线程赋值取值,不需要参数传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 使用nginx做反向代理,需要用该方法才能取到真实的远程IP
* @param request
* @return
*/
public String getRemoteIp(HttpServletRequest request) {
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
  • 扩展redis分布式锁setnx

电子书信息更新方案调研

电子书信息:文档数、阅读数、点赞数

  • 更新方式:
    • 实时更新
    • 定时批量更新

SpringBoot定时任务示例

  • 启用定时器,不需要引入依赖
  • 两种定时器写法

WikiApplication.java

1
2
3
4
5
6
7
8
9
10
11
12
13
@EnableScheduling
public class WikiApplication {

private static final Logger LOG = LoggerFactory.getLogger(WikiApplication.class);

public static void main(String[] args) {
SpringApplication app = new SpringApplication(WikiApplication.class);
Environment env = app.run(args).getEnvironment();
LOG.info("启动成功!!");
LOG.info("地址: \thttp://127.0.0.1:{}", env.getProperty("server.port"));
}

}

开启定时器@EnableScheduling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class TestJob {

private static final Logger LOG = LoggerFactory.getLogger(TestJob.class);

/**
* 固定时间间隔,fixedRate单位毫秒
*/
@Scheduled(fixedRate = 1000)
public void simple() throws InterruptedException {
SimpleDateFormat formatter = new SimpleDateFormat("mm:ss");
String dateString = formatter.format(new Date());
Thread.sleep(2000);
LOG.info("每隔5秒钟执行一次: {}", dateString);
}

/**
* 自定义cron表达式跑批
* 只有等上一次执行完成,下一次才会在下一个时间点执行,错过就错过
*/
@Scheduled(cron = "*/1 * * * * ?")
public void cron() throws InterruptedException {
SimpleDateFormat formatter = new SimpleDateFormat("mm:ss SSS");
String dateString = formatter.format(new Date());
Thread.sleep(1500);
LOG.info("每隔1秒钟执行一次: {}", dateString);
}

}

完成电子书信息定时更新功能

  • 增加定时器,定时执行电子书信息更新SQL
  • 定时策略:每分钟?每小时?
  • 分层调用:job -> service -> mapper
  • 了解cron表达式的写法
  • 扩展:定时任务可以集成异步化+线程池解决单线程问题

DocMapperCust.xml

1
2
3
4
5
<update id="updateEbookInfo">
update ebook t1, (select ebook_id, count(1) doc_count, sum(view_count) view_count, sum(vote_count) vote_count from doc group by ebook_id) t2
set t1.doc_count = t2.doc_count, t1.view_count = t2.view_count, t1.vote_count = t2.vote_count
where t1.id = t2.ebook_id
</update>

日志流水号的使用

  • 现在的日志有什么问题?
  • logback增加自定义参数

使用MDC.put,可以为logback增加自定义参数

aspect/LogAspect.java

1
2
// 增加日志流水号
MDC.put("LOG_ID", String.valueOf(snowFlake.nextId()));

WebSocket使用示例

  • 功能:网站通知
    点赞时,前端收到通知

  • 定时轮询&被动通知

集成WebSocket

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

config/WebSocketConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

websocket/WebSocketServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.javami.wiki.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;

@Component
@ServerEndpoint("/ws/{token}")
public class WebSocketServer {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);

/**
* 每个客户端一个token
*/
private String token = "";

private static HashMap<String, Session> map = new HashMap<>();

/**
* 连接成功
*/
@OnOpen
public void onOpen(Session session, @PathParam("token") String token) {
map.put(token, session);
this.token = token;
LOG.info("有新连接:token:{},session id:{},当前连接数:{}", token, session.getId(), map.size());
}

/**
* 连接关闭
*/
@OnClose
public void onClose(Session session) {
map.remove(this.token);
LOG.info("连接关闭,token:{},session id:{}!当前连接数:{}", this.token, session.getId(), map.size());
}

/**
* 收到消息
*/
@OnMessage
public void onMessage(String message, Session session) {
LOG.info("收到消息:{},内容:{}", token, message);
}

/**
* 连接错误
*/
@OnError
public void onError(Session session, Throwable error) {
LOG.error("发生错误", error);
}

/**
* 群发消息
*/
public void sendInfo(String message) {
for (String token : map.keySet()) {
Session session = map.get(token);
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOG.error("推送消息失败:{},内容:{}", token, message);
}
LOG.info("推送消息:{},内容:{}", token, message);
}
}

}

components/the-footer.vue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<template>
<a-layout-footer style="text-align: center">
甲蛙电子书<span v-show="user.id">,欢迎:{{user.name}}</span>
</a-layout-footer>
</template>

<script lang="ts">
import { defineComponent, computed, onMounted } from 'vue';
import store from "@/store";
import {Tool} from "@/util/tool";

export default defineComponent({
name: 'the-footer',
setup() {
const user = computed(() => store.state.user);

let websocket: any;
let token: any;
const onOpen = () => {
console.log('WebSocket连接成功,状态码:', websocket.readyState)
};
const onMessage = (event: any) => {
console.log('WebSocket收到消息:', event.data);
};
const onError = () => {
console.log('WebSocket连接错误,状态码:', websocket.readyState)
};
const onClose = () => {
console.log('WebSocket连接关闭,状态码:', websocket.readyState)
};
const initWebSocket = () => {
// 连接成功
websocket.onopen = onOpen;
// 收到消息的回调
websocket.onmessage = onMessage;
// 连接错误
websocket.onerror = onError;
// 连接关闭的回调
websocket.onclose = onClose;
};

onMounted(() => {
// WebSocket
if ('WebSocket' in window) {
token = Tool.uuid(10);
// 连接地址:ws://127.0.0.1:8880/ws/xxx
websocket = new WebSocket(process.env.VUE_APP_WS_SERVER + '/ws/' + token);
initWebSocket()

// 关闭
// websocket.close();
} else {
alert('当前浏览器 不支持')
}
});

return {
user
}
}
});
</script>

util/tool.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 随机生成[len]长度的[radix]进制数
* @param len
* @param radix 默认62
* @returns {string}
*/
public static uuid (len: number, radix = 62) {
const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
const uuid = [];
radix = radix || chars.length;

for (let i = 0; i < len; i++) {
uuid[i] = chars[0 | Math.random() * radix];
}

return uuid.join('');
}

完成点赞通知功能

  • 点赞时,组装消息内容,往WS推送
  • 前端收到WS消息后,弹出消息内容

WebSocket的测试最好每次都手动刷新页面,因为前端热部署会让WS重复连接

service/DocService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void vote(Long id) {
// docMapperCust.increaseVoteCount(id);
// 远程IP+doc.id作为key,24小时内不能重复
String ip = RequestContext.getRemoteAddr();
if (redisUtil.validateRepeat("DOC_VOTE_" + id + "_" + ip, 3600 * 24)) {
docMapperCust.increaseVoteCount(id);
} else {
throw new BusinessException(BusinessExceptionCode.VOTE_REPEAT);
}

// 推送消息
Doc docDb = docMapper.selectByPrimaryKey(id);
webSocketServer.sendInfo("【" + docDb.getName() + "】被点赞!");
}

使用异步化解耦点赞通知功能

SpringBoot异步化的使用

  • 使用@EnableAsync启用异步化功能

  • WebSocket的测试最好每次都手动刷新页面,因为后端热部署会让WS断开

  • 使用事务只需要在方法上加一个@Transactional注解,不需要加@EnableTransactionManagement,如果同时对两张表有增删加的操作,就要考虑加事务,否则会造成数据不准确。

同一个类里A调用B方法,B方法加异步化注解是不生效的,所以我们要新创建一个类,异步化解耦点赞&通知功能。
service/WsService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.javami.wiki.service;

import com.javami.wiki.websocket.WebSocketServer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
public class WsService {

@Resource
public WebSocketServer webSocketServer;

@Async
public void sendInfo(String message) {
webSocketServer.sendInfo(message);
}
}

使用MQ解耦点赞通知功能

  • 扩展:异步化可以配上线程池,线程池满后,会变成同步
  • 一般是把不是核心的,耗时长的功能做异步化处理。
  • 异步化存在的问题:如果异步线程里的内容耗时较长,业务量又较大,就会消耗大量服务器资源,影响核心功能
  • MQ:和redis一样,是一个中间件,需要单独安装。
  • 常用的MQ有rocketmq, kafka, rabbitmq等
  • MQ关键词:topic、服务端、发送方、消费方
  • autoCreateTopicEnable的作用
    加上这个参数后,SpringBoot可发送任意的topic到RocketMQ,否则需要在RocketMQ里先创建好Topic

使用RocketMQ解耦点赞通知功能,发送和接收调试成功
rocketmq本地启动:

  • mqnamesrv.cmd
  • mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true使用autoCreateTopicEnable,让客户端可以自由创建topic

pom.xml

1
2
3
4
5
6
 <!-- RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>

rocketmq/VoteTopicConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(consumerGroup = "default", topic = "VOTE_TOPIC")
public class VoteTopicConsumer implements RocketMQListener<MessageExt> {

private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);

@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
LOG.info("ROCKETMQ收到消息:{}", new String(body));
}
}

发MQ
service/DocService.java

1
2
3
4
@Resource
private RocketMQTemplate rocketMQTemplate;

rocketMQTemplate.convertAndSend("VOTE_TOPIC", "【" + docDb.getName() + "】被点赞!");

application.properties

1
2
3
# rocketmq配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=default

收MQ->推送WS消息->弹出通知
rocketmq/VoteTopicConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.jiawa.wiki.websocket.WebSocketServer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service
@RocketMQMessageListener(consumerGroup = "default", topic = "VOTE_TOPIC")
public class VoteTopicConsumer implements RocketMQListener<MessageExt> {

private static final Logger LOG = LoggerFactory.getLogger(VoteTopicConsumer.class);

@Resource
public WebSocketServer webSocketServer;

@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
LOG.info("ROCKETMQ收到消息:{}", new String(body));
webSocketServer.sendInfo(new String(body));
}
}