wuxw лет назад: 3
Родитель
Сommit
0c77832e31

+ 3 - 1
service-api/src/main/java/com/java110/api/kafka/FrontServiceKafka.java

@@ -3,6 +3,7 @@ package com.java110.api.kafka;
 import com.alibaba.fastjson.JSONObject;
 import com.java110.api.websocket.MessageWebsocket;
 import com.java110.api.websocket.ParkingAreaWebsocket;
+import com.java110.api.websocket.ParkingBoxWebsocket;
 import com.java110.core.base.controller.BaseController;
 import com.java110.utils.constant.KafkaConstant;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -53,7 +54,8 @@ public class FrontServiceKafka extends BaseController {
         JSONObject param = null;
         try {
             param = JSONObject.parseObject(record.value().toString());
-            ParkingAreaWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+            ParkingBoxWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+            ParkingAreaWebsocket.sendInfo(param.toJSONString(), param.getString("extPaId"));
         } catch (Exception e) {
             logger.error("发送消息失败", e);
         } finally {

+ 8 - 1
service-api/src/main/java/com/java110/api/smo/impl/ApiCallBackInnerServiceSMOImpl.java

@@ -2,6 +2,7 @@ package com.java110.api.smo.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.java110.api.websocket.ParkingAreaWebsocket;
+import com.java110.api.websocket.ParkingBoxWebsocket;
 import com.java110.intf.api.IApiCallBackInnerServiceSMO;
 import com.java110.utils.exception.SMOException;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -13,7 +14,13 @@ public class ApiCallBackInnerServiceSMOImpl implements IApiCallBackInnerServiceS
     public int webSentParkingArea(@RequestBody JSONObject reqJson) {
         JSONObject param = JSONObject.parseObject(reqJson.toString());
         try {
-            ParkingAreaWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+            ParkingBoxWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+
+        } catch (Exception e) {
+            throw new SMOException(e.getMessage());
+        }
+        try {
+            ParkingAreaWebsocket.sendInfo(param.toJSONString(), param.getString("extPaId"));
         } catch (Exception e) {
             throw new SMOException(e.getMessage());
         }

+ 8 - 8
service-api/src/main/java/com/java110/api/websocket/ParkingAreaWebsocket.java

@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @Version 1.0
  * add by wuxw 2020/5/25
  **/
-@ServerEndpoint("/ws/parkingArea/{boxId}/{clientId}")
+@ServerEndpoint("/ws/parkingArea/{paId}/{clientId}")
 @Component
 public class ParkingAreaWebsocket {
 
@@ -50,16 +50,16 @@ public class ParkingAreaWebsocket {
      */
     private String clientId = "";
 
-    private String boxId = "";
+    private String paId = "";
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
-    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("boxId") String boxId) {
+    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("paId") String paId) {
         this.session = session;
         this.clientId = clientId;
-        this.boxId = boxId;
+        this.paId = paId;
         if (webSocketMap.containsKey(clientId)) {
             webSocketMap.remove(clientId);
             webSocketMap.put(clientId, this);
@@ -101,7 +101,7 @@ public class ParkingAreaWebsocket {
      */
     @OnMessage
     public void onMessage(String message, Session session) throws Exception {
-        logger.info("用户消息:" + boxId + ",客户端:" + clientId + ",报文:" + message);
+        logger.info("用户消息:" + paId + ",客户端:" + clientId + ",报文:" + message);
         //可以群发消息
         //消息保存到数据库、redis
         if (StringUtil.isEmpty(message)) {
@@ -146,10 +146,10 @@ public class ParkingAreaWebsocket {
     /**
      * 发送设备监控信息
      */
-    public static void sendInfo(String message, String boxId) throws IOException {
-        logger.info("发送消息到:" + boxId + ",报文:" + message);
+    public static void sendInfo(String message, String paId) throws IOException {
+        logger.info("发送消息到:" + paId + ",报文:" + message);
         for (ParkingAreaWebsocket server : webSocketMap.values()) {
-            if (boxId.equals(server.boxId)) {
+            if (paId.equals(server.paId)) {
                 webSocketMap.get(server.clientId).sendMessage(message);
             }
         }

+ 169 - 0
service-api/src/main/java/com/java110/api/websocket/ParkingBoxWebsocket.java

@@ -0,0 +1,169 @@
+package com.java110.api.websocket;
+
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.log.LoggerFactory;
+import com.java110.dto.WsDataDto;
+import com.java110.utils.util.StringUtil;
+import com.java110.vo.ResultVo;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @ClassName 停车场 ws
+ * @Description TODO
+ * @Author wuxw
+ * @Date 2020/5/25 12:13
+ * @Version 1.0
+ * add by wuxw 2020/5/25
+ **/
+@ServerEndpoint("/ws/parkingBox/{boxId}/{clientId}")
+@Component
+public class ParkingBoxWebsocket {
+
+    private static Logger logger = LoggerFactory.getLogger(ParkingBoxWebsocket.class);
+
+    /**
+     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+     */
+    private static int onlineCount = 0;
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static ConcurrentHashMap<String, ParkingBoxWebsocket> webSocketMap = new ConcurrentHashMap<>();
+
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static ConcurrentHashMap<String, String> clientMachineMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+    /**
+     * 接收clientId
+     */
+    private String clientId = "";
+
+    private String boxId = "";
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("boxId") String boxId) {
+        this.session = session;
+        this.clientId = clientId;
+        this.boxId = boxId;
+        if (webSocketMap.containsKey(clientId)) {
+            webSocketMap.remove(clientId);
+            webSocketMap.put(clientId, this);
+            //加入set中
+        } else {
+            webSocketMap.put(clientId, this);
+            //加入set中
+            addOnlineCount();
+            //在线数加1
+        }
+
+
+        logger.debug("用户连接:" + clientId + ",当前在线人数为:" + getOnlineCount());
+
+        try {
+            sendMessage("连接成功");
+        } catch (IOException e) {
+            logger.error("用户:" + clientId + ",网络异常!!!!!!");
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        if (webSocketMap.containsKey(clientId)) {
+            webSocketMap.remove(clientId);
+            //从set中删除
+            subOnlineCount();
+        }
+        logger.info("用户退出:" + clientId + ",当前在线人数为:" + getOnlineCount());
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) throws Exception {
+        logger.info("用户消息:" + boxId + ",客户端:" + clientId + ",报文:" + message);
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (StringUtil.isEmpty(message)) {
+            ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "未包含内容");
+            webSocketMap.get(clientId).sendMessage(resultVo.toString());
+            return;
+        }
+
+        if (!StringUtil.isJsonObject(message)) {
+            ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "不是有效数据格式");
+            webSocketMap.get(clientId).sendMessage(resultVo.toString());
+            return;
+        }
+
+        WsDataDto wsDataDto = JSONObject.parseObject(message, WsDataDto.class);
+
+        switch (wsDataDto.getCmd()) {
+            case WsDataDto.CMD_PING:
+                //webSocketMap.get(userId).sendMessage(wsDataDto.toString());
+                break;
+        }
+    }
+
+    /**
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        logger.error("用户错误:" + this.clientId + ",原因:" + error.getMessage());
+        error.printStackTrace();
+    }
+
+    /**
+     * 实现服务器主动推送
+     */
+    public void sendMessage(String message) throws IOException {
+        this.session.getBasicRemote().sendText(message);
+    }
+
+
+    /**
+     * 发送设备监控信息
+     */
+    public static void sendInfo(String message, String boxId) throws IOException {
+        logger.info("发送消息到:" + boxId + ",报文:" + message);
+        for (ParkingBoxWebsocket server : webSocketMap.values()) {
+            if (boxId.equals(server.boxId)) {
+                webSocketMap.get(server.clientId).sendMessage(message);
+            }
+        }
+    }
+
+    public static synchronized int getOnlineCount() {
+        return onlineCount;
+    }
+
+    public static synchronized void addOnlineCount() {
+        ParkingBoxWebsocket.onlineCount++;
+    }
+
+    public static synchronized void subOnlineCount() {
+        ParkingBoxWebsocket.onlineCount--;
+    }
+}

+ 28 - 1
springboot/src/main/java/com/java110/boot/smo/impl/ApiCallBackInnerServiceSMOImpl.java

@@ -2,18 +2,45 @@ package com.java110.boot.smo.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.java110.boot.websocket.ParkingAreaWebsocket;
+import com.java110.boot.websocket.ParkingBoxWebsocket;
+import com.java110.dto.parkingBoxArea.ParkingBoxAreaDto;
 import com.java110.intf.api.IApiCallBackInnerServiceSMO;
+import com.java110.intf.community.IParkingBoxAreaV1InnerServiceSMO;
 import com.java110.utils.exception.SMOException;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 @RestController
 public class ApiCallBackInnerServiceSMOImpl implements IApiCallBackInnerServiceSMO {
+
+    @Autowired
+    private IParkingBoxAreaV1InnerServiceSMO parkingBoxAreaV1InnerServiceSMOImpl;
+
     @Override
     public int webSentParkingArea(@RequestBody JSONObject reqJson) {
         JSONObject param = JSONObject.parseObject(reqJson.toString());
         try {
-            ParkingAreaWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+            ParkingBoxWebsocket.sendInfo(param.toJSONString(), param.getString("extBoxId"));
+
+        } catch (Exception e) {
+            throw new SMOException(e.getMessage());
+        }
+
+        ParkingBoxAreaDto parkingBoxAreaDto = new ParkingBoxAreaDto();
+        parkingBoxAreaDto.setBoxId(reqJson.getString("extBoxId"));
+        parkingBoxAreaDto.setDefaultArea(ParkingBoxAreaDto.DEFAULT_AREA_TRUE);
+
+        List<ParkingBoxAreaDto> parkingBoxAreaDtos = parkingBoxAreaV1InnerServiceSMOImpl.queryParkingBoxAreas(parkingBoxAreaDto);
+
+        if(parkingBoxAreaDtos == null || parkingBoxAreaDtos.size()<1){
+            return 1;
+        }
+
+        try {
+            ParkingAreaWebsocket.sendInfo(param.toJSONString(), parkingBoxAreaDtos.get(0).getPaId());
         } catch (Exception e) {
             throw new SMOException(e.getMessage());
         }

+ 8 - 8
springboot/src/main/java/com/java110/boot/websocket/ParkingAreaWebsocket.java

@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @Version 1.0
  * add by wuxw 2020/5/25
  **/
-@ServerEndpoint("/ws/parkingArea/{boxId}/{clientId}")
+@ServerEndpoint("/ws/parkingArea/{paId}/{clientId}")
 @Component
 public class ParkingAreaWebsocket {
 
@@ -50,16 +50,16 @@ public class ParkingAreaWebsocket {
      */
     private String clientId = "";
 
-    private String boxId = "";
+    private String paId = "";
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
-    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("boxId") String boxId) {
+    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("paId") String paId) {
         this.session = session;
         this.clientId = clientId;
-        this.boxId = boxId;
+        this.paId = paId;
         if (webSocketMap.containsKey(clientId)) {
             webSocketMap.remove(clientId);
             webSocketMap.put(clientId, this);
@@ -101,7 +101,7 @@ public class ParkingAreaWebsocket {
      */
     @OnMessage
     public void onMessage(String message, Session session) throws Exception {
-        logger.info("用户消息:" + boxId + ",客户端:" + clientId + ",报文:" + message);
+        logger.info("用户消息:" + paId + ",客户端:" + clientId + ",报文:" + message);
         //可以群发消息
         //消息保存到数据库、redis
         if (StringUtil.isEmpty(message)) {
@@ -146,10 +146,10 @@ public class ParkingAreaWebsocket {
     /**
      * 发送设备监控信息
      */
-    public static void sendInfo(String message, String boxId) throws IOException {
-        logger.info("发送消息到:" + boxId + ",报文:" + message);
+    public static void sendInfo(String message, String paId) throws IOException {
+        logger.info("发送消息到:" + paId + ",报文:" + message);
         for (ParkingAreaWebsocket server : webSocketMap.values()) {
-            if (boxId.equals(server.boxId)) {
+            if (paId.equals(server.paId)) {
                 webSocketMap.get(server.clientId).sendMessage(message);
             }
         }

+ 169 - 0
springboot/src/main/java/com/java110/boot/websocket/ParkingBoxWebsocket.java

@@ -0,0 +1,169 @@
+package com.java110.boot.websocket;
+
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.log.LoggerFactory;
+import com.java110.dto.WsDataDto;
+import com.java110.utils.util.StringUtil;
+import com.java110.vo.ResultVo;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @ClassName 停车场 ws
+ * @Description TODO
+ * @Author wuxw
+ * @Date 2020/5/25 12:13
+ * @Version 1.0
+ * add by wuxw 2020/5/25
+ **/
+@ServerEndpoint("/ws/parkingBox/{boxId}/{clientId}")
+@Component
+public class ParkingBoxWebsocket {
+
+    private static Logger logger = LoggerFactory.getLogger(ParkingBoxWebsocket.class);
+
+    /**
+     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+     */
+    private static int onlineCount = 0;
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static ConcurrentHashMap<String, ParkingBoxWebsocket> webSocketMap = new ConcurrentHashMap<>();
+
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static ConcurrentHashMap<String, String> clientMachineMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+    /**
+     * 接收clientId
+     */
+    private String clientId = "";
+
+    private String boxId = "";
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("clientId") String clientId, @PathParam("boxId") String boxId) {
+        this.session = session;
+        this.clientId = clientId;
+        this.boxId = boxId;
+        if (webSocketMap.containsKey(clientId)) {
+            webSocketMap.remove(clientId);
+            webSocketMap.put(clientId, this);
+            //加入set中
+        } else {
+            webSocketMap.put(clientId, this);
+            //加入set中
+            addOnlineCount();
+            //在线数加1
+        }
+
+
+        logger.debug("用户连接:" + clientId + ",当前在线人数为:" + getOnlineCount());
+
+        try {
+            sendMessage("连接成功");
+        } catch (IOException e) {
+            logger.error("用户:" + clientId + ",网络异常!!!!!!");
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        if (webSocketMap.containsKey(clientId)) {
+            webSocketMap.remove(clientId);
+            //从set中删除
+            subOnlineCount();
+        }
+        logger.info("用户退出:" + clientId + ",当前在线人数为:" + getOnlineCount());
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) throws Exception {
+        logger.info("用户消息:" + boxId + ",客户端:" + clientId + ",报文:" + message);
+        //可以群发消息
+        //消息保存到数据库、redis
+        if (StringUtil.isEmpty(message)) {
+            ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "未包含内容");
+            webSocketMap.get(clientId).sendMessage(resultVo.toString());
+            return;
+        }
+
+        if (!StringUtil.isJsonObject(message)) {
+            ResultVo resultVo = new ResultVo(ResultVo.CODE_ERROR, "不是有效数据格式");
+            webSocketMap.get(clientId).sendMessage(resultVo.toString());
+            return;
+        }
+
+        WsDataDto wsDataDto = JSONObject.parseObject(message, WsDataDto.class);
+
+        switch (wsDataDto.getCmd()) {
+            case WsDataDto.CMD_PING:
+                //webSocketMap.get(userId).sendMessage(wsDataDto.toString());
+                break;
+        }
+    }
+
+    /**
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        logger.error("用户错误:" + this.clientId + ",原因:" + error.getMessage());
+        error.printStackTrace();
+    }
+
+    /**
+     * 实现服务器主动推送
+     */
+    public void sendMessage(String message) throws IOException {
+        this.session.getBasicRemote().sendText(message);
+    }
+
+
+    /**
+     * 发送设备监控信息
+     */
+    public static void sendInfo(String message, String boxId) throws IOException {
+        logger.info("发送消息到:" + boxId + ",报文:" + message);
+        for (ParkingBoxWebsocket server : webSocketMap.values()) {
+            if (boxId.equals(server.boxId)) {
+                webSocketMap.get(server.clientId).sendMessage(message);
+            }
+        }
+    }
+
+    public static synchronized int getOnlineCount() {
+        return onlineCount;
+    }
+
+    public static synchronized void addOnlineCount() {
+        ParkingBoxWebsocket.onlineCount++;
+    }
+
+    public static synchronized void subOnlineCount() {
+        ParkingBoxWebsocket.onlineCount--;
+    }
+}