Просмотр исходного кода

完成 java110-logAgent 功能

wuxw7 лет назад: 7
Родитель
Сommit
37c6fc4c4d

+ 39 - 26
CenterService/src/main/java/com/java110/center/smo/impl/CenterServiceSMOImpl.java

@@ -21,6 +21,7 @@ import com.java110.entity.center.Business;
 import com.java110.entity.center.DataFlowLinksCost;
 import com.java110.event.center.DataFlowEventPublishing;
 
+import com.java110.log.agent.LogAgent;
 import com.java110.service.smo.IQueryServiceSMO;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -132,12 +133,14 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
                 //DataFlowFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate());
 
                 //这里保存耗时,以及日志
-                //saveLogMessage(dataFlow.getReqJson(), dataFlow.getResJson());
+                saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()),
+                        LogAgent.createLogMessage(dataFlow.getResponseHeaders(),dataFlow.getResJson().toJSONString()));
 
                 //保存耗时
                 //saveCostTimeLogMessage(dataFlow);
                 //处理返回报文鉴权
                 AuthenticationFactory.putSign(dataFlow, responseJson);
+
             }
             resJson = encrypt(responseJson.toJSONString(),headers);
             /*LogAgent.sendLog(dataFlow.reBuilder(dataFlow.getTransactionId(),
@@ -211,7 +214,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
                 DataFlowFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate());
 
                 //这里保存耗时,以及日志
-                saveLogMessage(dataFlow.getReqJson(), dataFlow.getResJson());
+                saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()),
+                        LogAgent.createLogMessage(dataFlow.getResponseHeaders(),dataFlow.getResJson().toJSONString()));
 
                 //保存耗时
                 saveCostTimeLogMessage(dataFlow);
@@ -579,7 +583,10 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
                     //发起撤单
                     KafkaFactory.sendKafkaMessage(appRoute.getAppService().getMessageQueueName(),"",
                             DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()).toJSONString());
-                    saveLogMessage(DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null);
+                    //saveLogMessage(DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null);
+
+                    saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()).toJSONString()),
+                            LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS));
                 }
             }
         }
@@ -716,7 +723,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
             updateBusinessNotifyError(dataFlow);
         }finally{
             DataFlowFactory.addCostTime(dataFlow, "receiveBusinessSystemNotifyMessage", "接受业务系统通知消息耗时", startDate);
-            saveLogMessage(dataFlow.getReqJson(),null);
+            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getReqJson().toJSONString()),
+                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS));
         }
     }
 
@@ -864,7 +872,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
         KafkaFactory.sendKafkaMessage(
                 DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"",DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString());
 
-        saveLogMessage(DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow),null);
+        saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()),
+                LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS));
     }
 
     /**
@@ -878,7 +887,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
         KafkaFactory.sendKafkaMessage(
                 DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"",
                 DataFlowFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString());
-        saveLogMessage(DataFlowFactory.getNotifyBusinessErrorJson(dataFlow),null);
+        saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()),
+                LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_ERROR));
     }
 
     /**
@@ -970,7 +980,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
             JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, requestBusinessJson);
 
             DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"-doComplete耗时", businessStartDate);
-            saveLogMessage(requestBusinessJson,responseJson);
+            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
+                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()));
         }
 
     }
@@ -999,7 +1010,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
 
             updateBusinessStatusCdByBId(business.getbId(),StatusConstant.STATUS_CD_COMPLETE);
             DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"耗时", businessStartDate);
-            saveLogMessage(requestBusinessJson,responseJson);
+            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
+                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()));
         }
 
         if(dataFlow.getCurrentBusiness() == null){
@@ -1029,7 +1041,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
             requestBusinessJson = DataFlowFactory.getDeleteInstanceTableJson(dataFlow,business);
             JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, requestBusinessJson);
             DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"-撤单 耗时", businessStartDate);
-            saveLogMessage(requestBusinessJson,responseJson);
+            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
+                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()));
         }
     }
 
@@ -1066,8 +1079,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
         if(service.getMethod() == null || "".equals(service.getMethod())) {//post方式
             //http://user-service/test/sayHello
             HttpHeaders header = new HttpHeaders();
-            for(String key : dataFlow.getHeaders().keySet()){
-                header.add(key,dataFlow.getHeaders().get(key));
+            for(String key : dataFlow.getRequestCurrentHeaders().keySet()){
+                header.add(key,dataFlow.getRequestCurrentHeaders().get(key));
             }
             HttpEntity<String> httpEntity = new HttpEntity<String>(reqData, header);
             responseMessage = restTemplateNoLoadBalanced.postForObject(service.getUrl(),httpEntity,String.class);
@@ -1109,7 +1122,8 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
             responseBusinesses.add(dataFlow.getResponseBusinessJson());
 
             DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"耗时", businessStartDate);
-            saveLogMessage(dataFlow.getRequestBusinessJson(),dataFlow.getResponseBusinessJson());
+            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getRequestBusinessJson().toJSONString()),
+                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),dataFlow.getResponseBusinessJson().toJSONString()));
         }
     }
 
@@ -1141,26 +1155,25 @@ public class CenterServiceSMOImpl extends LoggerEngine implements ICenterService
         dataFlow.setResponseBusinessJson(DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(),
                  ResponseConstant.RESULT_CODE_SUCCESS, "成功"));
         DataFlowFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "异步调用业务系统总耗时", startDate);
-        saveLogMessage(dataFlow.getRequestBusinessJson(),dataFlow.getResponseBusinessJson());
+        saveLogMessage(dataFlow,dataFlow.getRequestBusinessJson(),dataFlow.getResponseBusinessJson());
     }
 
 
     /**
      * 保存日志信息
-     * @param requestJson
+     * @param dataFlow 数据流对象 封装用户请求的信息
+     *
+     * @param requestJson 请求报文 格式为
+     *                    {"headers":"",
+     *                     "body":""
+     *                     }
+     * @param responseJson 请求报文 格式为
+     *                    {"headers":"",
+     *                     "body":""
+     *                     }
      */
