Procházet zdrojové kódy

定时任务开发完成

java110 před 6 roky
rodič
revize
0698cd695e

+ 10 - 0
java110-bean/src/main/java/com/java110/dto/task/TaskDto.java

@@ -27,6 +27,8 @@ public class TaskDto extends PageDto implements Serializable {
 
     private List<TaskAttrDto> taskAttr;
 
+    private TaskTemplateDto taskTemplateDto;
+
 
     private Date createTime;
 
@@ -114,4 +116,12 @@ public class TaskDto extends PageDto implements Serializable {
     public void setTaskAttr(List<TaskAttrDto> taskAttr) {
         this.taskAttr = taskAttr;
     }
+
+    public TaskTemplateDto getTaskTemplateDto() {
+        return taskTemplateDto;
+    }
+
+    public void setTaskTemplateDto(TaskTemplateDto taskTemplateDto) {
+        this.taskTemplateDto = taskTemplateDto;
+    }
 }

+ 17 - 0
java110-core/src/main/java/com/java110/core/smo/task/ITaskInnerServiceSMO.java

@@ -9,6 +9,7 @@ import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.List;
 
@@ -82,4 +83,20 @@ public interface ITaskInnerServiceSMO {
      */
     @RequestMapping(value = "/queryTaskTemplateSpec", method = RequestMethod.POST)
      List<TaskTemplateSpecDto> queryTaskTemplateSpec(@RequestBody TaskTemplateSpecDto taskTemplateSpecDto);
+
+    /**
+     * 启动任务
+     * @param taskDto
+     * @return
+     */
+    @RequestMapping(value = "/startTask", method = RequestMethod.POST)
+    int startTask(@RequestBody TaskDto taskDto);
+
+    /**
+     * 停止任务
+     * @param taskDto
+     * @return
+     */
+    @RequestMapping(value = "/stopTask", method = RequestMethod.POST)
+    int stopTask(@RequestBody TaskDto taskDto);
 }

+ 7 - 8
service-job/src/main/java/com/java110/job/Api/HcFtpToFileSystemConfigAction.java

@@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.java110.core.factory.GenerateCodeFactory;
 import com.java110.job.dao.IHcFtpFileDAO;
-import com.java110.job.smo.DownloadFileFromFtpToTable;
-import com.java110.job.task.HcFtpToFileSystemJob;
+import com.java110.job.task.TaskSystemJob;
 import org.apache.commons.validator.GenericValidator;
 import org.apache.commons.validator.util.ValidatorUtils;
 import org.quartz.*;
@@ -502,18 +501,18 @@ public class HcFtpToFileSystemConfigAction {
 				String triggerName = triggerNames + taskId;
 
 				//设置任务名称
-				JobKey jobKey = new JobKey(jobName,HcFtpToFileSystemJob.JOB_GROUP_NAME);
+				JobKey jobKey = new JobKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
 				JobDetail jobDetail = scheduler.getJobDetail(jobKey);
 
 				if (jobDetail == null) {
 					// 任务名称
 					String taskCfgName = (String) doFtpItem.get("TASKNAME");
 					//构建job信息
-					JobDetail warnJob = JobBuilder.newJob(HcFtpToFileSystemJob.class).withIdentity(jobName, HcFtpToFileSystemJob.JOB_GROUP_NAME).withDescription("任务启动").build();
+					JobDetail warnJob = JobBuilder.newJob(TaskSystemJob.class).withIdentity(jobName, TaskSystemJob.JOB_GROUP_NAME).withDescription("任务启动").build();
 
-					warnJob.getJobDataMap().put(HcFtpToFileSystemJob.JOB_DATA_CONFIG_NAME, taskCfgName);
+					warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_CONFIG_NAME, taskCfgName);
 
-					warnJob.getJobDataMap().put(HcFtpToFileSystemJob.JOB_DATA_TASK_ID, taskId);
+					warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_TASK_ID, taskId);
 
 					// 触发时间点
 					CronTrigger warnTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerName+"_group").withSchedule(cronScheduleBuilder).build();
