Просмотр исходного кода

针对quzrtz2.3.0版本做了大的改动 jobdatail等实现方式由原来的new改为了jar包提供的工具类产生

shiyj лет назад: 6
Родитель
Сommit
4c4d8653ab

+ 1 - 1
JobService/pom.xml

@@ -40,7 +40,7 @@
         <dependency>
             <groupId>org.quartz-scheduler</groupId>
             <artifactId>quartz</artifactId>
-            <version>1.8.6</version>
+            <version>2.3.0</version>
         </dependency>
     </dependencies>
 

+ 26 - 19
JobService/src/main/java/com/java110/job/Api/PrvncFtpToFileSystemConfigAction.java

@@ -8,9 +8,8 @@ import com.java110.job.dao.IPrvncFtpFileDAO;
 import com.java110.job.task.PrvncFtpToFileSystemJob;
 import org.apache.commons.validator.GenericValidator;
 import org.apache.commons.validator.util.ValidatorUtils;
-import org.quartz.CronTrigger;
-import org.quartz.JobDetail;
-import org.quartz.Scheduler;
+
+import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,6 +52,9 @@ public class PrvncFtpToFileSystemConfigAction{
 
 	public JSONObject resultMsg;
 
+	@Autowired
+	private Scheduler scheduler;
+
 	/**
 	 * 
 	 */
@@ -485,8 +487,6 @@ public class PrvncFtpToFileSystemConfigAction{
 
 		List<Map> doFtpItems = getPrvncFtpFileDAO().queryFtpItemsByTaskIds(info);
 
-		// 获取Spring调度器
-		Scheduler scheduler = (Scheduler) SpringBeanInvoker.getBean("schedulerFactoryBean");
 		int linstenCount = 0;
 		int updateTaskStateFailCount = 0;
 		try {
@@ -501,28 +501,31 @@ public class PrvncFtpToFileSystemConfigAction{
 
 				// 获取定时时间
 				String cronExpression = doFtpItem.get("TASKCRON") == null ? defaultCronExpression : doFtpItem.get("TASKCRON").toString();// 如果没有配置则,每一分运行一次
+				// 设置触发时间点
+				CronScheduleBuilder cronScheduleBuilder =CronScheduleBuilder.cronSchedule(cronExpression);
 
 				String jobName = prefixJobName + taskId;
 
 				String triggerName = prefixJobName + taskId;
-
-				JobDetail jobDetail = scheduler.getJobDetail(jobName, PrvncFtpToFileSystemJob.JOB_GROUP_NAME);
+				//设置任务名称
+				JobKey jobKey = new JobKey(jobName);
+				JobDetail jobDetail = scheduler.getJobDetail(jobKey);
 				// 说明这个没有启动,则需要重新启动,如果启动着不做处理
 				if (jobDetail == null) {
 					// 任务名称
 					String taskCfgName = (String) doFtpItem.get("TASKNAME");
-
-					JobDetail warnJob = new JobDetail(jobName, PrvncFtpToFileSystemJob.JOB_GROUP_NAME, PrvncFtpToFileSystemJob.class);
-
+					//构建job信息
+					JobDetail warnJob = JobBuilder.newJob(PrvncFtpToFileSystemJob.class).withIdentity(jobName,PrvncFtpToFileSystemJob.JOB_GROUP_NAME).withDescription("任务启动").build();
+					// job.getJobDataMap().put("params", param.toString());
 					warnJob.getJobDataMap().put(PrvncFtpToFileSystemJob.JOB_DATA_CONFIG_NAME, taskCfgName);
 
 					warnJob.getJobDataMap().put(PrvncFtpToFileSystemJob.JOB_DATA_TASK_ID, taskId);
-
-					CronTrigger warnTrigger = new CronTrigger(triggerName, triggerName, cronExpression);
+					// 触发时间点
+					CronTrigger warnTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName, triggerName).withSchedule(cronScheduleBuilder).build();
 
 					// 错过执行后,立即执行
-					warnTrigger.setMisfireInstruction(CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW);
-
+					//warnTrigger(CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW);
+					//交由Scheduler安排触发
 					scheduler.scheduleJob(warnJob, warnTrigger);
 
 					// 修改数据状态,将任务数据状态改为运行状态
@@ -616,9 +619,6 @@ public class PrvncFtpToFileSystemConfigAction{
 
 		List<Map> doFtpItems = getPrvncFtpFileDAO().queryFtpItemsByTaskIds(info);
 
-		// 获取Spring调度器
-		Scheduler scheduler = (Scheduler) SpringBeanInvoker.getBean("schedulerFactoryBean");
-
 		int linstenCount = 0;
 		int updateTaskStateFailCount = 0;
 		try {
@@ -633,8 +633,15 @@ public class PrvncFtpToFileSystemConfigAction{
 				String jobName = prefixJobName + taskId;
 
 				String triggerName = prefixJobName + taskId;
-				scheduler.deleteJob(jobName, PrvncFtpToFileSystemJob.JOB_GROUP_NAME);
-
+				TriggerKey triggerKey = TriggerKey.triggerKey(jobName,PrvncFtpToFileSystemJob.JOB_GROUP_NAME);
+				// 停止触发器
+				scheduler.pauseTrigger(triggerKey);
+				// 移除触发器
+				scheduler.unscheduleJob(triggerKey);
+
+				JobKey jobKey = new JobKey(jobName,PrvncFtpToFileSystemJob.JOB_GROUP_NAME);
+				// 删除任务
+				scheduler.deleteJob(jobKey);
 				// 修改数据状态,将任务数据状态改为运行状态
 
 				Map updateTaskInfo = new HashMap();

+ 51 - 0
JobService/src/main/java/com/java110/job/myquartz/MonitorTriggerListener.java

@@ -0,0 +1,51 @@
+package com.java110.job.myquartz;
+
+import groovy.util.logging.Log4j;
+import org.quartz.JobExecutionContext;
+import org.quartz.Trigger;
+import org.quartz.TriggerListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 个人练习
+ * 2019/07/26
+ * 师延俊
+ */
+@Log4j
+public class MonitorTriggerListener implements TriggerListener {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorTriggerListener.class);
+
+    @Override
+    public String getName() {
+        // TODO Auto-generated method stub
+        return "MonitorTriggerListener";
+    }
+
+    @Override
+    public void triggerFired(Trigger trigger, JobExecutionContext context) {
+        logger.info("Trigger 被触发了,此时job上的execute()方法将要被执行");
+
+    }
+
+    @Override
+    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
+        // TODO Auto-generated method stub
+        logger.info("trigger被触发后,job将要被执行时Scheduler调用该方法,如返回true则job此次将不被执行");
+        return false;
+    }
+
+    @Override
+    public void triggerMisfired(Trigger trigger) {
+        logger.info("当前Trigger触发错过了");
+
+    }
+
+    @Override
+    public void triggerComplete(Trigger trigger, JobExecutionContext context,
+                                Trigger.CompletedExecutionInstruction triggerInstructionCode) {
+        logger.info("Trigger被触发并且完成了job的执行,此方法被调用");
+
+    }
+
+}

+ 25 - 0
JobService/src/main/java/com/java110/job/myquartz/QuartzConfigurer.java

@@ -0,0 +1,25 @@
+package com.java110.job.myquartz;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+
+import javax.sql.DataSource;
+
+@Configuration
+public class QuartzConfigurer implements SchedulerFactoryBeanCustomizer {
+
+    @Autowired
+    private DataSource dataSource;
+
+
+    @Override
+    public void customize(SchedulerFactoryBean schedulerFactoryBean) {
+        schedulerFactoryBean.setDataSource(dataSource);
+        schedulerFactoryBean.setStartupDelay(2);
+        schedulerFactoryBean.setAutoStartup(true);
+        schedulerFactoryBean.setOverwriteExistingJobs(true);
+
+    }
+}

+ 102 - 0
JobService/src/main/java/com/java110/job/myquartz/QuartzEntity.java

@@ -0,0 +1,102 @@
+package com.java110.job.myquartz;
+
+/*
+* 暂时不需要使用该定义类 直接在代码里面写死了参数
+* */
+public class QuartzEntity {
+    private String jobName;//任务名称
+    private String jobGroup;//任务分组
+    private String description;//任务描述
+    private String jobClassName;//执行类
+    private String cronExpression;//执行时间
+    private String triggerName;//执行时间
+    private String triggerState;//任务状态
+
+    private String oldJobName;//任务名称 用于修改
+    private String oldJobGroup;//任务分组 用于修改
+
+    public QuartzEntity() {
+        super();
+    }
+    public QuartzEntity(String jobName, String jobGroup, String description, String jobClassName, String cronExpression, String triggerName) {
+        super();
+        this.jobName = jobName;
+        this.jobGroup = jobGroup;
+        this.description = description;
+        this.jobClassName = jobClassName;
+        this.cronExpression = cronExpression;
+        this.triggerName = triggerName;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getJobGroup() {
+        return jobGroup;
+    }
+
+    public void setJobGroup(String jobGroup) {
+        this.jobGroup = jobGroup;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getJobClassName() {
+        return jobClassName;
+    }
+
+    public void setJobClassName(String jobClassName) {
+        this.jobClassName = jobClassName;
+    }
+
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    public void setCronExpression(String cronExpression) {
+        this.cronExpression = cronExpression;
+    }
+
+    public String getTriggerName() {
+        return triggerName;
+    }
+
+    public void setTriggerName(String triggerName) {
+        this.triggerName = triggerName;
+    }
+
+    public String getTriggerState() {
+        return triggerState;
+    }
+
+    public void setTriggerState(String triggerState) {
+        this.triggerState = triggerState;
+    }
+
+    public String getOldJobName() {
+        return oldJobName;
+    }
+
+    public void setOldJobName(String oldJobName) {
+        this.oldJobName = oldJobName;
+    }
+
+    public String getOldJobGroup() {
+        return oldJobGroup;
+    }
+
+    public void setOldJobGroup(String oldJobGroup) {
+        this.oldJobGroup = oldJobGroup;
+    }
+}

+ 52 - 0
JobService/src/main/java/com/java110/job/myquartz/QuartzUtil.java

@@ -0,0 +1,52 @@
+package com.java110.job.myquartz;
+
+import com.alibaba.fastjson.JSONObject;
+import org.quartz.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/***
+ * 首先定义好工具类以备后面业务需要
+ * 2019/07/26 师延俊
+ */
+@Component
+public class QuartzUtil {
+    private final static Logger logger = LoggerFactory.getLogger(QuartzUtil.class);
+    @Autowired
+    private Scheduler scheduler;
+
+    public String seveJob(QuartzEntity quartzEntity, JSONObject param){
+        logger.info("开始新增任务");
+        String massage="新增任务成功!";
+        //如果保存的任务已经存在 侧先清理任务
+        try {
+            if(quartzEntity.getOldJobGroup()!=null){
+                JobKey jobKey = new JobKey(quartzEntity.getOldJobName(),quartzEntity.getOldJobName());
+
+                    scheduler.deleteJob(jobKey);
+            }
+            //获取到job执行内容的class类
+            Class clas = Class.forName(quartzEntity.getJobClassName());
+            //构建job信息
+           JobDetail jobDetail= JobBuilder.newJob(clas).withIdentity(quartzEntity.getJobName(),quartzEntity.getJobGroup()).withDescription("加入任务"+quartzEntity.getJobName()).build();
+            jobDetail.getJobDataMap().put("param",param);
+            //定义触发时间点
+            CronScheduleBuilder cronScheduleBuilder= CronScheduleBuilder.cronSchedule(quartzEntity.getCronExpression());
+            Trigger trigger=TriggerBuilder.newTrigger().withIdentity("trigger"+quartzEntity.getJobName(),quartzEntity.getJobGroup())
+                    .startNow().withSchedule(cronScheduleBuilder).build();
+
+            scheduler.scheduleJob(jobDetail,trigger);
+        } catch (SchedulerException e) {
+            e.printStackTrace();
+            massage="新增任务失败";
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+            massage="新增任务失败";
+        }
+        return massage;
+    }
+
+
+}

+ 1 - 1
JobService/src/main/java/com/java110/job/task/PrvncFtpToFileSystemJob.java

@@ -38,7 +38,7 @@ public class PrvncFtpToFileSystemJob implements Job {
 
 			if(logger.isDebugEnabled()){
 				logger.debug("FTP通用数据文件传接任务:" +
-						context.getJobDetail().getFullName() + " taskID:" +
+						context.getJobDetail().getJobClass().getName()+ " taskID:" +
 						context.getJobDetail().getJobDataMap().get(JOB_DATA_TASK_ID) +
 						" ftpfileTaskName:" +
 						context.getJobDetail().getJobDataMap().get(JOB_DATA_CONFIG_NAME), context);

+ 22 - 1
JobService/src/main/resources/application-dev.yml

@@ -91,4 +91,25 @@ kafka:
       size: 4096
     linger: 1
     buffer:
-      memory: 40960
+      memory: 40960
+
+  quartz:
+    properties:
+      org:
+        quartz:
+          scheduler:
+            instanceName: clusteredScheduler
+            instanceId: AUTO
+          jobStore:
+            class: org.quartz.impl.jdbcjobstore.JobStoreTX
+            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
+            tablePrefix: HC_
+            isClustered: false
+            clusterCheckinInterval: 10000
+            useProperties: false
+          threadPool:
+            class: org.quartz.simpl.SimpleThreadPool
+            threadCount: 10
+            threadPriority: 5
+            threadsInheritContextClassLoaderOfInitializingThread: true
+    job-store-type: jdbc

+ 10 - 9
JobService/src/main/resources/quartz.properties

@@ -1,4 +1,5 @@
 #quartz集群配置
+#该配置文件已经弃用 只是为了检查配置含义暂时保留
 # ===========================================================================
 # Configure Main Scheduler Properties 调度器属性
 # ===========================================================================
@@ -27,25 +28,25 @@ org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDe
 #JobDataMaps是否都为String类型
 org.quartz.jobStore.useProperties = false
 #数据库别名 随便取
-org.quartz.jobStore.dataSource = myDS
-#表的前缀,默认QRTZ_
+#org.quartz.jobStore.dataSource = myDS
+#表的前缀,默认QRTZ
 org.quartz.jobStore.tablePrefix = HC
 #是否加入集群
 org.quartz.jobStore.isClustered = true
 #调度实例失效的检查时间间隔
 org.quartz.jobStore.clusterCheckinInterval = 20000
 #============================================================================
-# Configure Datasources
+# Configure Datasources 使用HC框架自带的datasources
 #============================================================================
 #数据库引擎
-org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
+#org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
 #数据库连接
-org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/testdb?characterEncoding=utf8&allowMultiQueries=true&useSSL=false&autoReconnect=true
+#org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/testdb?characterEncoding=utf8&allowMultiQueries=true&useSSL=false&autoReconnect=true
 #数据库用户
-org.quartz.dataSource.myDS.user = root
+#org.quartz.dataSource.myDS.user = root
 #数据库密码
-org.quartz.dataSource.myDS.password = root
+#org.quartz.dataSource.myDS.password = root
 #允许最大连接
-org.quartz.dataSource.myDS.maxConnections = 5
+#org.quartz.dataSource.myDS.maxConnections = 5
 #验证查询sql,可以不设置
-org.quartz.dataSource.myDS.validationQuery=select 0 from dual
+#org.quartz.dataSource.myDS.validationQuery=select 0 from dual