Browse Source

优化代码

java110 4 years ago
parent
commit
39d26b7244

+ 3 - 0
java110-utils/src/main/java/com/java110/utils/constant/KafkaConstant.java

@@ -19,4 +19,7 @@ public class KafkaConstant {
      * 通知 中心服务
      */
     public final static String TOPIC_NOTIFY_CENTER_SERVICE_NAME = "NOTIFY_CENTER_SERVICE";
+
+
+    public final static String TOPIC_API_SEND_WEB = "webSentMessageTopic";
 }

+ 8 - 2
service-api/src/main/java/com/java110/api/kafka/FrontServiceKafka.java

@@ -1,8 +1,9 @@
 package com.java110.api.kafka;
 
 import com.alibaba.fastjson.JSONObject;
-import com.java110.core.base.controller.BaseController;
 import com.java110.api.websocket.MessageWebsocket;
+import com.java110.core.base.controller.BaseController;
+import com.java110.utils.constant.KafkaConstant;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -17,7 +18,12 @@ public class FrontServiceKafka extends BaseController {
     private final static Logger logger = LoggerFactory.getLogger(FrontServiceKafka.class);
 
 
-    @KafkaListener(topics = {"webSentMessageTopic"})
+    /**
+     * 像前段返回内容
+     *
+     * @param record
+     */
+    @KafkaListener(topics = {KafkaConstant.TOPIC_API_SEND_WEB})
     public void listen(ConsumerRecord<?, ?> record) {
         logger.info("kafka的key: " + record.key());
         logger.info("kafka的value: " + record.value().toString());

+ 17 - 17
service-api/src/main/java/com/java110/api/websocket/MessageWebsocket.java

@@ -8,11 +8,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
+import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
@@ -26,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * @Version 1.0
  * add by wuxw 2020/6/5
  **/
-@ServerEndpoint("/ws/message/{userId}")
+@ServerEndpoint("/ws/message/{userId}/{clientId}")
 @Component
 public class MessageWebsocket {
     private final static Logger logger = LoggerFactory.getLogger(MessageWebsocket.class);
@@ -54,14 +50,17 @@ public class MessageWebsocket {
      */
     private String userId = "";
 
+    private String clientId = "";
+
 
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
-    public void onOpen(Session session, @PathParam("userId") String userId) {
+    public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("clientId") String clientId) {
         this.session = session;
         this.userId = userId;
+        this.clientId = clientId;
         if (webSocketMap.containsKey(userId)) {
             webSocketMap.remove(userId);
             webSocketMap.put(userId, this);
@@ -74,12 +73,12 @@ public class MessageWebsocket {
         }
 
 
-        logger.debug("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
+        logger.debug("用户连接:" + userId + ",客户端:" + clientId + ",当前在线人数为:" + getOnlineCount());
 
         try {
             sendMessage("连接成功");
         } catch (IOException e) {
-            logger.error("用户:" + userId + ",网络异常!!!!!!");
+            logger.error("用户:" + userId + ",客户端:" + clientId + ",网络异常!!!!!!");
         }
     }
 
@@ -93,7 +92,7 @@ public class MessageWebsocket {
             //从set中删除
             subOnlineCount();
         }
-        logger.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
+        logger.info("用户退出:" + userId + ",客户端:" + clientId + ",当前在线人数为:" + getOnlineCount());
     }
 
     /**
@@ -103,7 +102,7 @@ public class MessageWebsocket {
      */
     @OnMessage
     public void onMessage(String message, Session session) throws IOException {
-        logger.info("用户消息:" + userId + ",报文:" + message);
+        logger.info("用户消息:" + userId + ",客户端:" + clientId + ",报文:" + message);
         //可以群发消息
         //消息保存到数据库、redis
         if (StringUtil.isEmpty(message)) {
@@ -120,7 +119,7 @@ public class MessageWebsocket {
 
         WsDataDto wsDataDto = JSONObject.parseObject(message, WsDataDto.class);
 
-        switch (wsDataDto.getCmd()){
+        switch (wsDataDto.getCmd()) {
             case WsDataDto.CMD_PING:
                 //webSocketMap.get(userId).sendMessage(wsDataDto.toString());
                 break;
@@ -148,8 +147,8 @@ public class MessageWebsocket {
      */
     @OnError
     public void onError(Session session, Throwable error) {
-        logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
-        error.printStackTrace();
+        logger.error("用户错误:" + this.userId + ",客户端:" + clientId + ",原因:" + error.getMessage());
+        // error.printStackTrace();
     }
 
     /**
@@ -165,10 +164,11 @@ public class MessageWebsocket {
      */
     public static void sendInfo(String message, String userId) throws IOException {
         logger.info("发送消息到:" + userId + ",报文:" + message);
-
+        if (!webSocketMap.containsKey(userId)) {
+            //客户端未连接
+            return;
+        }
         webSocketMap.get(userId).sendMessage(message);
-
-
     }
 
     public static synchronized int getOnlineCount() {