@@ -627,13 +626,13 @@ public class HcFtpToFileSystemConfigAction {
 
 				String triggerName = prefixJobName + taskId;
 
-				TriggerKey triggerKey = TriggerKey.triggerKey(jobName, HcFtpToFileSystemJob.JOB_GROUP_NAME);
+				TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
 				// 停止触发器
 				scheduler.pauseTrigger(triggerKey);
 				// 移除触发器
 				scheduler.unscheduleJob(triggerKey);
 
-				JobKey jobKey = new JobKey(jobName, HcFtpToFileSystemJob.JOB_GROUP_NAME);
+				JobKey jobKey = new JobKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
 				// 删除任务
 				scheduler.deleteJob(jobKey);
 				// 修改数据状态,将任务数据状态改为运行状态

+ 1 - 0
service-job/src/main/java/com/java110/job/smo/DownloadFileFromFtpToTable.java

@@ -1,5 +1,6 @@
 package com.java110.job.smo;
 
+import com.java110.job.task.TaskSystemQuartz;
 import com.java110.job.util.FTPClientTemplate;
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPClient;

+ 347 - 347
service-job/src/main/java/com/java110/job/smo/HcFtpToFileSystemQuartz.java

@@ -16,356 +16,356 @@ import java.text.SimpleDateFormat;
 import java.util.*;
 
 /**
- * 
+ *
  * @author
- * 
+ *
  */
 public abstract class HcFtpToFileSystemQuartz{
 
-	protected static final Logger logger = LoggerFactory.getLogger(HcFtpToFileSystemQuartz.class);
-	@Autowired
-	private IHcFtpFileDAO iHcFtpFileDAO;
-	@Autowired
-	private IHcFtpFileSMO iHcFtpFileSMO;
-
-	/*private IPrvncDumpSMO prvncDumpSMO;*/
-	// 运行状态,R:正在执行 T:等待运行 TD1:文件下载失败 TD2:文件内容保存失败 TU1:数据文件生成失败 TU2:数据文件上传失败
-	private static final String TASK_STATE_R = "R";// 正在运行
-	private static final String TASK_STATE_T = "T";// 等待运行
-	private static final String TASK_STATE_E1 = "E1";// 执行事前过程失败
-	private static final String TASK_STATE_E2 = "E2";// 处理数据失败
-	private static final String TASK_STATE_E3 = "E3";// 执行事后过程失败
-
-	public void initTask() {
-
-		// 将所有的任务状态改为等待运行状态
-		Map paramIn = new HashMap();
-		paramIn.put("oldRunState", "R");
-		paramIn.put("runState", "T");
-		int updateFtpItemRunStateFlag = iHcFtpFileDAO.updateFtpItemRunState(paramIn);
-		if (updateFtpItemRunStateFlag < 1) {
-			logger.error("--【PrvncFtpToFileSystemQuartz.initTask】,没有需要更新的内容(没有下载一半后停止应用的情况)", paramIn);
-		}
-	}
-
-	/**
-	 * 启动任务
-	 * 
-	 * @param ftpItemConfigInfo
-	 */
-	public  void startFtpTask(Map ftpItemConfigInfo) throws Exception {
-
-		// 这么做是为了,单线程调用,防止多线程导致数据重复处理
-		if (!ftpItemConfigInfo.containsKey("RUN_STATE") || "R".equals(ftpItemConfigInfo.get("RUN_STATE"))) {
-			return;
-		}
-
-		long taskId = Long.parseLong(ftpItemConfigInfo.get("TASKID").toString());
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("---【PrvncFtpToFileSystemQuartz.startFtpTask】:任务【" + taskId + "】开始运行!", taskId);
-		}
-
-		// 保存任务执行主要日志信息
-		//获取LOGID 默认生成规则为tadkid去掉年月日之前的值+66
-		String id = ftpItemConfigInfo.get("TASKID").toString();
-		id = id.substring(10,id.length());
-		long logid = Long.parseLong (id+"22");
-		ftpItemConfigInfo.put("logid",logid);
-		long taskLogID = insertTaskInfo(ftpItemConfigInfo);
-
-		ftpItemConfigInfo.put("logid", taskLogID);
-		ftpItemConfigInfo.put("taskid", taskId);
-		ftpItemConfigInfo.put("threadrunstate", TASK_STATE_R);
-		ftpItemConfigInfo.put("tnum", 1);
-		// 修改任务状态为正在执行状态
-		updateTaskState(taskId, TASK_STATE_R);
-		// 方法调用是否成功,S成功(默认),E表示失败(在方法中失败时,需要修改)
-		ftpItemConfigInfo.put("PRE_METHOD_FLAG", "S");
-		try {
-			// 1.0空方法,让子类去实现
-			prepare(ftpItemConfigInfo);
-
-			// 2.0调用事前过程
-			if (ftpItemConfigInfo.containsKey("PREFLAG") && "0".equals(ftpItemConfigInfo.get("PREFLAG"))) {
-				callPreFunction(ftpItemConfigInfo);
-			}
-
-			if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
-				// 此时调用事前过程失败,直接返回 查询标识为E,更新日志
-				udpateTaskLog(ftpItemConfigInfo);
-				updateTaskState(taskId, TASK_STATE_E1);
-				return;
-			}
-
-			// 3.0核心业务处理逻辑,需要子类去实现
-			process(ftpItemConfigInfo);
-
-			if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
-				// 程序处理失败,直接返回
-				ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
-				updateTaskState(taskId, TASK_STATE_E2);
-				udpateTaskLog(ftpItemConfigInfo);
-				saveTaskLogDetail(ftpItemConfigInfo);// 保存detail
-				return;
-			}
-			// 记录详细日志
-			ftpItemConfigInfo.put("threadrunstate", "T");
-			saveTaskLogDetail(ftpItemConfigInfo);
-
-			// 4.0调用事后过程
-			if (ftpItemConfigInfo.containsKey("AFTERFLAG") && "0".equals(ftpItemConfigInfo.get("AFTERFLAG"))) {
-				callAfterFunction(ftpItemConfigInfo);
-			}
-			if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
-				// 此时调用事前过程失败,直接返回 查询标识为E,更新日志
-				udpateTaskLog(ftpItemConfigInfo);
-				updateTaskState(taskId, TASK_STATE_E3);
-				return;
-			}
-			// 5.0空方法,让子类去实现
-			post(ftpItemConfigInfo);
-		} catch (Exception ex) {
-			ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
-			udpateTaskLog(ftpItemConfigInfo);
-			ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
-			ftpItemConfigInfo.put("remark", ex);
-			saveTaskLogDetail(ftpItemConfigInfo);
-			updateTaskState(taskId, TASK_STATE_E2);
-			// 接续向外抛出去
-			logger.error("处理出现问题:", ex);
-			return;
-		}
-
-		// 修改任务状态为执行完毕状态
-		updateTaskState(taskId, TASK_STATE_T);
-		ftpItemConfigInfo.put("threadrunstate", TASK_STATE_T);
-		udpateTaskLog(ftpItemConfigInfo);
-
-		// 发送任务运行结果通知短信给相关人员 **暂时不调用短信
-		if (!TASK_STATE_T.equals(ftpItemConfigInfo.get("RUN_STATE").toString())) {
-			/*sendErrLogPhoneMsg(ftpItemConfigInfo, taskLogID);*/
-		}
-
-	}
-
-	// 如果有事前存过需要调用,则先调用存过
-	public void callPreFunction(Map taskInfo) {
-
-		if (taskInfo.containsKey("PREFUNCTION") && taskInfo.get("PREFUNCTION") != null && !"".equals(taskInfo.get("PREFUNCTION"))) {
-			try {
-				iHcFtpFileSMO.saveDbFunction(taskInfo.get("PREFUNCTION").toString());
-				taskInfo.put("threadrunstate", "T");
-				taskInfo.put("remark", "调用事前存过结束");
-				saveTaskLogDetail(taskInfo);
-			} catch (Exception ex) {
-				logger.error("调用事前存过失败:", ex);
-				taskInfo.put("threadrunstate", "E1");
-				taskInfo.put("remark", "调用事前存过失败" + ex);
-				taskInfo.put("PRE_METHOD_FLAG", "E");
-				saveTaskLogDetail(taskInfo);
-			}
-		}
-	}
-
-	/**
-	 * 主要业务处理(上传下载),让子类去实现
-	 * 
-	 * @param ftpItemConfigInfo
-	 */
-	protected abstract void process(Map ftpItemConfigInfo) throws Exception;
-
-	// 如果有事后存过需要调用,则调用存过
-	public void callAfterFunction(Map taskInfo) {
-
-		if (taskInfo.containsKey("AFTERFUNCTION") && taskInfo.get("AFTERFUNCTION") != null && !"".equals(taskInfo.get("AFTERFUNCTION"))) {
-			try {
-				taskInfo.put("functionname", taskInfo.get("AFTERFUNCTION"));
-				// taskInfo 参数param需要在process方法中需要自己写入
-				iHcFtpFileSMO.saveDbFunctionWithParam(taskInfo);
-				taskInfo.put("threadrunstate", "T");
-				taskInfo.put("remark", "调用事后存过结束");
-				saveTaskLogDetail(taskInfo);
-			} catch (Exception ex) {
-				ex.printStackTrace();
-				taskInfo.put("threadrunstate", "E3");
-				taskInfo.put("remark", "调用事后存过失败" + ex);
-				taskInfo.put("PRE_METHOD_FLAG", "E");
-				saveTaskLogDetail(taskInfo);
-			}
-		}
-	}
-
-	/**
-	 * 修改任务状态
-	 * 
-	 * @return
-	 */
-	private void updateTaskState(long taskId, String state) {
-		Map info = new HashMap();
-
-		info.put("taskId", taskId);
-		info.put("runState", state);
-		int updateFtpItemFlag = iHcFtpFileDAO.updateFtpItemByTaskId(info);
-		// 这里只是后台提示,不进行日志保存
-		if (updateFtpItemFlag < 1) {
-			logger.error("---【PrvncFtpToFileSystemQuartz.updateTaskState】修改任务【" + taskId + "】的状态失败", info);
-		}
-	}
-
-	/**
-	 * 修改任务执行日志的状态
-	 */
-	private void udpateTaskLog(Map taskInfo) {
-		FtpTaskLog loginfo = new FtpTaskLog();
-		loginfo.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
-		loginfo.setState(taskInfo.get("threadrunstate").toString());
-		iHcFtpFileSMO.updateTaskRunLog(loginfo);
-	}
-
-	/**
-	 * 保存任务执行的详细日志
-	 */
-	protected void saveTaskLogDetail(Map taskInfo) {
-		FtpTaskLogDetail logdetail = new FtpTaskLogDetail();
-		logdetail.setId(Long.valueOf(taskInfo.get("logid").toString()+"66"));
-		logdetail.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
-		logdetail.setTaskid(Long.valueOf(taskInfo.get("taskid").toString()));
-		logdetail.setState((String) taskInfo.get("threadrunstate"));
-		logdetail.setTnum(Integer.valueOf(taskInfo.get("tnum").toString()));
-		if (taskInfo.get("begin") != null) {
-			logdetail.setBegin(Long.valueOf(taskInfo.get("begin").toString()));
-		}
-		if (taskInfo.get("end") != null) {
-			logdetail.setEnd(Long.valueOf(taskInfo.get("end").toString()));
-		}
-		if (taskInfo.get("havedown") != null) {
-			logdetail.setHavedown(Long.valueOf(taskInfo.get("havedown").toString()));
-		}
-		logdetail.setRemark(taskInfo.get("remark") == null ? "" : (taskInfo.get("remark").toString().trim().length() > 2000 ? taskInfo.get("remark").toString().trim().substring(0,
-				1600) : taskInfo.get("remark").toString().trim()));
-		logdetail.setData(taskInfo.get("data") == null ? "" : taskInfo.get("data").toString());
-		logdetail.setServerfilename(taskInfo.get("serverfilename") == null ? "" : taskInfo.get("serverfilename").toString());
-		logdetail.setLocalfilename(taskInfo.get("localfilename") == null ? "" : taskInfo.get("localfilename").toString());
-		int logdetailid = iHcFtpFileSMO.saveTaskRunDetailLog(logdetail);
-		taskInfo.put("logdetailid", logdetailid);
-	}
-
-	// /**
-	// * 修改任务执行的详细日志的状态
-	// */
-	// private void updateTaskLogDetail(Map taskInfo){
-	// FtpTaskLogDetail logdetail=new FtpTaskLogDetail();//
-	// logdetail.setId(Long.valueOf(taskInfo.get("logdetailid").toString()));
-	// logdetail.setState(taskInfo.get("threadrunstate").toString());
-	// logdetail.setRemark((String)taskInfo.get("remark"));
-	// logdetail.setData((String)taskInfo.get("data"));
-	// if(taskInfo.get("downedlength")!=null)
-	// logdetail.setHavedown(Long.valueOf(taskInfo.get("downedlength").toString()));
-	// prvncFtpFileSMO.updateTaskRunDetailLog(logdetail);
-	// }
-
-	/**
-	 * 生成任务执行日志
-	 */
-	private long insertTaskInfo(Map taskInfo) {
-		FtpTaskLog loginfo = new FtpTaskLog();
-		loginfo.setTaskid(Long.valueOf(taskInfo.get("TASKID").toString()));
-		loginfo.setState("R");
-		loginfo.setServerfilename("");// taskInfo.get("serverfilename").toString()
-		loginfo.setLocalfilename("");// taskInfo.get("localfilename").toString()
-		loginfo.setUord(taskInfo.get("U_OR_D").toString());
-		return iHcFtpFileSMO.saveTaskRunLog(loginfo);
-	}
-
-	/**
-	 * 如果任务运行有异常,则发送警告短信给配置的手机号码
-	 */
-	private void sendErrLogPhoneMsg(Map taskInfo, long taskLogID) {
-		Map msginfo = new HashMap();
-		String phone = (String) taskInfo.get("errphone");
-		if (phone != null && !"".equals(phone)) {
-			String[] phonelist = phone.split(",");
-			for (int i = 0; i < phonelist.length; i++) {
-				msginfo.put("taskid", taskInfo.get("taskid"));
-				msginfo.put("phone", phonelist[i]);
-				msginfo.put("msg", "通用FTP数据文件传接任务:" + (String) taskInfo.get("taskname") + "运行提示");
-
-				DateFormat df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
-				String detail = "任务已于" + df.format(new Date()) + "运行完毕。运行过程中出现异常,详情请登录系统查看!";
-				msginfo.put("detail", detail);
-				/*prvncDumpSMO.saveTaskErrInfoPhoneMsg(msginfo);*/
-			}
-		}
-	}
-
-	/**
-	 * 处理文件名,校验文件名是中是否存在****(4个),表示通配符,如果不存在就是确定唯一文件名
-	 * 文件名支持日期型的如CRM_########001.txt 程序处理后是 CRM_20170105001.txt 文件名支持sql 语句生成的
-	 * 文件名支持通配符的如863_****.txt 程序下载所有以863_开头的文件 863_****001.txt
-	 * 以863_开头,以001结尾,****001.txt 以001结尾的
-	 * 
-	 * @param fileName
-	 * @return
-	 */
-	protected List<String> dealFileName(String fileName) {
-		// TODO Auto-generated method stub
-		List<String> results = new ArrayList<String>();
-		String result = "";
-		// 文件中使用的日期
-		if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_F)) {
-			result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_F, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHHmm"));
-		} else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_E)) {
-			result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_E, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHH"));
-		} else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_A)) {
-			result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_A, DateUtil.getFormatTimeString(new Date(), "yyyyMMdd"));
-		} else if (StringUtils.contains(fileName == null ? "" : fileName.toLowerCase(), RuleDomain.REPLAY_TYPE_SQL)) {
-			// 后期改造,文件名如果配置的是sql的话,以sql查询文件名
-			List<String> fileNames = this.getPrvncFtpFileDAO().execConfigSql(fileName);
-			// if (fileNames != null && fileNames.size() > 0) {
-			// result = fileNames.get(0);
-			// }
-			return fileNames;
-		} else {
-			result = fileName;
-		}
-		results.add(result);
-		return results;
-	}
-
-	/**
-	 * 空方法,如果在事前过程处理前,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
-	 * 
-	 * @param ftpItemConfigInfo
-	 */
-	protected void prepare(Map ftpItemConfigInfo) {
-
-	}
-
-	/**
-	 * 空方法,如果在事后过程处理完后,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
-	 * 
-	 * @param ftpItemConfigInfo
-	 */
-	protected void post(Map ftpItemConfigInfo) {
-
-	}
-
-	public IHcFtpFileDAO getPrvncFtpFileDAO() {
-		return iHcFtpFileDAO;
-	}
-
-	public void setPrvncFtpFileDAO(IHcFtpFileDAO prvncFtpFileDAO) {
-		this.iHcFtpFileDAO = prvncFtpFileDAO;
-	}
-
-	public IHcFtpFileSMO getPrvncFtpFileSMO() {
-		return iHcFtpFileSMO;
-	}
-
-	public void setPrvncFtpFileSMO(IHcFtpFileSMO prvncFtpFileSMO) {
-		this.iHcFtpFileSMO = prvncFtpFileSMO;
-	}
+    protected static final Logger logger = LoggerFactory.getLogger(HcFtpToFileSystemQuartz.class);
+    @Autowired
+    private IHcFtpFileDAO iHcFtpFileDAO;
+    @Autowired
+    private IHcFtpFileSMO iHcFtpFileSMO;
+
+    /*private IPrvncDumpSMO prvncDumpSMO;*/
+    // 运行状态,R:正在执行 T:等待运行 TD1:文件下载失败 TD2:文件内容保存失败 TU1:数据文件生成失败 TU2:数据文件上传失败
+    private static final String TASK_STATE_R = "R";// 正在运行
+    private static final String TASK_STATE_T = "T";// 等待运行
+    private static final String TASK_STATE_E1 = "E1";// 执行事前过程失败
+    private static final String TASK_STATE_E2 = "E2";// 处理数据失败
+    private static final String TASK_STATE_E3 = "E3";// 执行事后过程失败
+
+    public void initTask() {
+
+        // 将所有的任务状态改为等待运行状态
+        Map paramIn = new HashMap();
+        paramIn.put("oldRunState", "R");
+        paramIn.put("runState", "T");
+        int updateFtpItemRunStateFlag = iHcFtpFileDAO.updateFtpItemRunState(paramIn);
+        if (updateFtpItemRunStateFlag < 1) {
+            logger.error("--【PrvncFtpToFileSystemQuartz.initTask】,没有需要更新的内容(没有下载一半后停止应用的情况)", paramIn);
+        }
+    }
+
+    /**
+     * 启动任务
+     *
+     * @param ftpItemConfigInfo
+     */
+    public  void startFtpTask(Map ftpItemConfigInfo) throws Exception {
+
+        // 这么做是为了,单线程调用,防止多线程导致数据重复处理
+        if (!ftpItemConfigInfo.containsKey("RUN_STATE") || "R".equals(ftpItemConfigInfo.get("RUN_STATE"))) {
+            return;
+        }
+
+        long taskId = Long.parseLong(ftpItemConfigInfo.get("TASKID").toString());
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("---【PrvncFtpToFileSystemQuartz.startFtpTask】:任务【" + taskId + "】开始运行!", taskId);
+        }
+
+        // 保存任务执行主要日志信息
+        //获取LOGID 默认生成规则为tadkid去掉年月日之前的值+66
+        String id = ftpItemConfigInfo.get("TASKID").toString();
+        id = id.substring(10,id.length());
+        long logid = Long.parseLong (id+"22");
+        ftpItemConfigInfo.put("logid",logid);
+        long taskLogID = insertTaskInfo(ftpItemConfigInfo);
+
+        ftpItemConfigInfo.put("logid", taskLogID);
+        ftpItemConfigInfo.put("taskid", taskId);
+        ftpItemConfigInfo.put("threadrunstate", TASK_STATE_R);
+        ftpItemConfigInfo.put("tnum", 1);
+        // 修改任务状态为正在执行状态
+        updateTaskState(taskId, TASK_STATE_R);
+        // 方法调用是否成功,S成功(默认),E表示失败(在方法中失败时,需要修改)
+        ftpItemConfigInfo.put("PRE_METHOD_FLAG", "S");
+        try {
+            // 1.0空方法,让子类去实现
+            prepare(ftpItemConfigInfo);
+
+            // 2.0调用事前过程
+            if (ftpItemConfigInfo.containsKey("PREFLAG") && "0".equals(ftpItemConfigInfo.get("PREFLAG"))) {
+                callPreFunction(ftpItemConfigInfo);
+            }
+
+            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
+                // 此时调用事前过程失败,直接返回 查询标识为E,更新日志
+                udpateTaskLog(ftpItemConfigInfo);
+                updateTaskState(taskId, TASK_STATE_E1);
+                return;
+            }
+
+            // 3.0核心业务处理逻辑,需要子类去实现
+            process(ftpItemConfigInfo);
+
+            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
+                // 程序处理失败,直接返回
+                ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
+                updateTaskState(taskId, TASK_STATE_E2);
+                udpateTaskLog(ftpItemConfigInfo);
+                saveTaskLogDetail(ftpItemConfigInfo);// 保存detail
+                return;
+            }
+            // 记录详细日志
+            ftpItemConfigInfo.put("threadrunstate", "T");
+            saveTaskLogDetail(ftpItemConfigInfo);
+
+            // 4.0调用事后过程
+            if (ftpItemConfigInfo.containsKey("AFTERFLAG") && "0".equals(ftpItemConfigInfo.get("AFTERFLAG"))) {
+                callAfterFunction(ftpItemConfigInfo);
+            }
+            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
+                // 此时调用事前过程失败,直接返回 查询标识为E,更新日志
+                udpateTaskLog(ftpItemConfigInfo);
+                updateTaskState(taskId, TASK_STATE_E3);
+                return;
+            }
+            // 5.0空方法,让子类去实现
+            post(ftpItemConfigInfo);
+        } catch (Exception ex) {
+            ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
+            udpateTaskLog(ftpItemConfigInfo);
+            ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
+            ftpItemConfigInfo.put("remark", ex);
+            saveTaskLogDetail(ftpItemConfigInfo);
+            updateTaskState(taskId, TASK_STATE_E2);
+            // 接续向外抛出去
+            logger.error("处理出现问题:", ex);
+            return;
+        }
+
+        // 修改任务状态为执行完毕状态
+        updateTaskState(taskId, TASK_STATE_T);
+        ftpItemConfigInfo.put("threadrunstate", TASK_STATE_T);
+        udpateTaskLog(ftpItemConfigInfo);
+
+        // 发送任务运行结果通知短信给相关人员 **暂时不调用短信
+        if (!TASK_STATE_T.equals(ftpItemConfigInfo.get("RUN_STATE").toString())) {
+            /*sendErrLogPhoneMsg(ftpItemConfigInfo, taskLogID);*/
+        }
+
+    }
+
+    // 如果有事前存过需要调用,则先调用存过
+    public void callPreFunction(Map taskInfo) {
+
+        if (taskInfo.containsKey("PREFUNCTION") && taskInfo.get("PREFUNCTION") != null && !"".equals(taskInfo.get("PREFUNCTION"))) {
+            try {
+                iHcFtpFileSMO.saveDbFunction(taskInfo.get("PREFUNCTION").toString());
+                taskInfo.put("threadrunstate", "T");
+                taskInfo.put("remark", "调用事前存过结束");
+                saveTaskLogDetail(taskInfo);
+            } catch (Exception ex) {
+                logger.error("调用事前存过失败:", ex);
+                taskInfo.put("threadrunstate", "E1");
+                taskInfo.put("remark", "调用事前存过失败" + ex);
+                taskInfo.put("PRE_METHOD_FLAG", "E");
+                saveTaskLogDetail(taskInfo);
+            }
+        }
+    }
+
+    /**
+     * 主要业务处理(上传下载),让子类去实现
+     *
+     * @param ftpItemConfigInfo
+     */
+    protected abstract void process(Map ftpItemConfigInfo) throws Exception;
+
+    // 如果有事后存过需要调用,则调用存过
+    public void callAfterFunction(Map taskInfo) {
+
+        if (taskInfo.containsKey("AFTERFUNCTION") && taskInfo.get("AFTERFUNCTION") != null && !"".equals(taskInfo.get("AFTERFUNCTION"))) {
+            try {
+                taskInfo.put("functionname", taskInfo.get("AFTERFUNCTION"));
+                // taskInfo 参数param需要在process方法中需要自己写入
+                iHcFtpFileSMO.saveDbFunctionWithParam(taskInfo);
+                taskInfo.put("threadrunstate", "T");
+                taskInfo.put("remark", "调用事后存过结束");
+                saveTaskLogDetail(taskInfo);
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                taskInfo.put("threadrunstate", "E3");
+                taskInfo.put("remark", "调用事后存过失败" + ex);
+                taskInfo.put("PRE_METHOD_FLAG", "E");
+                saveTaskLogDetail(taskInfo);
+            }
+        }
+    }
+
+    /**
+     * 修改任务状态
+     *
+     * @return
+     */
+    private void updateTaskState(long taskId, String state) {
+        Map info = new HashMap();
+
+        info.put("taskId", taskId);
+        info.put("runState", state);
+        int updateFtpItemFlag = iHcFtpFileDAO.updateFtpItemByTaskId(info);
+        // 这里只是后台提示,不进行日志保存
+        if (updateFtpItemFlag < 1) {
+            logger.error("---【PrvncFtpToFileSystemQuartz.updateTaskState】修改任务【" + taskId + "】的状态失败", info);
+        }
+    }
+
+    /**
+     * 修改任务执行日志的状态
+     */
+    private void udpateTaskLog(Map taskInfo) {
+        FtpTaskLog loginfo = new FtpTaskLog();
+        loginfo.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
+        loginfo.setState(taskInfo.get("threadrunstate").toString());
+        iHcFtpFileSMO.updateTaskRunLog(loginfo);
+    }
+
+    /**
+     * 保存任务执行的详细日志
+     */
+    protected void saveTaskLogDetail(Map taskInfo) {
+        FtpTaskLogDetail logdetail = new FtpTaskLogDetail();
+        logdetail.setId(Long.valueOf(taskInfo.get("logid").toString()+"66"));
+        logdetail.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
+        logdetail.setTaskid(Long.valueOf(taskInfo.get("taskid").toString()));
+        logdetail.setState((String) taskInfo.get("threadrunstate"));
+        logdetail.setTnum(Integer.valueOf(taskInfo.get("tnum").toString()));
+        if (taskInfo.get("begin") != null) {
+            logdetail.setBegin(Long.valueOf(taskInfo.get("begin").toString()));
+        }
+        if (taskInfo.get("end") != null) {
+            logdetail.setEnd(Long.valueOf(taskInfo.get("end").toString()));
+        }
+        if (taskInfo.get("havedown") != null) {
+            logdetail.setHavedown(Long.valueOf(taskInfo.get("havedown").toString()));
+        }
+        logdetail.setRemark(taskInfo.get("remark") == null ? "" : (taskInfo.get("remark").toString().trim().length() > 2000 ? taskInfo.get("remark").toString().trim().substring(0,
+                1600) : taskInfo.get("remark").toString().trim()));
+        logdetail.setData(taskInfo.get("data") == null ? "" : taskInfo.get("data").toString());
+        logdetail.setServerfilename(taskInfo.get("serverfilename") == null ? "" : taskInfo.get("serverfilename").toString());
+        logdetail.setLocalfilename(taskInfo.get("localfilename") == null ? "" : taskInfo.get("localfilename").toString());
+        int logdetailid = iHcFtpFileSMO.saveTaskRunDetailLog(logdetail);
+        taskInfo.put("logdetailid", logdetailid);
+    }
+
+    // /**
+    // * 修改任务执行的详细日志的状态
+    // */
+    // private void updateTaskLogDetail(Map taskInfo){
+    // FtpTaskLogDetail logdetail=new FtpTaskLogDetail();//
+    // logdetail.setId(Long.valueOf(taskInfo.get("logdetailid").toString()));
+    // logdetail.setState(taskInfo.get("threadrunstate").toString());
+    // logdetail.setRemark((String)taskInfo.get("remark"));
+    // logdetail.setData((String)taskInfo.get("data"));
+    // if(taskInfo.get("downedlength")!=null)
+    // logdetail.setHavedown(Long.valueOf(taskInfo.get("downedlength").toString()));
+    // prvncFtpFileSMO.updateTaskRunDetailLog(logdetail);
+    // }
+
+    /**
+     * 生成任务执行日志
+     */
+    private long insertTaskInfo(Map taskInfo) {
+        FtpTaskLog loginfo = new FtpTaskLog();
+        loginfo.setTaskid(Long.valueOf(taskInfo.get("TASKID").toString()));
+        loginfo.setState("R");
+        loginfo.setServerfilename("");// taskInfo.get("serverfilename").toString()
+        loginfo.setLocalfilename("");// taskInfo.get("localfilename").toString()
+        loginfo.setUord(taskInfo.get("U_OR_D").toString());
+        return iHcFtpFileSMO.saveTaskRunLog(loginfo);
+    }
+
+    /**
+     * 如果任务运行有异常,则发送警告短信给配置的手机号码
+     */
+    private void sendErrLogPhoneMsg(Map taskInfo, long taskLogID) {
+        Map msginfo = new HashMap();
+        String phone = (String) taskInfo.get("errphone");
+        if (phone != null && !"".equals(phone)) {
+            String[] phonelist = phone.split(",");
+            for (int i = 0; i < phonelist.length; i++) {
+                msginfo.put("taskid", taskInfo.get("taskid"));
+                msginfo.put("phone", phonelist[i]);
+                msginfo.put("msg", "通用FTP数据文件传接任务:" + (String) taskInfo.get("taskname") + "运行提示");
+
+                DateFormat df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
+                String detail = "任务已于" + df.format(new Date()) + "运行完毕。运行过程中出现异常,详情请登录系统查看!";
+                msginfo.put("detail", detail);
+                /*prvncDumpSMO.saveTaskErrInfoPhoneMsg(msginfo);*/
+            }
+        }
+    }
+
+    /**
+     * 处理文件名,校验文件名是中是否存在****(4个),表示通配符,如果不存在就是确定唯一文件名
+     * 文件名支持日期型的如CRM_########001.txt 程序处理后是 CRM_20170105001.txt 文件名支持sql 语句生成的
+     * 文件名支持通配符的如863_****.txt 程序下载所有以863_开头的文件 863_****001.txt
+     * 以863_开头,以001结尾,****001.txt 以001结尾的
+     *
+     * @param fileName
+     * @return
+     */
+    protected List<String> dealFileName(String fileName) {
+        // TODO Auto-generated method stub
+        List<String> results = new ArrayList<String>();
+        String result = "";
+        // 文件中使用的日期
+        if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_F)) {
+            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_F, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHHmm"));
+        } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_E)) {
+            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_E, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHH"));
+        } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_A)) {
+            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_A, DateUtil.getFormatTimeString(new Date(), "yyyyMMdd"));
+        } else if (StringUtils.contains(fileName == null ? "" : fileName.toLowerCase(), RuleDomain.REPLAY_TYPE_SQL)) {
+            // 后期改造,文件名如果配置的是sql的话,以sql查询文件名
+            List<String> fileNames = this.getPrvncFtpFileDAO().execConfigSql(fileName);
+            // if (fileNames != null && fileNames.size() > 0) {
+            // result = fileNames.get(0);
+            // }
+            return fileNames;
+        } else {
+            result = fileName;
+        }
+        results.add(result);
+        return results;
+    }
+
+    /**
+     * 空方法,如果在事前过程处理前,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
+     *
+     * @param ftpItemConfigInfo
+     */
+    protected void prepare(Map ftpItemConfigInfo) {
+
+    }
+
+    /**
+     * 空方法,如果在事后过程处理完后,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
+     *
+     * @param ftpItemConfigInfo
+     */
+    protected void post(Map ftpItemConfigInfo) {
+
+    }
+
+    public IHcFtpFileDAO getPrvncFtpFileDAO() {
+        return iHcFtpFileDAO;
+    }
+
+    public void setPrvncFtpFileDAO(IHcFtpFileDAO prvncFtpFileDAO) {
+        this.iHcFtpFileDAO = prvncFtpFileDAO;
+    }
+
+    public IHcFtpFileSMO getPrvncFtpFileSMO() {
+        return iHcFtpFileSMO;
+    }
+
+    public void setPrvncFtpFileSMO(IHcFtpFileSMO prvncFtpFileSMO) {
+        this.iHcFtpFileSMO = prvncFtpFileSMO;
+    }
 
 	/*public IPrvncDumpSMO getPrvncDumpSMO() {
 		return prvncDumpSMO;
@@ -375,4 +375,4 @@ public abstract class HcFtpToFileSystemQuartz{
 		this.prvncDumpSMO = prvncDumpSMO;
 	}
 */
