java110 2 anni fa
parent
commit
2682d41ae6
23 ha cambiato i file con 373 aggiunte e 72 eliminazioni
  1. 6 6
      java110-bean/src/main/java/com/java110/dto/system/CustomBusinessDatabusDto.java
  2. 50 0
      java110-bean/src/main/java/com/java110/dto/data/DatabusQueueDataDto.java
  3. 10 0
      java110-bean/src/main/java/com/java110/dto/oaWorkflow/WorkflowStepStaffDto.java
  4. 2 1
      java110-db/src/main/resources/mapper/common/WorkflowStepStaffServiceDaoImplMapper.xml
  5. 9 8
      java110-interface/src/main/java/com/java110/intf/job/IDataBusInnerServiceSMO.java
  6. 2 2
      service-common/src/main/java/com/java110/common/bmo/machine/impl/SaveMachineRecordBMOImpl.java
  7. 4 3
      service-common/src/main/java/com/java110/common/smo/impl/AllocationStorehouseUserInnerServiceSMOImpl.java
  8. 2 2
      service-common/src/main/java/com/java110/common/smo/impl/GoodCollectionUserInnerServiceSMOImpl.java
  9. 3 3
      service-common/src/main/java/com/java110/common/smo/impl/PurchaseApplyUserInnerServiceSMOImpl.java
  10. 1 8
      service-job/src/main/java/com/java110/job/adapt/DatabusAdaptImpl.java
  11. 1 6
      service-job/src/main/java/com/java110/job/adapt/IDatabusAdapt.java
  12. 4 4
      service-job/src/main/java/com/java110/job/adapt/hcGov/inoutRecord/AddInoutRecordToHcGovAdapt.java
  13. 6 6
      service-job/src/main/java/com/java110/job/adapt/hcToTianchuang/PersonToTianchuangAdapt.java
  14. 4 3
      service-job/src/main/java/com/java110/job/adapt/purchase/allocationStorehouse/MachineAllocationStorehouse.java
  15. 4 3
      service-job/src/main/java/com/java110/job/adapt/purchase/purchaseApply/MachinePurchaseApplyAdapt.java
  16. 98 0
      service-job/src/main/java/com/java110/job/databus/DatabusDataExecutor.java
  17. 40 0
      service-job/src/main/java/com/java110/job/databus/DatabusDataQueue.java
  18. 9 0
      service-job/src/main/java/com/java110/job/databus/DatabusQueue.java
  19. 20 0
      service-job/src/main/java/com/java110/job/databus/DatabusQueueConfig.java
  20. 59 0
      service-job/src/main/java/com/java110/job/databus/TimeoutTest.java
  21. 33 14
      service-job/src/main/java/com/java110/job/smo/impl/DataBusInnerServiceSMOImpl.java
  22. 1 0
      service-store/src/main/java/com/java110/store/cmd/resourceStore/AuditAllocationStoreOrderCmd.java
  23. 5 3
      service-store/src/main/java/com/java110/store/cmd/resourceStore/ListAllocationStoreAuditOrdersCmd.java

+ 6 - 6
java110-bean/src/main/java/com/java110/dto/system/CustomBusinessDatabusDto.java

@@ -1,4 +1,4 @@
-package com.java110.dto.system;
+package com.java110.dto.data;
 
 import com.alibaba.fastjson.JSONObject;
 import com.java110.dto.PageDto;
@@ -13,22 +13,22 @@ import java.io.Serializable;
  * @Version 1.0
  * add by wuxw 2019/4/24
  **/
