Procházet zdrojové kódy

优化定时任务功能

java110 před 5 roky
rodič
revize
4213b9c3d0

+ 105 - 0
JobService/src/main/java/com/java110/job/Api/JobApi.java

@@ -0,0 +1,105 @@
+package com.java110.job.Api;
+
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.base.controller.BaseController;
+import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.core.factory.DataTransactionFactory;
+import com.java110.job.smo.IJobServiceSMO;
+import com.java110.utils.constant.ResponseConstant;
+import com.java110.utils.exception.InitConfigDataException;
+import com.java110.utils.exception.InitDataFlowContextException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 用户服务类
+ * Created by wuxw on 2018/5/14.
+ */
+@RestController
+public class JobApi extends BaseController {
+
+    private static Logger logger = LoggerFactory.getLogger(JobApi.class);
+
+    @Autowired
+    IJobServiceSMO jobServiceSMOImpl;
+
+    /**
+     * @param request 页面信息封装
+     * @return
+     */
+    @RequestMapping(path = "/jobApi/service", method = RequestMethod.GET)
+    public String serviceGet(HttpServletRequest request) {
+        return DataTransactionFactory.createBusinessResponseJson(ResponseConstant.RESULT_CODE_ERROR, "不支持Get方法请求").toJSONString();
+    }
+
+    /**
+     * 用户服务统一处理接口
+     *
+     * @param orderInfo
+     * @param request
+     * @return
+     */
+    @RequestMapping(path = "/jobApi/service", method = RequestMethod.POST)
+    public String servicePost(@RequestBody String orderInfo, HttpServletRequest request) {
+        BusinessServiceDataFlow businessServiceDataFlow = null;
+        JSONObject responseJson = null;
+        try {
+            Map<String, String> headers = new HashMap<String, String>();
+            getRequestInfo(request, headers);
+            //预校验
+            preValiateOrderInfo(orderInfo);
+            businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
+            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
+        } catch (InitDataFlowContextException e) {
+            logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e);
+            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
+        } catch (InitConfigDataException e) {
+            logger.error("请求报文错误,加载配置信息失败" + orderInfo, e);
+            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
+        } catch (Exception e) {
+            logger.error("请求订单异常", e);
+            responseJson = DataTransactionFactory.createBusinessResponseJson(businessServiceDataFlow, ResponseConstant.RESULT_CODE_ERROR, e.getMessage() + e,
+                    null);
+        } finally {
+            return responseJson.toJSONString();
+        }
+    }
+
+
+    /**
+     * 这里预校验,请求报文中不能有 dataFlowId
+     *
+     * @param orderInfo
+     */
+    private void preValiateOrderInfo(String orderInfo) {
+       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
+            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点");
+        }*/
+    }
+
+    /**
+     * 获取请求信息
+     *
+     * @param request
+     * @param headers
+     * @throws RuntimeException
+     */
+    private void getRequestInfo(HttpServletRequest request, Map headers) throws Exception {
+        try {
+            super.initHeadParam(request, headers);
+            super.initUrlParam(request, headers);
+        } catch (Exception e) {
+            logger.error("加载头信息失败", e);
+            throw new InitConfigDataException(ResponseConstant.RESULT_PARAM_ERROR, "加载头信息失败");
+        }
+    }
+}

+ 16 - 0
JobService/src/main/java/com/java110/job/kafka/JobServiceBean.java

@@ -0,0 +1,16 @@
+package com.java110.job.kafka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Created by wuxw on 2018/4/15.
+ */
+@Configuration
+public class JobServiceBean {
+    @Bean
+    public JobServiceKafka listener() {
+        return new JobServiceKafka();
+    }
+
+}

+ 85 - 0
JobService/src/main/java/com/java110/job/kafka/JobServiceKafka.java

