Просмотр исходного кода

2018-4-16 新增接受业务系统通知方法,并完善业务

wuxw7 лет назад: 8
Родитель
Сommit
7274c78616
29 измененных файлов с 1542 добавлено и 88 удалено
  1. 17 0
      CenterService/doc/businessNotifyProtocol.json
  2. 21 0
      CenterService/doc/businessRequestProtocol.json
  3. 42 16
      CenterService/doc/centerService.docx
  4. 17 0
      CenterService/doc/orderStatusNotifyProtocol.json
  5. BIN
      CenterService/doc/系统调用流程图.png
  6. 33 2
      CenterService/src/main/java/com/java110/center/dao/ICenterServiceDAO.java
  7. 62 7
      CenterService/src/main/java/com/java110/center/dao/impl/CenterServiceDAOImpl.java
  8. 33 0
      CenterService/src/main/java/com/java110/center/kaka/CenserServiceKafkaListener.java
  9. 16 0
      CenterService/src/main/java/com/java110/center/kaka/CenterServiceBean.java
  10. 7 0
      CenterService/src/main/java/com/java110/center/smo/ICenterServiceSMO.java
  11. 253 29
      CenterService/src/main/java/com/java110/center/smo/impl/CenterServiceSMOImpl.java
  12. 29 0
      CenterService/src/main/resources/application.yml
  13. 8 1
      CodingLog.txt
  14. 13 0
      java110-bean/src/main/java/com/java110/entity/center/AppService.java
  15. 25 1
      java110-bean/src/main/java/com/java110/entity/center/Business.java
  16. 5 0
      java110-common/pom.xml
  17. 3 0
      java110-common/src/main/java/com/java110/common/constant/CommonConstant.java
  18. 3 0
      java110-common/src/main/java/com/java110/common/constant/MappingConstant.java
  19. 9 0
      java110-common/src/main/java/com/java110/common/constant/StatusConstant.java
  20. 189 0
      java110-common/src/main/java/com/java110/common/exception/BusinessStatusException.java
  21. 189 0
      java110-common/src/main/java/com/java110/common/exception/InitConfigDataException.java
  22. 321 6
      java110-common/src/main/java/com/java110/common/factory/DataFlowFactory.java
  23. 30 0
      java110-common/src/main/java/com/java110/common/kafka/KafkaFactory.java
  24. 44 19
      java110-common/src/main/java/com/java110/common/util/SequenceUtil.java
  25. 47 6
      java110-config/src/main/resources/mapper/center/CenterServiceDAOImplMapper.xml
  26. 1 1
      java110-config/src/main/resources/mapper/order/OrderServiceDaoImplMapper.xml
  27. 64 0
      java110-core/src/main/java/com/java110/core/kafka/KafkaConsumerConfig.java
  28. 56 0
      java110-core/src/main/java/com/java110/core/kafka/KafkaProducerConfig.java
  29. 5 0
      pom.xml

+ 17 - 0
CenterService/doc/businessNotifyProtocol.json

@@ -0,0 +1,17 @@
+{
+  "orders": {
+    "transactionId": "100000000020180409224736000001",
+    "responseTime": "20180409224736",
+    "orderTypeCd":"订单类型,查询,受理",
+    "businessType":"N"
+  },
+  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)
+    "response": {
+      "code": "1999",
+      "message": "错误消息"
+    },
+    "bId":"12345678",
+    "serviceCode": "querycustinfo"
+    //其他字段
+  }]
+}

+ 21 - 0
CenterService/doc/businessRequestProtocol.json

@@ -0,0 +1,21 @@
+{
+  "orders": {
+    "transactionId": "100000000020180409224736000001",
+    "requestTime": "20180409224736",
+    "orderTypeCd":"订单类型,查询,受理",
+    "businessType":"Q"
+  },
+  "business": [{
+    "bId":"12345678",
+    "serviceCode": "querycustinfo",
+    "serviceName": "查询客户",
+    "remark": "备注",
+    "datas": [{
+    //这里是具体业务
+    }],
+    "attrs": [{
+    "specCd": "配置的字段id",
+    "value": "具体值"
+    }]
+  }]
+}

+ 42 - 16
CenterService/doc/centerService.docx

@@ -370,8 +370,8 @@ varchar
 
 request_time
-date
-
+Varchar
+16
 请求时间
 
 create_time
@@ -759,7 +759,7 @@ varchar
  1-同步方式
  2-异步方式
 business_type_cd
-
+
 varchar
 4
 对应业务项类型
@@ -776,12 +776,18 @@ int
 
 顺序
  只有同步方式下根据seq从小到大调用接口
+messageQueueName
+是
+Varchar
+50
+消息队里名称
+ 只有异步时有用
 url
 varchar
 200
 目标地址
-
+Localhost 则调用中心服务自己,其他调用目录服务
 method
 varchar
@@ -957,39 +963,59 @@ date
  如果是同步方式,直接调用目标系统。
  如果是异步方式,直接发送kafka消息。
  如果是异步方式,kafka 监听修改状态消息,如果,下游处理失败,则作废订单和业务项数据,发送作废所有下游消息(这里传b_id 和o_id)。
- json协议模板
+ 外系统请求json协议模板
 1、请求模板
 {
 	"orders": {
-		"appid": "外系统id,分配得到",
-		"transactionid": "100000000020180409224736000001",
-		"userid": "用户id",
-		"ordertypecd": "订单类型,查询,受理",
-		"requesttime": "20180409224736",
+		"appId": "外系统id,分配得到",
+		"transactionId": "100000000020180409224736000001",
+		"userId": "用户Id",
+		"orderTypeCd": "订单类型,查询,受理",
+		"requestTime": "20180409224736",
 		"remark":"备注",
 		"sign": "这个服务是否要求md5签名",
 		"attrs": [{
-			"speccd": "配置的字段id",
+			"specCd": "配置的字段id",
 			"value": "具体值"
 		}]
 	},
 	"business": [{
-		"servicecode": "querycustinfo",
-		"servicename": "查询客户",
+		"serviceCode": "querycustinfo",
+		"serviceName": "查询客户",
 		"remark": "备注",
 		"datas": [{
 			//这里是具体业务
 		}],
 		"attrs": [{
-			"speccd": "配置的字段id",
+			"specCd": "配置的字段id",
 			"value": "具体值"
 		}]
 	}]
 }
 
 2、返回模板
