diff --git a/chushang-common/chushang-common-mq/pom.xml b/chushang-common/chushang-common-mq/pom.xml new file mode 100644 index 0000000..f252de1 --- /dev/null +++ b/chushang-common/chushang-common-mq/pom.xml @@ -0,0 +1,21 @@ + + + + chushang-common + com.chushang + 1.0.0 + + 4.0.0 + + chushang-common-mq + mq模块 + + + + com.chushang + chushang-common-core + + + diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java new file mode 100644 index 0000000..fd1f55b --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/CanalEventListener.java @@ -0,0 +1,24 @@ +package com.chushang.common.canal.annotation; + +import org.springframework.core.annotation.AliasFor; +import org.springframework.stereotype.Component; + +import java.lang.annotation.*; + +/** + * inject the present class to the spring context + * as a listener of the canal event + * + * @author chen.qian + * @date 2018/3/19 + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Component +public @interface CanalEventListener { + + @AliasFor(annotation = Component.class) + String value() default ""; + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java new file mode 100644 index 0000000..246e399 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/DeleteListenPoint.java @@ -0,0 +1,45 @@ +package com.chushang.common.canal.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for delete + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.DELETE) +public @interface DeleteListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java new file mode 100644 index 0000000..8c477a7 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/InsertListenPoint.java @@ -0,0 +1,45 @@ +package com.chushang.common.canal.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for insert + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.INSERT) +public @interface InsertListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java new file mode 100644 index 0000000..a5b4d1a --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/ListenPoint.java @@ -0,0 +1,48 @@ +package com.chushang.common.canal.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.lang.annotation.*; + +/** + * used to indicate that method(or methods) is(are) the candidate of the + * canal event distributor + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD, ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface ListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + String[] table() default {}; + + /** + * canal event type + * default for all + * @return canal event type + */ + CanalEntry.EventType[] eventType() default {}; + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java new file mode 100644 index 0000000..9fe7fd5 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/annotation/UpdateListenPoint.java @@ -0,0 +1,45 @@ +package com.chushang.common.canal.annotation; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.*; + +/** + * ListenPoint for update + * + * @author chen.qian + * @date 2018/3/19 + */ + +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ListenPoint(eventType = CanalEntry.EventType.UPDATE) +public @interface UpdateListenPoint { + + /** + * canal destination + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String destination() default ""; + + /** + * database schema which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] schema() default {}; + + /** + * tables which you are concentrate on + * default for all + * @return canal destination + */ + @AliasFor(annotation = ListenPoint.class) + String[] table() default {}; + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java new file mode 100644 index 0000000..4c6b05c --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/AbstractCanalClient.java @@ -0,0 +1,108 @@ +package com.chushang.common.canal.client; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.chushang.common.canal.client.transfer.TransponderFactory; +import com.chushang.common.canal.config.CanalConfig; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public abstract class AbstractCanalClient implements CanalClient { + + /** + * running flag + */ + private volatile boolean running; + + /** + * customer config + */ + private final CanalConfig canalConfig; + + + /** + * TransponderFactory + */ + protected final TransponderFactory factory; + + AbstractCanalClient(CanalConfig canalConfig, TransponderFactory factory) { + Objects.requireNonNull(canalConfig, "canalConfig can not be null!"); + Objects.requireNonNull(canalConfig, "transponderFactory can not be null!"); + this.canalConfig = canalConfig; + this.factory = factory; + } + + @Override + public void start() { + Map instanceMap = getConfig(); + for (Map.Entry instanceEntry : instanceMap.entrySet()) { + process(processInstanceEntry(instanceEntry), instanceEntry); + } + + } + + /** + * To initialize the canal connector + * @param connector CanalConnector + * @param config config + */ + protected abstract void process(CanalConnector connector, Map.Entry config); + + private CanalConnector processInstanceEntry(Map.Entry instanceEntry) { + CanalConfig.Instance instance = instanceEntry.getValue(); + CanalConnector connector; + if (instance.isClusterEnabled()) { + List addresses = new ArrayList<>(); + for (String s : instance.getZookeeperAddress()) { + String[] entry = s.split(":"); + if (entry.length != 2) + throw new CanalClientException("error parsing zookeeper address:" + s); + addresses.add(new InetSocketAddress(entry[0], Integer.parseInt(entry[1]))); + } + connector = CanalConnectors.newClusterConnector(addresses, instanceEntry.getKey(), + instance.getUserName(), + instance.getPassword()); + } else { + connector = CanalConnectors.newSingleConnector(new InetSocketAddress(instance.getHost(), instance.getPort()), + instanceEntry.getKey(), + instance.getUserName(), + instance.getPassword()); + } + return connector; + } + + /** + * get the config + * + * @return config + */ + protected Map getConfig() { + CanalConfig config = canalConfig; + Map instanceMap; + if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) { + return config.getInstances(); + } else { + throw new CanalClientException("can not get the configuration of canal client!"); + } + } + + @Override + public void stop() { + setRunning(false); + } + + @Override + public boolean isRunning() { + return running; + } + + private void setRunning(boolean running) { + this.running = running; + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java new file mode 100644 index 0000000..05531d4 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/CanalClient.java @@ -0,0 +1,22 @@ +package com.chushang.common.canal.client; + +public interface CanalClient { + + /** + * open the canal client + * to get the config and connect to the canal server (1 : 1 or 1 : n) + * and then transfer the event to the special listener + * */ + void start(); + + /** + * stop the client + */ + void stop(); + + /** + * is running + * @return yes or no + */ + boolean isRunning(); +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java new file mode 100644 index 0000000..ed030f5 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/ListenerPoint.java @@ -0,0 +1,25 @@ +package com.chushang.common.canal.client; + +import com.chushang.common.canal.annotation.ListenPoint; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class ListenerPoint { + private final Object target; + private final Map invokeMap = new HashMap<>(); + + ListenerPoint(Object target, Method method, ListenPoint anno) { + this.target = target; + this.invokeMap.put(method, anno); + } + + public Object getTarget() { + return target; + } + + public Map getInvokeMap() { + return invokeMap; + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java new file mode 100644 index 0000000..7fd010a --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/SimpleCanalClient.java @@ -0,0 +1,85 @@ +package com.chushang.common.canal.client; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.chushang.common.canal.annotation.CanalEventListener; +import com.chushang.common.canal.util.BeanUtil; +import com.chushang.common.canal.annotation.ListenPoint; +import com.chushang.common.canal.client.transfer.TransponderFactory; +import com.chushang.common.canal.config.CanalConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.AnnotationUtils; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class SimpleCanalClient extends AbstractCanalClient { + private final static Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class); + + /** + * executor + */ + private final ThreadPoolExecutor executor; + + /** + * listeners which are used by implementing the Interface + */ + private final List listeners = new ArrayList<>(); + + /** + * listeners which are used by annotation + */ + private final List annoListeners = new ArrayList<>(); + + public SimpleCanalClient(CanalConfig canalConfig, TransponderFactory factory) { + super(canalConfig, factory); + executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), Executors.defaultThreadFactory()); + initListeners(); + } + + @Override + protected void process(CanalConnector connector, Map.Entry config) { + executor.submit(factory.newTransponder(connector, config, listeners, annoListeners)); + } + + @Override + public void stop() { + super.stop(); + executor.shutdown(); + } + + /** + * init listeners + */ + private void initListeners() { + logger.info("{}: initializing the listeners....", Thread.currentThread().getName()); + List list = BeanUtil.getBeansOfType(com.chushang.common.canal.event.CanalEventListener.class); + if (list != null) { + listeners.addAll(list); + } + Map listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class); + if (listenerMap != null) { + for (Object target : listenerMap.values()) { + Method[] methods = target.getClass().getDeclaredMethods(); + for (Method method : methods) { + ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class); + if (l != null) { + annoListeners.add(new ListenerPoint(target, method, l)); + } + } + } + } + logger.info("{}: initializing the listeners end.", Thread.currentThread().getName()); + if (logger.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) { + logger.warn("{}: No listener found in context! ", Thread.currentThread().getName()); + } + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java new file mode 100644 index 0000000..6361bf3 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractBasicMessageTransponder.java @@ -0,0 +1,158 @@ +package com.chushang.common.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.chushang.common.canal.config.CanalConfig; +import com.chushang.common.canal.event.CanalEventListener; +import com.chushang.common.canal.annotation.ListenPoint; +import com.chushang.common.canal.client.ListenerPoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder { + + private final static Logger logger = LoggerFactory.getLogger(AbstractBasicMessageTransponder.class); + + public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry config, List listeners, List annoListeners) { + super(connector, config, listeners, annoListeners); + } + + + @Override + protected void distributeEvent(List entryList) { + for (CanalEntry.Entry entry : entryList) { + //ignore the transaction operations + List ignoreEntryTypes = getIgnoreEntryTypes(); + if (ignoreEntryTypes != null + && ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) { + continue; + } + CanalEntry.RowChange rowChange; + try { + rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(), + e); + } + //ignore the ddl operation + if (rowChange.hasIsDdl() && rowChange.getIsDdl()) { + processDdl(rowChange); + continue; + } + CanalEntry.Header header = entry.getHeader(); + String tableName = header.getTableName(); + String schemaName = header.getSchemaName(); + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + //distribute to listener interfaces + distributeByImpl(rowChange.getEventType(),schemaName ,tableName, rowData); + //distribute to annotation listener interfaces + distributeByAnnotation(destination, + schemaName, + tableName, + rowChange.getEventType(), + rowData); + } + } + } + + /** + * process the ddl event + * @param rowChange rowChange + */ + protected void processDdl(CanalEntry.RowChange rowChange) {} + + /** + * distribute to listener interfaces + * + * @param eventType eventType + * @param schemaName 指定数据库名称 --> 一般由filter 指定, 此处不应当判断 + * @param tableName 指定数据表名 + * @param rowData rowData + */ + protected void distributeByImpl(CanalEntry.EventType eventType, String schemaName, String tableName, CanalEntry.RowData rowData) { + logger.info("schemaName : {}, tableName : {}", schemaName, tableName); + if (listeners != null) { + for (CanalEventListener listener : listeners) { + if (tableName.equals(listener.tableName()) && schemaName.equals(listener.schemaName())){ + listener.onEvent(eventType, rowData); + } + } + } + } + + /** + * distribute to annotation listener interfaces + * + * @param destination destination + * @param schemaName schema + * @param tableName table name + * @param eventType event type + * @param rowData row data + */ + protected void distributeByAnnotation(String destination, + String schemaName, + String tableName, + CanalEntry.EventType eventType, + CanalEntry.RowData rowData) { + //invoke the listeners + annoListeners.forEach(point -> point + .getInvokeMap() + .entrySet() + .stream() + .filter(getAnnotationFilter(destination, schemaName, tableName, eventType)) + .forEach(entry -> { + Method method = entry.getKey(); + method.setAccessible(true); + try { + Object[] args = getInvokeArgs(method, eventType, rowData); + method.invoke(point.getTarget(), args); + } catch (Exception e) { + logger.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}", + Thread.currentThread().getName(), + point.getTarget().getClass().getName(), method.getName()); + } + })); + } + + /** + * get the filters predicate + * + * @param destination destination + * @param schemaName schema + * @param tableName table name + * @param eventType event type + * @return predicate + */ + protected abstract Predicate> getAnnotationFilter(String destination, + String schemaName, + String tableName, + CanalEntry.EventType eventType); + + /** + * get the args + * + * @param method method + * @param eventType event type + * @param rowData row data + * @return args which will be used by invoking the annotation methods + */ + protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, + CanalEntry.RowData rowData); + + /** + * get the ignore eventType list + * + * @return eventType list + */ + protected List getIgnoreEntryTypes() { + return Collections.emptyList(); + } + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java new file mode 100644 index 0000000..1cc73c1 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/AbstractMessageTransponder.java @@ -0,0 +1,157 @@ +package com.chushang.common.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.chushang.common.canal.client.ListenerPoint; +import com.chushang.common.canal.config.CanalConfig; +import com.chushang.common.canal.event.CanalEventListener; +import com.chushang.common.core.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public abstract class AbstractMessageTransponder extends MessageTransponder { + + /** + * canal connector + */ + private final CanalConnector connector; + + /** + * custom config + */ + protected final CanalConfig.Instance config; + + /** + * destination of canal server + */ + protected final String destination; + + /** + * listeners which are used by implementing the Interface + */ + protected final List listeners = new ArrayList<>(); + + /** + * listeners which are used by annotation + */ + protected final List annoListeners = new ArrayList<>(); + + /** + * running flag + */ + private volatile boolean running = true; + + private static final Logger logger = LoggerFactory.getLogger(AbstractMessageTransponder.class); + + public AbstractMessageTransponder(CanalConnector connector, + Map.Entry config, + List listeners, + List annoListeners) { + Objects.requireNonNull(connector, "connector can not be null!"); + Objects.requireNonNull(config, "config can not be null!"); + this.connector = connector; + this.destination = config.getKey(); + this.config = config.getValue(); + if (listeners != null) + this.listeners.addAll(listeners); + if (annoListeners != null) + this.annoListeners.addAll(annoListeners); + } + + @Override + public void run() { + // 在 run 时 才进行连接 + connect(); + int errorCount = config.getRetryCount(); + final long interval = config.getAcquireInterval(); + final String threadName = Thread.currentThread().getName(); + boolean interrupted = Thread.currentThread().isInterrupted(); + do { + try { + Message message = connector.getWithoutAck(config.getBatchSize()); + long batchId = message.getId(); + int size = message.getEntries().size(); + if (logger.isDebugEnabled()) { + logger.debug("{}: Get message from canal server >>>>> size:{}", threadName, size); + } + //empty message + if (batchId == -1 || size == 0) { + if (logger.isDebugEnabled()) { + logger.debug("{}: Empty message... sleep for {} millis", threadName, interval); + } + Thread.sleep(interval); + } else { + distributeEvent(message.getEntries()); + } + + // commit ack + connector.ack(batchId); + if (logger.isDebugEnabled()) { + logger.debug("{}: Ack message. batchId:{}", threadName, batchId); + } + } catch (CanalClientException e) { + errorCount--; + logger.error(threadName + ": Error occurred!! ", e); + try { + Thread.sleep(interval); + } catch (InterruptedException e1) { + errorCount = 0; + } + } catch (InterruptedException e) { + errorCount = 0; + connector.rollback(); + } finally { + // 重试次数 为0, 不在进行重试, 等待重新连接 + if (errorCount <= 0) { + stop(); + logger.info("{}: Topping the client.. ", Thread.currentThread().getName()); + } + } + } while ((running && !interrupted)); + } + + protected abstract void distributeEvent(List entryList); + + private void connect() { + connector.connect(); + // 此处 添加 过滤 -> + if (StringUtils.isNotEmpty(config.getFilter())) { + connector.subscribe(config.getFilter()); + } else { + connector.subscribe(); + } + connector.rollback(); + + logger.info("connector is connect"); + } + + /** + * stop running + */ + void stop() { + // 此处应当就是失败了, 需要停止进行重试 + logger.info("{}: client stopped. ", Thread.currentThread().getName()); + running = false; + if (null == connector) { + return; + } + // 停止时, 关闭连接 + connector.disconnect(); + // 说明失败重试 + if (config.isErrorRetry()) { + logger.info("connector restart"); + + long errorRetryTime = config.getErrorRetryTime(); + running = true; + // 延时执行 + timer.schedule(this, errorRetryTime); + } + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java new file mode 100644 index 0000000..16429fb --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultMessageTransponder.java @@ -0,0 +1,68 @@ +package com.chushang.common.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.chushang.common.canal.config.CanalConfig; +import com.chushang.common.canal.annotation.ListenPoint; +import com.chushang.common.canal.client.ListenerPoint; +import com.chushang.common.canal.event.CanalEventListener; +import com.chushang.common.core.util.StringUtils; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class DefaultMessageTransponder extends AbstractBasicMessageTransponder { + + public DefaultMessageTransponder(CanalConnector connector, + Map.Entry config, + List listeners, + List annoListeners) { + super(connector, config, listeners, annoListeners); + } + + /** + * get the filters predicate + * + * @param destination destination + * @param schemaName schema + * @param tableName table name + * @param eventType event type + * @return predicate + */ + @Override + protected Predicate> getAnnotationFilter(String destination, + String schemaName, + String tableName, + CanalEntry.EventType eventType) { + Predicate> df = e -> StringUtils.isEmpty(e.getValue().destination()) + || e.getValue().destination().equals(destination); + + Predicate> sf = e -> e.getValue().schema().length == 0 + || Arrays.asList(e.getValue().schema()).contains(schemaName); + + Predicate> tf = e -> e.getValue().table().length == 0 + || Arrays.asList(e.getValue().table()).contains(tableName); + + Predicate> ef = e -> (e.getValue().eventType().length == 0) + || Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType); + return df.and(sf).and(tf).and(ef); + } + + @Override + protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + return Arrays.stream(method.getParameterTypes()) + .map(p -> p == CanalEntry.EventType.class + ? eventType + : p == CanalEntry.RowData.class + ? rowData : null) + .toArray(); + } + + @Override + protected List getIgnoreEntryTypes() { + return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT); + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java new file mode 100644 index 0000000..8532ff3 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/DefaultTransponderFactory.java @@ -0,0 +1,17 @@ +package com.chushang.common.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.chushang.common.canal.config.CanalConfig; +import com.chushang.common.canal.client.ListenerPoint; +import com.chushang.common.canal.event.CanalEventListener; + +import java.util.List; +import java.util.Map; + +public class DefaultTransponderFactory implements TransponderFactory { + @Override + public MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, + List annoListeners) { + return new DefaultMessageTransponder(connector, config, listeners, annoListeners); + } +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java new file mode 100644 index 0000000..50a1bed --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponder.java @@ -0,0 +1,10 @@ +package com.chushang.common.canal.client.transfer; + +import java.util.Timer; +import java.util.TimerTask; + +public abstract class MessageTransponder extends TimerTask { + + public static final Timer timer = new Timer(); + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java new file mode 100644 index 0000000..939beea --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/MessageTransponders.java @@ -0,0 +1,9 @@ +package com.chushang.common.canal.client.transfer; + +public class MessageTransponders { + + public static TransponderFactory defaultMessageTransponder() { + return new DefaultTransponderFactory(); + } + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java new file mode 100644 index 0000000..79a9f3d --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/client/transfer/TransponderFactory.java @@ -0,0 +1,22 @@ +package com.chushang.common.canal.client.transfer; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.chushang.common.canal.client.ListenerPoint; +import com.chushang.common.canal.config.CanalConfig; +import com.chushang.common.canal.event.CanalEventListener; + +import java.util.List; +import java.util.Map; + +public interface TransponderFactory { + + /** + * @param connector connector + * @param config config + * @param listeners listeners + * @param annoListeners annoListeners + * @return MessageTransponder + */ + MessageTransponder newTransponder(CanalConnector connector, Map.Entry config, List listeners, + List annoListeners); +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java new file mode 100644 index 0000000..7b29e8a --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalClientAutoConfiguration.java @@ -0,0 +1,43 @@ +package com.chushang.common.canal.config; + + +import com.chushang.common.canal.client.SimpleCanalClient; +import com.chushang.common.canal.client.transfer.MessageTransponders; +import com.chushang.common.canal.util.BeanUtil; +import com.chushang.common.canal.client.CanalClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; + + +public class CanalClientAutoConfiguration { + + private final static Logger logger = LoggerFactory.getLogger(CanalClientAutoConfiguration.class); + + @Autowired + private CanalConfig canalConfig; + + @Bean + @Order(Ordered.HIGHEST_PRECEDENCE) + public BeanUtil beanUtil() { + return new BeanUtil(); + } + + @Bean + private CanalClient canalClient() { + + CanalClient canalClient = + new SimpleCanalClient(canalConfig, MessageTransponders.defaultMessageTransponder()); + + canalClient.start(); + + logger.info("Starting canal client...."); + + return canalClient; + } + + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java new file mode 100644 index 0000000..c3c34dc --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/config/CanalConfig.java @@ -0,0 +1,200 @@ +package com.chushang.common.canal.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +@Order(Ordered.HIGHEST_PRECEDENCE) +@RefreshScope +@Configuration +@ConfigurationProperties(prefix = "canal.client") +public class CanalConfig { + + /** + * instance config + */ + private Map instances = new LinkedHashMap<>(); + + public Map getInstances() { + return instances; + } + + public void setInstances(Map instances) { + this.instances = instances; + } + + /** + * instance config class + */ + public static class Instance { + + /** + * is cluster-mod + */ + private boolean clusterEnabled; + /** + * zookeeper address + */ + private Set zookeeperAddress = new LinkedHashSet<>(); + + /** + * canal server host + */ + private String host = "127.0.0.1"; + + /** + * canal server port + */ + private int port = 10001; + + /** + * canal user name + */ + private String userName = ""; + + /** + * canal password + */ + private String password = ""; + + /** + * size when get messages from the canal server + */ + private int batchSize = 1000; + + /** + * filter + */ + private String filter; + + /** + * retry count when error occurred + * 单次异常 的重试次数 + */ + private int retryCount = 5; + + /** + * 失败后重新启动 默认不进行失败重新连接 + */ + private boolean errorRetry = false; + + /** + * 失败后间隔多长时间进行重启 + * 单位 ms + * 默认10分钟 一次进行重新连接 + */ + private long errorRetryTime = 10 * 60 * 1000; + + /** + * interval of the message-acquiring + */ + private long acquireInterval = 1000; + + public Instance() {} + + public boolean isClusterEnabled() { + return clusterEnabled; + } + + public void setClusterEnabled(boolean clusterEnabled) { + this.clusterEnabled = clusterEnabled; + } + + public Set getZookeeperAddress() { + return zookeeperAddress; + } + + public void setZookeeperAddress(Set zookeeperAddress) { + this.zookeeperAddress = zookeeperAddress; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public String getFilter() { + return filter; + } + + public void setFilter(String filter) { + this.filter = filter; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public long getAcquireInterval() { + return acquireInterval; + } + + public void setAcquireInterval(long acquireInterval) { + this.acquireInterval = acquireInterval; + } + + public boolean isErrorRetry() { + return errorRetry; + } + + public void setErrorRetry(boolean errorRetry) { + this.errorRetry = errorRetry; + } + + public long getErrorRetryTime() { + return errorRetryTime; + } + + public void setErrorRetryTime(long errorRetryTime) { + this.errorRetryTime = errorRetryTime; + } + + } + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java new file mode 100644 index 0000000..01b0fcf --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/event/CanalEventListener.java @@ -0,0 +1,52 @@ +package com.chushang.common.canal.event; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.util.Objects; + +public interface CanalEventListener { + + default void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + Objects.requireNonNull(eventType); + switch (eventType) { + case INSERT: + onInsert(rowData); + break; + case UPDATE: + onUpdate(rowData); + break; + case DELETE: + onDelete(rowData); + break; + default: + break; + } + } + + + /** + * fired on insert event + * + * @param rowData rowData + */ + void onInsert(CanalEntry.RowData rowData); + + /** + * fired on update event + * + * @param rowData rowData + */ + void onUpdate(CanalEntry.RowData rowData); + + /** + * fired on delete event + * + * @param rowData rowData + */ + void onDelete(CanalEntry.RowData rowData); + + String tableName(); + + String schemaName(); + +} diff --git a/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java new file mode 100644 index 0000000..c946c52 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/java/com/chushang/common/canal/util/BeanUtil.java @@ -0,0 +1,52 @@ +package com.chushang.common.canal.util; + +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Component +public class BeanUtil implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + BeanUtil.applicationContext = applicationContext; + } + + public static T getBean(Class clazz) { + T obj; + try { + obj = applicationContext.getBean(clazz); + } catch (Exception e) { + obj = null; + } + return obj; + } + + public static List getBeansOfType(Class clazz) { + Map map; + try { + map = applicationContext.getBeansOfType(clazz); + } catch (Exception e) { + map = null; + } + return map == null ? null : new ArrayList<>(map.values()); + } + + public static Map getBeansWithAnnotation(Class anno) { + Map map; + try { + map = applicationContext.getBeansWithAnnotation(anno); + } catch (Exception e) { + map = null; + } + return map; + } +} \ No newline at end of file diff --git a/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories b/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..7acf598 --- /dev/null +++ b/chushang-common/chushang-common-mq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,3 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.chushang.common.canal.config.CanalConfig, \ + com.chushang.common.canal.config.CanalClientAutoConfiguration diff --git a/chushang-common/pom.xml b/chushang-common/pom.xml index bdb6112..c2e9ba6 100644 --- a/chushang-common/pom.xml +++ b/chushang-common/pom.xml @@ -26,6 +26,7 @@ chushang-common-log chushang-common-mail chushang-common-mongo + chushang-common-mq chushang-common-mybatis chushang-common-mybatis-plugin chushang-common-redis diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/constants/TaskConstants.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/constants/TaskConstants.java new file mode 100644 index 0000000..5ae5777 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/constants/TaskConstants.java @@ -0,0 +1,12 @@ +package com.chushang.task.constants; + +/** + * @auther: zhao + * @date: 2024/6/11 10:07 + */ +public interface TaskConstants { + + String TASK_SERVICE = "task-service"; + String APPLICATION_CONTENT_PATH = "task"; + +} diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java index 47ac33e..3fd1167 100644 --- a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/TaskInfo.java @@ -5,10 +5,8 @@ import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.chushang.common.mybatis.base.BaseEntity; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; +import com.chushang.task.enums.ServiceEnum; +import lombok.*; import java.time.LocalDateTime; @@ -21,6 +19,7 @@ import java.time.LocalDateTime; @AllArgsConstructor @NoArgsConstructor @TableName(value = "tb_task_info") +@Builder public class TaskInfo extends BaseEntity { /** * 主键id @@ -36,6 +35,7 @@ public class TaskInfo extends BaseEntity { /** * 所属服务名 + * 取 ServiceEnum 的 value */ @TableField(value = "application_name") private String applicationName; @@ -54,9 +54,11 @@ public class TaskInfo extends BaseEntity { /** * 任务状态 + * 字典表 task_status + * */ @TableField(value = "task_status") - private Short taskStatus; + private Integer taskStatus; /** * 创建人 diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java new file mode 100644 index 0000000..a0e970e --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/CreateTaskDTO.java @@ -0,0 +1,46 @@ +package com.chushang.task.entity.dto; + +import com.chushang.task.enums.ServiceEnum; +import lombok.Data; + +/** + * @auther: zhao + * @date: 2024/6/11 10:19 + */ +@Data +public class CreateTaskDTO { + + /** + * 后台任务名称 + */ + private String taskName; + /** + * 所属服务名 + * 取 ServiceEnum 的 value + */ + private ServiceEnum applicationName; + /** + * 方法名称 + */ + private String methodName; + /** + * 类名 + */ + private String className; + /** + * 创建人 + */ + private String createBy; + /** + * 后台任务执行参数 + */ + private String params; + /** + * 备注信息 + */ + private String remark; + /** + * 部门id + */ + private Long deptId; +} diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java new file mode 100644 index 0000000..62aaac8 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/entity/dto/UpdateTaskDTO.java @@ -0,0 +1,40 @@ +package com.chushang.task.entity.dto; + +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * @auther: zhao + * @date: 2024/6/11 10:27 + */ +@Data +public class UpdateTaskDTO { + /** + * 任务状态 + * 字典表 task_status + * + */ + private Integer taskStatus; + /** + * 导出或者下载时, 对应的返回的文件路径 + */ + private String downUrl; + /** + * 最后一次执行时间 + */ + private LocalDateTime lastRunTime; + /** + * 最后一次执行结果 + */ + private String lastRunResult; + /** + * 错误信息 + */ + private String errorMessage; + /** + * 修改人 + */ + private String updateBy; + +} diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java new file mode 100644 index 0000000..061e4b0 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/enums/ServiceEnum.java @@ -0,0 +1,36 @@ +package com.chushang.task.enums; + +import com.baomidou.mybatisplus.annotation.IEnum; +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @auther: zhao + * @date: 2024/6/11 10:01 + */ +@Getter +@AllArgsConstructor +public enum ServiceEnum { + /** + * 系统 + */ + SYSTEM("system-service"), + /** + * 授权 + */ + AUTH("auth-service"), + /** + * 文件 + */ + OSS("oss-service"), + /** + * 工单 + */ + WRK("wrk-service"), + /** + * 后台任务 + */ + TASK("task-service"), + ; + private final String serviceName; +} diff --git a/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/feign/RemoteTaskService.java b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/feign/RemoteTaskService.java new file mode 100644 index 0000000..8656052 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-feign/src/main/java/com/chushang/task/feign/RemoteTaskService.java @@ -0,0 +1,37 @@ +package com.chushang.task.feign; + +import com.chushang.common.core.constant.SecurityConstants; +import com.chushang.common.core.web.Result; +import com.chushang.task.constants.TaskConstants; +import com.chushang.task.entity.dto.CreateTaskDTO; +import com.chushang.task.entity.dto.UpdateTaskDTO; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; + +/** + * @auther: zhao + * @date: 2024/6/11 10:04 + */ +@FeignClient(contextId = "taskFeign", + value = TaskConstants.TASK_SERVICE, + path = TaskConstants.APPLICATION_CONTENT_PATH + "/remote" +) +public interface RemoteTaskService { + + /** + * 创建 后台任务 + */ + @PostMapping("/create") + Result createTask(@RequestBody CreateTaskDTO taskInfo, @RequestHeader(SecurityConstants.FROM_SOURCE) String source); + + /** + * 修改后台任务 + */ + @PostMapping(value = "/{taskId}") + Result updateTask(@PathVariable(value = "taskId") Long taskId, + @RequestBody UpdateTaskDTO taskDTO, + @RequestHeader(SecurityConstants.FROM_SOURCE) String source); +} diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java new file mode 100644 index 0000000..098ce26 --- /dev/null +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/remote/RemoteTaskController.java @@ -0,0 +1,61 @@ +package com.chushang.task.remote; + +import com.chushang.common.core.web.Result; +import com.chushang.task.entity.TaskInfo; +import com.chushang.task.entity.dto.CreateTaskDTO; +import com.chushang.task.entity.dto.UpdateTaskDTO; +import com.chushang.task.feign.RemoteTaskService; +import com.chushang.task.service.TaskInfoService; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; + +/** + * @auther: zhao + * @date: 2024/6/11 10:28 + */ +@RestController +@RequestMapping(value = "/remote") +public class RemoteTaskController implements RemoteTaskService { + + @Resource + TaskInfoService taskInfoService; + /** + * 创建 后台任务 + */ + @Override + @PostMapping("/create") + public Result createTask(@RequestBody CreateTaskDTO task, String source) { + TaskInfo taskInfo = TaskInfo.builder() + .createBy(task.getCreateBy()) + .applicationName(task.getApplicationName().getServiceName()) + .className(task.getClassName()) + .methodName(task.getMethodName()) + .params(task.getParams()) + .taskStatus(0) + .deptId(task.getDeptId()) + .taskName(task.getTaskName()) + .build(); + boolean save = taskInfoService.save(taskInfo); + // todo 需要创建队列数据 + + return Result.ok(save ? taskInfo.getTaskId() : null); + } + + @Override + @PostMapping(value = "/{taskId}") + public Result updateTask(@PathVariable Long taskId, + @RequestBody UpdateTaskDTO task, String source) + { + TaskInfo taskInfo = TaskInfo.builder() + .taskId(taskId) + .updateBy(task.getUpdateBy()) + .taskStatus(task.getTaskStatus()) + .downUrl(task.getDownUrl()) + .errorMessage(task.getErrorMessage()) + .lastRunResult(task.getLastRunResult()) + .lastRunTime(task.getLastRunTime()) + .build(); + return Result.ok(taskInfoService.updateById(taskInfo)); + } +} diff --git a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java index a1e9e96..4c08906 100644 --- a/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java +++ b/chushang-modules/chushang-module-task/task-service/src/main/java/com/chushang/task/service/impl/TaskInfoServiceImpl.java @@ -28,7 +28,7 @@ public class TaskInfoServiceImpl extends ServiceImpl i .orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy()) .lambda() .eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId()) - .like(StringUtils.isNotEmpty(taskInfo.getApplicationName()), TaskInfo::getApplicationName, taskInfo.getApplicationName()) + .like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName()) .eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName()) .eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus()) .like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName()) diff --git a/pom.xml b/pom.xml index ccd7328..148d9d7 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,7 @@ 0.1.55 2.0.43 2.7.0 + 4.9.4 3.11.0 3.1.0 @@ -561,6 +562,11 @@ ip2region ${ip2region.version} + + + + rocket- +