springboot框架下的实时消息推送

2022-06-22 11:47:56

功能实现:在得到新数据后以最快的速度推送到前台。(springboot框架)

0.修改pom文件 加入需要的jar包

      
       	<dependency>
			<groupId>org.springframework.boot</groupId>
		    <artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
		<!-- 处理数据用 跟推送无关 -->
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>1.2.47</version>
		</dependency>

1.新建config配置文件(注意目录问题,最好是根目录下)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
/**
 * @Description: socket配置类,往 spring 容器中注入ServerEndpointExporter实例
 */
@Configuration
public class WebSocketConfig {
 
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

2.新建websocket服务类

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;
 
/**
 * @Description: WebSocket服务端代码,包含接收消息,推送消息等接口
 * userName是接收的参数
 */
@Component
@ServerEndpoint(value = "/ws/{userName}")
public class WebSocketServer {
	static Map<Session, String> map = new HashMap<Session, String>();     //存储系统的用户信息
	 /**
    * @param session
    */
   @OnOpen
   public void OnOpen(@PathParam("userName")String userName,Session session) {
   	System.out.println(userName);
   	map.put(session, userName);
       System.out.println("OnOpen()方法被执行...");
       System.out.println("websocket连接建立成功...");
   }

   /**
    * 连接关闭的方法
    */
   @OnClose
   public void OnClose(Session session) {
   	if (map.containsKey(session)) {
   		map.remove(session);
		}
       System.out.println("OnClose()方法被执行...");
       System.out.println("websocket连接已经关闭...");
   }

   /**
    * 接收消息的方法
    * @param msg
    * @param session
    * @throws InterruptedException 
    */
   @OnMessage
   public void OnMessage(String msg, Session session) throws InterruptedException {
       System.out.println("已从客户端接收消息:" + msg);

       System.out.println("向客户端发送数据完毕...");
   }

	/**
    * 出错的方法,注意参数不能错
    * @param session
    * @param error
    */
   @OnError
   public void OnError(Session session,Throwable error) {
   	if (map.containsKey(session)) {
   		map.remove(session);
		}
       System.out.println("OnError()方法dewd被执行...");
       System.out.println("websocket出错...");
   }
   /**
    * 推送数据的方法
    * @param session map里存的登录信息
    * @param message 推送数据
    */
   public void sendMessage(Session session,String message){
		try {
			session.getBasicRemote().sendText(message);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
   }
}

3.前台接收推送信息的页面增加如下代码:

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
<!--  websocket需要以来的js -->
<script src="reconnecting-websocket.js"></script>
</head>
<body>
<h2>推送内容:</h2>
<h3 id="ids"></h3>

<script type="text/javascript">

window.onload=function (){

	var address = "172.20.185.153:9991"
  	var url = "ws://"+address+"/Demon/ws/admin_hl-ceshi";     
    //判断浏览器是否支持websocket
    if('WebSocket' in window) {
        //如果支持,创建websocket对象,注意url格式
        websocket = new ReconnectingWebSocket(url);
    	websocket.debug=true;
    	//websocket.timeoutInterval=2000;重连的最长等待时间
    }else {
        alert('浏览器版本不支持websocket!');
    }
    //定义连接错误的回调方法
    websocket.onerror = function() {
        console.log('websocket连接出错!');
    }
    //定义连接成功的回调方法
    websocket.onopen = function() {
    	console.log('websocket连接成功!');
    }
    //定义websocket关闭的回调方法
    websocket.onclose = function() {
    	console.log('websocket已关闭!')
    }

    //当窗口关闭时,主动去关闭websocket连接
    window.onbeforeunload = function() {
        closeWebSocket();
    }

    //接收到消息的回调方法
    websocket.onmessage = function(event) {
    	var arr = event.data;
    	document.getElementById("ids").innerHTML = arr;
    }

}

</script>
</body>
</html>

到现在为止,启动服务 访问该页面 后台应该就会输出“连接成功“,前台页面的调试模式也会输出连接成功字样,没有或者显示连接出错的话就说明有地方没写对,注意检查url;

4.创建后台的推送类

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

import javax.websocket.Session;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;

@Component//被spring容器管理
@Order(1)//如果多个自定义ApplicationRunner,用来标明执行顺序
public class PushAlarm implements ApplicationRunner {             //服务启动后自动加载该类

	
	 @Override
	    public void run(ApplicationArguments applicationArguments) throws Exception {
	        System.out.println("-------------->" + "项目启动,now=" + new Date());
	        myTimer();
	    }