-}
+}

+ 126 - 3
service-job/src/main/java/com/java110/job/smo/impl/TaskInnerServiceSMOImpl.java

@@ -8,15 +8,31 @@ import com.java110.dto.PageDto;
 import com.java110.dto.task.TaskDto;
 import com.java110.dto.task.TaskTemplateDto;
 import com.java110.dto.task.TaskTemplateSpecDto;
+import com.java110.dto.taskAttr.TaskAttrDto;
 import com.java110.dto.user.UserDto;
+import com.java110.job.dao.ITaskAttrServiceDao;
 import com.java110.job.dao.ITaskServiceDao;
+import com.java110.job.task.TaskSystemJob;
+import com.java110.utils.util.Assert;
 import com.java110.utils.util.BeanConvertUtil;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.CronTrigger;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @ClassName FloorInnerServiceSMOImpl
@@ -29,9 +45,23 @@ import java.util.List;
 @RestController
 public class TaskInnerServiceSMOImpl extends BaseServiceSMO implements ITaskInnerServiceSMO {
 
+    private static final Logger logger = LoggerFactory.getLogger(TaskInnerServiceSMOImpl.class);
+
+    private static final String defaultCronExpression = "0 * * * * ?";// 每分钟执行一次
+
+    private static final String prefixJobName = "task_"; // job
+    private static final String triggerNames = "taskToData_"; // job
+
+    @Autowired
+    private Scheduler scheduler;
+
+
     @Autowired
     private ITaskServiceDao taskServiceDaoImpl;
 
+    @Autowired
+    private ITaskAttrServiceDao taskAttrServiceDaoImpl;
+
     @Autowired
     private IUserInnerServiceSMO userInnerServiceSMOImpl;
 
@@ -103,7 +133,6 @@ public class TaskInnerServiceSMOImpl extends BaseServiceSMO implements ITaskInne
     }
 
 
