DatabusDataExecutor.java 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package com.java110.job.databus;
  2. import com.java110.core.log.LoggerFactory;
  3. import com.java110.dto.data.DatabusQueueDataDto;
  4. import com.java110.job.adapt.IDatabusAdapt;
  5. import com.java110.job.importData.ImportDataQueue;
  6. import com.java110.utils.factory.ApplicationContextFactory;
  7. import com.java110.utils.util.Assert;
  8. import org.slf4j.Logger;
  9. import java.util.concurrent.*;
  10. /**
  11. * 导入资产数据执行器
  12. */
  13. public class DatabusDataExecutor implements Runnable {
  14. private static final Logger log = LoggerFactory.getLogger(ImportDataQueue.class);
  15. private static final int MAX_ROW = 200;
  16. private static final int DEFAULT_TIMEOUT_TIME = 5000; // 5秒超时
  17. //默认线程大小
  18. private static final int DEFAULT_EXPORT_POOL = 4;
  19. private boolean isRun = false;
  20. private ExecutorService executorService;
  21. public DatabusDataExecutor(boolean isRun) {
  22. this.isRun = isRun;
  23. }
  24. public DatabusDataExecutor() {
  25. }
  26. @Override
  27. public void run() {
  28. while (isRun) {
  29. log.debug("databus数据线程开始处理");
  30. try {
  31. doQueueData();
  32. } catch (Throwable e) {
  33. log.error("处理databus异常", e);
  34. e.printStackTrace();
  35. }
  36. log.debug("databus数据线程处理完成");
  37. }
  38. }
  39. private void doQueueData() throws Exception {
  40. DatabusQueueDataDto databusQueueDataDto = DatabusDataQueue.getData();
  41. if (databusQueueDataDto == null) {
  42. return;
  43. }
  44. String action = databusQueueDataDto.getBeanName();
  45. IDatabusAdapt databusAdaptImpl = ApplicationContextFactory.getBean(action, IDatabusAdapt.class);
  46. if (databusAdaptImpl == null) {
  47. return;
  48. }
  49. executorService = Executors.newSingleThreadExecutor();
  50. FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  51. @Override
  52. public String call() throws Exception {
  53. databusAdaptImpl.execute(databusQueueDataDto.getBusiness(), databusQueueDataDto.getBusinesses());
  54. return "";
  55. }
  56. });
  57. executorService.execute(futureTask);
  58. try {
  59. futureTask.get(DEFAULT_TIMEOUT_TIME, TimeUnit.MILLISECONDS);
  60. } catch (InterruptedException | ExecutionException |
  61. TimeoutException e) {//e.printStackTrace();
  62. futureTask.cancel(true);
  63. }
  64. executorService.shutdown();
  65. }
  66. /**
  67. * 线程启动器
  68. */
  69. public static void startQueueDataExecutor() {
  70. log.debug("开始初始化消息队列");
  71. ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EXPORT_POOL);
  72. executorService.execute(new DatabusDataExecutor(true));
  73. log.debug("初始化导入消息完成");
  74. }
  75. }