	    public static void myTimer(){
	    	
	    	//定时任务
	        /*Timer timer = new Timer();
	        timer.schedule(new TimerTask() {
	            @Override
	            public void run() {
	                System.out.println("------定时任务--------");
	            }
	        }, 0, 1000*60);*/
	    	
	    	//阻塞队列
	    	Runnable run = new Runnable() {
				
				@Override
				public void run() {
					while(true){
						RedisClient redisClient = new RedisClient();
						List<String> dataList = redisClient.blpopHdasAlertList();
						if(dataList!=null && dataList.size()>0){
							String message = dataList.get(1);
							System.out.println(message);
														
							Iterator<Entry<Session,String>> iterator = WebSocketServer.map.entrySet().iterator();
							Iterator<String> set = WebSocketServer.map.values().iterator();
							while (set.hasNext()) {
								System.out.println("登录的账户:"+set.next());
							}
							WebSocketServer webSocketServer = new WebSocketServer();
							SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
							System.out.println("推送时间:"+format.format(new Date()));
							while (iterator.hasNext()) {
								Entry<Session,String> entry = iterator.next();
								Session session = entry.getKey();
								String userPower = entry.getValue();
								System.out.println("推送内容:"+message);
								webSocketServer.sendMessage(session,message);
							}
						}
					}					
				}
			};
			Thread newThread = new Thread(run);
			newThread.start();
	    }
}
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;

/**
 *
 * redis工具类
 */
public class RedisClient{

	static ShardedJedisPool shardedJedisPool;// 切片连接池

	// 静态初始化块,只在类加载的时候初始化一次,且只能初始化静态成员变量,不能初始化普通变量
	// 1.初始化
	static {
		// 池基本配置
		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
		config.setMaxTotal(30);
		config.setMaxWaitMillis(30000);
		config.setTestOnBorrow(false);
		// slave链接
		List<JedisShardInfo> shards = new ArrayList<JedisShardInfo>();

		String serverIP = "你的ip";
		int port = 6379;
		//String password = "haiyisoft";

		JedisShardInfo jShardInfo = new JedisShardInfo(serverIP, port);
		//jShardInfo.setPassword(password);
		shards.add(jShardInfo);
		// 构造池
		shardedJedisPool = new ShardedJedisPool(config, shards);
	}

	public List<String> blpopHdasAlertList() {
		try {
			ShardedJedis shardedJedis= shardedJedisPool.getResource(); 
			List<String> ll = shardedJedis.blpop(0,"myList");  //redis的队列名称,这里通过命令取
			shardedJedis.close();
			
			if(ll!=null && ll.size()>0){
				return ll;
			}else{
				return null;
			}
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

}

这里取数据的地方有两种实现方式(任选其一即可,这里用的redis):
(1)定时任务去数据库查
(2)redis 的阻塞队列 ,没数据时线程阻塞,当队列有数据后自动执行(需要有人维护这个队列)

效果:
在这里插入图片描述
最终可以看到redis的myList列表 每插入一条记录,这里就会刷新该记录 并且redis里新插入的记录会消失。
依赖js文件下载地址:https://download.csdn.net/download/qq_39731741/11202754

  • 作者:角谷
  • 原文链接:https://blog.csdn.net/qq_39731741/article/details/90520105
    更新时间:2022-06-22 11:47:56