ParkingAreaWebsocket.java 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package com.java110.api.websocket;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.java110.dto.WsDataDto;
  4. import com.java110.utils.util.StringUtil;
  5. import com.java110.vo.ResultVo;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.stereotype.Component;
  9. import javax.websocket.*;
  10. import javax.websocket.server.PathParam;
  11. import javax.websocket.server.ServerEndpoint;
  12. import java.io.IOException;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. /**
  15. * @ClassName 停车场 ws
  16. * @Description TODO
  17. * @Author wuxw
  18. * @Date 2020/5/25 12:13
  19. * @Version 1.0
  20. * add by wuxw 2020/5/25
  21. **/
  22. @ServerEndpoint("/ws/parkingArea/{paId}/{clientId}")
  23. @Component
  24. public class ParkingAreaWebsocket {
  25. private static Logger logger = LoggerFactory.getLogger(ParkingAreaWebsocket.class);
  26. /**
  27. * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  28. */
  29. private static int onlineCount = 0;
  30. /**
  31. * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  32. */
  33. private static ConcurrentHashMap<String, ParkingAreaWebsocket> webSocketMap = new ConcurrentHashMap<>();
  34. /**
  35. * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  36. */
  37. private static ConcurrentHashMap<String, String> clientMachineMap = new ConcurrentHashMap<>();
  38. /**
  39. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  40. */
  41. private Session session;
  42. /**
  43. * 接收clientId
  44. */
  45. private String clientId = "";
  46. private String paId = "";
  47. /**
  48. * 连接建立成功调用的方法
  49. */
  50. @OnOpen
  51. public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("paId") String paId) {
  52. this.session = session;
  53. this.clientId = clientId;
  54. this.paId = paId;
  55. if (webSocketMap.containsKey(clientId)) {
  56. webSocketMap.remove(clientId);
  57. webSocketMap.put(clientId, this);
  58. //加入set中
  59. } else {
  60. webSocketMap.put(clientId, this);
  61. //加入set中
  62. addOnlineCount();
  63. //在线数加1
  64. }
  65. logger.debug("用户连接:" + clientId + ",当前在线人数为:" + getOnlineCount());
  66. try {
  67. sendMessage("连接成功");
  68. } catch (IOException e) {
  69. logger.error("用户:" + clientId + ",网络异常!!!!!!");
  70. }
  71. }
  72. /**
  73. * 连接关闭调用的方法
  74. */
  75. @OnClose
  76. public void onClose() {
  77. if (webSocketMap.containsKey(clientId)) {
  78. webSocketMap.remove(clientId);
  79. //从set中删除
  80. subOnlineCount();
  81. }
  82. logger.info("用户退出:" + clientId + ",当前在线人数为:" + getOnlineCount());
  83. }
  84. /**
  85. * 收到客户端消息后调用的方法
  86. *
  87. * @param message 客户端发送过来的消息
  88. */
  89. @OnMessage
  90. public void onMessage(String message, Session session) throws Exception {
  91. logger.info("用户消息:" + paId + ",客户端:" + clientId + ",报文:" + message);
  92. //可以群发消息
  93. //消息保存到数据库、redis
  94. if (StringUtil.isEmpty(message)) {
  95. ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "未包含内容");
  96. webSocketMap.get(clientId).sendMessage(resultVo.toString());
  97. return;
  98. }
  99. if (!StringUtil.isJsonObject(message)) {
  100. ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "不是有效数据格式");
  101. webSocketMap.get(clientId).sendMessage(resultVo.toString());
  102. return;
  103. }
  104. WsDataDto wsDataDto = JSONObject.parseObject(message, WsDataDto.class);
  105. switch (wsDataDto.getCmd()) {
  106. case WsDataDto.CMD_PING:
  107. //webSocketMap.get(userId).sendMessage(wsDataDto.toString());
  108. break;
  109. }
  110. }
  111. /**
  112. * @param session
  113. * @param error
  114. */
  115. @OnError
  116. public void onError(Session session, Throwable error) {
  117. logger.error("用户错误:" + this.clientId + ",原因:" + error.getMessage());
  118. error.printStackTrace();
  119. }
  120. /**
  121. * 实现服务器主动推送
  122. */
  123. public void sendMessage(String message) throws IOException {
  124. this.session.getBasicRemote().sendText(message);
  125. }
  126. /**
  127. * 发送设备监控信息
  128. */
  129. public static void sendInfo(String message, String paId) throws IOException {
  130. logger.info("发送消息到:" + paId + ",报文:" + message);
  131. for (ParkingAreaWebsocket server : webSocketMap.values()) {
  132. if (paId.equals(server.paId)) {
  133. webSocketMap.get(server.clientId).sendMessage(message);
  134. }
  135. }
  136. }
  137. public static synchronized int getOnlineCount() {
  138. return onlineCount;
  139. }
  140. public static synchronized void addOnlineCount() {
  141. ParkingAreaWebsocket.onlineCount++;
  142. }
  143. public static synchronized void subOnlineCount() {
  144. ParkingAreaWebsocket.onlineCount--;
  145. }
  146. }