-public class CustomBusinessDatabusDto extends PageDto implements Serializable {
+public class DatabusDataDto extends PageDto implements Serializable {
 
     private String businessTypeCd;
 
     private JSONObject data;
 
-    public CustomBusinessDatabusDto() {
+    public DatabusDataDto() {
     }
 
-    public CustomBusinessDatabusDto(String businessTypeCd, JSONObject data) {
+    public DatabusDataDto(String businessTypeCd, JSONObject data) {
         this.businessTypeCd = businessTypeCd;
         this.data = data;
     }
 
-    public static CustomBusinessDatabusDto getInstance(String businessTypeCd, JSONObject data) {
-        return new CustomBusinessDatabusDto(businessTypeCd, data);
+    public static DatabusDataDto getInstance(String businessTypeCd, JSONObject data) {
+        return new DatabusDataDto(businessTypeCd, data);
     }
 
 

+ 50 - 0
java110-bean/src/main/java/com/java110/dto/data/DatabusQueueDataDto.java

@@ -0,0 +1,50 @@
+package com.java110.dto.data;
+
+import com.java110.dto.system.Business;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * java110 队里数据封装
+ */
+public class DatabusQueueDataDto implements Serializable {
+
+    public DatabusQueueDataDto() {
+    }
+
+    public DatabusQueueDataDto(String beanName, Business business, List<Business> businesses) {
+        this.beanName = beanName;
+        this.business = business;
+        this.businesses = businesses;
+    }
+
+    private String beanName;
+
+    private Business business;
+    private List<Business> businesses;
+
+    public String getBeanName() {
+        return beanName;
+    }
+
+    public void setBeanName(String beanName) {
+        this.beanName = beanName;
+    }
+
+    public Business getBusiness() {
+        return business;
+    }
+
+    public void setBusiness(Business business) {
+        this.business = business;
+    }
+
+    public List<Business> getBusinesses() {
+        return businesses;
+    }
+
+    public void setBusinesses(List<Business> businesses) {
+        this.businesses = businesses;
+    }
+}

+ 10 - 0
java110-bean/src/main/java/com/java110/dto/oaWorkflow/WorkflowStepStaffDto.java

@@ -31,6 +31,8 @@ public class WorkflowStepStaffDto extends PageDto implements Serializable {
 
     private String processDefinitionKey;
 
+    private String flowId;
+
 
     public String getWssId() {
         return wssId;
@@ -120,4 +122,12 @@ public class WorkflowStepStaffDto extends PageDto implements Serializable {
     public void setProcessDefinitionKey(String processDefinitionKey) {
         this.processDefinitionKey = processDefinitionKey;
     }
+
+    public String getFlowId() {
+        return flowId;
+    }
+
+    public void setFlowId(String flowId) {
+        this.flowId = flowId;
+    }
 }

+ 2 - 1
java110-db/src/main/resources/mapper/common/WorkflowStepStaffServiceDaoImplMapper.xml

@@ -81,7 +81,8 @@
     <select id="getWorkflowStepStaffInfo" parameterType="Map" resultType="Map">
         select t.wss_id,t.wss_id wssId,t.step_id,t.step_id stepId,t.staff_name,t.staff_name
         staffName,t.status_cd,t.status_cd statusCd,t.b_id,t.b_id bId,t.community_id,t.community_id
-        communityId,t.staff_id,t.staff_id staffId,t.staff_role,t.staff_role staffRole,t.flow_type,t.flow_type flowType,w.process_definition_key processDefinitionKey
+        communityId,t.staff_id,t.staff_id staffId,t.staff_role,t.staff_role staffRole,t.flow_type,t.flow_type flowType,
+        w.process_definition_key processDefinitionKey,w.flow_id flowId
         from workflow_step_staff t
         left JOIN workflow_step ws on t.step_id = ws.step_id and ws.status_cd = '0'
         left join workflow w on ws.flow_id = w.flow_id and w.status_cd = '0'

+ 9 - 8
java110-interface/src/main/java/com/java110/intf/job/IDataBusInnerServiceSMO.java

@@ -2,7 +2,7 @@ package com.java110.intf.job;
 
 import com.alibaba.fastjson.JSONObject;
 import com.java110.config.feign.FeignConfiguration;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.machine.CarInoutDto;
 import com.java110.dto.machine.MachineDto;
 import com.java110.dto.fee.TempCarPayOrderDto;
@@ -36,6 +36,13 @@ public interface IDataBusInnerServiceSMO {
     @RequestMapping(value = "/exchange", method = RequestMethod.POST)
     boolean exchange(@RequestBody List<Business> businesses);
 
+    /**
+     * 自定义databus 数据 传输
+     * @param databusDataDto
+     * @return
+     */
+    @RequestMapping(value = "/databusData", method = RequestMethod.POST)
+    boolean databusData(@RequestBody DatabusDataDto databusDataDto);
 
     /**
      * <p>开门</p>
@@ -85,13 +92,7 @@ public interface IDataBusInnerServiceSMO {
     ResultVo notifyTempCarFeeOrder(@RequestBody TempCarPayOrderDto tempCarPayOrderDto);
 
 
-    /**
-     * 自定义databus 数据 传输
-     * @param customBusinessDatabusDto
-     * @return
-     */
-    @RequestMapping(value = "/customExchange", method = RequestMethod.POST)
-    void customExchange(@RequestBody CustomBusinessDatabusDto customBusinessDatabusDto);
+
 
     @RequestMapping(value = "/getQRcode", method = RequestMethod.POST)
     ResultVo getQRcode(@RequestBody JSONObject reqJson);

+ 2 - 2
service-common/src/main/java/com/java110/common/bmo/machine/impl/SaveMachineRecordBMOImpl.java

@@ -17,7 +17,7 @@ package com.java110.common.bmo.machine.impl;
 
 import com.java110.common.bmo.machine.ISaveMachineRecordBMO;
 import com.java110.core.factory.GenerateCodeFactory;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.file.FileDto;
 import com.java110.dto.machine.MachineDto;
 import com.java110.dto.machine.MachineRecordDto;
@@ -108,7 +108,7 @@ public class SaveMachineRecordBMOImpl implements ISaveMachineRecordBMO {
             return ResultVo.error("上传记录失败");
         }
         //传送databus
-        dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+        dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                 BusinessTypeConstant.BUSINESS_TYPE_DATABUS_SEND_OPEN_LOG, BeanConvertUtil.beanCovertJson(machineRecordPo)));
         return ResultVo.success();
     }

+ 4 - 3
service-common/src/main/java/com/java110/common/smo/impl/AllocationStorehouseUserInnerServiceSMOImpl.java

@@ -3,7 +3,7 @@ package com.java110.common.smo.impl;
 import com.java110.core.base.smo.BaseServiceSMO;
 import com.java110.dto.PageDto;
 import com.java110.dto.purchase.AllocationStorehouseApplyDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.purchase.PurchaseApplyDto;
 import com.java110.dto.store.StorehouseDto;
 import com.java110.dto.oaWorkflow.WorkflowDto;
@@ -115,7 +115,7 @@ public class AllocationStorehouseUserInnerServiceSMOImpl extends BaseServiceSMO
                 machineRecordPo.setApplyOrderId(businessKey);
                 machineRecordPo.setPurchaseUserId(actRuTaskUserId);
                 //传送databus
-                dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+                dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                         BusinessTypeConstant.BUSINESS_TYPE_DATABUS_ALLOCATION_STOREHOUSE_APPLY, BeanConvertUtil.beanCovertJson(machineRecordPo)));
             }
         }
@@ -361,6 +361,7 @@ public class AllocationStorehouseUserInnerServiceSMOImpl extends BaseServiceSMO
         variables.put("currentUserId", allocationStorehouseApplyDto.getCurrentUserId());
         variables.put("flag", "1200".equals(allocationStorehouseApplyDto.getAuditCode()) ? "false" : "true");
         variables.put("startUserId", allocationStorehouseApplyDto.getStartUserId());
+        variables.put("nextStaffId", allocationStorehouseApplyDto.getNextUserId());
         taskService.complete(allocationStorehouseApplyDto.getTaskId(), variables);
         ProcessInstance pi = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult();
         if (pi == null) {
@@ -384,7 +385,7 @@ public class AllocationStorehouseUserInnerServiceSMOImpl extends BaseServiceSMO
                 machineRecordPo.setNoticeState(noticeState);
                 machineRecordPo.setAuditMessage(auditMessage);
                 //传送databus
-                dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+                dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                         BusinessTypeConstant.BUSINESS_TYPE_DATABUS_ALLOCATION_STOREHOUSE_APPLY, BeanConvertUtil.beanCovertJson(machineRecordPo)));
             }
         }