@@ -0,0 +1,85 @@
+package com.java110.job.kafka;
+
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.base.controller.BaseController;
+import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.core.factory.DataTransactionFactory;
+import com.java110.job.smo.IJobServiceSMO;
+import com.java110.utils.constant.KafkaConstant;
+import com.java110.utils.constant.ResponseConstant;
+import com.java110.utils.constant.StatusConstant;
+import com.java110.utils.exception.InitConfigDataException;
+import com.java110.utils.exception.InitDataFlowContextException;
+import com.java110.utils.kafka.KafkaFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * kafka侦听
+ * Created by wuxw on 2018/4/15.
+ */
+public class JobServiceKafka extends BaseController {
+
+    private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class);
+
+    @Autowired
+    private IJobServiceSMO jobServiceSMOImpl;
+
+    @KafkaListener(topics = {"jobServiceTopic"})
+    public void listen(ConsumerRecord<?, ?> record) {
+        logger.info("kafka的key: " + record.key());
+        logger.info("kafka的value: " + record.value().toString());
+        String orderInfo = record.value().toString();
+        BusinessServiceDataFlow businessServiceDataFlow = null;
+        JSONObject responseJson = null;
+        try {
+            Map<String, String> headers = new HashMap<String, String>();
+            //预校验
+            preValiateOrderInfo(orderInfo);
+            businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
+            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
+        } catch (InitDataFlowContextException e) {
+            logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e);
+            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
+        } catch (InitConfigDataException e) {
+            logger.error("请求报文错误,加载配置信息失败" + orderInfo, e);
+            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
+        } catch (Exception e) {
+            logger.error("请求订单异常", e);
+            responseJson = DataTransactionFactory.createBusinessResponseJson(businessServiceDataFlow, ResponseConstant.RESULT_CODE_ERROR, e.getMessage() + e,
+                    null);
+        } finally {
+            logger.debug("当前请求报文:" + orderInfo + ", 当前返回报文:" + responseJson.toJSONString());
+            //只有business 和 instance 过程才做通知消息
+            if (!StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(responseJson.getString("businessType"))
+                    && !StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(responseJson.getString("businessType"))) {
+                return;
+            }
+            try {
+                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_NOTIFY_CENTER_SERVICE_NAME, "", responseJson.toJSONString());
+            } catch (Exception e) {
+                logger.error("用户服务通知centerService失败" + responseJson, e);
+                //这里保存异常信息
+            }
+        }
+    }
+
+
+    /**
+     * 这里预校验,请求报文中不能有 dataFlowId
+     *
+     * @param orderInfo
+     */
+    private void preValiateOrderInfo(String orderInfo) {
+       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
+            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点");
+        }*/
+    }
+
+}

+ 19 - 0
JobService/src/main/java/com/java110/job/smo/IJobServiceSMO.java

@@ -0,0 +1,19 @@
+package com.java110.job.smo;
+
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.utils.exception.SMOException;
+
+/**
+ *
+ * 用户信息管理,服务
+ * Created by wuxw on 2017/4/5.
+ */
+public interface IJobServiceSMO {
+
+
+
+    public JSONObject service(BusinessServiceDataFlow businessServiceDataFlow) throws SMOException;
+
+
+}

+ 126 - 0
JobService/src/main/java/com/java110/job/smo/impl/JobServiceSMOImpl.java