-    private void saveLogMessage(JSONObject requestJson,JSONObject responseJson){
-
-        try{
-            if(MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_LOG_ON_OFF))){
-                JSONObject log = new JSONObject();
-                log.put("request",requestJson);
-                log.put("response",responseJson);
-                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME,"",log.toJSONString());
-            }
-        }catch (Exception e){
-            logger.error("报错日志出错了,",e);
-        }
+    private void saveLogMessage(DataFlow dataFlow,JSONObject requestJson,JSONObject responseJson){
+            LogAgent.sendLog(dataFlow,requestJson,responseJson);
     }
 
     /**

+ 18 - 3
java110-core/src/main/java/com/java110/core/context/AbstractDataFlowContext.java

@@ -56,7 +56,10 @@ public abstract class AbstractDataFlowContext extends AbstractTransactionLog imp
 
     private List<DataFlowLog> logDatas = new ArrayList<DataFlowLog>();
 
-    protected Map<String,String> headers = new HashMap<String,String>();
+    protected Map<String,String> requestHeaders = new HashMap<String,String>();
+    protected Map<String,String> requestCurrentHeaders = new HashMap<String,String>();
+    protected Map<String,String> responseHeaders = new HashMap<String,String>();
+    protected Map<String,String> responseCurrentHeaders = new HashMap<String,String>();
 
     //请求开始时间
     private Date startDate;
@@ -187,8 +190,20 @@ public abstract class AbstractDataFlowContext extends AbstractTransactionLog imp
         this.endDate = endDate;
     }
 
-    public Map<String, String> getHeaders() {
-        return headers;
+    public Map<String, String> getRequestHeaders() {
+        return requestHeaders;
+    }
+
+    public Map<String, String> getResponseHeaders() {
+        return responseHeaders;
+    }
+
+    public Map<String, String> getRequestCurrentHeaders() {
+        return requestHeaders;
+    }
+
+    public Map<String, String> getResponseCurrentHeaders() {
+        return responseHeaders;
     }
 
     public String getReqData() {

+ 2 - 1
java110-core/src/main/java/com/java110/core/context/BusinessServiceDataFlow.java

@@ -53,7 +53,8 @@ public class BusinessServiceDataFlow extends AbstractDataFlowContext {
             businesses.add(business);
             this.setCurrentBusiness(business);
             if (headerAll != null){
-                this.headers.putAll(headerAll);
+                this.requestCurrentHeaders.putAll(headerAll);
+                this.requestHeaders.putAll(headerAll);
             }
         }catch (Exception e){
             throw new InitDataFlowContextException(ResponseConstant.RESULT_PARAM_ERROR,"初始化对象 BusinessServiceDataFlow 失败 "+reqInfo);

+ 2 - 1
java110-core/src/main/java/com/java110/core/context/CodeDataFlow.java

@@ -59,7 +59,8 @@ public class CodeDataFlow extends AbstractDataFlowContext {
         this.setRequestTime(reqInfoObj.getString("requestTime"));
 
         if (headerAll != null && !headerAll.isEmpty()){
-           this.headers.putAll(headerAll);
+           this.requestCurrentHeaders.putAll(headerAll);
+           this.requestHeaders.putAll(headerAll);
         }
 
         if(headerAll != null && headerAll.containsKey("hostName")) {

+ 8 - 6
java110-core/src/main/java/com/java110/core/context/DataFlow.java

@@ -241,9 +241,10 @@ public class DataFlow extends AbstractDataFlowContext {
             }
 
             if (headerAll != null){
-                this.headers.putAll(headerAll);
-                this.setRequestURL(headers.get("REQUEST_URL"));
-                this.setIp(headers.get("IP"));
+                this.requestHeaders.putAll(headerAll);
+                this.requestCurrentHeaders.putAll(headerAll);
+                this.setRequestURL(requestHeaders.get("REQUEST_URL"));
+                this.setIp(requestHeaders.get("IP"));
             }
 
 
@@ -286,9 +287,10 @@ public class DataFlow extends AbstractDataFlowContext {
             }
 
             if (headerAll != null){
-                this.headers.putAll(headerAll);
-                this.setRequestURL(headers.get("REQUEST_URL"));
-                this.setIp(headers.get("IP"));
+                this.requestHeaders.putAll(headerAll);
+                this.requestCurrentHeaders.putAll(headerAll);
+                this.setRequestURL(requestHeaders.get("REQUEST_URL"));
+                this.setIp(requestHeaders.get("IP"));
             }
 
 

+ 22 - 1
java110-core/src/main/java/com/java110/core/context/DataFlowContext.java

@@ -44,7 +44,28 @@ public interface DataFlowContext {
 
     public List<Business> getBusinesses();
 
-    public Map<String, String> getHeaders();
+    /**
+     * 源请求头信息
+     * @return
+     */
+    public Map<String, String> getRequestHeaders();
+    /**
+     * 终返回头信息
+     * @return
+     */
+    public Map<String, String> getResponseHeaders();
+
+    /**
+     * 当前请求头信息
+     * @return
+     */
+    public Map<String, String> getRequestCurrentHeaders();
+
+    /**
+     * 当前返回头信息
+     * @return
+     */
+    public Map<String, String> getResponseCurrentHeaders();
 
 
     public Orders getOrder();