+ 2 - 2
service-common/src/main/java/com/java110/common/smo/impl/GoodCollectionUserInnerServiceSMOImpl.java

@@ -3,7 +3,7 @@ package com.java110.common.smo.impl;
 import com.java110.core.base.smo.BaseServiceSMO;
 import com.java110.dto.PageDto;
 import com.java110.dto.audit.AuditMessageDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.purchase.PurchaseApplyDto;
 import com.java110.dto.user.UserDto;
 import com.java110.dto.oaWorkflow.WorkflowDto;
@@ -105,7 +105,7 @@ public class GoodCollectionUserInnerServiceSMOImpl extends BaseServiceSMO implem
                 machineRecordPo.setPurchaseUserId(actRuTaskUserId);
                 machineRecordPo.setResOrderType(resOrderType);
                 //传送databus
-                dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+                dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                         BusinessTypeConstant.BUSINESS_TYPE_DATABUS_PURCHASE_APPLY, BeanConvertUtil.beanCovertJson(machineRecordPo)));
             }
         }

+ 3 - 3
service-common/src/main/java/com/java110/common/smo/impl/PurchaseApplyUserInnerServiceSMOImpl.java

@@ -3,7 +3,7 @@ package com.java110.common.smo.impl;
 import com.java110.core.base.smo.BaseServiceSMO;
 import com.java110.dto.PageDto;
 import com.java110.dto.audit.AuditMessageDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.purchase.PurchaseApplyDto;
 import com.java110.dto.user.UserDto;
 import com.java110.dto.oaWorkflow.WorkflowDto;