-{  "orders": {    "transactionid": "100000000020180409224736000001",    "responsetime": "20180409224736",    "remark":"备注",    "sign": "这个服务是否要求md5签名",    "response": {//这个是centerorder 返回的状态结果      "code": "1999",      "message": "错误信息"    }  },  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)    "response": {      "code": "1999",      "message": "错误消息"    }    //相应内容  }]}
-六、状态码说明
+{  "orders": {    "transactionId": "100000000020180409224736000001",    "responseTime": "20180409224736",    "remark":"备注",    "sign": "这个服务是否要求md5签名",    "response": {//这个是centerorder 返回的状态结果      "code": "1999",      "message": "错误信息"    }  },  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)    "response": {      "code": "1999",      "message": "错误消息"    }    //相应内容  }]}
+
+ 系统调用方式
+
+ 请求下游系统json协议模板
+1、请求模板
+{  "orders": {    "transactionId": "100000000020180409224736000001",    "requestTime": "20180409224736",
+"orderTypeCd":"订单类型,查询,受理",    "businessType":"Q"  },  "business": [{    "bId":"12345678",    "serviceCode": "querycustinfo",    "serviceName": "查询客户",    "remark": "备注",    "datas": [{    //这里是具体业务    }],    "attrs": [{    "specCd": "配置的字段id",    "value": "具体值"    }]  }]}
+
+businessType Q 表示 请求报文 N通知报文
+
+2、返回或通知模板
+{  "orders": {    "transactionId": "100000000020180409224736000001",    "responseTime": "20180409224736",
+"orderTypeCd":"订单类型,查询,受理",    "businessType":"N"  },  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)    "response": {      "code": "1999",      "message": "错误消息"    },    "bId":"12345678",    "serviceCode": "querycustinfo"    //其他字段  }]}
+  注意:code 为0000 表示成功
+3、竣工模板
+{  "orders": {    "transactionId": "100000000020180409224736000001",    "requestTime": "20180409224736",
+"orderTypeCd":"订单类型,查询,受理",    "businessType":"N"  },  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)    "response": {      "code": "1999",      "message": "错误消息"    },    "bId":"12345678",    "serviceCode": "querycustinfo"    //其他字段  }]}
+  注意:code 为0000 表示成功
+
+
+七、状态码说明
                                  状态编码
                                     描述
                                      0000

+ 17 - 0
CenterService/doc/orderStatusNotifyProtocol.json

@@ -0,0 +1,17 @@
+{
+  "orders": {
+    "transactionId": "100000000020180409224736000001",
+    "requestTime": "20180409224736",
+    "orderTypeCd":"订单类型,查询,受理",
+    "businessType":"N"
+  },
+  "business":[{//这个是相应的业务系统返回的结果,(受理为空,查询时不为空)
+    "response": {
+      "code": "1999",
+      "message": "错误消息"
+    },
+    "bId":"12345678",
+    "serviceCode": "querycustinfo"
+    //其他字段
+  }]
+}

BIN
CenterService/doc/系统调用流程图.png


+ 33 - 2
CenterService/src/main/java/com/java110/center/dao/ICenterServiceDAO.java

@@ -27,10 +27,10 @@ public interface ICenterServiceDAO {
 
     /**
      * 保存订单项信息
-     * @param business 订单项信息
+     * @param businesses 订单项信息
      * @return
      */
-    public void saveBusiness(Map business) throws DAOException;
+    public void saveBusiness(List<Map> businesses) throws DAOException;
 
     /**
      * 保存属性信息
@@ -52,4 +52,35 @@ public interface ICenterServiceDAO {
      * @throws DAOException
      */
     public void updateBusiness(Map order) throws DAOException;
+
+    /**
+     * 根据bId 修改业务项信息
+     * @param business
+     * @throws DAOException
+     */
+    public void updateBusinessByBId(Map business) throws DAOException;
+
+    /**
+     * 当所有业务动作是否都是C,将订单信息改为 C
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public void completeOrderByBId(String bId) throws DAOException;
+
+    /**
+     * 根据bId查询订单信息
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public Map getOrderInfoByBId(String bId)throws DAOException;
+
+    /**
+     * 获取同个订单中已经完成的订单项
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public List<Map> getCommonOrderCompledBusinessByBId(String bId) throws DAOException;
 }

+ 62 - 7
CenterService/src/main/java/com/java110/center/dao/impl/CenterServiceDAOImpl.java

@@ -57,16 +57,17 @@ public class CenterServiceDAOImpl extends BaseServiceDao implements ICenterServi
 
     /**
      * 保存订单项信息
-     * @param business 订单项信息
+     * @param businesses 订单项信息
      */
     @Override
-    public void saveBusiness(Map business) throws DAOException {
+    public void saveBusiness(List<Map> businesses) throws DAOException {
 
-        LoggerEngine.debug("----【CenterServiceDAOImpl.saveBusiness】保存数据入参 : " + JSONObject.toJSONString(business));
-
-        int saveFlag = sqlSessionTemplate.insert("centerServiceDAOImpl.saveBusiness",business);
-        if(saveFlag < 1){
-            throw new DAOException(ResponseConstant.RESULT_CODE_INNER_ERROR,"保存订单项信息失败:"+ JSONObject.toJSONString(business));
+        LoggerEngine.debug("----【CenterServiceDAOImpl.saveBusiness】保存数据入参 : " + JSONObject.toJSONString(businesses));
+        for(Map business:businesses) {
+            int saveFlag = sqlSessionTemplate.insert("centerServiceDAOImpl.saveBusiness", business);
+            if (saveFlag < 1) {
+                throw new DAOException(ResponseConstant.RESULT_CODE_INNER_ERROR, "保存订单项信息失败:" + JSONObject.toJSONString(business));
+            }
         }
     }
 
@@ -116,4 +117,58 @@ public class CenterServiceDAOImpl extends BaseServiceDao implements ICenterServi
             throw new DAOException(ResponseConstant.RESULT_CODE_INNER_ERROR,"更新订单项信息失败:"+ JSONObject.toJSONString(order));
         }
     }
+
+    /**
+     * 根据bId 修改业务项信息
+     * @param business
+     * @throws DAOException
+     */
+    public void updateBusinessByBId(Map business) throws DAOException{
+        LoggerEngine.debug("----【CenterServiceDAOImpl.updateBusinessByBId】保存数据入参 : " + JSONObject.toJSONString(business));
+
+        int saveFlag = sqlSessionTemplate.update("centerServiceDAOImpl.updateBusinessByBId",business);
+        if(saveFlag < 1){
+            throw new DAOException(ResponseConstant.RESULT_CODE_INNER_ERROR,"更新订单项信息失败:"+ JSONObject.toJSONString(business));
+        }
+    }
+
+    /**
+     * 当所有业务动作是否都是C,将订单信息改为 C
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public void completeOrderByBId(String bId) throws DAOException{
+        LoggerEngine.debug("----【CenterServiceDAOImpl.completeOrderByBId】数据入参 : " + bId);
+
+        int updateFlag = sqlSessionTemplate.update("centerServiceDAOImpl.completeOrderByBId",bId);
+
+        if(updateFlag < 1){
+            throw new DAOException(ResponseConstant.RESULT_CODE_INNER_ERROR,"当前业务还没有全完成(C):"+ bId);
+        }
+    }
+
+    /**
+     * 根据bId查询订单信息
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public Map getOrderInfoByBId(String bId)throws DAOException{
+        List<Map> orders = sqlSessionTemplate.selectList("centerServiceDAOImpl.getOrderInfoByBId",bId);
+        if(orders !=null){
+            return orders.get(0);
+        }
+        return null;
+    }
+    /**
+     * 获取同个订单中已经完成的订单项
+     * @param bId
+     * @return
+     * @throws DAOException
+     */
+    public List<Map> getCommonOrderCompledBusinessByBId(String bId) throws DAOException{
+        LoggerEngine.debug("----【CenterServiceDAOImpl.getCommonOrderCompledBusinessByBId】数据入参 : " + bId);
+        return sqlSessionTemplate.selectList("centerServiceDAOImpl.getCommonOrderCompledBusinessByBId",bId);
+    }
 }

+ 33 - 0
CenterService/src/main/java/com/java110/center/kaka/CenserServiceKafkaListener.java

@@ -0,0 +1,33 @@
+package com.java110.center.kaka;
+
+import com.java110.center.smo.ICenterServiceSMO;
+import com.java110.core.base.AppBase;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+
+/**
+ * kafka侦听
+ * Created by wuxw on 2018/4/15.
+ */
+public class CenserServiceKafkaListener extends AppBase {
+
+    @Autowired
+    private ICenterServiceSMO centerServiceSMOImpl;
+
+    @KafkaListener(topics = {"notifyMessageTopic"})
+    public void listen(ConsumerRecord<?, ?> record) {
+        logger.info("kafka的key: " + record.key());
+        logger.info("kafka的value: " + record.value().toString());
+        centerServiceSMOImpl.receiveBusinessSystemNotifyMessage(record.value().toString());
+    }
+
+
+    public ICenterServiceSMO getCenterServiceSMOImpl() {
+        return centerServiceSMOImpl;
+    }
+
+    public void setCenterServiceSMOImpl(ICenterServiceSMO centerServiceSMOImpl) {
+        this.centerServiceSMOImpl = centerServiceSMOImpl;
+    }
+}

+ 16 - 0
CenterService/src/main/java/com/java110/center/kaka/CenterServiceBean.java

@@ -0,0 +1,16 @@
+package com.java110.center.kaka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Created by wuxw on 2018/4/15.
+ */
+@Configuration
+public class CenterServiceBean {
+    @Bean
+    public CenserServiceKafkaListener listener() {
+        return new CenserServiceKafkaListener();
+    }
+
+}

+ 7 - 0
CenterService/src/main/java/com/java110/center/smo/ICenterServiceSMO.java

@@ -17,4 +17,11 @@ public interface ICenterServiceSMO {
      * @return
      */
     public String service(String reqJson, Map<String,String> headers) throws SMOException;
+
+    /**
+     * 接受业务系统通知消息
+     * @param receiveJson 接受报文
+     * @throws SMOException
+     */
+    public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException;
 }

+ 253 - 29
CenterService/src/main/java/com/java110/center/smo/impl/CenterServiceSMOImpl.java

@@ -1,29 +1,24 @@
 package com.java110.center.smo.impl;
 
 import com.java110.center.dao.ICenterServiceDAO;
-import com.java110.common.cache.AppRouteCache;
 import com.java110.center.smo.ICenterServiceSMO;
+import com.java110.common.cache.AppRouteCache;
 import com.java110.common.cache.MappingCache;
 import com.java110.common.constant.MappingConstant;
 import com.java110.common.constant.ResponseConstant;
 import com.java110.common.exception.*;
 import com.java110.common.factory.DataFlowFactory;
+import com.java110.common.kafka.KafkaFactory;
 import com.java110.common.log.LoggerEngine;
 import com.java110.common.util.DateUtil;
 import com.java110.common.util.ResponseTemplateUtil;
 import com.java110.common.util.StringUtil;
-import com.java110.entity.center.AppRoute;
-import com.java110.entity.center.AppService;
-import com.java110.entity.center.Business;
-import com.java110.entity.center.DataFlow;
-import com.java110.entity.rule.RuleEntrance;
+import com.java110.entity.center.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * 中心服务处理类
@@ -34,7 +29,7 @@ import java.util.Map;
 public class CenterServiceSMOImpl implements ICenterServiceSMO {
 
     @Autowired
-    ICenterServiceDAO orderServiceDaoImpl;
+    ICenterServiceDAO centerServiceDaoImpl;
 
     @Override
     public String service(String reqJson, Map<String, String> headers) throws SMOException{
@@ -60,11 +55,14 @@ public class CenterServiceSMOImpl implements ICenterServiceSMO {
                 //7.0 作废订单和业务项
                 invalidOrderAndBusiness(dataFlow);
                 //8.0 广播作废业务系统订单信息
-                invalidBusinessSystem(dataFlow);
+                //想法:这里可以直接不广播,只有在业务返回时才广播,
+                // 疑问:在这里部分消息发出去了,如果在receiveBusinessSystemNotifyMessage这个方法中依然没有收到,我们认为是下游系统也是失败了不用处理,
+                //目前看逻辑也是对的
+                //invalidBusinessSystem(dataFlow);
             } catch (Exception e1) {
                 LoggerEngine.error("作废订单失败", e);
-                //9.0 记录作废失败的单子,人工处理。
-                saveInvalidBusinessError(dataFlow);
+                //9.0 将订单状态改为失败,人工处理。
+                updateOrderAndBusinessError(dataFlow);
             } finally {
                 return ResponseTemplateUtil.createOrderResponseJson(dataFlow.getTransactionId(),
                         ResponseConstant.NO_NEED_SIGN, e.getResult().getCode(), e.getMessage());
@@ -116,7 +114,7 @@ public class CenterServiceSMOImpl implements ICenterServiceSMO {
         if (appRoute == null) {
             //添加耗时
             DataFlowFactory.addCostTime(dataFlow, "initConfigData", "加载配置耗时", startDate);
-            throw new RuntimeException("当前没有获取到AppId对应的信息");
+            throw new InitConfigDataException(ResponseConstant.RESULT_CODE_INNER_ERROR,"当前没有获取到AppId对应的信息");
         }
         dataFlow.setAppRoute(appRoute);
         //添加耗时
@@ -242,11 +240,18 @@ public class CenterServiceSMOImpl implements ICenterServiceSMO {
             return ;
         }
 
+
         //1.0 保存 orders信息
+        centerServiceDaoImpl.saveOrder(DataFlowFactory.getOrder(dataFlow));
+
 
+        centerServiceDaoImpl.saveOrderAttrs(DataFlowFactory.getOrderAttrs(dataFlow));
 
         //2.0 保存 business信息
 
+        centerServiceDaoImpl.saveBusiness(DataFlowFactory.getBusiness(dataFlow));
+
+        centerServiceDaoImpl.saveBusinessAttrs(DataFlowFactory.getBusinessAttrs(dataFlow));
 
         DataFlowFactory.addCostTime(dataFlow, "saveOrdersAndBusiness", "保存订单和业务项耗时", startDate);
     }
@@ -266,9 +271,20 @@ public class CenterServiceSMOImpl implements ICenterServiceSMO {
             return ;
         }
 
+        //6.1 先处理同步方式的服务,每一同步后发布事件广播
+
+        doSynchronousBusinesses(dataFlow);
+
+
+        //6.2 处理异步服务
+        doAsynchronousBusinesses(dataFlow);
+
+
         DataFlowFactory.addCostTime(dataFlow, "invokeBusinessSystem", "调用下游系统耗时", startDate);
     }
 
+
+
     /**
      * 7.0 作废订单和业务项
      *
@@ -283,48 +299,256 @@ public class CenterServiceSMOImpl implements ICenterServiceSMO {
             return ;
         }
 
+        //作废 订单
+        centerServiceDaoImpl.updateOrder(DataFlowFactory.getNeedInvalidOrder(dataFlow));
+
+        //作废订单项
+        centerServiceDaoImpl.updateBusiness(DataFlowFactory.getNeedInvalidOrder(dataFlow));
+
+
         DataFlowFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate);
     }
 
+
     /**
-     * 8.0 广播作废业务系统订单信息
+     * 8.0 广播作废已经完成业务系统订单信息
      *
      * @param dataFlow
      */
-    private void invalidBusinessSystem(DataFlow dataFlow) {
+    private void invalidCompletedBusinessSystem(DataFlow dataFlow) throws Exception{
+        // 根据 c_business 表中的字段business_type_cd 找到对应的消息队列名称
+        List<Map> completedBusinesses = centerServiceDaoImpl.getCommonOrderCompledBusinessByBId(dataFlow.getBusinesses().get(0).getbId());
+        for(AppServiceStatus serviceStatus :dataFlow.getAppRoute().getAppServices()){
+            for(Map completedBusiness : completedBusinesses){
+                if(completedBusiness.get("business_type_cd").equals(serviceStatus.getAppService().getBusinessTypeCd())){
+                    KafkaFactory.sendKafkaMessage(serviceStatus.getAppService().getMessageQueueName(),"",
+                            DataFlowFactory.getCompletedBusinessErrorJson(completedBusiness,serviceStatus.getAppService()));
+                }
+            }
+        }
+
+    }
+
+    /**
+     * 9.0 将订单状态改为失败,人工处理。
+     *
+     * @param dataFlow
+     */
+    private void updateOrderAndBusinessError(DataFlow dataFlow) {
+
         Date startDate = DateUtil.getCurrentDate();
-        if(MappingCache.getValue(MappingConstant.KEY_NO_INVALID_BUSINESS_SYSTEM) != null
-                &&MappingCache.getValue(MappingConstant.KEY_NO_INVALID_BUSINESS_SYSTEM).contains(dataFlow.getOrderTypeCd())){
-            //不用调用 下游系统的配置(一般不存在这种情况,这里主要是在没有下游系统的情况下测试中心服务用)
-            DataFlowFactory.addCostTime(dataFlow, "invalidBusinessSystem", "作废业务耗时", startDate);
-            return ;
+
+        //作废 订单
+        centerServiceDaoImpl.updateOrder(DataFlowFactory.getNeedErrorOrder(dataFlow));
+
+        //作废订单项
+        centerServiceDaoImpl.updateBusiness(DataFlowFactory.getNeedErrorOrder(dataFlow));
+
+
+        DataFlowFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate);
+
+    }
+
+
+    /**
+     * 接受业务系统通知消息
+     * @param receiveJson 接受报文
+     * @throws SMOException
+     */
+    @Override
+    public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException{
+        DataFlow dataFlow = null;
+        try {
+            //1.0 创建数据流
+            dataFlow = DataFlowFactory.newInstance().builder(receiveJson, null);
+            //如果订单都没有保存,则再不要处理
+            if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
+                    &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){
+                //不保存订单信息
+                return ;
+            }
+
+            //2.0加载数据,没有找到appId 及配置信息 则抛出InitConfigDataException
+            reloadOrderInfoAndConfigData(dataFlow);
+
+            //3.0 判断是否成功,失败会抛出BusinessStatusException异常
+            judgeBusinessStatus(dataFlow);
+
+            //4.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常
+            completeBusiness(dataFlow);
+            //5.0当所有业务动作是否都是C,将订单信息改为 C 并且发布竣工消息,这里在广播之前确认
+            completeOrderAndNotifyBusinessSystem(dataFlow);
+
+        }catch (BusinessStatusException e){
+            try {
+                //6.0 作废订单和所有业务项
+                invalidOrderAndBusiness(dataFlow);
+                //7.0 广播作废业务系统订单信息
+                //想法,这里只广播已经完成的订单项
+                invalidCompletedBusinessSystem(dataFlow);
+            } catch (Exception e1) {
+                LoggerEngine.error("作废订单失败", e1);
+                //8.0 将订单状态改为失败,人工处理。
+                updateOrderAndBusinessError(dataFlow);
+            }
+        }catch (BusinessException e) {
+            //9.0说明这个订单已经失败了,再不需要
+            //想法,这里广播当前失败业务
+            try {
+                notifyBusinessSystemErrorMessage(dataFlow);
+            }catch (Exception e1){
+                //这里记录日志
+            }
+        }catch (InitConfigDataException e){ //这种一般不会出现,除非人工改了数据
+            LoggerEngine.error("加载配置数据出错", e);
+            try {
+                //6.0 作废订单和所有业务项
+                invalidOrderAndBusiness(dataFlow);
+                //7.0 广播作废业务系统订单信息
+                //想法,这里只广播已经完成的订单项
+                invalidCompletedBusinessSystem(dataFlow);
+            } catch (Exception e1) {
+                LoggerEngine.error("作废订单失败", e1);
+                //8.0 将订单状态改为失败,人工处理。
+                updateOrderAndBusinessError(dataFlow);
+            }
+
+        }catch (Exception e){
+            LoggerEngine.error("作废订单失败", e);
+            //10.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。
+            updateBusinessNotifyError(dataFlow);
         }
+    }
+
 
+    /**
+     * 2.0重新加载订单信息到dataFlow 中
+     *
+     * @param dataFlow
+     */
+    private void reloadOrderInfoAndConfigData(DataFlow dataFlow) {
 
-        DataFlowFactory.addCostTime(dataFlow, "invalidBusinessSystem", "作废业务耗时", startDate);
+        Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getBusinesses().get(0).getbId());
+        dataFlow.setoId(order.get("o_id").toString());
+        //重新加载配置
+        initConfigData(dataFlow);
     }
 
     /**
-     * 9.0 记录作废失败的单子,人工处理。
+     * 9.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。
      *
      * @param dataFlow
      */
-    private void saveInvalidBusinessError(DataFlow dataFlow) {
+    private void updateBusinessNotifyError(DataFlow dataFlow) {
 
         Date startDate = DateUtil.getCurrentDate();
+            //完成订单项
+        centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedNotifyErrorBusiness(dataFlow));
+
+        DataFlowFactory.addCostTime(dataFlow, "updateBusinessNotifyError", "订单状态改为失败耗时", startDate);
+
+    }
+
+    /**
+     * 判断是否都成功了
+     * @param dataFlow
+     */
+    private void judgeBusinessStatus(DataFlow dataFlow) throws BusinessStatusException{
+
+        List<Business> businesses = dataFlow.getBusinesses();
+
+        for(Business business: businesses){
+            if(!ResponseConstant.RESULT_CODE_SUCCESS.equals(business.getCode())){
+                throw new BusinessStatusException(business.getCode(),"业务bId= "+business.getbId() + " 处理失败,需要作废订单");
+            }
+        }
+
+    }
+
+    /**
+     * 3.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常
+     *
+     * @param dataFlow
+     */
+    private void completeBusiness(DataFlow dataFlow) throws BusinessException{
+        try {
+            //完成订单项
+            centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedCompleteBusiness(dataFlow));
+
+        }catch (DAOException e){
+            throw new BusinessException(e.getResult(),e);
+        }
+    }
 
+    /**
+     * //4.0当所有业务动作是否都是C,将订单信息改为 C 并且发布竣工消息,这里在广播之前确认
+     * @param dataFlow
+     */
+    private void completeOrderAndNotifyBusinessSystem(DataFlow dataFlow) throws Exception{
+        try {
+            centerServiceDaoImpl.completeOrderByBId(DataFlowFactory.getMoreBId(dataFlow));
+            //通知成功消息
+            notifyBusinessSystemSuccessMessage(dataFlow);
+        }catch (DAOException e){
+            //这里什么都不做,说明订单没有完成
+        }
+    }
 
+    /**
+     * 通知 订单已经完成,后端需要完成数据
+     * @param dataFlow
+     */
+    private void notifyBusinessSystemSuccessMessage(DataFlow dataFlow) throws Exception{
 
-        DataFlowFactory.addCostTime(dataFlow, "saveInvalidBusinessError", "保存作废业务失败耗时", startDate);
+        //拼装报文通知业务系统
+        KafkaFactory.sendKafkaMessage(
+                DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"",DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString());
 
     }
 
+    /**
+     * 8.0 广播作废业务系统订单信息
+     *
+     * @param dataFlow
+     */
+    private void notifyBusinessSystemErrorMessage(DataFlow dataFlow) throws Exception{
+
+        //拼装报文通知业务系统
+        KafkaFactory.sendKafkaMessage(
+                DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"",DataFlowFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString());
+    }
+
+    /**
+     * 处理同步业务
+     * @param dataFlow
+     */
+    private void doSynchronousBusinesses(DataFlow dataFlow) {
+        List<Business> synchronousBusinesses = DataFlowFactory.getSynchronousBusinesses(dataFlow);
+        //6.2处理同步服务
+    }
+
+    /**
+     * 处理异步业务
+     * @param
+     */
+    private void doAsynchronousBusinesses(DataFlow dataFlow) throws BusinessException{
+        //6.3 处理异步,按消息队里处理
+        List<Business> asynchronousBusinesses = DataFlowFactory.getAsynchronousBusinesses(dataFlow);
+        try {
+            for (Business business : asynchronousBusinesses) {
+                KafkaFactory.sendKafkaMessage(DataFlowFactory.getService(dataFlow, business.getServiceCode()).getMessageQueueName(), "",
+                        DataFlowFactory.getRequestBusinessJson(business).toJSONString());
+            }
+        }catch (Exception e){
+            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,e.getMessage());
+        }
+    }
 
-    public ICenterServiceDAO getOrderServiceDaoImpl() {
-        return orderServiceDaoImpl;
+    public ICenterServiceDAO getCenterServiceDaoImpl() {
+        return centerServiceDaoImpl;
     }
 
-    public void setOrderServiceDaoImpl(ICenterServiceDAO orderServiceDaoImpl) {
-        this.orderServiceDaoImpl = orderServiceDaoImpl;
+    public void setCenterServiceDaoImpl(ICenterServiceDAO centerServiceDaoImpl) {
+        this.centerServiceDaoImpl = centerServiceDaoImpl;
     }
 }

+ 29 - 0
CenterService/src/main/resources/application.yml

@@ -17,3 +17,32 @@ spring:
   application:
     name: center-service
 
+#============== kafka ===================
+kafka:
+  consumer:
+    zookeeper:
+      connect: 10.93.21.21:2181
+    services: 10.93.21.21:9092
+    enable:
+      auto:
+        commit: true
+    session:
+      timeout: 6000
+    auto:
+      commit:
+        interval: 100
+      offset:
+        reset: latest
+    topic: test
+    group:
+      id: test
+    concurrency: 10
+  producer:
+    servers: 10.93.21.21:9092
+    retries: 0
+    batch:
+      size: 4096
+    linger: 1
+    buffer:
+      memory: 40960
+

+ 8 - 1
CodingLog.txt

@@ -6,4 +6,11 @@
 
 --------------------2018年04月14日-----------------------
 1、httpApi service 请求常规校验
-2、加入Redis 缓存系统
+2、加入Redis 缓存系统
+--------------------2018年04月15日-----------------------
+1、加入kafka
+2、调整依赖关系
+--------------------2018年04月16日-----------------------
+1、实现接受通知消息流程 CenterServiceSMOImpl receiveBusinessSystemNotifyMessage方法
+2、实现失败和成功模式下通知业务系统,竣工数据或作废数据
+3、实现异步同步数据到业务系统处理(同步方式,需要做事件监听等业务未完成,做事件监听主要为了对报文重构)

+ 13 - 0
java110-bean/src/main/java/com/java110/entity/center/AppService.java

@@ -20,6 +20,9 @@ public class AppService implements Serializable{
 
     private int seq;
 
+    //消息队里名称 只有异步时有用
+    private String messageQueueName;
+
     private String url;
 
     //只有webservice时才有用
@@ -128,4 +131,14 @@ public class AppService implements Serializable{
     public void setStatusCd(String statusCd) {
         this.statusCd = statusCd;
     }
+
+    public String getMessageQueueName() {
+        return messageQueueName;
+    }
+
+    public void setMessageQueueName(String messageQueueName) {
+        this.messageQueueName = messageQueueName;
+    }
+
+
 }

+ 25 - 1
java110-bean/src/main/java/com/java110/entity/center/Business.java

@@ -7,7 +7,7 @@ import com.alibaba.fastjson.JSONObject;
  * 业务数据
  * Created by wuxw on 2018/4/13.
  */
-public class Business {
+public class Business implements Comparable{
 
     private String bId;
 
@@ -26,6 +26,8 @@ public class Business {
 
     private String message;
 
+    private int seq;
+
 
     public String getbId() {
         return bId;
@@ -91,6 +93,14 @@ public class Business {
         this.message = message;
     }
 
+    public int getSeq() {
+        return seq;
+    }
+
+    public void setSeq(int seq) {
+        this.seq = seq;
+    }
+
     /**
      * 构建成对象
      * @return
@@ -99,14 +109,28 @@ public class Business {
     public Business builder(JSONObject businessObj) throws Exception{
 
         try{
+            this.setbId(businessObj.getString("bId"));
             this.setServiceCode(businessObj.getString("serviceCode"));
             this.setServiceName(businessObj.getString("serviceName"));
             this.setRemark(businessObj.getString("remark"));
             this.setDatas(businessObj.getJSONArray("datas"));
             this.setAttrs(businessObj.getJSONArray("attrs"));
+            if(businessObj.containsKey("response")){
+                this.setCode(businessObj.getJSONObject("response").getString("code"));
+                this.setMessage(businessObj.getJSONObject("response").getString("message"));
+            }
         }catch (Exception e){
             throw e;
         }
         return this;
     }
+
+    @Override
+    public int compareTo(Object o) {
+        Business otherBusiness = (Business)o;
+        if(this.getSeq() > otherBusiness.getSeq()) {
+            return -1;
+        }
+        return 0;
+    }
 }

+ 5 - 0
java110-common/pom.xml

@@ -110,6 +110,11 @@
             <artifactId>jedis</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>commons-net</groupId>
             <artifactId>commons-net</artifactId>

+ 3 - 0
java110-common/src/main/java/com/java110/common/constant/CommonConstant.java

@@ -23,4 +23,7 @@ public class CommonConstant {
      */
     public final static String PROCESS_ORDER_ASYNCHRONOUS = "A";
 
+    public final static String ORDER_INVOKE_METHOD_SYNCHRONOUS = "S"; //同步
+    public final static String ORDER_INVOKE_METHOD_ASYNCHRONOUS = "A"; //同步
+
 }

+ 3 - 0
java110-common/src/main/java/com/java110/common/constant/MappingConstant.java

@@ -26,4 +26,7 @@ public class MappingConstant {
 
     // 不用调用 作废下游系统的配置(一般不存在这种情况,这里主要是在没有下游系统的情况下测试中心服务用)
     public  final static String KEY_NO_INVALID_BUSINESS_SYSTEM = "NO_INVALID_BUSINESS_SYSTEM";//
+
+    //需要调用服务生成各个ID
+    public final static String KEY_NEED_INVOKE_GENERATE_ID = "NEED_INVOKE_SERVICE_GENERATE_ID";
 }

+ 9 - 0
java110-common/src/main/java/com/java110/common/constant/StatusConstant.java

@@ -13,6 +13,12 @@ public class StatusConstant {
     public final static String STATUS_CD_SAVE = "S";//保存成功
 
 
+    public final static String STATUS_CD_DELETE = "D";//作废订单
+    public final static String STATUS_CD_ERROR = "E";//错误订单
+    public final static String STATUS_CD_NOTIFY_ERROR = "NE";//通知错误订单
+    public final static String STATUS_CD_COMPLETE = "C";//错误订单
+
+
 
     /**
      * 有效的,在用的
@@ -23,4 +29,7 @@ public class StatusConstant {
      * 无效的,不在用的
      */
     public final static String STATUS_CD_INVALID = "1";
+
+    public final static String REQUEST_BUSINESS_TYPE = "Q";
+    public final static String NOTIFY_BUSINESS_TYPE = "N";
 }

+ 189 - 0
java110-common/src/main/java/com/java110/common/exception/BusinessStatusException.java

@@ -0,0 +1,189 @@
+package com.java110.common.exception;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+/**
+ * 无权限异常
+ * Created by wuxw on 2018/4/14.
+ */
+public class BusinessStatusException extends RuntimeException {
+
+
+    private Result result;
+    private Throwable cause = this;
+
+    public BusinessStatusException(){}
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param cause  异常堆栈
+     */
+    public BusinessStatusException(Result result, Throwable cause) {
+        super(result.getMsg(), cause);
+        this.result = result;
+    }
+
+    /**
+     * 构造方法
+     * @param code 返回码
+     * @param msg  错误消息
+     */
+    public BusinessStatusException(int code, String msg) {
+        super(msg);
+        this.result = new Result(code, msg);
+    }
+
+    public BusinessStatusException(String code, String msg) {
+        super(msg);
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param detail 具体的返回消息
+     */
+    public BusinessStatusException(Result result, String detail) {
+        super(result.getMsg() + "," + detail);
+        this.result = new Result(result.getCode(), result.getMsg() + "," + detail);
+    }
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param detail 具体的返回消息
+     * @param cause  异常堆栈
+     */
+    public BusinessStatusException(Result result, String detail, Throwable cause) {
+        super(result.getMsg() + "," + detail, cause);
+        this.result = new Result(result.getCode(), result.getMsg() + "," + detail);
+    }
+
+    /**
+     * 构造方法
+     * @param code	返回码
+     * @param msg	返回消息
+     * @param cause 异常堆栈
+     */
+    public BusinessStatusException(int code, String msg, Throwable cause) {
+        super(msg, cause);
+
+        if(cause != null) {
+            if(cause.getCause() != null) {
+                msg += " cause:" + ExceptionUtils.populateExecption(cause.getCause(), 500);
+            }
+            msg += " StackTrace:"+ExceptionUtils.populateExecption(cause, 500);
+        }
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     * 构造方法
+     * @param code	返回码
+     * @param cause	异常堆栈
+     */
+    public BusinessStatusException(int code, Throwable cause) {
+        super(cause);
+        String msg = "";
+
+        if(cause != null) {
+            if(cause.getCause() != null) {
+                msg += " cause:" + ExceptionUtils.populateExecption(cause.getCause(), 500);
+            }
+            msg += " StackTrace:"+ExceptionUtils.populateExecption(cause, 500);
+        }
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     *
+     * TODO 简单描述该方法的实现功能(可选).
+     * @see Throwable#getCause()
+     */
+    public synchronized Throwable getCause() {
+        return (cause==this ? super.getCause() : cause);
+    }
+
+
+    /**
+     * 返回异常消息
+     * @return 异常消息
+     */
+    @Override
+    public String getMessage() {
+        return ExceptionUtils.buildMessage(super.getMessage(), getCause());
+    }
+
+    /**
+     * 异常
+     * @return
+     */
+    public String toJsonString() {
+        JSONObject exceptionJson = JSONObject.parseObject("{\"exception\":{}");
+        JSONObject exceptionJsonObj = exceptionJson.getJSONObject("exception");
+
+        if (getResult() != null)
+            exceptionJsonObj.putAll(JSONObject.parseObject(result.toString()));
+
+        exceptionJsonObj.put("exceptionTrace",getMessage());
+
+        return exceptionJsonObj.toString();
+    }
+    @Override
+    public void printStackTrace(PrintStream ps) {
+        ps.print("<exception>");
+        if (getResult() != null) {
+            ps.print(result.toString());
+        }
+        ps.append("<exceptionTrace>");
+
+        Throwable cause = getCause();
+        if (cause == null) {
+            super.printStackTrace(ps);
+        } else {
+            ps.println(this);
+            ps.print("Caused by: ");
+            cause.printStackTrace(ps);
+        }
+        ps.append("</exceptionTrace>");
+        ps.println("</exception>");
+    }
+
+    @Override
+    public void printStackTrace(PrintWriter pw) {
+        pw.print("<exception>");
+        if (getResult() != null) {
+            pw.print(result.toString());
+        }
+        pw.append("<exceptionTrace>");
+
+        Throwable cause = getCause();
+        if (cause == null) {
+            super.printStackTrace(pw);
+        } else {
+            pw.println(this);
+            pw.print("Caused by: ");
+            cause.printStackTrace(pw);
+        }
+        pw.append("</exceptionTrace>");
+        pw.println("</exception>");
+    }
+
+    /**
+     * 返回异常值
+     * @return	异常值对象
+     */
+    public Result getResult() {
+        return result;
+    }
+
+    public void setResult(Result result) {
+        this.result = result;
+    }
+
+}

+ 189 - 0
java110-common/src/main/java/com/java110/common/exception/InitConfigDataException.java

@@ -0,0 +1,189 @@
+package com.java110.common.exception;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+/**
+ * 无权限异常
+ * Created by wuxw on 2018/4/14.
+ */
+public class InitConfigDataException extends RuntimeException {
+
+
+    private Result result;
+    private Throwable cause = this;
+
+    public InitConfigDataException(){}
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param cause  异常堆栈
+     */
+    public InitConfigDataException(Result result, Throwable cause) {
+        super(result.getMsg(), cause);
+        this.result = result;
+    }
+
+    /**
+     * 构造方法
+     * @param code 返回码
+     * @param msg  错误消息
+     */
+    public InitConfigDataException(int code, String msg) {
+        super(msg);
+        this.result = new Result(code, msg);
+    }
+
+    public InitConfigDataException(String code, String msg) {
+        super(msg);
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param detail 具体的返回消息
+     */
+    public InitConfigDataException(Result result, String detail) {
+        super(result.getMsg() + "," + detail);
+        this.result = new Result(result.getCode(), result.getMsg() + "," + detail);
+    }
+
+    /**
+     * 构造方法
+     * @param result 返回值
+     * @param detail 具体的返回消息
+     * @param cause  异常堆栈
+     */
+    public InitConfigDataException(Result result, String detail, Throwable cause) {
+        super(result.getMsg() + "," + detail, cause);
+        this.result = new Result(result.getCode(), result.getMsg() + "," + detail);
+    }
+
+    /**
+     * 构造方法
+     * @param code	返回码
+     * @param msg	返回消息
+     * @param cause 异常堆栈
+     */
+    public InitConfigDataException(int code, String msg, Throwable cause) {
+        super(msg, cause);
+
+        if(cause != null) {
+            if(cause.getCause() != null) {
+                msg += " cause:" + ExceptionUtils.populateExecption(cause.getCause(), 500);
+            }
+            msg += " StackTrace:"+ExceptionUtils.populateExecption(cause, 500);
+        }
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     * 构造方法
+     * @param code	返回码
+     * @param cause	异常堆栈
+     */
+    public InitConfigDataException(int code, Throwable cause) {
+        super(cause);
+        String msg = "";
+
+        if(cause != null) {
+            if(cause.getCause() != null) {
+                msg += " cause:" + ExceptionUtils.populateExecption(cause.getCause(), 500);
+            }
+            msg += " StackTrace:"+ExceptionUtils.populateExecption(cause, 500);
+        }
+        this.result = new Result(code, msg);
+    }
+
+    /**
+     *
+     * TODO 简单描述该方法的实现功能(可选).
+     * @see Throwable#getCause()
+     */
+    public synchronized Throwable getCause() {
+        return (cause==this ? super.getCause() : cause);
+    }
+
+
+    /**
+     * 返回异常消息
+     * @return 异常消息
+     */
+    @Override
+    public String getMessage() {
+        return ExceptionUtils.buildMessage(super.getMessage(), getCause());
+    }
+
+    /**
+     * 异常
+     * @return
+     */
+    public String toJsonString() {
+        JSONObject exceptionJson = JSONObject.parseObject("{\"exception\":{}");
+        JSONObject exceptionJsonObj = exceptionJson.getJSONObject("exception");
+
+        if (getResult() != null)
+            exceptionJsonObj.putAll(JSONObject.parseObject(result.toString()));
+
+        exceptionJsonObj.put("exceptionTrace",getMessage());
+
+        return exceptionJsonObj.toString();
+    }
+    @Override
+    public void printStackTrace(PrintStream ps) {
+        ps.print("<exception>");
+        if (getResult() != null) {
+            ps.print(result.toString());
+        }
+        ps.append("<exceptionTrace>");
+
+        Throwable cause = getCause();
+        if (cause == null) {
+            super.printStackTrace(ps);
+        } else {
+            ps.println(this);
+            ps.print("Caused by: ");
+            cause.printStackTrace(ps);
+        }
+        ps.append("</exceptionTrace>");
+        ps.println("</exception>");
+    }
+
+    @Override
+    public void printStackTrace(PrintWriter pw) {
+        pw.print("<exception>");
+        if (getResult() != null) {
+            pw.print(result.toString());
+        }
+        pw.append("<exceptionTrace>");
+
+        Throwable cause = getCause();
+        if (cause == null) {
+            super.printStackTrace(pw);
+        } else {
+            pw.println(this);
+            pw.print("Caused by: ");
+            cause.printStackTrace(pw);
+        }
+        pw.append("</exceptionTrace>");
+        pw.println("</exception>");
+    }
+
+    /**
+     * 返回异常值
+     * @return	异常值对象
+     */
+    public Result getResult() {
+        return result;
+    }
+
+    public void setResult(Result result) {
+        this.result = result;
+    }
+
+}

+ 321 - 6
java110-common/src/main/java/com/java110/common/factory/DataFlowFactory.java

@@ -1,17 +1,18 @@
 package com.java110.common.factory;
 
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.java110.common.cache.MappingCache;
+import com.java110.common.constant.CommonConstant;
 import com.java110.common.constant.MappingConstant;
 import com.java110.common.constant.ResponseConstant;
 import com.java110.common.constant.StatusConstant;
 import com.java110.common.util.DateUtil;
-import com.java110.entity.center.AppService;
-import com.java110.entity.center.AppServiceStatus;
-import com.java110.entity.center.DataFlow;
-import com.java110.entity.center.DataFlowLinksCost;
+import com.java110.common.util.SequenceUtil;
+import com.java110.entity.center.*;
 
-import java.util.Date;
-import java.util.List;
+import javax.xml.crypto.Data;
+import java.util.*;
 
 /**
  * 数据流工厂类
@@ -78,6 +79,320 @@ public class DataFlowFactory {
         return null;
     }
 
+    /**
+     * 获取Order信息
+     * @param dataFlow
+     * @return
+     */
+    public static Map getOrder(DataFlow dataFlow){
+        Map order = new HashMap();
+        dataFlow.setoId(SequenceUtil.getOId());
+        order.put("oId",dataFlow.getoId());
+        order.put("appId",dataFlow.getAppId());
+        order.put("extTransactionId",dataFlow.getTransactionId());
+        order.put("userId",dataFlow.getUserId());
+        order.put("requestTime",dataFlow.getRequestTime());
+        order.put("orderTypeCd",dataFlow.getOrderTypeCd());
+        order.put("remark",dataFlow.getRemark());
+        order.put("statusCd",StatusConstant.STATUS_CD_SAVE);
+        return order ;
+    }
+
+
+    /**
+     * 获取订单属性
+     * @param dataFlow
+     * @return
+     */
+    public static List<Map> getOrderAttrs(DataFlow dataFlow){
+        List<Map> orderAttrs = new ArrayList<Map>();
+        JSONObject reqOrders = dataFlow.getReqOrders();
+        if(!reqOrders.containsKey("attrs") && reqOrders.getJSONArray("attrs").size() ==0){
+            return orderAttrs;
+        }
+        JSONArray attrs = reqOrders.getJSONArray("attrs");
+        Map attrMap = null;
+        for(int attrIndex = 0;attrIndex <attrs.size();attrIndex ++ )
+        {
+            attrMap = new HashMap();
+            attrMap.put("oId",dataFlow.getoId());
+            attrMap.put("attrId",SequenceUtil.getAttrId());
+            attrMap.put("specCd",attrs.getJSONObject(attrIndex).getString("specCd"));
+            attrMap.put("value",attrs.getJSONObject(attrIndex).getString("value"));
+            orderAttrs.add(attrMap);
+        }
+        return orderAttrs;
+    }
+
+    /**
+     * 获取订单项
+     * @param dataFlow
+     * @return
+     */
+    public static List<Map> getBusiness(DataFlow dataFlow){
+        List<Map> businesss = new ArrayList<Map>();
+        JSONArray reqBusiness = dataFlow.getReqBusiness();
+        Map busiMap = null;
+        for(int businessIndex = 0 ; businessIndex < reqBusiness.size();businessIndex ++) {
+            JSONObject business = reqBusiness.getJSONObject(businessIndex);
+            if(business == null){
+                continue;
+            }
+            business.put("bId",SequenceUtil.getBId());
+            busiMap = new HashMap();
+            busiMap.put("oId",dataFlow.getoId());
+            busiMap.put("businessTypeCd",getService(dataFlow,business.getString("serviceCode")).getBusinessTypeCd());
+            busiMap.put("remark",business.getString("remark"));
+            busiMap.put("status_cd",StatusConstant.STATUS_CD_SAVE);
+            businesss.add(busiMap);
+        }
+        return businesss;
+    }
+
+    /**
+     * 获取订单属性
+     * @param dataFlow
+     * @return
+     */
+    public static List<Map> getBusinessAttrs(DataFlow dataFlow){
+        List<Map> businessAttrs = new ArrayList<Map>();
+        JSONArray reqBusiness = dataFlow.getReqBusiness();
+        for(int businessIndex = 0 ; businessIndex < reqBusiness.size();businessIndex ++) {
+            JSONObject business = reqBusiness.getJSONObject(businessIndex);
+            if (!business.containsKey("attrs") && business.getJSONArray("attrs").size() == 0) {
+                continue;
+            }
+            JSONArray attrs = business.getJSONArray("attrs");
+            Map attrMap = null;
+            for (int attrIndex = 0; attrIndex < attrs.size(); attrIndex++) {
+                attrMap = new HashMap();
+                attrMap.put("bId", business.getString("bId"));
+                attrMap.put("attrId", SequenceUtil.getAttrId());
+                attrMap.put("specCd", attrs.getJSONObject(attrIndex).getString("specCd"));
+                attrMap.put("value", attrs.getJSONObject(attrIndex).getString("value"));
+                businessAttrs.add(attrMap);
+            }
+        }
+        return businessAttrs;
+    }
+
+    /**
+     * 获取将要作废的订单
+     * @param dataFlow
+     * @return
+     */
+    public static Map getNeedInvalidOrder(DataFlow dataFlow){
+        Map order = new HashMap();
+        order.put("oId",dataFlow.getoId());
+       // order.put("finishTime",DateUtil.getCurrentDate());
+        order.put("statusCd",StatusConstant.STATUS_CD_DELETE);
+        return order;
+    }
+
+    /**
+     * 获取将要作废的订单
+     * @param dataFlow
+     * @return
+     */
+    public static Map getNeedErrorOrder(DataFlow dataFlow){
+        Map order = new HashMap();
+        order.put("oId",dataFlow.getoId());
+        //order.put("finishTime",DateUtil.getCurrentDate());
+        order.put("statusCd",StatusConstant.STATUS_CD_ERROR);
+        return order;
+    }
+
+    public static Map getNeedCompleteBusiness(DataFlow dataFlow){
+        Map business = new HashMap();
+        String bId = "";
+        for(Business busi:dataFlow.getBusinesses()){
+            bId += "'"+busi.getbId()+"',";
+        }
+        business.put("bId",bId.substring(0,bId.length()-1));
+        business.put("finishTime",DateUtil.getCurrentDate());
+        business.put("statusCd",StatusConstant.STATUS_CD_COMPLETE);
+        return business;
+    }
+
+    public static Map getNeedNotifyErrorBusiness(DataFlow dataFlow){
+        Map business = new HashMap();
+        String bId = getMoreBId(dataFlow);
+        business.put("bId",bId.substring(0,bId.length()-1));
+        //business.put("finishTime",DateUtil.getCurrentDate());
+        business.put("statusCd",StatusConstant.STATUS_CD_NOTIFY_ERROR);
+        return business;
+    }
+
+    /**
+     * 获取DataFlow 对象中的所有bId
+     * @param dataFlow
+     * @return
+     */
+    public static String getMoreBId(DataFlow dataFlow){
+        String bId = "";
+        for(Business busi:dataFlow.getBusinesses()){
+            bId += "'"+busi.getbId()+"',";
+        }
+        return bId;
+    }
+
+
+    /**
+     * 获取将要完成的订单
+     * @param dataFlow
+     * @return
+     */
+    public static Map getNeedCompleteOrder(DataFlow dataFlow){
+        Map order = new HashMap();
+        order.put("oId",dataFlow.getoId());
+        order.put("finishTime",DateUtil.getCurrentDate());
+        order.put("statusCd",StatusConstant.STATUS_CD_COMPLETE);
+        return order;
+    }
+
+    /**
+     * 获取竣工消息的报文(订单完成后通知业务系统)
+     * @param dataFlow
+     * @return
+     */
+    public static JSONObject getNotifyBusinessSuccessJson(DataFlow dataFlow){
+        JSONObject notifyMessage = getTransactionBusinessBaseJson(StatusConstant.NOTIFY_BUSINESS_TYPE);
+        JSONArray businesses = notifyMessage.getJSONArray("business");
+
+        JSONObject busi = null;
+        JSONObject response = null;
+        for(Business business :dataFlow.getBusinesses()){
+            busi = new JSONObject();
+            busi.put("bId",business.getbId());
+            busi.put("serviceCode",business.getServiceCode());
+            response = new JSONObject();
+            response.put("code",ResponseConstant.RESULT_CODE_SUCCESS);
+            response.put("message","成功");
+            busi.put("response",response);
+            businesses.add(busi);
+        }
+        return notifyMessage;
+    }
+
+    /**
+     * 获取失败消息的报文(订单失败后通知业务系统)
+     * @param dataFlow
+     * @return
+     */
+    public static JSONObject getNotifyBusinessErrorJson(DataFlow dataFlow){
+
+        JSONObject notifyMessage = getTransactionBusinessBaseJson(StatusConstant.NOTIFY_BUSINESS_TYPE);
+        JSONArray businesses = notifyMessage.getJSONArray("business");
+
+        JSONObject busi = null;
+        JSONObject response = null;
+        for(Business business :dataFlow.getBusinesses()){
+            busi = new JSONObject();
+            busi.put("bId",business.getbId());
+            busi.put("serviceCode",business.getServiceCode());
+            response = new JSONObject();
+            response.put("code",ResponseConstant.RESULT_CODE_INNER_ERROR);
+            response.put("message","失败");
+            busi.put("response",response);
+            businesses.add(busi);
+        }
+        return notifyMessage;
+    }
+
+    public static JSONObject getCompletedBusinessErrorJson(Map business,AppService appService){
+        JSONObject notifyMessage = getTransactionBusinessBaseJson(StatusConstant.NOTIFY_BUSINESS_TYPE);
+        JSONArray businesses = notifyMessage.getJSONArray("business");
+
+        JSONObject busi = null;
+        JSONObject response = null;
+        busi = new JSONObject();
+        busi.put("bId",business.get("b_id"));
+        busi.put("serviceCode",appService.getServiceCode());
+        response = new JSONObject();
+        response.put("code",ResponseConstant.RESULT_CODE_INNER_ERROR);
+        response.put("message","失败");
+        busi.put("response",response);
+        businesses.add(busi);
+        return notifyMessage;
+
+    }
+
+    /**
+     * 获取失败消息的报文(订单失败后通知业务系统)
+     * @param business
+     * @return
+     */
+    public static JSONObject getRequestBusinessJson(Business business){
+
+        JSONObject notifyMessage = getTransactionBusinessBaseJson(StatusConstant.REQUEST_BUSINESS_TYPE);
+        JSONArray businesses = notifyMessage.getJSONArray("business");
+
+        JSONObject busi = null;
+        JSONObject response = null;
+            busi = new JSONObject();
+            busi.put("bId",business.getbId());
+            busi.put("serviceCode",business.getServiceCode());
+            busi.put("serviceName",business.getServiceName());
+            busi.put("remark",business.getRemark());
+            busi.put("datas",business.getDatas());
+            businesses.add(busi);
+        return notifyMessage;
+    }
+
+
+    /**
+     * 业务系统交互
+     * @return
+     */
+    private static JSONObject getTransactionBusinessBaseJson(String businessType){
+        JSONObject notifyMessage = JSONObject.parseObject("{\"orders\":{},\"business\":[]}");
+        JSONObject orders = notifyMessage.getJSONObject("orders");
+        orders.put("transactionId",SequenceUtil.getTransactionId());
+        orders.put("requestTime",DateUtil.getyyyyMMddhhmmssDateString());
+        orders.put("businessType",businessType);
+        return notifyMessage;
+    }
+
+    /**
+     * 获取同步处理业务
+     * @param dataFlow
+     * @return
+     */
+    public static List<Business> getSynchronousBusinesses(DataFlow dataFlow){
+        AppService service = null;
+        List<Business> syschronousBusinesses = new ArrayList<Business>();
+        for(Business business :dataFlow.getBusinesses()){
+            service = DataFlowFactory.getService(dataFlow,business.getServiceCode());
+            if(CommonConstant.ORDER_INVOKE_METHOD_SYNCHRONOUS.equals(service.getInvokeMethod())){
+                business.setSeq(service.getSeq());
+                syschronousBusinesses.add(business);
+            }
+        }
+        if(syschronousBusinesses.size() > 0) {
+            Collections.sort(syschronousBusinesses);
+        }
+
+        return syschronousBusinesses;
+    }
+
+
+    /**
+     * 获取异步处理业务
+     * @param dataFlow
+     * @return
+     */
+    public static List<Business> getAsynchronousBusinesses(DataFlow dataFlow){
+        AppService service = null;
+        List<Business> syschronousBusinesses = new ArrayList<Business>();
+        for(Business business :dataFlow.getBusinesses()){
+            service = DataFlowFactory.getService(dataFlow,business.getServiceCode());
+            if(CommonConstant.ORDER_INVOKE_METHOD_ASYNCHRONOUS.equals(service.getInvokeMethod())){
+                syschronousBusinesses.add(business);
+            }
+        }
+
+        return syschronousBusinesses;
+    }
 
 
 }

+ 30 - 0
java110-common/src/main/java/com/java110/common/kafka/KafkaFactory.java

@@ -0,0 +1,30 @@
+package com.java110.common.kafka;
+
+import com.java110.common.factory.ApplicationContextFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+
+/**
+ * kafka 工厂类
+ * Created by wuxw on 2018/4/15.
+ */
+public class KafkaFactory {
+
+    /**
+     * 获取kafka template
+     * @return
+     */
+    private static KafkaTemplate getKafkaTemplate(){
+        return (KafkaTemplate) ApplicationContextFactory.getBean("kafkaTemplate");
+    }
+
+    /**
+     * 发送kafka消息
+     * @param topic
+     * @param key
+     * @param message
+     * @throws Exception
+     */
+    public static void sendKafkaMessage(String topic,String key,Object message) throws Exception{
+        getKafkaTemplate().send(topic,key,message);
+    }
+}

+ 44 - 19
java110-common/src/main/java/com/java110/common/util/SequenceUtil.java

@@ -1,8 +1,13 @@
 package com.java110.common.util;
 
+import com.java110.common.cache.MappingCache;
+import com.java110.common.constant.MappingConstant;
+
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -16,10 +21,24 @@ public class SequenceUtil {
     private static final Lock LOCK = new ReentrantLock();
     private static short lastCount = 1;
     private static int count = 0;
-    private static DateFormat dateFormatDay = new SimpleDateFormat("yyyyMMdd");
-    private static DateFormat dateFormatMinute = new SimpleDateFormat("yyyyMMddhhmmss");
     private static final String first = "10";
 
+    /**
+     *
+     * 只有在不调用服务生成ID时有用
+     */
+    private static Map prefixMap = null;
+    static {
+        prefixMap = new HashMap();
+        //10+yyyymmdd+八位序列
+        prefixMap.put("oId","10");
+        //(20+yyyymmdd+八位序列)
+        prefixMap.put("bId","20");
+        //(11+yyyymmdd+八位序列)
+        prefixMap.put("attrId","11");
+        prefixMap.put("transactionId","1000001");
+    }
+
     private static String PLATFORM_CODE = "0001";
 
     @SuppressWarnings("finally")
@@ -49,26 +68,32 @@ public class SequenceUtil {
 
         //从内存中获取平台随机码
 
-        return first + PLATFORM_CODE + dateFormatDay.format(new Date()) + nextId();
+        return prefixMap.get("transactionId") + DateUtil.getNow(DateUtil.DATE_FORMATE_STRING_H) + nextId();
     }
 
-    /**
-     * 创建能力平台交互 流水
-     * SVC90005
-     *
-     * 90001 20170314094355 10000018
-     * @return
-     */
-    public static String getSVC90005TransactionId(){
-        return "90001"+ dateFormatMinute.format(new Date()) +"99" + nextId();
+    public static String getOId(){
+        if(!MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_NEED_INVOKE_GENERATE_ID))){
+            return prefixMap.get("oId") + DateUtil.getNow(DateUtil.DATE_FORMATE_STRING_H) + nextId("%08d");
+        }
+        //调用服务
+        return null;
     }
 
-    /**
-     * 6004050001201703141105137879
-     * dateFormatMinute
-     * @return
-     */
-    public static String getInvokeSAOPTransactionId(){
-        return "6004050001"+dateFormatMinute.format(new Date())+nextId("%04d");
+    public static String getBId(){
+        if(!MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_NEED_INVOKE_GENERATE_ID))){
+            return prefixMap.get("bId") + DateUtil.getNow(DateUtil.DATE_FORMATE_STRING_H) + nextId("%08d");
+        }
+        //调用服务
+        return null;
     }
+
+    public static String getAttrId(){
+        if(!MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_NEED_INVOKE_GENERATE_ID))){
+            return prefixMap.get("attrId") + DateUtil.getNow(DateUtil.DATE_FORMATE_STRING_H) + nextId("%08d");
+        }
+        //调用服务
+        return null;
+    }
+
+
 }

+ 47 - 6
java110-config/src/main/resources/mapper/center/CenterServiceDAOImplMapper.xml

@@ -5,35 +5,35 @@
 <mapper namespace="centerServiceDAOImpl">
 
     <!--保存订单信息 c_orders 中 -->
-    <insert id="saveOrder" parameterType="map">
+    <insert id="saveOrder" parameterType="Map">
         <![CDATA[
             insert into c_orders(o_id,app_id,ext_transaction_id,user_id,request_time,order_type_cd,remark,status_cd)
             values(#{oId},#{appId},#{extTransactionId},#{userId},#{requestTime},#{orderTypeCd},#{remark},#{statusCd})
         ]]>
     </insert>
     <!-- 保存属性信息c_orders_attrs 中-->
-    <insert id="saveOrderAttrs" parameterType="map">
+    <insert id="saveOrderAttrs" parameterType="Map">
         <![CDATA[
             insert into c_orders_attrs(o_id,attr_id,spec_cd,value)
             values(#{oId},#{attrId},#{specCd},#{value})
         ]]>
     </insert>
     <!-- 保存订单项信息 c_business -->
-    <insert id="saveBusiness" parameterType="map">
+    <insert id="saveBusiness" parameterType="Map">
         <![CDATA[
             insert into c_business(b_id,o_id,business_type_cd,remark,status_cd)
             values(#{bId},#{oId},#{businessTypeCd},#{remark},#{statusCd})
         ]]>
     </insert>
     <!-- 保存属性信息 c_business_attrs -->
-    <insert id="saveBusinessAttrs" parameterType="map">
+    <insert id="saveBusinessAttrs" parameterType="Map">
         <![CDATA[
             insert into c_business_attrs(b_id,attr_id,spec_cd,value)
             values(#{bId},#{attrId},#{specCd},#{value})
         ]]>
     </insert>
     <!-- 更新订单信息(一般就更新订单状态) -->
-    <update id="updateOrder" parameterType="map" >
+    <update id="updateOrder" parameterType="Map" >
         <![CDATA[
             update c_orders co set
             co.status_cd=#{statusCd},
@@ -42,7 +42,7 @@
          ]]>
     </update>
     <!-- 更新订单项信息(一般就更新订单项状态)-->
-    <update id="updateBusiness" parameterType="map">
+    <update id="updateBusiness" parameterType="Map">
         <![CDATA[
             update c_business cb set
             cb.status_cd=#{statusCd},
@@ -50,5 +50,46 @@
             where cb.o_id=#{oId}
          ]]>
     </update>
+    <!--根据bId 修改业务项信息-->
+    <update id="updateBusinessByBId" parameterType="Map">
+        <![CDATA[
+            update c_business cb set
+            cb.status_cd=#{statusCd},
+            cb.finish_time=#{finishTime}
+            where cb.b_id in (#{bId})
+            and cb.status_cd not in ('D','E')
+         ]]>
+    </update>
+    <!-- 当所有业务动作是否都是C,将订单信息改为 C-->
+    <update id="completeOrderByBId" parameterType="String" >
+        <![CDATA[
+            update c_orders co set co.status_cd = 'C' where co.status='S'
+                and not exists(
+                        select 1 from c_business cb where cb.status_cd <> 'C'
+                        and cb.o_id = co.o_id
+                        and cb.b_id in (#{bId})
+                )
+        ]]>
+    </update>
+
+    <select id="getOrderInfoByBId" parameterType="String" resultType="Map">
+        <![CDATA[
+        select co.* from c_orders co where 1 = 1 and exists
+        (
+            select 1 from c_business cb where cb.o_id = co.o_id
+            and cb.b_id = #{bId}
+        )
+    ]]>
+    </select>
+
+    <!-- 获取同个订单中已经完成的订单项-->
+    <select id="getCommonOrderCompledBusinessByBId" parameterType="String" resultType="Map">
+        <![CDATA[
+            select * from c_business cb where cb.finish_time is not null
+            and cb.o_id in (
+                select cb1.o_id from c_business cb1 where cb1.b_id = #{bId}
+            )
+        ]]>
+    </select>
 
 </mapper>

+ 1 - 1
java110-config/src/main/resources/mapper/order/OrderServiceDaoImplMapper.xml

@@ -2,7 +2,7 @@
 <!DOCTYPE mapper
         PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="orderServiceDaoImpl">
+<mapper namespace="centerServiceDaoImpl">
 
     <!--order_list order_list_attr  返回结果封装-->
     <resultMap type="com.java110.entity.order.OrderList" id="orderListMap">

+ 64 - 0
java110-core/src/main/java/com/java110/core/kafka/KafkaConsumerConfig.java

@@ -0,0 +1,64 @@
+package com.java110.core.kafka;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableKafka
+public class KafkaConsumerConfig {
+
+    @Value("${kafka.consumer.servers}")
+    private String servers;
+    @Value("${kafka.consumer.enable.auto.commit}")
+    private boolean enableAutoCommit;
+    @Value("${kafka.consumer.session.timeout}")
+    private String sessionTimeout;
+    @Value("${kafka.consumer.auto.commit.interval}")
+    private String autoCommitInterval;
+    @Value("${kafka.consumer.group.id}")
+    private String groupId;
+    @Value("${kafka.consumer.auto.offset.reset}")
+    private String autoOffsetReset;
+    @Value("${kafka.consumer.concurrency}")
+    private int concurrency;
+    @Bean
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setConcurrency(concurrency);
+        factory.getContainerProperties().setPollTimeout(1500);
+        return factory;
+    }
+
+    public ConsumerFactory<String, String> consumerFactory() {
+        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
+    }
+
+
+    public Map<String, Object> consumerConfigs() {
+        Map<String, Object> propsMap = new HashMap<String, Object>();
+        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
+        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
+        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
+        return propsMap;
+    }
+
+}

+ 56 - 0
java110-core/src/main/java/com/java110/core/kafka/KafkaProducerConfig.java

@@ -0,0 +1,56 @@
+package com.java110.core.kafka;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+/**
+ * Created by wuxw on 2018/4/15.
+ */
+@Configuration
+@EnableKafka
+public class KafkaProducerConfig {
+
+
+        @Value("${kafka.producer.servers}")
+        private String servers;
+        @Value("${kafka.producer.retries}")
+        private int retries;
+        @Value("${kafka.producer.batch.size}")
+        private int batchSize;
+        @Value("${kafka.producer.linger}")
+        private int linger;
+        @Value("${kafka.producer.buffer.memory}")
+        private int bufferMemory;
+
+
+        public Map<String, Object> producerConfigs() {
+            Map<String, Object> props = new HashMap<String, Object> ();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+            props.put(ProducerConfig.RETRIES_CONFIG, retries);
+            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
+            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            return props;
+        }
+
+        public ProducerFactory<String, String> producerFactory() {
+            return new DefaultKafkaProducerFactory<String, String> (producerConfigs());
+        }
+
+        @Bean
+        public KafkaTemplate<String, String> kafkaTemplate() {
+            return new KafkaTemplate<String, String>(producerFactory());
+        }
+
+}

+ 5 - 0
pom.xml

@@ -84,6 +84,11 @@
                 <artifactId>spring-context-support</artifactId>
                 <version>4.2.7.RELEASE</version>
             </dependency>
+            <dependency>
+                <groupId>org.springframework.kafka</groupId>
+                <artifactId>spring-kafka</artifactId>
+                <version>1.1.1.RELEASE</version>
+            </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>