+ 3 - 0
java110-core/src/main/java/com/java110/core/context/TransactionLog.java

@@ -81,6 +81,9 @@ public interface TransactionLog extends Serializable {
 
     /**
      * 重新构建 TransactionLog 对象 主要用于服务提供方
+     * @param requestMessage 请求数据
+     * @param responseMessage 返回数据
+     * @param logStatus 数据交互状态
      * @return
      */
     public TransactionLog reBuilder(String requestMessage,String responseMessage,String logStatus);

+ 96 - 4
java110-logAgent/src/main/java/com/java110/log/agent/LogAgent.java

@@ -1,14 +1,21 @@
 package com.java110.log.agent;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSONPath;
 import com.java110.common.cache.MappingCache;
 import com.java110.common.constant.KafkaConstant;
 import com.java110.common.constant.MappingConstant;
+import com.java110.common.constant.ResponseConstant;
 import com.java110.common.factory.ApplicationContextFactory;
 import com.java110.common.kafka.KafkaFactory;
 import com.java110.common.log.LoggerEngine;
+import com.java110.common.util.Assert;
 import com.java110.core.context.DataFlow;
 import com.java110.core.context.TransactionLog;
 
+import java.util.Map;
+
 /**
  * 日志代理
  * 收集日志 发送至 日志服务
@@ -18,8 +25,6 @@ public class LogAgent extends LoggerEngine{
 
     public static final String LOG_STATUS_S = "S";
     public static final String LOG_STATUS_F = "F";
-    public static final String LOG_TYPE_S = "S";
-    public static final String LOG_TYPE_C = "C";
 
 
     /**
@@ -40,10 +45,97 @@ public class LogAgent extends LoggerEngine{
     }
 
 
-    public static boolean sendLog(DataFlow dataFlow){
-        return sendLog(dataFlow);
+    /**
+     * 发送交互日志
+     * @param dataFlow 数据流对象
+     * @param requestMessage 请求数据
+     * @param responseMessage 返回数据
+     * @param logStatus 日志状态
+     * @return
+     */
+    public static boolean sendLog(DataFlow dataFlow,String requestMessage,String responseMessage,String logStatus){
+        return sendLog(dataFlow.reBuilder(requestMessage,responseMessage,logStatus));
+    }
+
+    /**
+     * 发送交互日志
+     * 请求报文和返回报文必须组装成
+     * {"headers":"",
+     * "body":""
+     * }
+     * @param dataFlow 数据流对象
+     * @param requestMessage 请求数据
+     * @param responseMessage 返回数据
+     * @return
+     */
+    public static boolean sendLog(DataFlow dataFlow, JSONObject requestMessage, JSONObject responseMessage){
+        Assert.hasKey(responseMessage,"body","返回报文不满足 日志协议要求"+responseMessage.toJSONString());
+
+        String body = responseMessage.getString("body");
+        String logStatus = LOG_STATUS_F;
+        //如果是JSONObject
+        if(Assert.isJsonObject(body)){
+            JSONObject bodyObj = JSONObject.parseObject(body);
+
+            Object codeNode = JSONPath.eval(body,"$.orders.response.code");
+            //判断订单是否成功
+            if(codeNode != null && ResponseConstant.RESULT_CODE_SUCCESS.equals(codeNode.toString())){
+                //判断业务是否受理成功个,如果有一个业务受理失败,则认为失败
+                if(!bodyObj.containsKey("business")){
+                    return sendLog(dataFlow,requestMessage.toJSONString(),responseMessage.toJSONString(),LOG_STATUS_S);
+                }
+
+                if(bodyObj.get("business") instanceof JSONObject){
+                    JSONObject businessObj = bodyObj.getJSONObject("business");
+                    if(businessObj.containsKey("response")&&
+                            ResponseConstant.RESULT_CODE_SUCCESS.equals(businessObj.getJSONObject("response").getString("code"))){
+                        return sendLog(dataFlow,requestMessage.toJSONString(),responseMessage.toJSONString(),LOG_STATUS_S);
+                    }
+                }
 
+                if(bodyObj.get("business") instanceof JSONArray){
+                    JSONArray businessArrays = bodyObj.getJSONArray("business");
+                    if(businessArrays == null || businessArrays.size() == 0){
+                        return sendLog(dataFlow,requestMessage.toJSONString(),responseMessage.toJSONString(),LOG_STATUS_S);
+                    }
+
+                    logStatus = LOG_STATUS_S;
+                    for(int businessIndex = 0; businessIndex < businessArrays.size();businessIndex ++){
+                        JSONObject businessObj = businessArrays.getJSONObject(businessIndex);
+                        if(!businessObj.containsKey("response") ||
+                                !ResponseConstant.RESULT_CODE_SUCCESS.equals(businessObj.getJSONObject("response").getString("code"))){
+                            logStatus = LOG_STATUS_F;
+                        }
+                    }
+                }
+            }
+        }
+        //如果有xml交互,则扩展
+
+        //兼容kafka 传递消息
+        if(ResponseConstant.RESULT_CODE_SUCCESS.equals(body)){
+            logStatus = LOG_STATUS_S;
+        }
+        return sendLog(dataFlow,requestMessage.toJSONString(),responseMessage.toJSONString(),logStatus);
     }
 
 
+    /**
+     * 封装头信息和 消息信息至body中
+     * {"headers":"",
+     * "body":""
+     * }
+     * @param headers 头信息
+     * @param dataInfo 数据信息
+     * @return
+     */
+    public static JSONObject createLogMessage(Map<String,String> headers, String dataInfo){
+        JSONObject message = new JSONObject();
+        String headerMessage = (headers == null || headers.isEmpty())?"":JSONObject.toJSONString(headers);
+        message.put("headers",headerMessage);
+        message.put("body",dataInfo);
+
+        return message;
+    }
+
 }