发布时间:2025-12-09 20:32:53 浏览次数:4
推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用。
消息推送(push)通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。
消息推送一般又分为web端消息推送和移动端消息推送。
上边的这种属于移动端消息推送,web端消息推送常见的诸如站内信、未读邮件数量、监控报警数量等,应用的也非常广泛。
如上图所示只要触发某个事件(主动分享了资源或者后台主动推送消息),web页面的通知小红点就会实时的+1就可以了。
通常在服务端会有若干张消息推送表,用来记录用户触发不同事件所推送不同类型的消息,前端主动查询(拉)或者被动接收(推)用户所有未读的消息数。
客户端向服务端发起请求,服务器端到请求后保持连接不断开,直到数据有更新才返回响应并关闭连接,客户端处理完响应信息后再向服务端发送新的请求。
要求:请求http://localhost:8080/get/requestId=1时,页面处于等待状态;当访问http://localhost:8080/set/requestId=1,前面的页面会返回"处理成功 1"。
@Controller@RequestMapping(value = "/")public class DeferredResultController {private Map<String, DeferredResult<String>> deferredResultMap = new ConcurrentHashMap<>();;/*** 为了方便测试,简单模拟一个 多个请求用同一个requestId会出问题*/@ResponseBody@GetMapping("/get")public DeferredResult<String> get(@RequestParam String requestId,@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) {System.out.println("start get");//初始化延时对象,超时时间为5sDeferredResult<String> deferredResult = new DeferredResult<>(timeout);// 请求超时的回调函数deferredResult.onTimeout(() -> {//返回处理超时deferredResult.setResult("处理超时");//超时该处理任务deferredResultMap.remove(requestId);});//如果不存在的requestId直接抛异常Optional.ofNullable(deferredResultMap).filter(t -> !t.containsKey(requestId)).orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));deferredResultMap.put(requestId,deferredResult);System.out.println("end get");return deferredResult;}/*** 设置DeferredResult对象的result属性,模拟异步操作*/@ResponseBody@GetMapping(value = "/set")public String settingResult(@RequestParam String requestId) {//--------------------这里相当于异步的操作方法 设置DeferredResult对象的setResult方法--------if (deferredResultMap.containsKey(requestId)) {DeferredResult<String> deferredResult = deferredResultMap.get(requestId);deferredResult.setResult("处理成功:"+requestId);deferredResultMap.remove(requestId);}return "Done";}}要求:接口/test 接收请求后,立即将请求入队receiveQueue,后台线程自旋执行队列receiveQueue任务,任务完成后将结果入队resultQueue,如果监听器线程监听resultQueue,如果有任务结果,则将结果赋值给DeferredResult,返回结果响应。
定义Task,封装了DeferredResult对象和收到的消息对象,以及一个是否超时标记,用于任务完成后取出每个请求消息对应的DeferredResult对象,返回消息给客户端.
@Data@AllArgsConstructor@NoArgsConstructorpublic class Task<T> {//延时返回对象private DeferredResult<String> result;//延时消息private T message;//是否超时private Boolean isTimeout;}定义TaskQueue,用于管理队列及处理数据:
/*** 模拟队列类*/@Componentpublic class TaskQueue {/*** 接收任务队列*/private BlockingQueue<Task<String>> receiveQueue = new LinkedBlockingDeque<>(5000);/*** 任务完成结果队列*/private BlockingQueue<Task<String>> resultQueue = new LinkedBlockingDeque<>(5000);/*** 初始化任务处理线程*/public TaskQueue() {this.run();}/*** 存入请求任务** @param task task实体* @throws InterruptedException*/public void put(Task<String> task) throws InterruptedException {receiveQueue.put(task);}/*** 获取任务完成结果** @return* @throws InterruptedException*/public Task<String> get() throws InterruptedException {return resultQueue.take();}/*** 处理任务* 开启一个新线程,自旋的从接收队列中取出数据,然后处理若干秒后,将成功数据放入成功队列.* ,如果任务超时标志isTimeout超时,可以中断该任务的进行,在正常的service中,可以替换为数据库回滚等操作.*/private void run() {new Thread(() -> {while (true) {try {//从接收队列中取出任务,处理,然后放入成功队列Task<String> task = receiveQueue.take();System.out.println("队列收到数据,处理中!");Thread.sleep(1000);task.setMessage("成功");//TODO:如果超时了,中断该任务-此处应该加锁if (task.getIsTimeout()) {System.out.println("任务超时,处理线程中断该任务");continue;}resultQueue.put(task);System.out.println("队列处理完成!");} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}定义队列监听线程, 当spring容器加载完毕,开启新线程,自旋的从模拟队列的完成队列中获取数据,并使用ReferredResult返回
@Componentpublic class QueueResultListener implements ApplicationListener<ContextRefreshedEvent> {@AutowiredTaskQueue taskQueue;@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {new Thread(() -> {try {Task<String> task = taskQueue.get();task.getResult().setResult(task.getMessage());System.out.println("监听器获取到结果:task=" + task);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}实现Controller异步接口
@Controllerpublic class DeferredResultQueueController {@AutowiredTaskQueue taskQueue;@ResponseBody@GetMapping("/test")public DeferredResult<String> test(@RequestParam String requestId,@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) throws InterruptedException {//新建延期返回对象并设置超时时间,优先级比configureAsyncSupport方法中默认配置中的高System.out.println("start test");//初始化延迟任务DeferredResult<String> deferredResult = new DeferredResult<>(timeout);//要执行的任务Task<String> task = new Task<String>(deferredResult, "任务", false);//设置超时后执行的任务,优先级比DeferredResultProcessingInterceptor拦截器中的高deferredResult.onTimeout(() -> {System.out.println("任务超时 id=" + requestId);//TODO:告知该任务已经超时-此处应该加锁task.setMessage("任务超时");task.setIsTimeout(true);});//任务入队taskQueue.put(task);System.out.println("end test");return deferredResult;}}参考文章
Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable或者DeferredResult。
什么是 MQTT协议?
MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。
TCP协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于TCP/IP协议上,也就是说只要支持TCP/IP协议栈的地方,都可以使用MQTT协议。
MQTT协议为什么在物联网(IOT)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP协议呢?
首先HTTP协议它是一种同步协议,客户端请求后需要等待服务端的响应。而在物联网(IOT)环境中,设备会很受制于环境影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合IOT应用程序。
HTTP是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。 通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP要实现这样的功能不但很困难,而且成本极高。
springboot+rabbitmq实现智能家居实例详解
**SSE( Server-sent Events )**是 WebSocket 的一种轻量代替方案,使用 HTTP 协议,在服务器和客户端之间打开一个单向通道,只能服务器向客户端发送消息,服务端响应的不再是一次性的数据包,而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。
SSE与WebSocket作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:
在 html5 的定义中,服务端 sse,一般需要遵循以下规范:
SSE 如何保证数据完整性
实例:客户端发送请求到服务端,服务端以流的形式不断向客户端推送数据示例,增加帅气值。
服务端代码(注意响应头以及固定返回数据格式)
@Controller@RequestMapping(value = "/sse")public class SEEController {//响应头为text/event-stream;charset=UTF-8@RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8")public void push(HttpServletResponse response) {response.setContentType("text/event-stream");response.setCharacterEncoding("utf-8");int i = 0;while (true) {try {Thread.sleep(1000);PrintWriter pw = response.getWriter();//注意返回数据必须以data:开头,"\n\n"结尾pw.write("data:xdm帅气值加" + i + "\n\n");pw.flush();//检测异常时断开连接if (pw.checkError()) {log.error("客户端断开连接");return;}} catch (Exception e) {e.printStackTrace();}i++;}}}前端代码(重写message、open、error事件)
<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>SSE Demo</title></head><body><p id="msg_from_server">空白</p><script type="text/javascript" src="../js/jquery.js"></script><script type="text/javascript">if (!!window.EventSource) {var source = new EventSource('/sse/get'); s = '';//客户端收到服务器发来的数据 另一种写法:source.onmessage = function (event) {}source.addEventListener('message', function(e) {s += e.data + "<br/>"$("#msg_from_server").html(s);});// 连接一旦建立,就会触发open事件 另一种写法:source.onopen = function (event) {}source.addEventListener('open', function(e) {console.log("连接打开.");}, false);// 如果发生通信错误(比如连接中断),就会触发error事件 另一种写法:source.onerror = function (event) {}source.addEventListener('error', function(e) {if (e.readyState == EventSource.CLOSED) {console.log("连接关闭");} else {console.log(e.readyState);}}, false);} else {alert(4);console.log("没有sse");}</script></body></html>演示SSE的连接建立、接收数据和异常情况监听处理。
服务端
@Controller@RequestMapping(value = "/sse")@Slf4jpublic class SSEPlusController {private static Map<String, SseEmitter> cache = new ConcurrentHashMap<>();String clientId;int sseId;@GetMapping("/create")public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) {// 设置超时时间,0表示不过期。默认30000毫秒//可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接SseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (Strings.isBlank(clientId)) {clientId = UUID.randomUUID().toString();}this.clientId = clientId;cache.put(clientId, sseEmitter);log.info("sse连接,当前客户端:{}", clientId);return sseEmitter;}@Scheduled(cron = "0/3 * * * * ? ")public void pushMessage() {try {sseId++;SseEmitter sseEmitter = cache.get(clientId);sseEmitter.send(SseEmitter.event().data("帅气值暴增" + sseId).id("" + sseId).reconnectTime(3000));} catch (Exception e) {log.error(e.getMessage());sseId--;}}@GetMapping("/close")public void close(String clientId) {SseEmitter sseEmitter = cache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();cache.remove(clientId);}}}复杂代码
/*** SSE长链接*/@RestController@RequestMapping("/sse")public class SseEmitterController {@Autowiredprivate SseEmitterService sseEmitterService;/*** 创建SSE长链接** @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter* @author re* @date 2021/12/12**/@CrossOrigin //如果nginx做了跨域处理,此处可去掉@GetMapping("/CreateSseConnect")public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {return sseEmitterService.createSseConnect(clientId);}/*** 关闭SSE连接** @param clientId 客户端ID* @author re* @date 2021/12/13**/@GetMapping("/CloseSseConnect")public Result closeSseConnect(String clientId) {sseEmitterService.closeSseConnect(clientId);return ResultGenerator.genSuccessResult(true);}} @Servicepublic class SseEmitterServiceImpl implements SseEmitterService {/*** 容器,保存连接,用于输出返回*/private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Overridepublic SseEmitter createSseConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StringUtils.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调sseEmitter.onCompletion(completionCallBack(clientId));sseCache.put(clientId, sseEmitter);logger.info("创建新的sse连接,当前用户:{}", clientId);try {sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));} catch (IOException e) {logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);throw new BusinessException("创建连接异常!", e);}return sseEmitter;}@Overridepublic void closeSseConnect(String clientId) {SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}// 根据客户端id获取SseEmitter对象@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return sseCache.get(clientId);}// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息@Overridepublic void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {if (CollectionUtil.isEmpty(sseCache)) {return;}for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId 客户端ID* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可* @author re* @date 2022/3/30**/private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, sseEmitterResultVOList.toString());return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID* @return java.lang.Runnable* @author re* @date 2021/12/14**/private Runnable completionCallBack(String clientId) {return () -> {logger.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID* @return java.lang.Runnable* @author re* @date 2021/12/14**/private Runnable timeoutCallBack(String clientId) {return () -> {logger.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID* @return java.util.function.Consumer<java.lang.Throwable>**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {e.printStackTrace();}}};}/*** 移除用户连接* @param clientId 客户端ID* @author re**/private void removeUser(String clientId) {sseCache.remove(clientId);logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。
@ControllerAdvicepublic class AsyncRequestTimeoutHandler {@ResponseStatus(HttpStatus.NOT_MODIFIED)@ResponseBody@ExceptionHandler(AsyncRequestTimeoutException.class)public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {System.out.println("异步请求超时");return "304";}} SseEmitter.event()用来得到一个记录数据的容器。.data("帅气值暴增" + sseId)发送给客户端的数据。.id("" + sseId)记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。.reconnectTime(3000)定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。客户端:
注:若浏览器不兼容在页面引入evensource.js。
如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;proxy_buffering off;proxy_http_version 1.1;proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;
创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
特点
websocket运用场景:
支持WebSocket的主流浏览器如下:
发送文本的例子。
ws.send('your message');发送 Blob 对象的例子。
var file = document.querySelector('input[type="file"]').files[0];ws.send(file);发送 ArrayBuffer 对象的例子。
// Sending canvas ImageData as ArrayBuffervar img = canvas_context.getImageData(0, 0, 400, 320);var binary = new Uint8Array(img.data.length);for (var i = 0; i < img.data.length; i++) {binary[i] = img.data[i];}ws.send(binary.buffer);常用的 Node 实现
常用的 Java实现
使用@ServerEndpoint标注当前类为一个websocket服务器,客户端可以通过ws://localhost:8088/webSocketByTomcat/10086来连接到WebSocket服务器端。
@ServerEndpoint("/webSocketByTomcat/{username}")public class WebSocketServer {//在线人数private static int onlineCount = 0;//存储会话private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();//当前会话private Session session;//当前用户private String username;//建立连接@OnOpenpublic void onOpen(@PathParam("username") String username, Session session) throws IOException {this.username = username;this.session = session;//自增在线人数addOnlineCount();//存储当前会话clients.put(username, this);System.out.println("已连接");}//连接关闭@OnClosepublic void onClose() throws IOException {//移除当前会话clients.remove(username);//自减在线人数subOnlineCount();}//发送消息客户客户端@OnMessagepublic void onMessage(String message) throws IOException {JSONObject jsonTo = JSONObject.fromObject(message);//单独发if (!jsonTo.get("To").equals("All")){sendMessageTo("给一个人", jsonTo.get("To").toString());}//群发else{sendMessageAll("给所有人");}}//连接失败@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}//发送消息给指定客户端public void sendMessageTo(String message, String to) throws IOException {// session.getBasicRemote().sendText(message);//session.getAsyncRemote().sendText(message);for (WebSocketServer item : clients.values()) {if (item.username.equals(to) ) {item.session.getAsyncRemote().sendText(message);}}}//发送消息给所有客户端public void sendMessageAll(String message) throws IOException {for (WebSocketServer item : clients.values()) {item.session.getAsyncRemote().sendText(message);}}//获取在线人数public static synchronized int getOnlineCount() {return onlineCount;}//自增在线人数public static synchronized void addOnlineCount() {WebSocketServer.onlineCount++;}//自减在线人数public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }//获取所有客户端public static synchronized Map<String, WebSocketServer> getClients() {return clients;}}客户端
配置类注入handler
/*** websocket的配置类*/@Configuration@EnableWebSocketpublic class CustomWebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(customWebSocketHandler(), "/webSocketBySpring/customWebSocketHandler").addInterceptors(new CustomWebSocketInterceptor()).setAllowedOrigins("*");registry.addHandler(customWebSocketHandler(), "/sockjs/webSocketBySpring/customWebSocketHandler").addInterceptors(new CustomWebSocketInterceptor()).setAllowedOrigins("*").withSockJS();}@Beanpublic WebSocketHandler customWebSocketHandler() {return new CustomWebSocketHandler();}}使用withSockJS()的原因:
如果代码中添加了withSockJS()如下,服务器也会自动降级为轮询。
registry.addEndpoint("/coordination").withSockJS();