@@ -0,0 +1,126 @@
+package com.java110.job.smo.impl;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.java110.core.base.smo.BaseServiceSMO;
+import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.core.event.service.BusinessServiceDataFlowEventPublishing;
+import com.java110.core.factory.DataFlowFactory;
+import com.java110.entity.center.DataFlowLinksCost;
+import com.java110.entity.center.DataFlowLog;
+import com.java110.job.smo.IJobServiceSMO;
+import com.java110.utils.cache.MappingCache;
+import com.java110.utils.constant.KafkaConstant;
+import com.java110.utils.constant.MappingConstant;
+import com.java110.utils.constant.ResponseConstant;
+import com.java110.utils.exception.SMOException;
+import com.java110.utils.kafka.KafkaFactory;
+import com.java110.utils.util.Assert;
+import com.java110.utils.util.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 用户服务信息管理业务信息实现
+ * Created by wuxw on 2017/4/5.
+ */
+@Service("jobServiceSMOImpl")
+@Transactional
+public class JobServiceSMOImpl extends BaseServiceSMO implements IJobServiceSMO {
+
+    private static Logger logger = LoggerFactory.getLogger(JobServiceSMOImpl.class);
+
+
+    //新增用户
+    private static final String USER_ACTION_ADD = "ADD";
+
+    //新增用户
+    private static final String USER_ACTION_KIP = "KIP";
+
+    //新增用户
+    private static final String USER_ACTION_DEL = "DEL";
+
+
+    @Override
+    public JSONObject service(BusinessServiceDataFlow businessServiceDataFlow) throws SMOException {
+        try {
+            Assert.hasLength(businessServiceDataFlow.getbId(), "bId 不能为空");
+
+            BusinessServiceDataFlowEventPublishing.multicastEvent(businessServiceDataFlow);
+            Assert.notEmpty(businessServiceDataFlow.getResJson(), "定时任务服务[" + businessServiceDataFlow.getCurrentBusiness().getServiceCode() + "]没有返回内容");
+        } catch (Exception e) {
+            logger.error("定时任务处理异常", e);
+            throw new SMOException(ResponseConstant.RESULT_PARAM_ERROR, "定时任务处理异常" + e.getMessage());
+        } finally {
+            if (businessServiceDataFlow == null) {
+                return null;
+            }
+
+            //这里记录日志
+            Date endDate = DateUtil.getCurrentDate();
+
+            businessServiceDataFlow.setEndDate(endDate);
+            //添加耗时
+            DataFlowFactory.addCostTime(businessServiceDataFlow, "service", "业务处理总耗时",
+                    businessServiceDataFlow.getStartDate(), businessServiceDataFlow.getEndDate());
+            //保存耗时
+            saveCostTimeLogMessage(businessServiceDataFlow);
+            //保存日志
+            saveLogMessage(businessServiceDataFlow);
+        }
+        return businessServiceDataFlow.getResJson();
+    }
+
+
+    /**
+     * 保存日志信息
+     *
+     * @param businessServiceDataFlow 业务日志对象
+     */
+    private void saveLogMessage(BusinessServiceDataFlow businessServiceDataFlow) {
+
+        try {
+            if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_LOG_ON_OFF))) {
+                for (DataFlowLog dataFlowLog : businessServiceDataFlow.getLogDatas()) {
+                    KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME, "", JSONObject.toJSONString(dataFlowLog));
+                }
+            }
+        } catch (Exception e) {
+            logger.error("报错日志出错了,", e);
+        }
+    }
+
+    /**
+     * 保存耗时信息
+     *
+     * @param businessServiceDataFlow
+     */
+    private void saveCostTimeLogMessage(BusinessServiceDataFlow businessServiceDataFlow) {
+        try {
+            if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_COST_TIME_ON_OFF))) {
+                List<DataFlowLinksCost> dataFlowLinksCosts = businessServiceDataFlow.getLinksCostDates();
+                JSONObject costDate = new JSONObject();
+                JSONArray costDates = new JSONArray();
+                JSONObject newObj = null;
+                for (DataFlowLinksCost dataFlowLinksCost : dataFlowLinksCosts) {
+                    newObj = JSONObject.parseObject(JSONObject.toJSONString(dataFlowLinksCost));
+                    newObj.put("dataFlowId", businessServiceDataFlow.getDataFlowId());
+                    newObj.put("transactionId", businessServiceDataFlow.getTransactionId());
+                    costDates.add(newObj);
+                }
+                costDate.put("costDates", costDates);
+
+                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_COST_TIME_LOG_NAME, "", costDate.toJSONString());
+            }
+        } catch (Exception e) {
+            logger.error("报错日志出错了,", e);
+        }
+    }
+
+
+}