ServiceDataFlowEventPublishing.java 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package com.java110.event.service.api;
  2. import com.java110.common.constant.CommonConstant;
  3. import com.java110.common.constant.ResponseConstant;
  4. import com.java110.common.constant.ServiceCodeConstant;
  5. import com.java110.common.exception.BusinessException;
  6. import com.java110.common.exception.ListenerExecuteException;
  7. import com.java110.common.factory.ApplicationContextFactory;
  8. import com.java110.common.log.LoggerEngine;
  9. import com.java110.common.util.Assert;
  10. import com.java110.core.context.DataFlowContext;
  11. import com.java110.entity.center.AppService;
  12. import com.java110.event.center.DataFlowListenerOrderComparator;
  13. import com.java110.event.service.BusinessServiceDataFlowEvent;
  14. import com.java110.event.service.BusinessServiceDataFlowListener;
  15. import org.springframework.http.HttpMethod;
  16. import java.util.ArrayList;
  17. import java.util.HashMap;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.concurrent.Executor;
  21. import java.util.concurrent.Executors;
  22. /**
  23. * 数据流 事件发布
  24. * Created by wuxw on 2018/4/17.
  25. */
  26. public class ServiceDataFlowEventPublishing extends LoggerEngine {
  27. private static Executor taskExecutor;
  28. //默认 线程数 100
  29. private final static int DEFAULT_THREAD_NUM = 100;
  30. /**
  31. * 保存侦听实例信息,一般启动时加载
  32. */
  33. private final static List<String> listeners = new ArrayList<String>();
  34. /**
  35. * 根据 事件类型查询侦听
  36. */
  37. private final static Map<String, List<ServiceDataFlowListener>> cacheListenersMap = new HashMap<String, List<ServiceDataFlowListener>>();
  38. /**
  39. * 添加 侦听,这个只有启动时,单线程 处理,所以是线程安全的
  40. *
  41. * @param listener
  42. */
  43. public static void addListener(String listener) {
  44. listeners.add(listener);
  45. }
  46. /**
  47. * 获取侦听(全部侦听)
  48. *
  49. * @return
  50. */
  51. public static List<String> getListeners() {
  52. return listeners;
  53. }
  54. /**
  55. * 根据是否实现了某个接口,返回侦听
  56. *
  57. * @param serviceCode
  58. * @return
  59. * @since 1.8
  60. */
  61. public static List<ServiceDataFlowListener> getListeners(String serviceCode, String httpMethod) {
  62. Assert.hasLength(serviceCode, "获取需要发布的事件处理侦听时,传递事件为空,请检查");
  63. String needCachedServiceCode = serviceCode + httpMethod;
  64. //先从缓存中获取,为了提升效率
  65. if (cacheListenersMap.containsKey(needCachedServiceCode)) {
  66. return cacheListenersMap.get(needCachedServiceCode);
  67. }
  68. List<ServiceDataFlowListener> dataFlowListeners = new ArrayList<ServiceDataFlowListener>();
  69. for (String listenerBeanName : getListeners()) {
  70. ServiceDataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName, ServiceDataFlowListener.class);
  71. if (serviceCode.equals(listener.getServiceCode())
  72. && listener.getHttpMethod() == HttpMethod.valueOf(httpMethod)) {
  73. dataFlowListeners.add(listener);
  74. }
  75. //特殊处理 透传类接口
  76. if (ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(listener.getServiceCode())
  77. && ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(serviceCode)) {
  78. dataFlowListeners.add(listener);
  79. }
  80. }
  81. //这里排序
  82. DataFlowListenerOrderComparator.sort(dataFlowListeners);
  83. //将数据放入缓存中
  84. cacheListenersMap.put(needCachedServiceCode, dataFlowListeners);
  85. return dataFlowListeners;
  86. }
  87. /**
  88. * 发布事件
  89. *
  90. * @param dataFlowContext
  91. */
  92. public static void multicastEvent(DataFlowContext dataFlowContext, AppService appService) throws BusinessException {
  93. Assert.notNull(dataFlowContext.getServiceCode(), "当前没有可处理的业务信息!");
  94. multicastEvent(dataFlowContext.getServiceCode(), dataFlowContext, appService, null);
  95. }
  96. /**
  97. * 发布事件
  98. *
  99. * @param serviceCode
  100. * @param dataFlowContext
  101. */
  102. public static void multicastEvent(String serviceCode, DataFlowContext dataFlowContext, AppService appService) throws BusinessException {
  103. multicastEvent(serviceCode, dataFlowContext, appService, null);
  104. }
  105. /**
  106. * 发布事件
  107. *
  108. * @param serviceCode
  109. * @param dataFlowContext 这个订单信息,以便于 侦听那边需要用
  110. */
  111. public static void multicastEvent(String serviceCode, DataFlowContext dataFlowContext, AppService appService, String asyn) throws BusinessException {
  112. try {
  113. ServiceDataFlowEvent targetDataFlowEvent = new ServiceDataFlowEvent(serviceCode, dataFlowContext, appService);
  114. multicastEvent(serviceCode, targetDataFlowEvent, asyn);
  115. } catch (Exception e) {
  116. logger.error("发布侦听失败,失败原因为:", e);
  117. throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR, "发布侦听失败,失败原因为:" + e.getMessage());
  118. }
  119. }
  120. /**
  121. * 发布事件
  122. *
  123. * @param event
  124. * @param asyn A 表示异步处理
  125. */
  126. public static void multicastEvent(String serviceCode, final ServiceDataFlowEvent event, String asyn) {
  127. String httpMethod = event.getDataFlowContext().getRequestCurrentHeaders().get(CommonConstant.HTTP_METHOD);
  128. List<ServiceDataFlowListener> listeners = getListeners(serviceCode, httpMethod);
  129. //这里判断 serviceCode + httpMethod 的侦听,如果没有注册直接报错。
  130. if (listeners == null || listeners.size() == 0) {
  131. throw new ListenerExecuteException(ResponseConstant.RESULT_CODE_ERROR,
  132. "服务【" + serviceCode + "】调用方式【" + httpMethod + "】当前不支持");
  133. }
  134. for (final ServiceDataFlowListener listener : listeners) {
  135. if (CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)) { //异步处理
  136. Executor executor = getTaskExecutor();
  137. executor.execute(new Runnable() {
  138. @Override
  139. public void run() {
  140. invokeListener(listener, event);
  141. }
  142. });
  143. break;
  144. } else {
  145. invokeListener(listener, event);
  146. break;
  147. }
  148. }
  149. }
  150. /**
  151. * Return the current task executor for this multicaster.
  152. */
  153. protected static synchronized Executor getTaskExecutor() {
  154. if (taskExecutor == null) {
  155. taskExecutor = Executors.newFixedThreadPool(DEFAULT_THREAD_NUM);
  156. }
  157. return taskExecutor;
  158. }
  159. /**
  160. * Invoke the given listener with the given event.
  161. *
  162. * @param listener the ApplicationListener to invoke
  163. * @param event the current event to propagate
  164. * @since 4.1
  165. */
  166. @SuppressWarnings({"unchecked", "rawtypes"})
  167. protected static void invokeListener(ServiceDataFlowListener listener, ServiceDataFlowEvent event) {
  168. try {
  169. listener.soService(event);
  170. } catch (Exception e) {
  171. LoggerEngine.error("发布侦听失败", e);
  172. throw new RuntimeException("发布侦听失败," + listener + event + e);
  173. }
  174. }
  175. }