@@ -103,7 +103,7 @@ public class PurchaseApplyUserInnerServiceSMOImpl extends BaseServiceSMO impleme
                 machineRecordPo.setPurchaseUserId(actRuTaskUserId);
                 machineRecordPo.setResOrderType(resOrderType);
                 //传送databus
-                dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+                dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                         BusinessTypeConstant.BUSINESS_TYPE_DATABUS_PURCHASE_APPLY, BeanConvertUtil.beanCovertJson(machineRecordPo)));
             }
         }
@@ -336,7 +336,7 @@ public class PurchaseApplyUserInnerServiceSMOImpl extends BaseServiceSMO impleme
                 machineRecordPo.setResOrderType(resOrderType);
                 machineRecordPo.setAuditMessage(auditMessage);
                 //传送databus
-                dataBusInnerServiceSMOImpl.customExchange(CustomBusinessDatabusDto.getInstance(
+                dataBusInnerServiceSMOImpl.databusData(DatabusDataDto.getInstance(
                         BusinessTypeConstant.BUSINESS_TYPE_DATABUS_PURCHASE_APPLY, BeanConvertUtil.beanCovertJson(machineRecordPo)));
             }
         }

+ 1 - 8
service-job/src/main/java/com/java110/job/adapt/DatabusAdaptImpl.java

@@ -19,7 +19,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.java110.core.client.RestTemplate;
 import com.java110.core.factory.WechatFactory;
 import com.java110.core.log.LoggerFactory;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.machine.CarInoutDto;
 import com.java110.dto.machine.MachineDto;
 import com.java110.dto.wechat.SmallWeChatDto;
@@ -207,14 +207,7 @@ public abstract class DatabusAdaptImpl implements IDatabusAdapt {
 
     }
 
-    /**
-     * 手工 送数据
-     *
-     * @param customBusinessDatabusDto
-     */
-    public void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto) {
 
-    }
 
     /**
      * 查询模板信息

+ 1 - 6
service-job/src/main/java/com/java110/job/adapt/IDatabusAdapt.java

@@ -16,7 +16,7 @@
 package com.java110.job.adapt;
 
 import com.alibaba.fastjson.JSONObject;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.machine.CarInoutDto;
 import com.java110.dto.machine.MachineDto;
 import com.java110.dto.fee.TempCarPayOrderDto;
@@ -71,11 +71,6 @@ public interface IDatabusAdapt {
     ResultVo notifyTempCarFeeOrder(TempCarPayOrderDto tempCarPayOrderDto);
 
 
-    /**
-     * 手工 送数据
-     * @param customBusinessDatabusDto
-     */
-    void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto);
 
     ResultVo customCarInOut(JSONObject reqJson);
 

+ 4 - 4
service-job/src/main/java/com/java110/job/adapt/hcGov/inoutRecord/AddInoutRecordToHcGovAdapt.java

@@ -16,7 +16,7 @@
 package com.java110.job.adapt.hcGov.inoutRecord;
 
 import com.alibaba.fastjson.JSONObject;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.community.CommunityAttrDto;
 import com.java110.dto.community.CommunityDto;
 import com.java110.dto.community.CommunityLocationAttrDto;