-
     @Override
     public List<TaskTemplateDto> queryTaskTemplate(@RequestBody TaskTemplateDto taskTemplateDto) {
 
@@ -121,14 +150,12 @@ public class TaskInnerServiceSMOImpl extends BaseServiceSMO implements ITaskInne
     }
 
 
-
     @Override
     public int queryTaskTemplateSpecCount(@RequestBody TaskTemplateSpecDto taskTemplateSpecDto) {
         return taskServiceDaoImpl.queryTaskTemplateSpecCount(BeanConvertUtil.beanCovertMap(taskTemplateSpecDto));
     }
 
 
-
     @Override
     public List<TaskTemplateSpecDto> queryTaskTemplateSpec(@RequestBody TaskTemplateSpecDto taskTemplateSpecDto) {
 
@@ -145,8 +172,104 @@ public class TaskInnerServiceSMOImpl extends BaseServiceSMO implements ITaskInne
         return taskTemplates;
     }
 
+    /**
+     * 启动任务
+     *
+     * @param taskDto
+     * @return
+     */
+    public int startTask(@RequestBody TaskDto taskDto) {
+//        List<TaskAttrDto> attrDtos = BeanConvertUtil.covertBeanList(taskAttrServiceDaoImpl.getTaskAttrInfo(BeanConvertUtil.beanCovertMap(taskDto)),
+//                TaskAttrDto.class);
+        Map info = new HashMap();
+        info.put("templateId", taskDto.getTemplateId());
+        List<TaskTemplateDto> taskTemplateDtos = BeanConvertUtil.covertBeanList(taskServiceDaoImpl.getTaskTemplateInfo(info), TaskTemplateDto.class);
+
+        Assert.listOnlyOne(taskTemplateDtos, "模板不存在或存在多个");
+
+        taskDto.setTaskTemplateDto(taskTemplateDtos.get(0));
 
+        try {
+            String cronExpression = taskDto.getTaskCron();// 如果没有配置则,每一分运行一次
 
+            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
+
+            String jobName = prefixJobName + taskDto.getTaskId();
+
+            String triggerName = triggerNames + taskDto.getTaskId();
+
+            //设置任务名称
+            JobKey jobKey = new JobKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
+            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
+
+            if (jobDetail != null) {
+                return 0;
+            }
+
+            String taskCfgName = taskDto.getTaskName();
+            JobDetail warnJob = JobBuilder.newJob(TaskSystemJob.class).withIdentity(jobName, TaskSystemJob.JOB_GROUP_NAME).withDescription("任务启动").build();
+
+            warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_CONFIG_NAME, taskCfgName);
+
+            warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_TASK_ID, taskDto.getTaskId());
+            warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_TASK, taskDto);
+            warnJob.getJobDataMap().put(TaskSystemJob.JOB_DATA_TASK_ATTR, taskDto);
+
+            // 触发时间点
+            CronTrigger warnTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerName + "_group").withSchedule(cronScheduleBuilder).build();
+
+            // 错过执行后,立即执行
+            //warnTrigger(CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW);
+            //交由Scheduler安排触发
+            scheduler.scheduleJob(warnJob, warnTrigger);
+            Map paramIn = new HashMap();
+            paramIn.put("taskId", taskDto.getTaskId());
+            paramIn.put("state", "002");
+            paramIn.put("statusCd", "0");
+            taskServiceDaoImpl.updateTaskInfoInstance(paramIn);
+
+        } catch (Exception e) {
+            logger.error("启动侦听失败", e);
+            return 0;
+        }
+        return 1;
+    }
+
+    /**
+     * 停止任务
+     *
+     * @param taskDto
+     * @return
+     */
+    public int stopTask(@RequestBody TaskDto taskDto) {
+
+        try {
+            String jobName = prefixJobName + taskDto.getTaskId();
+
+            String triggerName = prefixJobName + taskDto.getTaskId();
+
+            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
+            // 停止触发器
+            scheduler.pauseTrigger(triggerKey);
+            // 移除触发器
+            scheduler.unscheduleJob(triggerKey);
+
+            JobKey jobKey = new JobKey(jobName, TaskSystemJob.JOB_GROUP_NAME);
+            // 删除任务
+            scheduler.deleteJob(jobKey);
+
+            Map paramIn = new HashMap();
+            paramIn.put("taskId", taskDto.getTaskId());
+            paramIn.put("state", "001");
+            paramIn.put("statusCd", "0");
+            taskServiceDaoImpl.updateTaskInfoInstance(paramIn);
+
+        } catch (Exception e) {
+            logger.error("启动侦听失败", e);
+            return 0;
+        }
+        return 1;
+    }
 
 
     public ITaskServiceDao getTaskServiceDaoImpl() {

+ 0 - 97
service-job/src/main/java/com/java110/job/task/HcFtpToFileSystemJob.java

@@ -1,97 +0,0 @@
-package com.java110.job.task;
-
-import com.java110.utils.factory.ApplicationContextFactory;
-import com.java110.utils.util.SpringBeanInvoker;
-import com.java110.job.dao.IHcFtpFileDAO;
-import com.java110.job.smo.HcFtpToFileSystemQuartz;
-import org.quartz.Job;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- *
- * @author
- *
- */
-
-public class HcFtpToFileSystemJob implements Job {
-
-	private static final Logger logger = LoggerFactory.getLogger(HcFtpToFileSystemJob.class);
-
-	public static String JOB_DATA_CONFIG_NAME = "ftpToFileSystemTaskName";
-	public static String JOB_DATA_TASK_ID = "ftpToFileSystemTaskID";
-	public static String JOB_GROUP_NAME = "ftpToFileSystemJobGroup"; // 任务的 分组名称
-	@Autowired
-	private IHcFtpFileDAO iHcFtpFileDAO;
-
-	private HcFtpToFileSystemQuartz hcFtpToFileSystemQuartz;
-	protected void executeInternal(JobExecutionContext context) {
-		try {
-
-			if(logger.isDebugEnabled()){
-				logger.debug("FTP通用数据文件传接任务:" +
-						context.getJobDetail().getJobClass().getName()+ " taskID:" +
-						context.getJobDetail().getJobDataMap().get(JOB_DATA_TASK_ID) +
-						" ftpfileTaskName:" +
-						context.getJobDetail().getJobDataMap().get(JOB_DATA_CONFIG_NAME), context);
-			}
-
-			long taskId = Long.parseLong(context.getJobDetail().getJobDataMap()
-					.getString(JOB_DATA_TASK_ID));
-			// 根据taskId 查询配置信息
-			Map ftpItemConfigInfo = this.getFtpConfigInfo(taskId);
-
-			//如果查询不到数据,或者是没有处理class,不在运行
-			if(ftpItemConfigInfo == null || !ftpItemConfigInfo.containsKey("DEAL_CLASS") || ftpItemConfigInfo.get("DEAL_CLASS") == null){
-				logger.error("---【PrvncFtpToFileSystemQuartz.executeInternal】查询到的ftp配置数据为空,或没有处理类", ftpItemConfigInfo);
-				return;
-			}
-
-			String dealClass = ftpItemConfigInfo.get("DEAL_CLASS").toString();
-
-			hcFtpToFileSystemQuartz = (HcFtpToFileSystemQuartz) ApplicationContextFactory.getBean(dealClass);
-			hcFtpToFileSystemQuartz.startFtpTask(ftpItemConfigInfo);
-
-		} catch (Throwable ex) {
-			logger.error("执行任务失败:", ex);
-		}
-	}
-
-	/**
-	 * 查询配置相关信息
-	 *
-	 * @param taskId
-	 * @return
-	 */
-	private Map getFtpConfigInfo(long taskId) {
-		Map info = new HashMap();
-		info.put("taskId", taskId);
-		Map ftpItem = getiHcFtpFileDAO().queryFtpItemByTaskId(info);
-		if (logger.isDebugEnabled()) {
-			logger.debug(
-					"---【PrvncFtpToFileSystemQuartz.getFtpConfigInfo】查询到的配置数据为:"
-							+ ftpItem, ftpItem);
-		}
-		return ftpItem;
-	}
-
-	public IHcFtpFileDAO getiHcFtpFileDAO() {
-		if (this.iHcFtpFileDAO == null) {
-			this.iHcFtpFileDAO = ((IHcFtpFileDAO) SpringBeanInvoker
-					.getBean("provInner.iHcFtpFileDAO"));
-		}
-		return iHcFtpFileDAO;
-	}
-
-	@Override
-	public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
-		this.executeInternal(jobExecutionContext);
-	}
-}

