diff --git a/chushang-common/chushang-common-bom/pom.xml b/chushang-common/chushang-common-bom/pom.xml index 025d65c..65aa074 100644 --- a/chushang-common/chushang-common-bom/pom.xml +++ b/chushang-common/chushang-common-bom/pom.xml @@ -62,6 +62,11 @@ chushang-common-mongo ${common.version} + + com.chushang + chushang-common-mq + ${common.version} + com.chushang chushang-common-mybatis diff --git a/chushang-common/chushang-common-core/src/main/java/com/chushang/common/core/constant/ServiceConstant.java b/chushang-common/chushang-common-core/src/main/java/com/chushang/common/core/constant/ServiceConstant.java new file mode 100644 index 0000000..6bce2a5 --- /dev/null +++ b/chushang-common/chushang-common-core/src/main/java/com/chushang/common/core/constant/ServiceConstant.java @@ -0,0 +1,48 @@ +package com.chushang.common.core.constant; + +public interface ServiceConstant { + + /** + * 系统 + */ + String SYSTEM = "system-service"; + String AUTH = "auth-service"; + /** + * 系统模块消费者组 + */ + String SYSTEM_CONSUMER_GROUP = "system_consumer_group"; + /** + * 系统模块生产者组 -- 与配置文件对应 + */ + String SYSTEM_PRODUCE_GROUP = "system_produce_group"; + /** + * 文件 + */ + String OSS = "oss-service"; + /** + * 工单 + */ + String WRK = "wrk-service"; + /** + * 文件模块消费者组 + */ + String OSS_CONSUMER_GROUP = "oss_consumer_group"; + /** + * 文件模块生产者组 + */ + String OSS_PRODUCE_GROUP = "oss_produce_group"; + /** + * 后台任务 + */ + String TASK = "task-service"; + /** + * 后台任务模块消费者组 + */ + String TASK_CONSUMER_GROUP = "task_consumer_group"; + /** + * 后台任务模块生产者组 + */ + String TASK_PRODUCE_GROUP = "task_produce_group"; + + String QUEUE_NAME = "OPERATION_LOG"; +} diff --git a/chushang-common/chushang-common-feign/src/main/java/com/chushang/common/feign/interceptor/MdcInterceptor.java b/chushang-common/chushang-common-feign/src/main/java/com/chushang/common/feign/interceptor/MdcInterceptor.java index 0b0311c..1a3d6c3 100644 --- a/chushang-common/chushang-common-feign/src/main/java/com/chushang/common/feign/interceptor/MdcInterceptor.java +++ b/chushang-common/chushang-common-feign/src/main/java/com/chushang/common/feign/interceptor/MdcInterceptor.java @@ -17,7 +17,6 @@ public class MdcInterceptor implements RequestInterceptor { } public void apply(RequestTemplate template) { - log.info("TRACE_ID : -------- {}", MDC.get(CommonConstants.TRACE_ID)); template.header(CommonConstants.TRACE_ID, MDC.get(CommonConstants.TRACE_ID)); template.header(CommonConstants.SPAN, MDC.get(CommonConstants.SPAN)); } diff --git a/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/aspect/SysLogAspect.java b/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/aspect/SysLogAspect.java index 13aa22f..0185da9 100644 --- a/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/aspect/SysLogAspect.java +++ b/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/aspect/SysLogAspect.java @@ -18,7 +18,7 @@ import com.chushang.common.core.jackson.JacksonUtils; import com.chushang.common.core.util.IPUtils; import com.chushang.common.core.util.ServletUtils; import com.chushang.common.log.annotation.SysLog; -import com.chushang.common.log.constants.LogConstant; +import com.chushang.common.core.constant.ServiceConstant; import com.chushang.common.log.entity.SysLogEntity; import com.chushang.common.log.enums.BusinessType; import com.chushang.common.log.enums.LogTypeEnum; @@ -171,7 +171,7 @@ public class SysLogAspect { assert syslog != null; if (syslog.isDatabase()){ // 入队列, 方便在system 统一查询 - RList list = redissonClient.getList(LogConstant.QUEUE_NAME); + RList list = redissonClient.getList(ServiceConstant.QUEUE_NAME); list.add(sysLogEntity); // sysLogService.save(sysLogEntity); }else { diff --git a/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/constants/LogConstant.java b/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/constants/LogConstant.java deleted file mode 100644 index 7de5e3d..0000000 --- a/chushang-common/chushang-common-log/src/main/java/com/chushang/common/log/constants/LogConstant.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.chushang.common.log.constants; - -public interface LogConstant { - - /** - * 系统 - */ - String SYSTEM = "system-service"; - /** - * 文件 - */ - String OSS = "oss-service"; - /** - * 后台任务 - */ - String TASK = "task-service"; - - String QUEUE_NAME = "OPERATION_LOG"; -} diff --git a/chushang-common/chushang-common-mq/pom.xml b/chushang-common/chushang-common-mq/pom.xml index 4114092..99361f8 100644 --- a/chushang-common/chushang-common-mq/pom.xml +++ b/chushang-common/chushang-common-mq/pom.xml @@ -10,7 +10,6 @@ 4.0.0 chushang-common-mq - mq模块 diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java deleted file mode 100644 index fd1f55b..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.chushang.common.canal.annotation; - -import org.springframework.core.annotation.AliasFor; -import org.springframework.stereotype.Component; - -import java.lang.annotation.*; - -/** - * inject the present class to the spring context - * as a listener of the canal event - * - * @author chen.qian - * @date 2018/3/19 - */ -@Target({ElementType.TYPE}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -@Component -public @interface CanalEventListener { - - @AliasFor(annotation = Component.class) - String value() default ""; - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java deleted file mode 100644 index 246e399..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.chushang.common.canal.annotation; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.*; - -/** - * ListenPoint for delete - * - * @author chen.qian - * @date 2018/3/19 - */ - -@Target({ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -@ListenPoint(eventType = CanalEntry.EventType.DELETE) -public @interface DeleteListenPoint { - - /** - * canal destination - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String destination() default ""; - - /** - * database schema which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] schema() default {}; - - /** - * tables which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] table() default {}; - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java deleted file mode 100644 index 8c477a7..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.chushang.common.canal.annotation; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.*; - -/** - * ListenPoint for insert - * - * @author chen.qian - * @date 2018/3/19 - */ - -@Target({ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -@ListenPoint(eventType = CanalEntry.EventType.INSERT) -public @interface InsertListenPoint { - - /** - * canal destination - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String destination() default ""; - - /** - * database schema which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] schema() default {}; - - /** - * tables which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] table() default {}; - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java deleted file mode 100644 index a5b4d1a..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.chushang.common.canal.annotation; - -import com.alibaba.otter.canal.protocol.CanalEntry; - -import java.lang.annotation.*; - -/** - * used to indicate that method(or methods) is(are) the candidate of the - * canal event distributor - * - * @author chen.qian - * @date 2018/3/19 - */ - -@Target({ElementType.METHOD, ElementType.TYPE}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface ListenPoint { - - /** - * canal destination - * default for all - * @return canal destination - */ - String destination() default ""; - - /** - * database schema which you are concentrate on - * default for all - * @return canal destination - */ - String[] schema() default {}; - - /** - * tables which you are concentrate on - * default for all - * @return canal destination - */ - String[] table() default {}; - - /** - * canal event type - * default for all - * @return canal event type - */ - CanalEntry.EventType[] eventType() default {}; - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java deleted file mode 100644 index 9fe7fd5..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.chushang.common.canal.annotation; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.*; - -/** - * ListenPoint for update - * - * @author chen.qian - * @date 2018/3/19 - */ - -@Target({ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -@ListenPoint(eventType = CanalEntry.EventType.UPDATE) -public @interface UpdateListenPoint { - - /** - * canal destination - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String destination() default ""; - - /** - * database schema which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] schema() default {}; - - /** - * tables which you are concentrate on - * default for all - * @return canal destination - */ - @AliasFor(annotation = ListenPoint.class) - String[] table() default {}; - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java deleted file mode 100644 index 4c6b05c..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.chushang.common.canal.client; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.client.CanalConnectors; -import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.chushang.common.canal.client.transfer.TransponderFactory; -import com.chushang.common.canal.config.CanalConfig; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -public abstract class AbstractCanalClient implements CanalClient { - - /** - * running flag - */ - private volatile boolean running; - - /** - * customer config - */ - private final CanalConfig canalConfig; - - - /** - * TransponderFactory - */ - protected final TransponderFactory factory; - - AbstractCanalClient(CanalConfig canalConfig, TransponderFactory factory) { - Objects.requireNonNull(canalConfig, "canalConfig can not be null!"); - Objects.requireNonNull(canalConfig, "transponderFactory can not be null!"); - this.canalConfig = canalConfig; - this.factory = factory; - } - - @Override - public void start() { - Map instanceMap = getConfig(); - for (Map.Entry instanceEntry : instanceMap.entrySet()) { - process(processInstanceEntry(instanceEntry), instanceEntry); - } - - } - - /** - * To initialize the canal connector - * @param connector CanalConnector - * @param config config - */ - protected abstract void process(CanalConnector connector, Map.Entry config); - - private CanalConnector processInstanceEntry(Map.Entry instanceEntry) { - CanalConfig.Instance instance = instanceEntry.getValue(); - CanalConnector connector; - if (instance.isClusterEnabled()) { - List addresses = new ArrayList<>(); - for (String s : instance.getZookeeperAddress()) { - String[] entry = s.split(":"); - if (entry.length != 2) - throw new CanalClientException("error parsing zookeeper address:" + s); - addresses.add(new InetSocketAddress(entry[0], Integer.parseInt(entry[1]))); - } - connector = CanalConnectors.newClusterConnector(addresses, instanceEntry.getKey(), - instance.getUserName(), - instance.getPassword()); - } else { - connector = CanalConnectors.newSingleConnector(new InetSocketAddress(instance.getHost(), instance.getPort()), - instanceEntry.getKey(), - instance.getUserName(), - instance.getPassword()); - } - return connector; - } - - /** - * get the config - * - * @return config - */ - protected Map getConfig() { - CanalConfig config = canalConfig; - Map instanceMap; - if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) { - return config.getInstances(); - } else { - throw new CanalClientException("can not get the configuration of canal client!"); - } - } - - @Override - public void stop() { - setRunning(false); - } - - @Override - public boolean isRunning() { - return running; - } - - private void setRunning(boolean running) { - this.running = running; - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java deleted file mode 100644 index 05531d4..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.chushang.common.canal.client; - -public interface CanalClient { - - /** - * open the canal client - * to get the config and connect to the canal server (1 : 1 or 1 : n) - * and then transfer the event to the special listener - * */ - void start(); - - /** - * stop the client - */ - void stop(); - - /** - * is running - * @return yes or no - */ - boolean isRunning(); -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java deleted file mode 100644 index ed030f5..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.chushang.common.canal.client; - -import com.chushang.common.canal.annotation.ListenPoint; - -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; - -public class ListenerPoint { - private final Object target; - private final Map invokeMap = new HashMap<>(); - - ListenerPoint(Object target, Method method, ListenPoint anno) { - this.target = target; - this.invokeMap.put(method, anno); - } - - public Object getTarget() { - return target; - } - - public Map getInvokeMap() { - return invokeMap; - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java deleted file mode 100644 index 7fd010a..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.chushang.common.canal.client; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.chushang.common.canal.annotation.CanalEventListener; -import com.chushang.common.canal.util.BeanUtil; -import com.chushang.common.canal.annotation.ListenPoint; -import com.chushang.common.canal.client.transfer.TransponderFactory; -import com.chushang.common.canal.config.CanalConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.core.annotation.AnnotationUtils; - -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class SimpleCanalClient extends AbstractCanalClient { - private final static Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class); - - /** - * executor - */ - private final ThreadPoolExecutor executor; - - /** - * listeners which are used by implementing the Interface - */ - private final List listeners = new ArrayList<>(); - - /** - * listeners which are used by annotation - */ - private final List annoListeners = new ArrayList<>(); - - public SimpleCanalClient(CanalConfig canalConfig, TransponderFactory factory) { - super(canalConfig, factory); - executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<>(), Executors.defaultThreadFactory()); - initListeners(); - } - - @Override - protected void process(CanalConnector connector, Map.Entry config) { - executor.submit(factory.newTransponder(connector, config, listeners, annoListeners)); - } - - @Override - public void stop() { - super.stop(); - executor.shutdown(); - } - - /** - * init listeners - */ - private void initListeners() { - logger.info("{}: initializing the listeners....", Thread.currentThread().getName()); - List list = BeanUtil.getBeansOfType(com.chushang.common.canal.event.CanalEventListener.class); - if (list != null) { - listeners.addAll(list); - } - Map listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class); - if (listenerMap != null) { - for (Object target : listenerMap.values()) { - Method[] methods = target.getClass().getDeclaredMethods(); - for (Method method : methods) { - ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class); - if (l != null) { - annoListeners.add(new ListenerPoint(target, method, l)); - } - } - } - } - logger.info("{}: initializing the listeners end.", Thread.currentThread().getName()); - if (logger.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) { - logger.warn("{}: No listener found in context! ", Thread.currentThread().getName()); - } - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java deleted file mode 100644 index 6361bf3..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java +++ /dev/null @@ -1,158 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.chushang.common.canal.config.CanalConfig; -import com.chushang.common.canal.event.CanalEventListener; -import com.chushang.common.canal.annotation.ListenPoint; -import com.chushang.common.canal.client.ListenerPoint; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; - -public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder { - - private final static Logger logger = LoggerFactory.getLogger(AbstractBasicMessageTransponder.class); - - public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry config, List listeners, List annoListeners) { - super(connector, config, listeners, annoListeners); - } - - - @Override - protected void distributeEvent(List entryList) { - for (CanalEntry.Entry entry : entryList) { - //ignore the transaction operations - List ignoreEntryTypes = getIgnoreEntryTypes(); - if (ignoreEntryTypes != null - && ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) { - continue; - } - CanalEntry.RowChange rowChange; - try { - rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); - } catch (Exception e) { - throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(), - e); - } - //ignore the ddl operation - if (rowChange.hasIsDdl() && rowChange.getIsDdl()) { - processDdl(rowChange); - continue; - } - CanalEntry.Header header = entry.getHeader(); - String tableName = header.getTableName(); - String schemaName = header.getSchemaName(); - for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { - //distribute to listener interfaces - distributeByImpl(rowChange.getEventType(),schemaName ,tableName, rowData); - //distribute to annotation listener interfaces - distributeByAnnotation(destination, - schemaName, - tableName, - rowChange.getEventType(), - rowData); - } - } - } - - /** - * process the ddl event - * @param rowChange rowChange - */ - protected void processDdl(CanalEntry.RowChange rowChange) {} - - /** - * distribute to listener interfaces - * - * @param eventType eventType - * @param schemaName 指定数据库名称 --> 一般由filter 指定, 此处不应当判断 - * @param tableName 指定数据表名 - * @param rowData rowData - */ - protected void distributeByImpl(CanalEntry.EventType eventType, String schemaName, String tableName, CanalEntry.RowData rowData) { - logger.info("schemaName : {}, tableName : {}", schemaName, tableName); - if (listeners != null) { - for (CanalEventListener listener : listeners) { - if (tableName.equals(listener.tableName()) && schemaName.equals(listener.schemaName())){ - listener.onEvent(eventType, rowData); - } - } - } - } - - /** - * distribute to annotation listener interfaces - * - * @param destination destination - * @param schemaName schema - * @param tableName table name - * @param eventType event type - * @param rowData row data - */ - protected void distributeByAnnotation(String destination, - String schemaName, - String tableName, - CanalEntry.EventType eventType, - CanalEntry.RowData rowData) { - //invoke the listeners - annoListeners.forEach(point -> point - .getInvokeMap() - .entrySet() - .stream() - .filter(getAnnotationFilter(destination, schemaName, tableName, eventType)) - .forEach(entry -> { - Method method = entry.getKey(); - method.setAccessible(true); - try { - Object[] args = getInvokeArgs(method, eventType, rowData); - method.invoke(point.getTarget(), args); - } catch (Exception e) { - logger.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}", - Thread.currentThread().getName(), - point.getTarget().getClass().getName(), method.getName()); - } - })); - } - - /** - * get the filters predicate - * - * @param destination destination - * @param schemaName schema - * @param tableName table name - * @param eventType event type - * @return predicate - */ - protected abstract Predicate> getAnnotationFilter(String destination, - String schemaName, - String tableName, - CanalEntry.EventType eventType); - - /** - * get the args - * - * @param method method - * @param eventType event type - * @param rowData row data - * @return args which will be used by invoking the annotation methods - */ - protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, - CanalEntry.RowData rowData); - - /** - * get the ignore eventType list - * - * @return eventType list - */ - protected List getIgnoreEntryTypes() { - return Collections.emptyList(); - } - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java deleted file mode 100644 index 1cc73c1..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java +++ /dev/null @@ -1,157 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.Message; -import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.chushang.common.canal.client.ListenerPoint; -import com.chushang.common.canal.config.CanalConfig; -import com.chushang.common.canal.event.CanalEventListener; -import com.chushang.common.core.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -public abstract class AbstractMessageTransponder extends MessageTransponder { - - /** - * canal connector - */ - private final CanalConnector connector; - - /** - * custom config - */ - protected final CanalConfig.Instance config; - - /** - * destination of canal server - */ - protected final String destination; - - /** - * listeners which are used by implementing the Interface - */ - protected final List listeners = new ArrayList<>(); - - /** - * listeners which are used by annotation - */ - protected final List annoListeners = new ArrayList<>(); - - /** - * running flag - */ - private volatile boolean running = true; - - private static final Logger logger = LoggerFactory.getLogger(AbstractMessageTransponder.class); - - public AbstractMessageTransponder(CanalConnector connector, - Map.Entry config, - List listeners, - List annoListeners) { - Objects.requireNonNull(connector, "connector can not be null!"); - Objects.requireNonNull(config, "config can not be null!"); - this.connector = connector; - this.destination = config.getKey(); - this.config = config.getValue(); - if (listeners != null) - this.listeners.addAll(listeners); - if (annoListeners != null) - this.annoListeners.addAll(annoListeners); - } - - @Override - public void run() { - // 在 run 时 才进行连接 - connect(); - int errorCount = config.getRetryCount(); - final long interval = config.getAcquireInterval(); - final String threadName = Thread.currentThread().getName(); - boolean interrupted = Thread.currentThread().isInterrupted(); - do { - try { - Message message = connector.getWithoutAck(config.getBatchSize()); - long batchId = message.getId(); - int size = message.getEntries().size(); - if (logger.isDebugEnabled()) { - logger.debug("{}: Get message from canal server >>>>> size:{}", threadName, size); - } - //empty message - if (batchId == -1 || size == 0) { - if (logger.isDebugEnabled()) { - logger.debug("{}: Empty message... sleep for {} millis", threadName, interval); - } - Thread.sleep(interval); - } else { - distributeEvent(message.getEntries()); - } - - // commit ack - connector.ack(batchId); - if (logger.isDebugEnabled()) { - logger.debug("{}: Ack message. batchId:{}", threadName, batchId); - } - } catch (CanalClientException e) { - errorCount--; - logger.error(threadName + ": Error occurred!! ", e); - try { - Thread.sleep(interval); - } catch (InterruptedException e1) { - errorCount = 0; - } - } catch (InterruptedException e) { - errorCount = 0; - connector.rollback(); - } finally { - // 重试次数 为0, 不在进行重试, 等待重新连接 - if (errorCount <= 0) { - stop(); - logger.info("{}: Topping the client.. ", Thread.currentThread().getName()); - } - } - } while ((running && !interrupted)); - } - - protected abstract void distributeEvent(List entryList); - - private void connect() { - connector.connect(); - // 此处 添加 过滤 -> - if (StringUtils.isNotEmpty(config.getFilter())) { - connector.subscribe(config.getFilter()); - } else { - connector.subscribe(); - } - connector.rollback(); - - logger.info("connector is connect"); - } - - /** - * stop running - */ - void stop() { - // 此处应当就是失败了, 需要停止进行重试 - logger.info("{}: client stopped. ", Thread.currentThread().getName()); - running = false; - if (null == connector) { - return; - } - // 停止时, 关闭连接 - connector.disconnect(); - // 说明失败重试 - if (config.isErrorRetry()) { - logger.info("connector restart"); - - long errorRetryTime = config.getErrorRetryTime(); - running = true; - // 延时执行 - timer.schedule(this, errorRetryTime); - } - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java deleted file mode 100644 index 16429fb..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.chushang.common.canal.config.CanalConfig; -import com.chushang.common.canal.annotation.ListenPoint; -import com.chushang.common.canal.client.ListenerPoint; -import com.chushang.common.canal.event.CanalEventListener; -import com.chushang.common.core.util.StringUtils; - -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; - -public class DefaultMessageTransponder extends AbstractBasicMessageTransponder { - - public DefaultMessageTransponder(CanalConnector connector, - Map.Entry config, - List listeners, - List annoListeners) { - super(connector, config, listeners, annoListeners); - } - - /** - * get the filters predicate - * - * @param destination destination - * @param schemaName schema - * @param tableName table name - * @param eventType event type - * @return predicate - */ - @Override - protected Predicate> getAnnotationFilter(String destination, - String schemaName, - String tableName, - CanalEntry.EventType eventType) { - Predicate> df = e -> StringUtils.isEmpty(e.getValue().destination()) - || e.getValue().destination().equals(destination); - - Predicate> sf = e -> e.getValue().schema().length == 0 - || Arrays.asList(e.getValue().schema()).contains(schemaName); - - Predicate> tf = e -> e.getValue().table().length == 0 - || Arrays.asList(e.getValue().table()).contains(tableName); - - Predicate> ef = e -> (e.getValue().eventType().length == 0) - || Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType); - return df.and(sf).and(tf).and(ef); - } - - @Override - protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - return Arrays.stream(method.getParameterTypes()) - .map(p -> p == CanalEntry.EventType.class - ? eventType - : p == CanalEntry.RowData.class - ? rowData : null) - .toArray(); - } - - @Override - protected List getIgnoreEntryTypes() { - return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT); - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java deleted file mode 100644 index 8532ff3..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.chushang.common.canal.config.CanalConfig; -import com.chushang.common.canal.client.ListenerPoint; -import com.chushang.common.canal.event.CanalEventListener; - -import java.util.List; -import java.util.Map; - -public class DefaultTransponderFactory implements TransponderFactory { - @Override - public MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, - List annoListeners) { - return new DefaultMessageTransponder(connector, config, listeners, annoListeners); - } -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java deleted file mode 100644 index 50a1bed..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import java.util.Timer; -import java.util.TimerTask; - -public abstract class MessageTransponder extends TimerTask { - - public static final Timer timer = new Timer(); - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java deleted file mode 100644 index 939beea..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -public class MessageTransponders { - - public static TransponderFactory defaultMessageTransponder() { - return new DefaultTransponderFactory(); - } - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java deleted file mode 100644 index 79a9f3d..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.chushang.common.canal.client.transfer; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.chushang.common.canal.client.ListenerPoint; -import com.chushang.common.canal.config.CanalConfig; -import com.chushang.common.canal.event.CanalEventListener; - -import java.util.List; -import java.util.Map; - -public interface TransponderFactory { - - /** - * @param connector connector - * @param config config - * @param listeners listeners - * @param annoListeners annoListeners - * @return MessageTransponder - */ - MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, - List annoListeners); -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java deleted file mode 100644 index 7b29e8a..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.chushang.common.canal.config; - - -import com.chushang.common.canal.client.SimpleCanalClient; -import com.chushang.common.canal.client.transfer.MessageTransponders; -import com.chushang.common.canal.util.BeanUtil; -import com.chushang.common.canal.client.CanalClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; - - -public class CanalClientAutoConfiguration { - - private final static Logger logger = LoggerFactory.getLogger(CanalClientAutoConfiguration.class); - - @Autowired - private CanalConfig canalConfig; - - @Bean - @Order(Ordered.HIGHEST_PRECEDENCE) - public BeanUtil beanUtil() { - return new BeanUtil(); - } - - @Bean - private CanalClient canalClient() { - - CanalClient canalClient = - new SimpleCanalClient(canalConfig, MessageTransponders.defaultMessageTransponder()); - - canalClient.start(); - - logger.info("Starting canal client...."); - - return canalClient; - } - - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java deleted file mode 100644 index c3c34dc..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java +++ /dev/null @@ -1,200 +0,0 @@ -package com.chushang.common.canal.config; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.cloud.context.config.annotation.RefreshScope; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; - -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - -@Order(Ordered.HIGHEST_PRECEDENCE) -@RefreshScope -@Configuration -@ConfigurationProperties(prefix = "canal.client") -public class CanalConfig { - - /** - * instance config - */ - private Map instances = new LinkedHashMap<>(); - - public Map getInstances() { - return instances; - } - - public void setInstances(Map instances) { - this.instances = instances; - } - - /** - * instance config class - */ - public static class Instance { - - /** - * is cluster-mod - */ - private boolean clusterEnabled; - /** - * zookeeper address - */ - private Set zookeeperAddress = new LinkedHashSet<>(); - - /** - * canal server host - */ - private String host = "127.0.0.1"; - - /** - * canal server port - */ - private int port = 10001; - - /** - * canal user name - */ - private String userName = ""; - - /** - * canal password - */ - private String password = ""; - - /** - * size when get messages from the canal server - */ - private int batchSize = 1000; - - /** - * filter - */ - private String filter; - - /** - * retry count when error occurred - * 单次异常 的重试次数 - */ - private int retryCount = 5; - - /** - * 失败后重新启动 默认不进行失败重新连接 - */ - private boolean errorRetry = false; - - /** - * 失败后间隔多长时间进行重启 - * 单位 ms - * 默认10分钟 一次进行重新连接 - */ - private long errorRetryTime = 10 * 60 * 1000; - - /** - * interval of the message-acquiring - */ - private long acquireInterval = 1000; - - public Instance() {} - - public boolean isClusterEnabled() { - return clusterEnabled; - } - - public void setClusterEnabled(boolean clusterEnabled) { - this.clusterEnabled = clusterEnabled; - } - - public Set getZookeeperAddress() { - return zookeeperAddress; - } - - public void setZookeeperAddress(Set zookeeperAddress) { - this.zookeeperAddress = zookeeperAddress; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getBatchSize() { - return batchSize; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public String getFilter() { - return filter; - } - - public void setFilter(String filter) { - this.filter = filter; - } - - public int getRetryCount() { - return retryCount; - } - - public void setRetryCount(int retryCount) { - this.retryCount = retryCount; - } - - public long getAcquireInterval() { - return acquireInterval; - } - - public void setAcquireInterval(long acquireInterval) { - this.acquireInterval = acquireInterval; - } - - public boolean isErrorRetry() { - return errorRetry; - } - - public void setErrorRetry(boolean errorRetry) { - this.errorRetry = errorRetry; - } - - public long getErrorRetryTime() { - return errorRetryTime; - } - - public void setErrorRetryTime(long errorRetryTime) { - this.errorRetryTime = errorRetryTime; - } - - } - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java deleted file mode 100644 index 01b0fcf..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.chushang.common.canal.event; - -import com.alibaba.otter.canal.protocol.CanalEntry; - -import java.util.Objects; - -public interface CanalEventListener { - - default void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - Objects.requireNonNull(eventType); - switch (eventType) { - case INSERT: - onInsert(rowData); - break; - case UPDATE: - onUpdate(rowData); - break; - case DELETE: - onDelete(rowData); - break; - default: - break; - } - } - - - /** - * fired on insert event - * - * @param rowData rowData - */ - void onInsert(CanalEntry.RowData rowData); - - /** - * fired on update event - * - * @param rowData rowData - */ - void onUpdate(CanalEntry.RowData rowData); - - /** - * fired on delete event - * - * @param rowData rowData - */ - void onDelete(CanalEntry.RowData rowData); - - String tableName(); - - String schemaName(); - -} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java deleted file mode 100644 index c946c52..0000000 --- a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.chushang.common.canal.util; - -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.stereotype.Component; - -import java.lang.annotation.Annotation; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -@Component -public class BeanUtil implements ApplicationContextAware { - - private static ApplicationContext applicationContext; - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - BeanUtil.applicationContext = applicationContext; - } - - public static T getBean(Class clazz) { - T obj; - try { - obj = applicationContext.getBean(clazz); - } catch (Exception e) { - obj = null; - } - return obj; - } - - public static List getBeansOfType(Class clazz) { - Map map; - try { - map = applicationContext.getBeansOfType(clazz); - } catch (Exception e) { - map = null; - } - return map == null ? null : new ArrayList<>(map.values()); - } - - public static Map getBeansWithAnnotation(Class anno) { - Map map; - try { - map = applicationContext.getBeansWithAnnotation(anno); - } catch (Exception e) { - map = null; - } - return map; - } -} \ No newline at end of file diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/mq/produce/MqProduceService.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/mq/produce/MqProduceService.java new file mode 100644 index 0000000..f05f791 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/mq/produce/MqProduceService.java @@ -0,0 +1,110 @@ +package com.chushang.common.mq.produce; + +import apache.rocketmq.v2.Message; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * @auther: zhao + * @date: 2024/6/12 14:18 + */ +@Component +public class MqProduceService { + + @Resource + RocketMQTemplate rocketMQTemplate; + + private String getDesi(String topic, String tag){ + if (StringUtils.isNotBlank(tag)) topic = topic + ":" + tag; + return topic; + } + + /** + * 发送单向消息, 不关心发送结果 + */ + public void sendOneWayMsg(@NotBlank String topic,@NotBlank T message){ + rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build()); + } + + /** + * 发送单向消息, 不关心发送结果 + */ + public void sendOneWayMsg(@NotBlank String topic,String tag,@NotBlank T message){ + rocketMQTemplate.sendOneWay(getDesi(topic, tag), MessageBuilder.withPayload(message).build()); + } + + /** + * 发送 topic 消息 + */ + public void send(@NotBlank String topic, @NotBlank T message){ + rocketMQTemplate.send(topic, MessageBuilder.withPayload(message).build()); + } + + /** + * 带tag 的消息 + * tag 可为空 + */ + public void send(@NotBlank String topic,String tag, @NotBlank T message){ + rocketMQTemplate.send(getDesi(topic, tag), MessageBuilder.withPayload(message).build()); + } + + /** + * 发送同步消息 + */ + public SendResult sendMsg(@NotBlank String topic,@NotBlank T message){ + return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message) + + .build()); + } + + /** + * 发送同步消息 + */ + public SendResult sendMsg(@NotBlank String topic,String tag,@NotBlank T message){ + return rocketMQTemplate.syncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build()); + } + + /** + * 发送异步消息 + */ + public void sendAsyncMsg(@NotBlank String topic,@NotBlank T message, @NotNull SendCallback sendCallback){ + rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(),sendCallback); + } + + /** + * 发送异步消息 + */ + public void sendAsyncMsg(@NotBlank String topic,String tag,@NotBlank T message, @NotNull SendCallback sendCallback){ + rocketMQTemplate.asyncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build(),sendCallback); + } + + /** + * 发送延时消息 + * 延时消息一共分为18个等级1-18分别含义为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + * delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + * messageTimeOut: 超时时间, 用于重试 + */ + public void sendDelayMsg(@NotBlank String topic, @NotBlank T message, int messageTimeOut,int delayLevel){ + rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(),messageTimeOut,delayLevel); + } + + /** + * 发送延时消息 + * 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + * delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + * messageTimeOut: 超时时间, 用于重试 + */ + public void sendDelayMsg(@NotBlank String topic,String tag, @NotBlank T message,int messageTimeOut, int delayLevel){ + rocketMQTemplate.syncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build(),messageTimeOut,delayLevel); + } + + +} diff --git a/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories b/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories index 7acf598..cb21931 100644 --- a/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories +++ b/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories @@ -1,3 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ - com.chushang.common.canal.config.CanalConfig, \ - com.chushang.common.canal.config.CanalClientAutoConfiguration + com.chushang.common.mq.produce.MqProduceService diff --git a/chushang-modules/chushang-module-oss/oss-feign/pom.xml b/chushang-modules/chushang-module-oss/oss-feign/pom.xml index e0abcc5..36e0734 100644 --- a/chushang-modules/chushang-module-oss/oss-feign/pom.xml +++ b/chushang-modules/chushang-module-oss/oss-feign/pom.xml @@ -34,5 +34,9 @@ com.chushang chushang-common-data-scope + + com.chushang + task-feign + diff --git a/chushang-modules/chushang-module-oss/oss-service/src/main/java/com/chushang/oss/consumer/TaskConsumerService.java b/chushang-modules/chushang-module-oss/oss-service/src/main/java/com/chushang/oss/consumer/TaskConsumerService.java new file mode 100644 index 0000000..a2a24f0 --- /dev/null +++ b/chushang-modules/chushang-module-oss/oss-service/src/main/java/com/chushang/oss/consumer/TaskConsumerService.java @@ -0,0 +1,101 @@ +package com.chushang.oss.consumer; + +import com.chushang.common.core.constant.SecurityConstants; +import com.chushang.common.core.constant.ServiceConstant; +import com.chushang.common.core.web.AjaxResult; +import com.chushang.common.core.web.Result; +import com.chushang.task.entity.TaskInfo; +import com.chushang.task.entity.dto.UpdateTaskDTO; +import com.chushang.task.enums.TaskStatusEnum; +import com.chushang.task.enums.TaskTypeEnum; +import com.chushang.task.feign.RemoteTaskService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * @auther: zhao + * @date: 2024/6/12 16:59 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = ServiceConstant.OSS, consumerGroup = ServiceConstant.OSS_CONSUMER_GROUP) +public class TaskConsumerService implements RocketMQListener { + @Resource + RemoteTaskService remoteTaskService; + /** + * 执行 下发的 后台任务 + */ + @Override + public void onMessage(TaskInfo message) { + remoteTaskService.updateTask(message.getTaskId(), UpdateTaskDTO.builder() + .taskStatus(TaskStatusEnum.EXECUTING) + .build(), SecurityConstants.INNER); + // 通过 反射 去执行具体方法 + String className = message.getClassName(); + String methodName = message.getMethodName(); + String params = message.getParams(); + Integer taskType = message.getTaskType(); + UpdateTaskDTO updateTask = new UpdateTaskDTO(); + try { + // 包名+类名 + Class classzz = Class.forName(className); + // 获取构造器对象 + Constructor constructor = classzz.getConstructor(); + // 利用构造器对象创建一个对象 + Object o = constructor.newInstance(); + // 传递需要执行的方法 + Method method = classzz.getMethod(methodName, String.class); + // 调用的方法有多个参数 Method method = classzz.getMethod("class1method",long.class,String.class,String.class); + // 如果调用的方法有参数 invoke(o,param1,param2,param3,...) + updateTask.setLastRunTime(LocalDateTime.now()); + Object invoke = method.invoke(o, params); + if (invoke instanceof AjaxResult result){ + Integer code = (Integer) result.get(AjaxResult.DATA_TAG); + if (result.isSuccess() && 200 == code){ + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS); + String data = (String)result.get(AjaxResult.DATA_TAG); + updateTask.setLastRunResult("success"); + if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){ + updateTask.setRemark(data); + } + }else { + String errMsg = (String)result.get(AjaxResult.MSG_TAG); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + updateTask.setLastRunResult(errMsg); + } + }else if (invoke instanceof Result result){ + if (result.isSuccess() && result.getCode() == 200){ + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS); + String data = (String)result.getData(); + updateTask.setLastRunResult("success"); + if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){ + updateTask.setRemark(data); + } + }else { + String errMsg = result.getMsg(); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + updateTask.setLastRunResult(errMsg); + } + } + }catch (Exception e){ + log.error("执行失败", e); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(e).append("\n"); + for (StackTraceElement s : e.getStackTrace()) { + stringBuilder.append(s.toString()).append("\n"); + } + updateTask.setLastRunResult(stringBuilder.toString()); + } + // 执行完毕后 更新 任务状态 + remoteTaskService.updateTask(message.getTaskId(), updateTask, SecurityConstants.INNER); + } +} diff --git a/chushang-modules/chushang-module-oss/oss-service/src/main/resources/application.yml b/chushang-modules/chushang-module-oss/oss-service/src/main/resources/application.yml index b82054f..c567d0b 100644 --- a/chushang-modules/chushang-module-oss/oss-service/src/main/resources/application.yml +++ b/chushang-modules/chushang-module-oss/oss-service/src/main/resources/application.yml @@ -93,4 +93,32 @@ management: # 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径 logging: config: classpath:logback-nacos.xml - +rocketmq: + name-server: ${config.rocketmq.name-server} + producer: + # 生产者分组 + group: ${config.rocketmq.producer.group} + # 发送消息超时时间, 单位毫秒, 默认3000 + send-message-timeout: 10000 + # 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B + compress-message-body-threshold: 4096 + # 消息体的最大允许大小,默认为4*1024*1024B + max-message-size: 4194304 + # 同步发送消息时,失败重试次数,默认2 + retry-times-when-send-failed: 2 + # 异步发送消息时,失败重试次数,默认2 + retry-times-when-send-async-failed: 2 + # 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false + retry-next-server: false + access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: # Secret Key + # 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 + enable-msg-trace: false + # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 + customized-trace-topic: RMQ_SYS_TRACE_TOPIC + consumer: + # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 + listeners: + # 关闭 test-consumer-group 对 topic1 的监听消费 + test-consumer-group: + topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 diff --git a/chushang-modules/chushang-module-oss/pom.xml b/chushang-modules/chushang-module-oss/pom.xml index f100b66..d660ef7 100644 --- a/chushang-modules/chushang-module-oss/pom.xml +++ b/chushang-modules/chushang-module-oss/pom.xml @@ -29,6 +29,11 @@ oss-feign 1.0.0 + + com.chushang + task-feign + 1.0.0 + diff --git a/chushang-modules/chushang-module-system/pom.xml b/chushang-modules/chushang-module-system/pom.xml index e86488e..68ca0f6 100644 --- a/chushang-modules/chushang-module-system/pom.xml +++ b/chushang-modules/chushang-module-system/pom.xml @@ -31,6 +31,11 @@ system-feign 1.0.0 + + com.chushang + task-feign + 1.0.0 + diff --git a/chushang-modules/chushang-module-system/system-feign/pom.xml b/chushang-modules/chushang-module-system/system-feign/pom.xml index a270f9f..0821fc4 100644 --- a/chushang-modules/chushang-module-system/system-feign/pom.xml +++ b/chushang-modules/chushang-module-system/system-feign/pom.xml @@ -37,5 +37,9 @@ com.chushang chushang-common-dict + + com.chushang + task-feign + diff --git a/chushang-modules/chushang-module-system/system-feign/src/main/resources/META-INF/spring.factories b/chushang-modules/chushang-module-system/system-feign/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 90344de..0000000 --- a/chushang-modules/chushang-module-system/system-feign/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1 +0,0 @@ -com.chushang.common.feign.ChushangFeignAutoConfiguration= diff --git a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/component/LogRedisComponent.java b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/component/LogRedisComponent.java index 6718c21..81e5fd0 100644 --- a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/component/LogRedisComponent.java +++ b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/component/LogRedisComponent.java @@ -1,6 +1,6 @@ package com.chushang.system.component; -import com.chushang.common.log.constants.LogConstant; +import com.chushang.common.core.constant.ServiceConstant; import com.chushang.common.log.entity.SysLogEntity; import com.chushang.system.service.SysLogService; import lombok.extern.slf4j.Slf4j; @@ -69,7 +69,7 @@ public class LogRedisComponent { Thread.currentThread().interrupt(); } if (!redissonClient.isShutdown()){ - RQueue list = redissonClient.getQueue(LogConstant.QUEUE_NAME); + RQueue list = redissonClient.getQueue(ServiceConstant.QUEUE_NAME); List reqList = list.poll(5000); logService.saveBatch(reqList); }else { diff --git a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/consumer/TaskConsumerService.java b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/consumer/TaskConsumerService.java new file mode 100644 index 0000000..cc197c2 --- /dev/null +++ b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/consumer/TaskConsumerService.java @@ -0,0 +1,102 @@ +package com.chushang.system.consumer; + +import com.alibaba.fastjson2.JSON; +import com.chushang.common.core.constant.SecurityConstants; +import com.chushang.common.core.constant.ServiceConstant; +import com.chushang.common.core.web.AjaxResult; +import com.chushang.common.core.web.Result; +import com.chushang.task.entity.TaskInfo; +import com.chushang.task.entity.dto.UpdateTaskDTO; +import com.chushang.task.enums.TaskStatusEnum; +import com.chushang.task.enums.TaskTypeEnum; +import com.chushang.task.feign.RemoteTaskService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * @auther: zhao + * @date: 2024/6/12 16:59 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = ServiceConstant.SYSTEM, consumerGroup = ServiceConstant.SYSTEM_CONSUMER_GROUP) +public class TaskConsumerService implements RocketMQListener { + @Resource + RemoteTaskService remoteTaskService; + /** + * 执行 下发的 后台任务 + */ + @Override + public void onMessage(TaskInfo message) { + remoteTaskService.updateTask(message.getTaskId(), UpdateTaskDTO.builder() + .taskStatus(TaskStatusEnum.EXECUTING) + .build(), SecurityConstants.INNER); + // 通过 反射 去执行具体方法 + String className = message.getClassName(); + String methodName = message.getMethodName(); + String params = message.getParams(); + Integer taskType = message.getTaskType(); + UpdateTaskDTO updateTask = new UpdateTaskDTO(); + try { + // 包名+类名 + Class classzz = Class.forName(className); + // 获取构造器对象 + Constructor constructor = classzz.getConstructor(); + // 利用构造器对象创建一个对象 + Object o = constructor.newInstance(); + // 传递需要执行的方法 + Method method = classzz.getMethod(methodName, String.class); + // 调用的方法有多个参数 Method method = classzz.getMethod("class1method",long.class,String.class,String.class); + // 如果调用的方法有参数 invoke(o,param1,param2,param3,...) + updateTask.setLastRunTime(LocalDateTime.now()); + Object invoke = method.invoke(o, params); + if (invoke instanceof AjaxResult result){ + Integer code = (Integer) result.get(AjaxResult.DATA_TAG); + if (result.isSuccess() && 200 == code){ + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS); + String data = (String)result.get(AjaxResult.DATA_TAG); + updateTask.setLastRunResult("success"); + if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){ + updateTask.setRemark(data); + } + }else { + String errMsg = (String)result.get(AjaxResult.MSG_TAG); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + updateTask.setLastRunResult(errMsg); + } + }else if (invoke instanceof Result result){ + if (result.isSuccess() && result.getCode() == 200){ + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS); + String data = (String)result.getData(); + updateTask.setLastRunResult("success"); + if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){ + updateTask.setRemark(data); + } + }else { + String errMsg = result.getMsg(); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + updateTask.setLastRunResult(errMsg); + } + } + }catch (Exception e){ + log.error("执行失败", e); + updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(e).append("\n"); + for (StackTraceElement s : e.getStackTrace()) { + stringBuilder.append(s.toString()).append("\n"); + } + updateTask.setLastRunResult(stringBuilder.toString()); + } + // 执行完毕后 更新 任务状态 + remoteTaskService.updateTask(message.getTaskId(), updateTask, SecurityConstants.INNER); + } +} diff --git a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/TaskServiceTest.java b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/TaskServiceTest.java new file mode 100644 index 0000000..f210f82 --- /dev/null +++ b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/TaskServiceTest.java @@ -0,0 +1,42 @@ +package com.chushang.system.service; + +import com.chushang.common.core.exception.ResultException; +import com.chushang.common.core.web.Result; +import com.chushang.system.entity.bo.LoginBody; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @auther: zhao + * @date: 2024/6/12 17:26 + */ +@Slf4j +@Service +public class TaskServiceTest { + + public Result test(String code, LoginBody loginBody) + { + + log.info("code {}", code); + log.info("loginBody {}", loginBody); + return Result.ok("test"); + } + + public Result testError(String code, LoginBody loginBody) + { + log.info("code {}", code); + log.info("loginBody {}", loginBody); + + return Result.failed("test", "test"); + } + + public void testError2(String code, LoginBody loginBody) + { + + log.info("code {}", code); + log.info("loginBody {}", loginBody); + throw new ResultException("error"); + } + + +} diff --git a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/impl/SysLogServiceImpl.java b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/impl/SysLogServiceImpl.java index ddfcccc..3ae5099 100644 --- a/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/impl/SysLogServiceImpl.java +++ b/chushang-modules/chushang-module-system/system-service/src/main/java/com/chushang/system/service/impl/SysLogServiceImpl.java @@ -1,18 +1,14 @@ package com.chushang.system.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.chushang.common.log.constants.LogConstant; import com.chushang.common.log.entity.SysLogEntity; import com.chushang.system.mapper.SysLogMapper; import com.chushang.system.service.SysLogService; import lombok.extern.slf4j.Slf4j; -import org.redisson.api.RQueue; import org.redisson.api.RedissonClient; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; -import java.util.List; /** * by zhaowenyuan create 2022/3/16 15:35 diff --git a/chushang-modules/chushang-module-system/system-service/src/main/resources/application.yml b/chushang-modules/chushang-module-system/system-service/src/main/resources/application.yml index 5e802e4..c7716b0 100644 --- a/chushang-modules/chushang-module-system/system-service/src/main/resources/application.yml +++ b/chushang-modules/chushang-module-system/system-service/src/main/resources/application.yml @@ -93,7 +93,32 @@ management: # 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径 logging: config: classpath:logback-nacos.xml -pagehelper: - async-count: - banner: false - +rocketmq: + name-server: ${config.rocketmq.name-server} + producer: + # 生产者分组 + group: ${config.rocketmq.producer.group} + # 发送消息超时时间, 单位毫秒, 默认3000 + send-message-timeout: 10000 + # 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B + compress-message-body-threshold: 4096 + # 消息体的最大允许大小,默认为4*1024*1024B + max-message-size: 4194304 + # 同步发送消息时,失败重试次数,默认2 + retry-times-when-send-failed: 2 + # 异步发送消息时,失败重试次数,默认2 + retry-times-when-send-async-failed: 2 + # 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false + retry-next-server: false + access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: # Secret Key + # 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 + enable-msg-trace: false + # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 + customized-trace-topic: RMQ_SYS_TRACE_TOPIC + consumer: + # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 + listeners: + # 关闭 test-consumer-group 对 topic1 的监听消费 + test-consumer-group: + topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 diff --git a/chushang-modules/chushang-module-system/system-service/src/test/java/DemoTest.java b/chushang-modules/chushang-module-system/system-service/src/test/java/DemoTest.java index 9e9fbdd..2d1e7c3 100644 --- a/chushang-modules/chushang-module-system/system-service/src/test/java/DemoTest.java +++ b/chushang-modules/chushang-module-system/system-service/src/test/java/DemoTest.java @@ -1,10 +1,17 @@ import cn.hutool.json.JSONUtil; +import com.alibaba.fastjson2.JSON; +import com.chushang.common.core.constant.SecurityConstants; import com.chushang.security.service.TokenService; import com.chushang.SystemApplication; +import com.chushang.system.entity.bo.LoginBody; import com.chushang.system.entity.po.SysMenu; import com.chushang.security.entity.po.SysUser; import com.chushang.system.entity.vo.RouterVo; import com.chushang.system.service.*; +import com.chushang.task.entity.dto.CreateTaskDTO; +import com.chushang.task.enums.ServiceEnum; +import com.chushang.task.enums.TaskTypeEnum; +import com.chushang.task.feign.RemoteTaskService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; @@ -12,6 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import javax.annotation.Resource; import java.util.*; /** @@ -22,67 +30,25 @@ import java.util.*; @SpringBootTest(classes = SystemApplication.class) public class DemoTest { - @Autowired - ISysMenuService menuService; - @Autowired - ISysDeptService sysDeptService; - @Autowired - ISysUserService sysUserService; - @Autowired - ISysRoleService sysRoleService; - @Autowired - ISysPermissionService permissionService; - @Autowired - TokenService tokenService; - -// @Test -// public void test(){ -// String username = "zhaowenyuan@31gamestudio.com"; -//// String username = "admin"; -// -// SysUser sysUser = sysUserService.selectUserByUserName(username); -// if (StringUtils.isNull(sysUser)) { -// log.error("密码错误"); -// } -// -// // 角色集合 -// Set roles = permissionService.getRolePermission(sysUser); -// // 权限集合 -// Set permissions = permissionService.getMenuPermission(sysUser); -// -// LoginUser sysUserVo = new LoginUser(); -// sysUserVo.setUserId(sysUser.getUserId()); -// sysUserVo.setUsername(sysUser.getUsername()); -// sysUserVo.setSysUser(sysUser); -// sysUserVo.setRoles(roles); -// sysUserVo.setPermissions(permissions); -// -// List userDataList = userDataService.listUserData(sysUser); -// // 默认传递一个空Map -// sysUserVo.setAuthDataMap(new HashMap<>()); -// if (CollectionUtil.isNotEmpty(userDataList)){ -// Map> authDataMap = new HashMap<>(); -// // 对应的数据权限 -// for (SysUserData sysUserData : userDataList) { -// authDataMap.put(sysUserData.getDataType().getCodeType(), -// new HashSet<>(JSONUtil.toList(sysUserData.getDataValue(), String.class))); -// } -// sysUserVo.setAuthDataMap(authDataMap); -// } -// -// log.info("{}", JSONUtil.toJsonStr(sysUserVo)); -// } - + @Resource + RemoteTaskService remoteTaskService; @Test public void menuTest(){ -// List menus = menuService.list( -// new LambdaQueryWrapper() -// .eq(SysMenu::getMenuId, 2136) -// ); - List menus = menuService.selectMenuTreeByUserId(new SysUser(1l)); - List routerVos = menuService.buildMenus(menus); - log.info("{}", JSONUtil.toJsonStr(routerVos)); + LoginBody loginBody = new LoginBody(); + loginBody.setPassword("1"); + loginBody.setUsername("2"); + remoteTaskService.createTask(CreateTaskDTO.builder() + .params(JSON.toJSONString(loginBody)) + .taskName("测试后台任务") + .applicationName(ServiceEnum.SYSTEM) + .methodName("methodName") + .className(this.getClass().getName()) + .deptId(1L) + .taskType(TaskTypeEnum.OTHER) + .remark("remark") + .createBy("system") + .build(), SecurityConstants.INNER); } } diff --git a/chushang-modules/chushang-module-task/task-feign/pom.xml b/chushang-modules/chushang-module-task/task-feign/pom.xml index 4a02531..ee4742d 100644 --- a/chushang-modules/chushang-module-task/task-feign/pom.xml +++ b/chushang-modules/chushang-module-task/task-feign/pom.xml @@ -13,10 +13,6 @@ com.chushang chushang-common-feign - - com.chushang - chushang-common-mybatis - com.chushang @@ -26,5 +22,13 @@ com.chushang chushang-common-dict + + com.chushang + chushang-common-mq + + + com.chushang + chushang-common-log + diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java index 3fd1167..a201b77 100644 --- a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java @@ -4,8 +4,10 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import com.chushang.common.dict.annotation.DictFormat; import com.chushang.common.mybatis.base.BaseEntity; import com.chushang.task.enums.ServiceEnum; +import com.chushang.task.enums.TaskStatusEnum; import lombok.*; import java.time.LocalDateTime; @@ -26,69 +28,61 @@ public class TaskInfo extends BaseEntity { */ @TableId(value = "task_id", type = IdType.ASSIGN_ID) private Long taskId; - /** * 后台任务名称 */ @TableField(value = "task_name") private String taskName; - /** * 所属服务名 * 取 ServiceEnum 的 value */ @TableField(value = "application_name") private String applicationName; - /** * 方法名称 */ @TableField(value = "method_name") private String methodName; - /** * 类名 */ @TableField(value = "class_name") private String className; - /** * 任务状态 * 字典表 task_status * */ @TableField(value = "task_status") + @DictFormat(dictType = "task_status") private Integer taskStatus; - + /** + * 后台任务类型 + */ + @TableField(value = "task_type") + @DictFormat(dictType = "task_type") + private Integer taskType; /** * 创建人 */ @TableField(value = "create_by") private String createBy; - /** * 修改人 */ @TableField(value = "update_by") private String updateBy; - /** * 后台任务执行参数 */ @TableField(value = "params") private String params; - /** * 备注信息 */ @TableField(value = "remark") private String remark; - - /** - * 导出或者下载时, 对应的返回的文件路径 - */ - @TableField(value = "down_url") - private String downUrl; /** * 最后一次执行时间 */ @@ -99,9 +93,6 @@ public class TaskInfo extends BaseEntity { */ @TableField("last_run_result") private String lastRunResult; - - @TableField("error_message") - private String errorMessage; /** * 部门id -- 根据部门查看区分 */ diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java index a0e970e..90331a6 100644 --- a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java @@ -1,13 +1,20 @@ package com.chushang.task.entity.dto; import com.chushang.task.enums.ServiceEnum; +import com.chushang.task.enums.TaskTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; /** * @auther: zhao * @date: 2024/6/11 10:19 */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class CreateTaskDTO { /** @@ -39,6 +46,10 @@ public class CreateTaskDTO { * 备注信息 */ private String remark; + /** + * 后台任务类型 + */ + private TaskTypeEnum taskType; /** * 部门id */ diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java index 62aaac8..179fa4d 100644 --- a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java @@ -1,6 +1,10 @@ package com.chushang.task.entity.dto; +import com.chushang.task.enums.TaskStatusEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import java.time.LocalDateTime; @@ -9,32 +13,28 @@ import java.time.LocalDateTime; * @date: 2024/6/11 10:27 */ @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class UpdateTaskDTO { /** * 任务状态 * 字典表 task_status * */ - private Integer taskStatus; - /** - * 导出或者下载时, 对应的返回的文件路径 - */ - private String downUrl; + private TaskStatusEnum taskStatus; /** * 最后一次执行时间 */ private LocalDateTime lastRunTime; /** * 最后一次执行结果 + * 成功时 填写 对应的路径 + * 失败填写失败原因 */ private String lastRunResult; /** - * 错误信息 + * 备注信息 */ - private String errorMessage; - /** - * 修改人 - */ - private String updateBy; - + private String remark; } diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java index 061e4b0..c03bfd9 100644 --- a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java @@ -1,6 +1,7 @@ package com.chushang.task.enums; import com.baomidou.mybatisplus.annotation.IEnum; +import com.chushang.common.core.constant.ServiceConstant; import lombok.AllArgsConstructor; import lombok.Getter; @@ -14,23 +15,23 @@ public enum ServiceEnum { /** * 系统 */ - SYSTEM("system-service"), + SYSTEM(ServiceConstant.SYSTEM), /** * 授权 */ - AUTH("auth-service"), + AUTH(ServiceConstant.AUTH), /** * 文件 */ - OSS("oss-service"), + OSS(ServiceConstant.OSS), /** * 工单 */ - WRK("wrk-service"), + WRK(ServiceConstant.WRK), /** * 后台任务 */ - TASK("task-service"), + TASK(ServiceConstant.TASK), ; private final String serviceName; } diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskStatusEnum.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskStatusEnum.java new file mode 100644 index 0000000..6671fe0 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskStatusEnum.java @@ -0,0 +1,30 @@ +package com.chushang.task.enums; + +import com.baomidou.mybatisplus.annotation.IEnum; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @auther: zhao + * @date: 2024/6/12 16:34 + */ +@Getter +@AllArgsConstructor +public enum TaskStatusEnum implements IEnum { + CREATE(0,"新建"), + DISPATCH(1, "已下发"), + EXECUTING(2, "执行中"), + + EXECUTE_FAIL(3, "执行失败"), + EXECUTE_SUCCESS(4, "执行成功"), + + CANCEL(5, "已取消"), + ; + private final Integer code; + private final String desc; + + @Override + public Integer getValue() { + return code; + } +} diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskTypeEnum.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskTypeEnum.java new file mode 100644 index 0000000..68be1d1 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/TaskTypeEnum.java @@ -0,0 +1,29 @@ +package com.chushang.task.enums; + +import com.baomidou.mybatisplus.annotation.IEnum; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @auther: zhao + * @date: 2024/6/12 18:02 + */ +@Getter +@AllArgsConstructor +public enum TaskTypeEnum implements IEnum { + /** + * 导出时, 需要取remark 为导出文件路径 + */ + DOWN(1, "导出任务"), + UPLOAD(2, "上传任务"), + OTHER(3, "其他后台任务"), + ; + + private final Integer code; + private final String desc; + + @Override + public Integer getValue() { + return code; + } +} diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/consumer/TaskConsumerService.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/consumer/TaskConsumerService.java new file mode 100644 index 0000000..ad560ea --- /dev/null +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/consumer/TaskConsumerService.java @@ -0,0 +1,22 @@ +package com.chushang.task.consumer; + +import com.chushang.common.core.constant.ServiceConstant; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @auther: zhao + * @date: 2024/6/12 15:03 + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = ServiceConstant.TASK, consumerGroup = ServiceConstant.TASK_CONSUMER_GROUP) +public class TaskConsumerService implements RocketMQListener { + + @Override + public void onMessage(String message) { + log.info("msg {}", message); + } +} diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/controller/TaskController.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/controller/TaskController.java index f8546e2..1cda414 100644 --- a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/controller/TaskController.java +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/controller/TaskController.java @@ -7,6 +7,7 @@ import com.chushang.security.annotation.RequiresPermissions; import com.chushang.task.entity.TaskInfo; import com.chushang.task.service.TaskInfoService; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -31,4 +32,13 @@ public class TaskController { return AjaxResult.success(pageResult); } + /** + * 获取 后台任务信息 + */ + @GetMapping(value = "/{taskId}") + @RequiresPermissions("system:task:info") + public AjaxResult info(@PathVariable Long taskId){ + return AjaxResult.success(taskInfoService.getById(taskId)); + } + } diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java index 098ce26..14fbfe0 100644 --- a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java @@ -1,11 +1,15 @@ package com.chushang.task.remote; +import com.alibaba.fastjson2.JSON; import com.chushang.common.core.web.Result; +import com.chushang.common.mq.produce.MqProduceService; import com.chushang.task.entity.TaskInfo; import com.chushang.task.entity.dto.CreateTaskDTO; import com.chushang.task.entity.dto.UpdateTaskDTO; +import com.chushang.task.enums.TaskStatusEnum; import com.chushang.task.feign.RemoteTaskService; import com.chushang.task.service.TaskInfoService; +import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -14,6 +18,7 @@ import javax.annotation.Resource; * @auther: zhao * @date: 2024/6/11 10:28 */ +@Slf4j @RestController @RequestMapping(value = "/remote") public class RemoteTaskController implements RemoteTaskService { @@ -32,13 +37,17 @@ public class RemoteTaskController implements RemoteTaskService { .className(task.getClassName()) .methodName(task.getMethodName()) .params(task.getParams()) - .taskStatus(0) + .taskStatus(TaskStatusEnum.CREATE.getCode()) .deptId(task.getDeptId()) + .taskType(task.getTaskType().getCode()) .taskName(task.getTaskName()) .build(); boolean save = taskInfoService.save(taskInfo); - // todo 需要创建队列数据 - + // 如果 入库成功, 则将其发送到消息队列中 + if (save){ + log.info("消息发送队列 {}", taskInfo.getTaskId()); + taskInfoService.joinQueue(taskInfo); + } return Result.ok(save ? taskInfo.getTaskId() : null); } @@ -49,12 +58,11 @@ public class RemoteTaskController implements RemoteTaskService { { TaskInfo taskInfo = TaskInfo.builder() .taskId(taskId) - .updateBy(task.getUpdateBy()) - .taskStatus(task.getTaskStatus()) - .downUrl(task.getDownUrl()) - .errorMessage(task.getErrorMessage()) + .updateBy("system") + .taskStatus(task.getTaskStatus().getCode()) .lastRunResult(task.getLastRunResult()) .lastRunTime(task.getLastRunTime()) + .remark(task.getRemark()) .build(); return Result.ok(taskInfoService.updateById(taskInfo)); } diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/TaskInfoService.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/TaskInfoService.java index 6e34680..ee9b073 100644 --- a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/TaskInfoService.java +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/TaskInfoService.java @@ -1,21 +1,40 @@ package com.chushang.task.service; +import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.chushang.common.core.constant.ServiceConstant; +import com.chushang.common.mq.produce.MqProduceService; import com.chushang.common.mybatis.page.CommonParam; import com.chushang.common.mybatis.utils.PageResult; import com.chushang.datascope.annotation.DataScope; +import com.chushang.datascope.constants.ScopeConstants; import com.chushang.task.entity.TaskInfo; +import com.chushang.task.enums.TaskStatusEnum; +import com.chushang.task.mapper.TaskInfoMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; /** * @auther: zhao * @date: 2024/6/7 18:19 */ -public interface TaskInfoService extends IService { +@Slf4j +@Service +public class TaskInfoService extends ServiceImpl implements IService { + + @Resource + MqProduceService mqProduceService; @DataScope(tableAlias = "t") - default PageResult pagePostList(TaskInfo sysPost, CommonParam commonParam){ + public PageResult pagePostList(TaskInfo sysPost, CommonParam commonParam){ IPage page = this.page( new com.baomidou.mybatisplus.extension.plugins.pagination.Page<>(commonParam.getPage(), commonParam.getLimit()), buildWrapper(sysPost, commonParam) @@ -23,5 +42,24 @@ public interface TaskInfoService extends IService { return new PageResult(page); } - Wrapper buildWrapper(TaskInfo sysPost, CommonParam commonParam); + public Wrapper buildWrapper(TaskInfo taskInfo, CommonParam commonParam) { + return new QueryWrapper() + .orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy()) + .lambda() + .eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId()) + .like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName()) + .eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName()) + .eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus()) + .like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName()) + .like(StringUtils.isNotEmpty(taskInfo.getMethodName()), TaskInfo::getMethodName, taskInfo.getMethodName()) + .in(ObjectUtils.isNotEmpty(taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)), TaskInfo::getDeptId, taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)); + } + + public void joinQueue(TaskInfo taskInfo){ + mqProduceService.send(taskInfo.getApplicationName(), taskInfo); + + taskInfo.setTaskStatus(TaskStatusEnum.DISPATCH.getCode()); + updateById(taskInfo); + } + } diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java deleted file mode 100644 index 4c08906..0000000 --- a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.chushang.task.service.impl; - -import com.baomidou.mybatisplus.core.conditions.Wrapper; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.chushang.common.core.constant.SecurityConstants; -import com.chushang.common.mybatis.page.CommonParam; -import com.chushang.datascope.constants.ScopeConstants; -import com.chushang.task.entity.TaskInfo; -import com.chushang.task.mapper.TaskInfoMapper; -import com.chushang.task.service.TaskInfoService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Service; - -/** - * @auther: zhao - * @date: 2024/6/7 18:19 - */ -@Slf4j -@Service -public class TaskInfoServiceImpl extends ServiceImpl implements TaskInfoService { - - @Override - public Wrapper buildWrapper(TaskInfo taskInfo, CommonParam commonParam) { - return new QueryWrapper() - .orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy()) - .lambda() - .eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId()) - .like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName()) - .eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName()) - .eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus()) - .like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName()) - .like(StringUtils.isNotEmpty(taskInfo.getMethodName()), TaskInfo::getMethodName, taskInfo.getMethodName()) - .in(ObjectUtils.isNotEmpty(taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)), TaskInfo::getDeptId, taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)); - } -} diff --git a/chushang-modules/chushang-module-task/task-service/src/main/resources/application.yml b/chushang-modules/chushang-module-task/task-service/src/main/resources/application.yml index c173f0f..a424c40 100644 --- a/chushang-modules/chushang-module-task/task-service/src/main/resources/application.yml +++ b/chushang-modules/chushang-module-task/task-service/src/main/resources/application.yml @@ -93,4 +93,34 @@ management: # 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径 logging: config: classpath:logback-nacos.xml +rocketmq: + name-server: ${config.rocketmq.name-server} + producer: + # 生产者分组 + group: ${config.rocketmq.producer.group} + # 发送消息超时时间, 单位毫秒, 默认3000 + send-message-timeout: 10000 + # 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B + compress-message-body-threshold: 4096 + # 消息体的最大允许大小,默认为4*1024*1024B + max-message-size: 4194304 + # 同步发送消息时,失败重试次数,默认2 + retry-times-when-send-failed: 2 + # 异步发送消息时,失败重试次数,默认2 + retry-times-when-send-async-failed: 2 + # 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false + retry-next-server: false + access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: # Secret Key + # 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 + enable-msg-trace: false + # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 + customized-trace-topic: RMQ_SYS_TRACE_TOPIC + consumer: + # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 + listeners: + # 关闭 test-consumer-group 对 topic1 的监听消费 + test-consumer-group: + topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 +