前言
服务端推送消息我们采用SSE方式进行推送。
一、关于SSE
1. 概念介绍
sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件
我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
2. 特点分析
SSE 最大的特点,可以简单规划为两个
- 长连接
- 服务端可以向客户端推送信息
了解 websocket 的小伙伴,可能也知道它也是长连接,可以推送信息,但是它们有一个明显的区别
sse 是单通道,只能服务端向客户端发消息;而 webscoket 是双通道
那么为什么有了 webscoket 还要搞出一个 sse 呢?既然存在,必然有着它的优越之处
sse | websocket |
---|---|
http 协议 | websocket 协议 |
轻量,使用简单 | 相对复杂 |
默认支持断线重连 | 需要自己实现断线重连 |
文本传输 | 二进制传输 |
支持自定义发送的消息类型 | - |
3. 应用场景
从 sse 的特点出发,我们可以大致的判断出它的应用场景,需要轮询获取服务端最新数据的 case 下,多半是可以用它的
比如显示当前网站在线的实时人数,法币汇率显示当前实时汇率,电商大促的实时成交额等等…
我们这里是硬件回调服务端接口插入报警数据的同时需要推送给前端进行提示。
二、SpringBoot实现
- ApplicationEvent以及Listener是Spring为我们提供的一个事件监听、订阅的实现。
- 如果一个bean实现ApplicationListener接口在容器中,每次一个ApplicationEvent被发布到ApplicationContext中,这类bean就会收到这些通知。
- 基于此我们可以通过监听数据保存事件来触发发送提示消息的目的。
- AlarmEvent 事件消息体
@Getter
@SetterpublicclassAlarmEvent extends ApplicationEvent{private Long seeId;//alarmPO 为消息体private AlarmPO alarmPO;publicAlarmEvent(Object source, Long seeId, AlarmPO alarmPO){super(source);this.seeId= seeId;this.alarmPO= alarmPO;}}
- AlarmListener 事件监听回调
@ComponentpublicclassAlarmListener{privatestatic Map<Long, SseEmitter> sseEmitters=new Hashtable<>();publicvoidaddSseEmitters(Long id, SseEmitter sseEmitter){
sseEmitters.put(id, sseEmitter);//sseEmitter.onTimeout(() -> sseEmitters.remove(sseId));
sseEmitter.onCompletion(()-> System.out.println("完成!!!"));}
@EventListenerpublicvoiddeployEventHandler(AlarmEvent alaramEvent){
Long id= alaramEvent.getId();
SseEmitter sseEmitter= sseEmitters.get(id);try{
sseEmitter.send(alaramEvent.getAlarmPO().toString());}catch(IOException e){
e.printStackTrace();}
sseEmitter.complete();}}
- 调用方法内触发监听事件回调
@Autowired
ApplicationContext applicationContext;
@Overridepublic StringstrangerEventRcv(StrangerAlarmInDTO alarmDTO){//陌生人闯入数据解析入库
AlarmPO po=newAlarmPO();
Arrays.stream(alarmDTO.getParams().getEvents()).forEach(e->{
po.setName(e.getSrcName());
po.setAlarmType(e.getEventType().toString());
po.setHappenTime(e.getHappenTime());
po.setUrl(e.getData().getFaceRecognitionResult().getSnap().getFaceUrl());
alarmDao.save(po);//触发监听,sse发送消息.此处sse的id为111,只要与前端一致即可
applicationContext.publishEvent(newAlarmEvent(this,111L,po));});//返回指定内容
JsonObject jsonObject=newJsonObject();
jsonObject.addProperty("code","0");
jsonObject.addProperty("msg","success");
jsonObject.addProperty("data","");return jsonObject.toString();}
- controller提供消息订阅接口(即sse通信连接)
/**
* 订阅sse消息
* @param id
* @return
*/
@GetMapping(path="/subscribe",produces={MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmittersubscribe(@RequestParam Long id){
final SseEmitter emitter=newSseEmitter();try{
alarmListener.addSseEmitters(id,emitter);}catch(Exception e){
emitter.completeWithError(e);}return emitter;}
三、前端vue调用
if('EventSource' in window){const that=this;
var source=newEventSource("http://ip:port/api/perception/hik/subscribe?id=111",{
withCredentials:true});
source.onmessage=function(e){
console.log("消息 onmessage==>",e)};
source.onopen=function(e){
console.log("打开连接 onopen==>",e)};
source.onerror=function(e){
console.log('报错连接 οnerrοr==>',e)};}else{
console.log("改浏览器不支持sse");};
四、一些问题
- 前端sse连接不上:可能是跨域了,ie浏览器不支持
- 可以初始化设置sse超时时间
SseEmitter sseEmitter = new SseEmitter(3600_000L);
默认为2分钟,
每两分钟后端会报超时org.springframework.web.context.request.async.AsyncRequestTimeoutException: null
,但会立即重新连接上来 - 前端每次
source.onmessage
后会走source.onerror
然后重新source.onopen
参考:
https://www.cnblogs.com/yihuihui/p/12622729.html
https://www.baidu.com/link?url=v2fhMcQp7-4SD6BwHeMOLdfXvOVU_HaayfFwOMI3Rt1_S5pUwW6mcGr3BAFUwdmv&wd=&eqid=cd4f6bd5001f65f3000000036078f386