+ 47 - 0
service-job/src/main/java/com/java110/job/task/TaskSystemJob.java

@@ -0,0 +1,47 @@
+package com.java110.job.task;
+
+import com.java110.dto.task.TaskDto;
+import com.java110.utils.factory.ApplicationContextFactory;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author
+ */
+
+public class TaskSystemJob implements Job {
+
+    private static final Logger logger = LoggerFactory.getLogger(TaskSystemJob.class);
+
+    public static String JOB_DATA_CONFIG_NAME = "taskSystemTaskName";
+    public static String JOB_DATA_TASK_ID = "taskSystemTaskId";
+    public static String JOB_DATA_TASK_ATTR = "taskSystemTaskAttr";
+    public static String JOB_DATA_TASK = "taskSystemTask";
+    public static String JOB_GROUP_NAME = "taskSystemJobGroup"; // 任务的 分组名称
+
+
+    private TaskSystemQuartz hcFtpToFileSystemQuartz;
+
+    protected void executeInternal(JobExecutionContext context) {
+        try {
+            TaskDto taskDto = (TaskDto) (context.getJobDetail().getJobDataMap()
+                    .get(JOB_DATA_TASK));
+
+
+            hcFtpToFileSystemQuartz = (TaskSystemQuartz) ApplicationContextFactory.getBean(taskDto.getTaskTemplateDto().getClassBean());
+            hcFtpToFileSystemQuartz.startTask(taskDto);
+
+        } catch (Throwable ex) {
+            logger.error("执行任务失败:", ex);
+        }
+    }
+
+    @Override
+    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+        this.executeInternal(jobExecutionContext);
+    }
+}