@@ -68,11 +68,11 @@ public class AddInoutRecordToHcGovAdapt extends DatabusAdaptImpl {
 
 
     /**
-     * @param customBusinessDatabusDto 当前处理业务
+     * @param business 当前处理业务
      */
     @Override
-    public void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto) {
-        JSONObject data = customBusinessDatabusDto.getData();
+    public void execute(Business business,List<Business> businesses) {
+        JSONObject data = business.getData();
         doInoutRecord(null, data);
     }
 

+ 6 - 6
service-job/src/main/java/com/java110/job/adapt/hcToTianchuang/PersonToTianchuangAdapt.java

@@ -21,7 +21,7 @@ import com.java110.core.client.RestTemplate;
 import com.java110.core.factory.GenerateCodeFactory;
 import com.java110.dto.room.RoomAttrDto;
 import com.java110.dto.room.RoomDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.community.CommunityAttrDto;
 import com.java110.dto.community.CommunityDto;
 import com.java110.dto.file.FileRelDto;
@@ -120,11 +120,11 @@ public class PersonToTianchuangAdapt extends DatabusAdaptImpl {
     private RestTemplate outRestTemplate;
 
     /**
-     * @param customBusinessDatabusDto 当前处理业务
+     * @param business 当前处理业务
      */
     @Override
-    public void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto) {
-        JSONObject data = customBusinessDatabusDto.getData();
+    public void execute(Business business, List<Business> businesses) {
+        JSONObject data = business.getData();
         doInoutRecord(null, data);
     }
 
@@ -221,7 +221,7 @@ public class PersonToTianchuangAdapt extends DatabusAdaptImpl {
         fileRelDto.setObjId(machineRecordPo.getMachineRecordId());
         List<FileRelDto> fileRelDtos = fileRelInnerServiceSMOImpl.queryFileRels(fileRelDto);
         String url = "";
-        String imgUrl = MappingCache.getValue(MappingConstant.FILE_DOMAIN,"IMG_PATH");
+        String imgUrl = MappingCache.getValue(MappingConstant.FILE_DOMAIN, "IMG_PATH");
         if (fileRelDtos != null && fileRelDtos.size() > 0) {
             url = imgUrl + fileRelDtos.get(0).getFileRealName();
         }
@@ -280,7 +280,7 @@ public class PersonToTianchuangAdapt extends DatabusAdaptImpl {
         dataObj.put("lvgmsfhm", ownerDto.getIdCard());
         dataObj.put("lvxm", ownerDto.getName());
         dataObj.put("lvlxdh", ownerDto.getLink());
-        dataObj.put("lvdjsj", DateUtil.getFormatTimeString(ownerDto.getCreateTime(),"yyyyMMdd HH:mm:ss"));
+        dataObj.put("lvdjsj", DateUtil.getFormatTimeString(ownerDto.getCreateTime(), "yyyyMMdd HH:mm:ss"));
         dataObj.put("lvrybm", ownerDto.getMemberId());
         String qrCodeAddress = "";
         for (RoomAttrDto roomAttrDto : roomDtos.get(0).getRoomAttrDto()) {

+ 4 - 3
service-job/src/main/java/com/java110/job/adapt/purchase/allocationStorehouse/MachineAllocationStorehouse.java

@@ -4,7 +4,8 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.java110.core.factory.WechatFactory;
 import com.java110.dto.purchase.AllocationStorehouseApplyDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
+import com.java110.dto.system.Business;
 import com.java110.dto.wechat.SmallWeChatDto;
 import com.java110.dto.wechat.SmallWechatAttrDto;
 import com.java110.dto.user.StaffAppAuthDto;
@@ -61,8 +62,8 @@ public class MachineAllocationStorehouse extends DatabusAdaptImpl {
     private static String sendMsgUrl = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=";
 
     @Override
-    public void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto) {
-        JSONObject data = customBusinessDatabusDto.getData();
+    public void execute(Business business, List<Business> businesses) {
+        JSONObject data = business.getData();
         //获取下级处理人id
         String purchaseUserId = data.getString("purchaseUserId");
         //获取调拨申请id

+ 4 - 3
service-job/src/main/java/com/java110/job/adapt/purchase/purchaseApply/MachinePurchaseApplyAdapt.java

@@ -3,8 +3,9 @@ package com.java110.job.adapt.purchase.purchaseApply;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.java110.core.factory.WechatFactory;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.purchase.PurchaseApplyDto;
+import com.java110.dto.system.Business;
 import com.java110.dto.wechat.SmallWeChatDto;
 import com.java110.dto.wechat.SmallWechatAttrDto;
 import com.java110.dto.user.StaffAppAuthDto;
@@ -59,8 +60,8 @@ public class MachinePurchaseApplyAdapt extends DatabusAdaptImpl {
     private static String sendMsgUrl = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=";
 
     @Override
-    public void customExchange(CustomBusinessDatabusDto customBusinessDatabusDto) {
-        JSONObject data = customBusinessDatabusDto.getData();
+    public void execute(Business business, List<Business> businesses) {
+        JSONObject data = business.getData();
         //获取申请id
         String applyOrderId = data.getString("applyOrderId");
         //获取下级处理人id

+ 98 - 0
service-job/src/main/java/com/java110/job/databus/DatabusDataExecutor.java

@@ -0,0 +1,98 @@
+package com.java110.job.databus;
+
+import com.java110.core.log.LoggerFactory;
+import com.java110.dto.data.DatabusQueueDataDto;
+import com.java110.job.adapt.IDatabusAdapt;
+import com.java110.job.importData.ImportDataQueue;
+import com.java110.utils.factory.ApplicationContextFactory;
+import com.java110.utils.util.Assert;
+import org.slf4j.Logger;
+
+import java.util.concurrent.*;
+
+/**
+ * 导入资产数据执行器
+ */
+public class DatabusDataExecutor implements Runnable {
+    private static final Logger log = LoggerFactory.getLogger(ImportDataQueue.class);
+
+    private static final int MAX_ROW = 200;
+
+    private static final int DEFAULT_TIMEOUT_TIME = 5000; // 5秒超时
+
+    //默认线程大小
+    private static final int DEFAULT_EXPORT_POOL = 4;
+
+    private boolean isRun = false;
+
+    private ExecutorService executorService;
+
+    public DatabusDataExecutor(boolean isRun) {
+        this.isRun = isRun;
+    }
+
+    public DatabusDataExecutor() {
+    }
+
+    @Override
+    public void run() {
+
+        while (isRun) {
+            log.debug("databus数据线程开始处理");
+            try {
+                doQueueData();
+            } catch (Throwable e) {
+                log.error("处理databus异常", e);
+                e.printStackTrace();
+            }
+            log.debug("databus数据线程处理完成");
+
+        }
+
+    }
+
+    private void doQueueData() throws Exception {
+
+        DatabusQueueDataDto databusQueueDataDto = DatabusDataQueue.getData();
+        if (databusQueueDataDto == null) {
+            return;
+        }
+
+        String action = databusQueueDataDto.getBeanName();
+
+        IDatabusAdapt databusAdaptImpl = ApplicationContextFactory.getBean(action, IDatabusAdapt.class);
+
+        if (databusAdaptImpl == null) {
+            return;
+        }
+
+        executorService = Executors.newSingleThreadExecutor();
+        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                databusAdaptImpl.execute(databusQueueDataDto.getBusiness(), databusQueueDataDto.getBusinesses());
+                return "";
+            }
+        });
+        executorService.execute(futureTask);
+        try {
+            futureTask.get(DEFAULT_TIMEOUT_TIME, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException |
+                 TimeoutException e) {//e.printStackTrace();
+            futureTask.cancel(true);
+        }
+        executorService.shutdown();
+
+    }
+
+    /**
+     * 线程启动器
+     */
+    public static void startQueueDataExecutor() {
+        log.debug("开始初始化消息队列");
+        ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EXPORT_POOL);
+        executorService.execute(new DatabusDataExecutor(true));
+        log.debug("初始化导入消息完成");
+
+    }
+}

+ 40 - 0
service-job/src/main/java/com/java110/job/databus/DatabusDataQueue.java

@@ -0,0 +1,40 @@
+package com.java110.job.databus;
+
+import com.java110.core.log.LoggerFactory;
+import com.java110.dto.data.DatabusQueueDataDto;
+import org.slf4j.Logger;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class DatabusDataQueue {
+    private static final Logger log = LoggerFactory.getLogger(DatabusDataQueue.class);
+
+    private static final BlockingQueue<DatabusQueueDataDto> msgs = new LinkedBlockingQueue<DatabusQueueDataDto>(100);
+
+    /**
+     * 添加导出数据消息
+     *
+     * @param databusQueueDataDto
+     */
+    public static void addMsg(DatabusQueueDataDto databusQueueDataDto) {
+        try {
+
+            msgs.offer(databusQueueDataDto, 3, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            log.error("写入队列失败", e);
+            e.printStackTrace();
+        }
+    }
+
+    public static DatabusQueueDataDto getData() {
+        try {
+            return msgs.take();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+}

+ 9 - 0
service-job/src/main/java/com/java110/job/databus/DatabusQueue.java

@@ -0,0 +1,9 @@
+package com.java110.job.databus;
+
+public class DatabusQueue {
+
+    public void initExportQueue(){
+        //启动导出数据线程处理器
+        DatabusDataExecutor.startQueueDataExecutor();
+    }
+}

+ 20 - 0
service-job/src/main/java/com/java110/job/databus/DatabusQueueConfig.java

@@ -0,0 +1,20 @@
+package com.java110.job.databus;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * 消息队里 可以少量小区时 使用如果 小区数量比较大时 可以选择切换为mq
+ *
+ */
+@Configuration
+public class DatabusQueueConfig {
+
+
+    @Bean
+    public DatabusQueue databusQueue(){
+        DatabusQueue databusQueue = new DatabusQueue();
+        databusQueue.initExportQueue();
+        return databusQueue;
+    }
+}

+ 59 - 0
service-job/src/main/java/com/java110/job/databus/TimeoutTest.java

@@ -0,0 +1,59 @@
+package com.java110.job.databus;
+
+import java.util.Random;
+import java.util.concurrent.*;
+
+public class TimeoutTest {
+    private static ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    /*** @param args*/
+    public static void main(String[] args) {
+// TODO Auto-generated method stub
+        long start = System.currentTimeMillis();
+        String result = timeoutMethod(5000);
+        System.out.println("方法实际耗时:" + (System.currentTimeMillis() - start) + "毫秒");
+        System.out.println("结果:" + result);
+        try {
+            Thread.sleep(8000);
+            long start1 = System.currentTimeMillis();
+            String result1 = timeoutMethod(5000);
+            System.out.println("方法实际耗时:" + (System.currentTimeMillis() - start1) + "毫秒");
+            System.out.println("结果:" + result1);
+            executorService.shutdown();
+        } catch (
+                Exception e) {// TODO: handle exception
+        }
+    }
+
+    // /*** 有超时时间的方法* @param timeout* @return*/
+    private static String timeoutMethod(int timeout) {
+        String result = "默认";
+        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                return unknowMethod();
+            }
+        });
+        executorService.execute(futureTask);
+        try {
+            result = futureTask.get(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException |
+                 TimeoutException e) {//e.printStackTrace();
+            futureTask.cancel(true);
+            result = "默认";
+        }
+        return result;
+    }
+
+    /*** 这个方法的耗时不确定* @return*/
+    private static String unknowMethod() {
+        Random random = new Random();
+        int time = 10000;
+        System.out.println("任务将耗时: " + time + "毫秒");
+        try {
+            Thread.sleep(time);
+        } catch (Exception e) {// TODO: handle exception
+        }
+        return "获得方法执行后的返回值";
+    }
+}

+ 33 - 14
service-job/src/main/java/com/java110/job/smo/impl/DataBusInnerServiceSMOImpl.java

@@ -3,14 +3,16 @@ package com.java110.job.smo.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.java110.core.base.smo.BaseServiceSMO;
+import com.java110.dto.data.DatabusQueueDataDto;
 import com.java110.dto.system.BusinessDatabusDto;
-import com.java110.dto.system.CustomBusinessDatabusDto;
+import com.java110.dto.data.DatabusDataDto;
 import com.java110.dto.machine.CarInoutDto;
 import com.java110.dto.machine.MachineDto;
 import com.java110.dto.fee.TempCarPayOrderDto;
 import com.java110.dto.system.Business;
 import com.java110.intf.job.IDataBusInnerServiceSMO;
 import com.java110.job.adapt.IDatabusAdapt;
+import com.java110.job.databus.DatabusDataQueue;
 import com.java110.utils.cache.DatabusCache;
 import com.java110.utils.factory.ApplicationContextFactory;
 import com.java110.vo.ResultVo;
@@ -19,6 +21,7 @@ import com.java110.core.log.LoggerFactory;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -48,7 +51,7 @@ public class DataBusInnerServiceSMOImpl extends BaseServiceSMO implements IDataB
         for (Business business : businesses) {
             doExchange(business, businesses, databusDtos);
         }
-        return false;
+        return true;
     }
 
     @Override
@@ -57,6 +60,7 @@ public class DataBusInnerServiceSMOImpl extends BaseServiceSMO implements IDataB
         return databusAdaptImpl.openDoor(reqJson);
 
     }
+
     @Override
     public ResultVo closeDoor(@RequestBody JSONObject reqJson) {
         IDatabusAdapt databusAdaptImpl = ApplicationContextFactory.getBean(DEFAULT_OPEN_DOOR_PROTOCOL, IDatabusAdapt.class);
@@ -90,8 +94,6 @@ public class DataBusInnerServiceSMOImpl extends BaseServiceSMO implements IDataB
     }
 
 
-
-
     @Override
     public ResultVo restartMachine(@RequestBody JSONObject reqJson) {
         IDatabusAdapt databusAdaptImpl = ApplicationContextFactory.getBean(DEFAULT_START_MACHINE_PROTOCOL, IDatabusAdapt.class);
@@ -145,23 +147,37 @@ public class DataBusInnerServiceSMOImpl extends BaseServiceSMO implements IDataB
     /**
      * 门禁开门记录
      *
-     * @param customBusinessDatabusDto
+     * @param databusDataDto
      * @return
      */
     @Override
-    public void customExchange(@RequestBody CustomBusinessDatabusDto customBusinessDatabusDto) {
-        IDatabusAdapt databusAdaptImpl = null;
+    public boolean databusData(@RequestBody DatabusDataDto databusDataDto) {
+        // IDatabusAdapt databusAdaptImpl = null;
         List<BusinessDatabusDto> databusDtos = DatabusCache.getDatabuss();
+        Business business = null;
+        List<Business> businesses = null;
         for (BusinessDatabusDto databusDto : databusDtos) {
             try {
-                if (customBusinessDatabusDto.getBusinessTypeCd().equals(databusDto.getBusinessTypeCd())) {
-                    databusAdaptImpl = ApplicationContextFactory.getBean(databusDto.getBeanName(), IDatabusAdapt.class);
-                    databusAdaptImpl.customExchange(customBusinessDatabusDto);
+                if (!databusDataDto.getBusinessTypeCd().equals(databusDto.getBusinessTypeCd())) {
+                    continue;
                 }
+                businesses = new ArrayList<>();
+                business = new Business();
+                business.setData(databusDataDto.getData());
+                business.setBusinessTypeCd(databusDataDto.getBusinessTypeCd());
+                businesses.add(business);
+                //todo 存放队列中
+                DatabusDataQueue.addMsg(new DatabusQueueDataDto(databusDto.getBeanName(), business, businesses));
+
+//                databusAdaptImpl = ApplicationContextFactory.getBean(databusDto.getBeanName(), IDatabusAdapt.class);
+//                databusAdaptImpl.customExchange(customBusinessDatabusDto);
             } catch (Exception e) {
                 logger.error("执行databus失败", e);
             }
         }
+
+        return true;
+
     }
 
 
@@ -173,13 +189,16 @@ public class DataBusInnerServiceSMOImpl extends BaseServiceSMO implements IDataB
      * @param databusDtos databus
      */
     private void doExchange(Business business, List<Business> businesses, List<BusinessDatabusDto> databusDtos) {
-        IDatabusAdapt databusAdaptImpl = null;
+        // IDatabusAdapt databusAdaptImpl = null;
         for (BusinessDatabusDto databusDto : databusDtos) {
             try {
-                if (business.getBusinessTypeCd().equals(databusDto.getBusinessTypeCd())) {
-                    databusAdaptImpl = ApplicationContextFactory.getBean(databusDto.getBeanName(), IDatabusAdapt.class);
-                    databusAdaptImpl.execute(business, businesses);
+                if (!business.getBusinessTypeCd().equals(databusDto.getBusinessTypeCd())) {
+                    continue;
                 }
+                //todo 存放队列中
+                DatabusDataQueue.addMsg(new DatabusQueueDataDto(databusDto.getBeanName(), business, businesses));
+//                    databusAdaptImpl = ApplicationContextFactory.getBean(databusDto.getBeanName(), IDatabusAdapt.class);
+//                    databusAdaptImpl.execute(business, businesses);
             } catch (Exception e) {
                 logger.error("执行databus失败", e);
             }

+ 1 - 0
service-store/src/main/java/com/java110/store/cmd/resourceStore/AuditAllocationStoreOrderCmd.java

@@ -82,6 +82,7 @@ public class AuditAllocationStoreOrderCmd extends Cmd {
         allocationStorehouseDto.setAuditMessage(reqJson.getString("remark"));
         allocationStorehouseDto.setCurrentUserId(reqJson.getString("userId"));
         allocationStorehouseDto.setNoticeState(reqJson.getString("noticeState"));
+        allocationStorehouseDto.setNextUserId(reqJson.getString("nextUserId"));
         AllocationStorehouseApplyDto tmpAllocationStorehouseDto = new AllocationStorehouseApplyDto();
         tmpAllocationStorehouseDto.setApplyId(reqJson.getString("applyId"));
         tmpAllocationStorehouseDto.setStoreId(reqJson.getString("storeId"));

+ 5 - 3
service-store/src/main/java/com/java110/store/cmd/resourceStore/ListAllocationStoreAuditOrdersCmd.java

@@ -80,6 +80,10 @@ public class ListAllocationStoreAuditOrdersCmd extends Cmd {
             return;
         }
 
+        for(AllocationStorehouseApplyDto allocationStorehouseApplyDto: allocationStorehouseApplyDtos){
+            allocationStorehouseApplyDto.setStoreManager("N");
+        }
+
         //todo 查询调拨 中是否为管理员
         WorkflowStepStaffDto workflowStepStaffDto = new WorkflowStepStaffDto();
         workflowStepStaffDto.setFlowType(WorkflowDto.FLOW_TYPE_ALLOCATION_STOREHOUSE);
@@ -90,11 +94,9 @@ public class ListAllocationStoreAuditOrdersCmd extends Cmd {
             return;
         }
 
-
-
         for(AllocationStorehouseApplyDto allocationStorehouseApplyDto: allocationStorehouseApplyDtos){
             for(WorkflowStepStaffDto tmpWorkflowStepStaffDto : workflowStepStaffDtos) {
-                if (allocationStorehouseApplyDto.getProcessDefinitionKey().equals(tmpWorkflowStepStaffDto.getProcessDefinitionKey())){
+                if (allocationStorehouseApplyDto.getProcessDefinitionKey().equals("java110_"+tmpWorkflowStepStaffDto.getFlowId())){
                     allocationStorehouseApplyDto.setStoreManager("Y");
                 }
             }