+ 80 - 0
service-job/src/main/java/com/java110/job/task/TaskSystemQuartz.java

@@ -0,0 +1,80 @@
+package com.java110.job.task;
+
+import com.java110.dto.task.TaskDto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author
+ */
+public abstract class TaskSystemQuartz {
+
+    protected static final Logger logger = LoggerFactory.getLogger(TaskSystemQuartz.class);
+
+
+    public void initTask() {
+
+    }
+
+    /**
+     * 启动任务
+     *
+     * @param taskDto
+     */
+    public void startTask(TaskDto taskDto) throws Exception {
+
+        // 这么做是为了,单线程调用,防止多线程导致数据重复处理
+        if (!"002".equals(taskDto.getState())) {
+            return;
+        }
+
+        String taskId = taskDto.getTaskId();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("---【TaskSystemQuartz.startFtpTask】:任务【" + taskId + "】开始运行!", taskId);
+        }
+
+        try {
+            // 1.0空方法,让子类去实现
+            prepare(taskDto);
+
+            // 3.0核心业务处理逻辑,需要子类去实现
+            process(taskDto);
+
+            // 5.0空方法,让子类去实现
+            after(taskDto);
+        } catch (Exception ex) {
+
+            // 接续向外抛出去
+            logger.error("处理出现问题:", ex);
+            return;
+        }
+
+    }
+
+
+    /**
+     * 主要业务处理(上传下载),让子类去实现
+     *
+     * @param taskDto
+     */
+    protected abstract void process(TaskDto taskDto) throws Exception;
+
+    /**
+     * 空方法,如果在事前过程处理前,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
+     *
+     * @param taskDto
+     */
+    protected void prepare(TaskDto taskDto) {
+
+    }
+
+    /**
+     * 空方法,如果在事后过程处理完后,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
+     *
+     * @param taskDto
+     */
+    protected void after(TaskDto taskDto) {
+
+    }
+}