1. todo 待添加 mq 相关,
This commit is contained in:
parent
fee737f5e9
commit
0560d7244a
|
|
@ -0,0 +1,21 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>chushang-common</artifactId>
|
||||||
|
<groupId>com.chushang</groupId>
|
||||||
|
<version>1.0.0</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>chushang-common-mq</artifactId>
|
||||||
|
<description>mq模块</description>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.chushang</groupId>
|
||||||
|
<artifactId>chushang-common-core</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
package com.chushang.common.canal.annotation;
|
||||||
|
|
||||||
|
import org.springframework.core.annotation.AliasFor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* inject the present class to the spring context
|
||||||
|
* as a listener of the canal event
|
||||||
|
*
|
||||||
|
* @author chen.qian
|
||||||
|
* @date 2018/3/19
|
||||||
|
*/
|
||||||
|
@Target({ElementType.TYPE})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
@Component
|
||||||
|
public @interface CanalEventListener {
|
||||||
|
|
||||||
|
@AliasFor(annotation = Component.class)
|
||||||
|
String value() default "";
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package com.chushang.common.canal.annotation;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import org.springframework.core.annotation.AliasFor;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ListenPoint for delete
|
||||||
|
*
|
||||||
|
* @author chen.qian
|
||||||
|
* @date 2018/3/19
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Target({ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
@ListenPoint(eventType = CanalEntry.EventType.DELETE)
|
||||||
|
public @interface DeleteListenPoint {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal destination
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String destination() default "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* database schema which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] schema() default {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* tables which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] table() default {};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package com.chushang.common.canal.annotation;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import org.springframework.core.annotation.AliasFor;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ListenPoint for insert
|
||||||
|
*
|
||||||
|
* @author chen.qian
|
||||||
|
* @date 2018/3/19
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Target({ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
@ListenPoint(eventType = CanalEntry.EventType.INSERT)
|
||||||
|
public @interface InsertListenPoint {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal destination
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String destination() default "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* database schema which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] schema() default {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* tables which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] table() default {};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
package com.chushang.common.canal.annotation;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* used to indicate that method(or methods) is(are) the candidate of the
|
||||||
|
* canal event distributor
|
||||||
|
*
|
||||||
|
* @author chen.qian
|
||||||
|
* @date 2018/3/19
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Target({ElementType.METHOD, ElementType.TYPE})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
public @interface ListenPoint {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal destination
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
String destination() default "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* database schema which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
String[] schema() default {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* tables which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
String[] table() default {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal event type
|
||||||
|
* default for all
|
||||||
|
* @return canal event type
|
||||||
|
*/
|
||||||
|
CanalEntry.EventType[] eventType() default {};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package com.chushang.common.canal.annotation;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import org.springframework.core.annotation.AliasFor;
|
||||||
|
|
||||||
|
import java.lang.annotation.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ListenPoint for update
|
||||||
|
*
|
||||||
|
* @author chen.qian
|
||||||
|
* @date 2018/3/19
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Target({ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Documented
|
||||||
|
@ListenPoint(eventType = CanalEntry.EventType.UPDATE)
|
||||||
|
public @interface UpdateListenPoint {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal destination
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String destination() default "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* database schema which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] schema() default {};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* tables which you are concentrate on
|
||||||
|
* default for all
|
||||||
|
* @return canal destination
|
||||||
|
*/
|
||||||
|
@AliasFor(annotation = ListenPoint.class)
|
||||||
|
String[] table() default {};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,108 @@
|
||||||
|
package com.chushang.common.canal.client;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnectors;
|
||||||
|
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||||
|
import com.chushang.common.canal.client.transfer.TransponderFactory;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public abstract class AbstractCanalClient implements CanalClient {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* running flag
|
||||||
|
*/
|
||||||
|
private volatile boolean running;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* customer config
|
||||||
|
*/
|
||||||
|
private final CanalConfig canalConfig;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TransponderFactory
|
||||||
|
*/
|
||||||
|
protected final TransponderFactory factory;
|
||||||
|
|
||||||
|
AbstractCanalClient(CanalConfig canalConfig, TransponderFactory factory) {
|
||||||
|
Objects.requireNonNull(canalConfig, "canalConfig can not be null!");
|
||||||
|
Objects.requireNonNull(canalConfig, "transponderFactory can not be null!");
|
||||||
|
this.canalConfig = canalConfig;
|
||||||
|
this.factory = factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
Map<String, CanalConfig.Instance> instanceMap = getConfig();
|
||||||
|
for (Map.Entry<String, CanalConfig.Instance> instanceEntry : instanceMap.entrySet()) {
|
||||||
|
process(processInstanceEntry(instanceEntry), instanceEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To initialize the canal connector
|
||||||
|
* @param connector CanalConnector
|
||||||
|
* @param config config
|
||||||
|
*/
|
||||||
|
protected abstract void process(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config);
|
||||||
|
|
||||||
|
private CanalConnector processInstanceEntry(Map.Entry<String, CanalConfig.Instance> instanceEntry) {
|
||||||
|
CanalConfig.Instance instance = instanceEntry.getValue();
|
||||||
|
CanalConnector connector;
|
||||||
|
if (instance.isClusterEnabled()) {
|
||||||
|
List<SocketAddress> addresses = new ArrayList<>();
|
||||||
|
for (String s : instance.getZookeeperAddress()) {
|
||||||
|
String[] entry = s.split(":");
|
||||||
|
if (entry.length != 2)
|
||||||
|
throw new CanalClientException("error parsing zookeeper address:" + s);
|
||||||
|
addresses.add(new InetSocketAddress(entry[0], Integer.parseInt(entry[1])));
|
||||||
|
}
|
||||||
|
connector = CanalConnectors.newClusterConnector(addresses, instanceEntry.getKey(),
|
||||||
|
instance.getUserName(),
|
||||||
|
instance.getPassword());
|
||||||
|
} else {
|
||||||
|
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(instance.getHost(), instance.getPort()),
|
||||||
|
instanceEntry.getKey(),
|
||||||
|
instance.getUserName(),
|
||||||
|
instance.getPassword());
|
||||||
|
}
|
||||||
|
return connector;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the config
|
||||||
|
*
|
||||||
|
* @return config
|
||||||
|
*/
|
||||||
|
protected Map<String, CanalConfig.Instance> getConfig() {
|
||||||
|
CanalConfig config = canalConfig;
|
||||||
|
Map<String, CanalConfig.Instance> instanceMap;
|
||||||
|
if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) {
|
||||||
|
return config.getInstances();
|
||||||
|
} else {
|
||||||
|
throw new CanalClientException("can not get the configuration of canal client!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
setRunning(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return running;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setRunning(boolean running) {
|
||||||
|
this.running = running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.chushang.common.canal.client;
|
||||||
|
|
||||||
|
public interface CanalClient {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* open the canal client
|
||||||
|
* to get the config and connect to the canal server (1 : 1 or 1 : n)
|
||||||
|
* and then transfer the event to the special listener
|
||||||
|
* */
|
||||||
|
void start();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stop the client
|
||||||
|
*/
|
||||||
|
void stop();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* is running
|
||||||
|
* @return yes or no
|
||||||
|
*/
|
||||||
|
boolean isRunning();
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.chushang.common.canal.client;
|
||||||
|
|
||||||
|
import com.chushang.common.canal.annotation.ListenPoint;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ListenerPoint {
|
||||||
|
private final Object target;
|
||||||
|
private final Map<Method, ListenPoint> invokeMap = new HashMap<>();
|
||||||
|
|
||||||
|
ListenerPoint(Object target, Method method, ListenPoint anno) {
|
||||||
|
this.target = target;
|
||||||
|
this.invokeMap.put(method, anno);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getTarget() {
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<Method, ListenPoint> getInvokeMap() {
|
||||||
|
return invokeMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
package com.chushang.common.canal.client;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.chushang.common.canal.annotation.CanalEventListener;
|
||||||
|
import com.chushang.common.canal.util.BeanUtil;
|
||||||
|
import com.chushang.common.canal.annotation.ListenPoint;
|
||||||
|
import com.chushang.common.canal.client.transfer.TransponderFactory;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.core.annotation.AnnotationUtils;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class SimpleCanalClient extends AbstractCanalClient {
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* executor
|
||||||
|
*/
|
||||||
|
private final ThreadPoolExecutor executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* listeners which are used by implementing the Interface
|
||||||
|
*/
|
||||||
|
private final List<com.chushang.common.canal.event.CanalEventListener> listeners = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* listeners which are used by annotation
|
||||||
|
*/
|
||||||
|
private final List<ListenerPoint> annoListeners = new ArrayList<>();
|
||||||
|
|
||||||
|
public SimpleCanalClient(CanalConfig canalConfig, TransponderFactory factory) {
|
||||||
|
super(canalConfig, factory);
|
||||||
|
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||||
|
60L, TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<>(), Executors.defaultThreadFactory());
|
||||||
|
initListeners();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void process(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config) {
|
||||||
|
executor.submit(factory.newTransponder(connector, config, listeners, annoListeners));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
super.stop();
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* init listeners
|
||||||
|
*/
|
||||||
|
private void initListeners() {
|
||||||
|
logger.info("{}: initializing the listeners....", Thread.currentThread().getName());
|
||||||
|
List<com.chushang.common.canal.event.CanalEventListener> list = BeanUtil.getBeansOfType(com.chushang.common.canal.event.CanalEventListener.class);
|
||||||
|
if (list != null) {
|
||||||
|
listeners.addAll(list);
|
||||||
|
}
|
||||||
|
Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class);
|
||||||
|
if (listenerMap != null) {
|
||||||
|
for (Object target : listenerMap.values()) {
|
||||||
|
Method[] methods = target.getClass().getDeclaredMethods();
|
||||||
|
for (Method method : methods) {
|
||||||
|
ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class);
|
||||||
|
if (l != null) {
|
||||||
|
annoListeners.add(new ListenerPoint(target, method, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.info("{}: initializing the listeners end.", Thread.currentThread().getName());
|
||||||
|
if (logger.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) {
|
||||||
|
logger.warn("{}: No listener found in context! ", Thread.currentThread().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,158 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import com.chushang.common.canal.event.CanalEventListener;
|
||||||
|
import com.chushang.common.canal.annotation.ListenPoint;
|
||||||
|
import com.chushang.common.canal.client.ListenerPoint;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder {
|
||||||
|
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(AbstractBasicMessageTransponder.class);
|
||||||
|
|
||||||
|
public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners, List<ListenerPoint> annoListeners) {
|
||||||
|
super(connector, config, listeners, annoListeners);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void distributeEvent(List<CanalEntry.Entry> entryList) {
|
||||||
|
for (CanalEntry.Entry entry : entryList) {
|
||||||
|
//ignore the transaction operations
|
||||||
|
List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();
|
||||||
|
if (ignoreEntryTypes != null
|
||||||
|
&& ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
CanalEntry.RowChange rowChange;
|
||||||
|
try {
|
||||||
|
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(),
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
//ignore the ddl operation
|
||||||
|
if (rowChange.hasIsDdl() && rowChange.getIsDdl()) {
|
||||||
|
processDdl(rowChange);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
CanalEntry.Header header = entry.getHeader();
|
||||||
|
String tableName = header.getTableName();
|
||||||
|
String schemaName = header.getSchemaName();
|
||||||
|
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
||||||
|
//distribute to listener interfaces
|
||||||
|
distributeByImpl(rowChange.getEventType(),schemaName ,tableName, rowData);
|
||||||
|
//distribute to annotation listener interfaces
|
||||||
|
distributeByAnnotation(destination,
|
||||||
|
schemaName,
|
||||||
|
tableName,
|
||||||
|
rowChange.getEventType(),
|
||||||
|
rowData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* process the ddl event
|
||||||
|
* @param rowChange rowChange
|
||||||
|
*/
|
||||||
|
protected void processDdl(CanalEntry.RowChange rowChange) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* distribute to listener interfaces
|
||||||
|
*
|
||||||
|
* @param eventType eventType
|
||||||
|
* @param schemaName 指定数据库名称 --> 一般由filter 指定, 此处不应当判断
|
||||||
|
* @param tableName 指定数据表名
|
||||||
|
* @param rowData rowData
|
||||||
|
*/
|
||||||
|
protected void distributeByImpl(CanalEntry.EventType eventType, String schemaName, String tableName, CanalEntry.RowData rowData) {
|
||||||
|
logger.info("schemaName : {}, tableName : {}", schemaName, tableName);
|
||||||
|
if (listeners != null) {
|
||||||
|
for (CanalEventListener listener : listeners) {
|
||||||
|
if (tableName.equals(listener.tableName()) && schemaName.equals(listener.schemaName())){
|
||||||
|
listener.onEvent(eventType, rowData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* distribute to annotation listener interfaces
|
||||||
|
*
|
||||||
|
* @param destination destination
|
||||||
|
* @param schemaName schema
|
||||||
|
* @param tableName table name
|
||||||
|
* @param eventType event type
|
||||||
|
* @param rowData row data
|
||||||
|
*/
|
||||||
|
protected void distributeByAnnotation(String destination,
|
||||||
|
String schemaName,
|
||||||
|
String tableName,
|
||||||
|
CanalEntry.EventType eventType,
|
||||||
|
CanalEntry.RowData rowData) {
|
||||||
|
//invoke the listeners
|
||||||
|
annoListeners.forEach(point -> point
|
||||||
|
.getInvokeMap()
|
||||||
|
.entrySet()
|
||||||
|
.stream()
|
||||||
|
.filter(getAnnotationFilter(destination, schemaName, tableName, eventType))
|
||||||
|
.forEach(entry -> {
|
||||||
|
Method method = entry.getKey();
|
||||||
|
method.setAccessible(true);
|
||||||
|
try {
|
||||||
|
Object[] args = getInvokeArgs(method, eventType, rowData);
|
||||||
|
method.invoke(point.getTarget(), args);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}",
|
||||||
|
Thread.currentThread().getName(),
|
||||||
|
point.getTarget().getClass().getName(), method.getName());
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the filters predicate
|
||||||
|
*
|
||||||
|
* @param destination destination
|
||||||
|
* @param schemaName schema
|
||||||
|
* @param tableName table name
|
||||||
|
* @param eventType event type
|
||||||
|
* @return predicate
|
||||||
|
*/
|
||||||
|
protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
|
||||||
|
String schemaName,
|
||||||
|
String tableName,
|
||||||
|
CanalEntry.EventType eventType);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the args
|
||||||
|
*
|
||||||
|
* @param method method
|
||||||
|
* @param eventType event type
|
||||||
|
* @param rowData row data
|
||||||
|
* @return args which will be used by invoking the annotation methods
|
||||||
|
*/
|
||||||
|
protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType,
|
||||||
|
CanalEntry.RowData rowData);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the ignore eventType list
|
||||||
|
*
|
||||||
|
* @return eventType list
|
||||||
|
*/
|
||||||
|
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,157 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import com.alibaba.otter.canal.protocol.Message;
|
||||||
|
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||||
|
import com.chushang.common.canal.client.ListenerPoint;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import com.chushang.common.canal.event.CanalEventListener;
|
||||||
|
import com.chushang.common.core.util.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public abstract class AbstractMessageTransponder extends MessageTransponder {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal connector
|
||||||
|
*/
|
||||||
|
private final CanalConnector connector;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* custom config
|
||||||
|
*/
|
||||||
|
protected final CanalConfig.Instance config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destination of canal server
|
||||||
|
*/
|
||||||
|
protected final String destination;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* listeners which are used by implementing the Interface
|
||||||
|
*/
|
||||||
|
protected final List<CanalEventListener> listeners = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* listeners which are used by annotation
|
||||||
|
*/
|
||||||
|
protected final List<ListenerPoint> annoListeners = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* running flag
|
||||||
|
*/
|
||||||
|
private volatile boolean running = true;
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AbstractMessageTransponder.class);
|
||||||
|
|
||||||
|
public AbstractMessageTransponder(CanalConnector connector,
|
||||||
|
Map.Entry<String, CanalConfig.Instance> config,
|
||||||
|
List<CanalEventListener> listeners,
|
||||||
|
List<ListenerPoint> annoListeners) {
|
||||||
|
Objects.requireNonNull(connector, "connector can not be null!");
|
||||||
|
Objects.requireNonNull(config, "config can not be null!");
|
||||||
|
this.connector = connector;
|
||||||
|
this.destination = config.getKey();
|
||||||
|
this.config = config.getValue();
|
||||||
|
if (listeners != null)
|
||||||
|
this.listeners.addAll(listeners);
|
||||||
|
if (annoListeners != null)
|
||||||
|
this.annoListeners.addAll(annoListeners);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// 在 run 时 才进行连接
|
||||||
|
connect();
|
||||||
|
int errorCount = config.getRetryCount();
|
||||||
|
final long interval = config.getAcquireInterval();
|
||||||
|
final String threadName = Thread.currentThread().getName();
|
||||||
|
boolean interrupted = Thread.currentThread().isInterrupted();
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
Message message = connector.getWithoutAck(config.getBatchSize());
|
||||||
|
long batchId = message.getId();
|
||||||
|
int size = message.getEntries().size();
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
|
||||||
|
}
|
||||||
|
//empty message
|
||||||
|
if (batchId == -1 || size == 0) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("{}: Empty message... sleep for {} millis", threadName, interval);
|
||||||
|
}
|
||||||
|
Thread.sleep(interval);
|
||||||
|
} else {
|
||||||
|
distributeEvent(message.getEntries());
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit ack
|
||||||
|
connector.ack(batchId);
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("{}: Ack message. batchId:{}", threadName, batchId);
|
||||||
|
}
|
||||||
|
} catch (CanalClientException e) {
|
||||||
|
errorCount--;
|
||||||
|
logger.error(threadName + ": Error occurred!! ", e);
|
||||||
|
try {
|
||||||
|
Thread.sleep(interval);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
errorCount = 0;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
errorCount = 0;
|
||||||
|
connector.rollback();
|
||||||
|
} finally {
|
||||||
|
// 重试次数 为0, 不在进行重试, 等待重新连接
|
||||||
|
if (errorCount <= 0) {
|
||||||
|
stop();
|
||||||
|
logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while ((running && !interrupted));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void distributeEvent(List<CanalEntry.Entry> entryList);
|
||||||
|
|
||||||
|
private void connect() {
|
||||||
|
connector.connect();
|
||||||
|
// 此处 添加 过滤 ->
|
||||||
|
if (StringUtils.isNotEmpty(config.getFilter())) {
|
||||||
|
connector.subscribe(config.getFilter());
|
||||||
|
} else {
|
||||||
|
connector.subscribe();
|
||||||
|
}
|
||||||
|
connector.rollback();
|
||||||
|
|
||||||
|
logger.info("connector is connect");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* stop running
|
||||||
|
*/
|
||||||
|
void stop() {
|
||||||
|
// 此处应当就是失败了, 需要停止进行重试
|
||||||
|
logger.info("{}: client stopped. ", Thread.currentThread().getName());
|
||||||
|
running = false;
|
||||||
|
if (null == connector) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// 停止时, 关闭连接
|
||||||
|
connector.disconnect();
|
||||||
|
// 说明失败重试
|
||||||
|
if (config.isErrorRetry()) {
|
||||||
|
logger.info("connector restart");
|
||||||
|
|
||||||
|
long errorRetryTime = config.getErrorRetryTime();
|
||||||
|
running = true;
|
||||||
|
// 延时执行
|
||||||
|
timer.schedule(this, errorRetryTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import com.chushang.common.canal.annotation.ListenPoint;
|
||||||
|
import com.chushang.common.canal.client.ListenerPoint;
|
||||||
|
import com.chushang.common.canal.event.CanalEventListener;
|
||||||
|
import com.chushang.common.core.util.StringUtils;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
public class DefaultMessageTransponder extends AbstractBasicMessageTransponder {
|
||||||
|
|
||||||
|
public DefaultMessageTransponder(CanalConnector connector,
|
||||||
|
Map.Entry<String, CanalConfig.Instance> config,
|
||||||
|
List<CanalEventListener> listeners,
|
||||||
|
List<ListenerPoint> annoListeners) {
|
||||||
|
super(connector, config, listeners, annoListeners);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the filters predicate
|
||||||
|
*
|
||||||
|
* @param destination destination
|
||||||
|
* @param schemaName schema
|
||||||
|
* @param tableName table name
|
||||||
|
* @param eventType event type
|
||||||
|
* @return predicate
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
|
||||||
|
String schemaName,
|
||||||
|
String tableName,
|
||||||
|
CanalEntry.EventType eventType) {
|
||||||
|
Predicate<Map.Entry<Method, ListenPoint>> df = e -> StringUtils.isEmpty(e.getValue().destination())
|
||||||
|
|| e.getValue().destination().equals(destination);
|
||||||
|
|
||||||
|
Predicate<Map.Entry<Method, ListenPoint>> sf = e -> e.getValue().schema().length == 0
|
||||||
|
|| Arrays.asList(e.getValue().schema()).contains(schemaName);
|
||||||
|
|
||||||
|
Predicate<Map.Entry<Method, ListenPoint>> tf = e -> e.getValue().table().length == 0
|
||||||
|
|| Arrays.asList(e.getValue().table()).contains(tableName);
|
||||||
|
|
||||||
|
Predicate<Map.Entry<Method, ListenPoint>> ef = e -> (e.getValue().eventType().length == 0)
|
||||||
|
|| Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType);
|
||||||
|
return df.and(sf).and(tf).and(ef);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
||||||
|
return Arrays.stream(method.getParameterTypes())
|
||||||
|
.map(p -> p == CanalEntry.EventType.class
|
||||||
|
? eventType
|
||||||
|
: p == CanalEntry.RowData.class
|
||||||
|
? rowData : null)
|
||||||
|
.toArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
|
||||||
|
return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import com.chushang.common.canal.client.ListenerPoint;
|
||||||
|
import com.chushang.common.canal.event.CanalEventListener;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class DefaultTransponderFactory implements TransponderFactory {
|
||||||
|
@Override
|
||||||
|
public MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners,
|
||||||
|
List<ListenerPoint> annoListeners) {
|
||||||
|
return new DefaultMessageTransponder(connector, config, listeners, annoListeners);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
|
public abstract class MessageTransponder extends TimerTask {
|
||||||
|
|
||||||
|
public static final Timer timer = new Timer();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
public class MessageTransponders {
|
||||||
|
|
||||||
|
public static TransponderFactory defaultMessageTransponder() {
|
||||||
|
return new DefaultTransponderFactory();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.chushang.common.canal.client.transfer;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.client.CanalConnector;
|
||||||
|
import com.chushang.common.canal.client.ListenerPoint;
|
||||||
|
import com.chushang.common.canal.config.CanalConfig;
|
||||||
|
import com.chushang.common.canal.event.CanalEventListener;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public interface TransponderFactory {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param connector connector
|
||||||
|
* @param config config
|
||||||
|
* @param listeners listeners
|
||||||
|
* @param annoListeners annoListeners
|
||||||
|
* @return MessageTransponder
|
||||||
|
*/
|
||||||
|
MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners,
|
||||||
|
List<ListenerPoint> annoListeners);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.chushang.common.canal.config;
|
||||||
|
|
||||||
|
|
||||||
|
import com.chushang.common.canal.client.SimpleCanalClient;
|
||||||
|
import com.chushang.common.canal.client.transfer.MessageTransponders;
|
||||||
|
import com.chushang.common.canal.util.BeanUtil;
|
||||||
|
import com.chushang.common.canal.client.CanalClient;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.core.Ordered;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
|
||||||
|
|
||||||
|
public class CanalClientAutoConfiguration {
|
||||||
|
|
||||||
|
private final static Logger logger = LoggerFactory.getLogger(CanalClientAutoConfiguration.class);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private CanalConfig canalConfig;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||||
|
public BeanUtil beanUtil() {
|
||||||
|
return new BeanUtil();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
private CanalClient canalClient() {
|
||||||
|
|
||||||
|
CanalClient canalClient =
|
||||||
|
new SimpleCanalClient(canalConfig, MessageTransponders.defaultMessageTransponder());
|
||||||
|
|
||||||
|
canalClient.start();
|
||||||
|
|
||||||
|
logger.info("Starting canal client....");
|
||||||
|
|
||||||
|
return canalClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,200 @@
|
||||||
|
package com.chushang.common.canal.config;
|
||||||
|
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.core.Ordered;
|
||||||
|
import org.springframework.core.annotation.Order;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||||
|
@RefreshScope
|
||||||
|
@Configuration
|
||||||
|
@ConfigurationProperties(prefix = "canal.client")
|
||||||
|
public class CanalConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* instance config
|
||||||
|
*/
|
||||||
|
private Map<String, Instance> instances = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
public Map<String, Instance> getInstances() {
|
||||||
|
return instances;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInstances(Map<String, Instance> instances) {
|
||||||
|
this.instances = instances;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* instance config class
|
||||||
|
*/
|
||||||
|
public static class Instance {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* is cluster-mod
|
||||||
|
*/
|
||||||
|
private boolean clusterEnabled;
|
||||||
|
/**
|
||||||
|
* zookeeper address
|
||||||
|
*/
|
||||||
|
private Set<String> zookeeperAddress = new LinkedHashSet<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal server host
|
||||||
|
*/
|
||||||
|
private String host = "127.0.0.1";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal server port
|
||||||
|
*/
|
||||||
|
private int port = 10001;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal user name
|
||||||
|
*/
|
||||||
|
private String userName = "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* canal password
|
||||||
|
*/
|
||||||
|
private String password = "";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* size when get messages from the canal server
|
||||||
|
*/
|
||||||
|
private int batchSize = 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* filter
|
||||||
|
*/
|
||||||
|
private String filter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* retry count when error occurred
|
||||||
|
* 单次异常 的重试次数
|
||||||
|
*/
|
||||||
|
private int retryCount = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 失败后重新启动 默认不进行失败重新连接
|
||||||
|
*/
|
||||||
|
private boolean errorRetry = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 失败后间隔多长时间进行重启
|
||||||
|
* 单位 ms
|
||||||
|
* 默认10分钟 一次进行重新连接
|
||||||
|
*/
|
||||||
|
private long errorRetryTime = 10 * 60 * 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* interval of the message-acquiring
|
||||||
|
*/
|
||||||
|
private long acquireInterval = 1000;
|
||||||
|
|
||||||
|
public Instance() {}
|
||||||
|
|
||||||
|
public boolean isClusterEnabled() {
|
||||||
|
return clusterEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClusterEnabled(boolean clusterEnabled) {
|
||||||
|
this.clusterEnabled = clusterEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getZookeeperAddress() {
|
||||||
|
return zookeeperAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setZookeeperAddress(Set<String> zookeeperAddress) {
|
||||||
|
this.zookeeperAddress = zookeeperAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getHost() {
|
||||||
|
return host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHost(String host) {
|
||||||
|
this.host = host;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPort() {
|
||||||
|
return port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPort(int port) {
|
||||||
|
this.port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUserName() {
|
||||||
|
return userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUserName(String userName) {
|
||||||
|
this.userName = userName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPassword() {
|
||||||
|
return password;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPassword(String password) {
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBatchSize() {
|
||||||
|
return batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBatchSize(int batchSize) {
|
||||||
|
this.batchSize = batchSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFilter() {
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFilter(String filter) {
|
||||||
|
this.filter = filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getRetryCount() {
|
||||||
|
return retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRetryCount(int retryCount) {
|
||||||
|
this.retryCount = retryCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAcquireInterval() {
|
||||||
|
return acquireInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAcquireInterval(long acquireInterval) {
|
||||||
|
this.acquireInterval = acquireInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isErrorRetry() {
|
||||||
|
return errorRetry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorRetry(boolean errorRetry) {
|
||||||
|
this.errorRetry = errorRetry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getErrorRetryTime() {
|
||||||
|
return errorRetryTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErrorRetryTime(long errorRetryTime) {
|
||||||
|
this.errorRetryTime = errorRetryTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.chushang.common.canal.event;
|
||||||
|
|
||||||
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
public interface CanalEventListener {
|
||||||
|
|
||||||
|
default void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
||||||
|
Objects.requireNonNull(eventType);
|
||||||
|
switch (eventType) {
|
||||||
|
case INSERT:
|
||||||
|
onInsert(rowData);
|
||||||
|
break;
|
||||||
|
case UPDATE:
|
||||||
|
onUpdate(rowData);
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
onDelete(rowData);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fired on insert event
|
||||||
|
*
|
||||||
|
* @param rowData rowData
|
||||||
|
*/
|
||||||
|
void onInsert(CanalEntry.RowData rowData);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fired on update event
|
||||||
|
*
|
||||||
|
* @param rowData rowData
|
||||||
|
*/
|
||||||
|
void onUpdate(CanalEntry.RowData rowData);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* fired on delete event
|
||||||
|
*
|
||||||
|
* @param rowData rowData
|
||||||
|
*/
|
||||||
|
void onDelete(CanalEntry.RowData rowData);
|
||||||
|
|
||||||
|
String tableName();
|
||||||
|
|
||||||
|
String schemaName();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.chushang.common.canal.util;
|
||||||
|
|
||||||
|
import org.springframework.beans.BeansException;
|
||||||
|
import org.springframework.context.ApplicationContext;
|
||||||
|
import org.springframework.context.ApplicationContextAware;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.lang.annotation.Annotation;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class BeanUtil implements ApplicationContextAware {
|
||||||
|
|
||||||
|
private static ApplicationContext applicationContext;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||||
|
BeanUtil.applicationContext = applicationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T getBean(Class<T> clazz) {
|
||||||
|
T obj;
|
||||||
|
try {
|
||||||
|
obj = applicationContext.getBean(clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
obj = null;
|
||||||
|
}
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> List<T> getBeansOfType(Class<T> clazz) {
|
||||||
|
Map<String, T> map;
|
||||||
|
try {
|
||||||
|
map = applicationContext.getBeansOfType(clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
map = null;
|
||||||
|
}
|
||||||
|
return map == null ? null : new ArrayList<>(map.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, Object> getBeansWithAnnotation(Class<? extends Annotation> anno) {
|
||||||
|
Map<String, Object> map;
|
||||||
|
try {
|
||||||
|
map = applicationContext.getBeansWithAnnotation(anno);
|
||||||
|
} catch (Exception e) {
|
||||||
|
map = null;
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||||
|
com.chushang.common.canal.config.CanalConfig, \
|
||||||
|
com.chushang.common.canal.config.CanalClientAutoConfiguration
|
||||||
|
|
@ -26,6 +26,7 @@
|
||||||
<module>chushang-common-log</module>
|
<module>chushang-common-log</module>
|
||||||
<module>chushang-common-mail</module>
|
<module>chushang-common-mail</module>
|
||||||
<module>chushang-common-mongo</module>
|
<module>chushang-common-mongo</module>
|
||||||
|
<module>chushang-common-mq</module>
|
||||||
<module>chushang-common-mybatis</module>
|
<module>chushang-common-mybatis</module>
|
||||||
<module>chushang-common-mybatis-plugin</module>
|
<module>chushang-common-mybatis-plugin</module>
|
||||||
<module>chushang-common-redis</module>
|
<module>chushang-common-redis</module>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
package com.chushang.task.constants;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:07
|
||||||
|
*/
|
||||||
|
public interface TaskConstants {
|
||||||
|
|
||||||
|
String TASK_SERVICE = "task-service";
|
||||||
|
String APPLICATION_CONTENT_PATH = "task";
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -5,10 +5,8 @@ import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
import com.baomidou.mybatisplus.annotation.TableId;
|
import com.baomidou.mybatisplus.annotation.TableId;
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
import com.chushang.common.mybatis.base.BaseEntity;
|
import com.chushang.common.mybatis.base.BaseEntity;
|
||||||
import lombok.AllArgsConstructor;
|
import com.chushang.task.enums.ServiceEnum;
|
||||||
import lombok.Data;
|
import lombok.*;
|
||||||
import lombok.EqualsAndHashCode;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
|
@ -21,6 +19,7 @@ import java.time.LocalDateTime;
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@TableName(value = "tb_task_info")
|
@TableName(value = "tb_task_info")
|
||||||
|
@Builder
|
||||||
public class TaskInfo extends BaseEntity {
|
public class TaskInfo extends BaseEntity {
|
||||||
/**
|
/**
|
||||||
* 主键id
|
* 主键id
|
||||||
|
|
@ -36,6 +35,7 @@ public class TaskInfo extends BaseEntity {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 所属服务名
|
* 所属服务名
|
||||||
|
* 取 ServiceEnum 的 value
|
||||||
*/
|
*/
|
||||||
@TableField(value = "application_name")
|
@TableField(value = "application_name")
|
||||||
private String applicationName;
|
private String applicationName;
|
||||||
|
|
@ -54,9 +54,11 @@ public class TaskInfo extends BaseEntity {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 任务状态
|
* 任务状态
|
||||||
|
* 字典表 task_status
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
@TableField(value = "task_status")
|
@TableField(value = "task_status")
|
||||||
private Short taskStatus;
|
private Integer taskStatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 创建人
|
* 创建人
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
package com.chushang.task.entity.dto;
|
||||||
|
|
||||||
|
import com.chushang.task.enums.ServiceEnum;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class CreateTaskDTO {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 后台任务名称
|
||||||
|
*/
|
||||||
|
private String taskName;
|
||||||
|
/**
|
||||||
|
* 所属服务名
|
||||||
|
* 取 ServiceEnum 的 value
|
||||||
|
*/
|
||||||
|
private ServiceEnum applicationName;
|
||||||
|
/**
|
||||||
|
* 方法名称
|
||||||
|
*/
|
||||||
|
private String methodName;
|
||||||
|
/**
|
||||||
|
* 类名
|
||||||
|
*/
|
||||||
|
private String className;
|
||||||
|
/**
|
||||||
|
* 创建人
|
||||||
|
*/
|
||||||
|
private String createBy;
|
||||||
|
/**
|
||||||
|
* 后台任务执行参数
|
||||||
|
*/
|
||||||
|
private String params;
|
||||||
|
/**
|
||||||
|
* 备注信息
|
||||||
|
*/
|
||||||
|
private String remark;
|
||||||
|
/**
|
||||||
|
* 部门id
|
||||||
|
*/
|
||||||
|
private Long deptId;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,40 @@
|
||||||
|
package com.chushang.task.entity.dto;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:27
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class UpdateTaskDTO {
|
||||||
|
/**
|
||||||
|
* 任务状态
|
||||||
|
* 字典表 task_status
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private Integer taskStatus;
|
||||||
|
/**
|
||||||
|
* 导出或者下载时, 对应的返回的文件路径
|
||||||
|
*/
|
||||||
|
private String downUrl;
|
||||||
|
/**
|
||||||
|
* 最后一次执行时间
|
||||||
|
*/
|
||||||
|
private LocalDateTime lastRunTime;
|
||||||
|
/**
|
||||||
|
* 最后一次执行结果
|
||||||
|
*/
|
||||||
|
private String lastRunResult;
|
||||||
|
/**
|
||||||
|
* 错误信息
|
||||||
|
*/
|
||||||
|
private String errorMessage;
|
||||||
|
/**
|
||||||
|
* 修改人
|
||||||
|
*/
|
||||||
|
private String updateBy;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package com.chushang.task.enums;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.annotation.IEnum;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:01
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public enum ServiceEnum {
|
||||||
|
/**
|
||||||
|
* 系统
|
||||||
|
*/
|
||||||
|
SYSTEM("system-service"),
|
||||||
|
/**
|
||||||
|
* 授权
|
||||||
|
*/
|
||||||
|
AUTH("auth-service"),
|
||||||
|
/**
|
||||||
|
* 文件
|
||||||
|
*/
|
||||||
|
OSS("oss-service"),
|
||||||
|
/**
|
||||||
|
* 工单
|
||||||
|
*/
|
||||||
|
WRK("wrk-service"),
|
||||||
|
/**
|
||||||
|
* 后台任务
|
||||||
|
*/
|
||||||
|
TASK("task-service"),
|
||||||
|
;
|
||||||
|
private final String serviceName;
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.chushang.task.feign;
|
||||||
|
|
||||||
|
import com.chushang.common.core.constant.SecurityConstants;
|
||||||
|
import com.chushang.common.core.web.Result;
|
||||||
|
import com.chushang.task.constants.TaskConstants;
|
||||||
|
import com.chushang.task.entity.dto.CreateTaskDTO;
|
||||||
|
import com.chushang.task.entity.dto.UpdateTaskDTO;
|
||||||
|
import org.springframework.cloud.openfeign.FeignClient;
|
||||||
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
|
import org.springframework.web.bind.annotation.RequestHeader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:04
|
||||||
|
*/
|
||||||
|
@FeignClient(contextId = "taskFeign",
|
||||||
|
value = TaskConstants.TASK_SERVICE,
|
||||||
|
path = TaskConstants.APPLICATION_CONTENT_PATH + "/remote"
|
||||||
|
)
|
||||||
|
public interface RemoteTaskService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建 后台任务
|
||||||
|
*/
|
||||||
|
@PostMapping("/create")
|
||||||
|
Result<Long> createTask(@RequestBody CreateTaskDTO taskInfo, @RequestHeader(SecurityConstants.FROM_SOURCE) String source);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 修改后台任务
|
||||||
|
*/
|
||||||
|
@PostMapping(value = "/{taskId}")
|
||||||
|
Result<Boolean> updateTask(@PathVariable(value = "taskId") Long taskId,
|
||||||
|
@RequestBody UpdateTaskDTO taskDTO,
|
||||||
|
@RequestHeader(SecurityConstants.FROM_SOURCE) String source);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
package com.chushang.task.remote;
|
||||||
|
|
||||||
|
import com.chushang.common.core.web.Result;
|
||||||
|
import com.chushang.task.entity.TaskInfo;
|
||||||
|
import com.chushang.task.entity.dto.CreateTaskDTO;
|
||||||
|
import com.chushang.task.entity.dto.UpdateTaskDTO;
|
||||||
|
import com.chushang.task.feign.RemoteTaskService;
|
||||||
|
import com.chushang.task.service.TaskInfoService;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @auther: zhao
|
||||||
|
* @date: 2024/6/11 10:28
|
||||||
|
*/
|
||||||
|
@RestController
|
||||||
|
@RequestMapping(value = "/remote")
|
||||||
|
public class RemoteTaskController implements RemoteTaskService {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
TaskInfoService taskInfoService;
|
||||||
|
/**
|
||||||
|
* 创建 后台任务
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
@PostMapping("/create")
|
||||||
|
public Result<Long> createTask(@RequestBody CreateTaskDTO task, String source) {
|
||||||
|
TaskInfo taskInfo = TaskInfo.builder()
|
||||||
|
.createBy(task.getCreateBy())
|
||||||
|
.applicationName(task.getApplicationName().getServiceName())
|
||||||
|
.className(task.getClassName())
|
||||||
|
.methodName(task.getMethodName())
|
||||||
|
.params(task.getParams())
|
||||||
|
.taskStatus(0)
|
||||||
|
.deptId(task.getDeptId())
|
||||||
|
.taskName(task.getTaskName())
|
||||||
|
.build();
|
||||||
|
boolean save = taskInfoService.save(taskInfo);
|
||||||
|
// todo 需要创建队列数据
|
||||||
|
|
||||||
|
return Result.ok(save ? taskInfo.getTaskId() : null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@PostMapping(value = "/{taskId}")
|
||||||
|
public Result<Boolean> updateTask(@PathVariable Long taskId,
|
||||||
|
@RequestBody UpdateTaskDTO task, String source)
|
||||||
|
{
|
||||||
|
TaskInfo taskInfo = TaskInfo.builder()
|
||||||
|
.taskId(taskId)
|
||||||
|
.updateBy(task.getUpdateBy())
|
||||||
|
.taskStatus(task.getTaskStatus())
|
||||||
|
.downUrl(task.getDownUrl())
|
||||||
|
.errorMessage(task.getErrorMessage())
|
||||||
|
.lastRunResult(task.getLastRunResult())
|
||||||
|
.lastRunTime(task.getLastRunTime())
|
||||||
|
.build();
|
||||||
|
return Result.ok(taskInfoService.updateById(taskInfo));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -28,7 +28,7 @@ public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> i
|
||||||
.orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy())
|
.orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy())
|
||||||
.lambda()
|
.lambda()
|
||||||
.eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId())
|
.eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId())
|
||||||
.like(StringUtils.isNotEmpty(taskInfo.getApplicationName()), TaskInfo::getApplicationName, taskInfo.getApplicationName())
|
.like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName())
|
||||||
.eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName())
|
.eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName())
|
||||||
.eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus())
|
.eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus())
|
||||||
.like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName())
|
.like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName())
|
||||||
|
|
|
||||||
6
pom.xml
6
pom.xml
|
|
@ -100,6 +100,7 @@
|
||||||
<com-jcraft-jsch.version>0.1.55</com-jcraft-jsch.version>
|
<com-jcraft-jsch.version>0.1.55</com-jcraft-jsch.version>
|
||||||
<fastjson2.version>2.0.43</fastjson2.version>
|
<fastjson2.version>2.0.43</fastjson2.version>
|
||||||
<ip2region.version>2.7.0</ip2region.version>
|
<ip2region.version>2.7.0</ip2region.version>
|
||||||
|
<rocket.version>4.9.4</rocket.version>
|
||||||
|
|
||||||
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
|
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
|
||||||
<maven-surefire-plugin.version>3.1.0</maven-surefire-plugin.version>
|
<maven-surefire-plugin.version>3.1.0</maven-surefire-plugin.version>
|
||||||
|
|
@ -561,6 +562,11 @@
|
||||||
<artifactId>ip2region</artifactId>
|
<artifactId>ip2region</artifactId>
|
||||||
<version>${ip2region.version}</version>
|
<version>${ip2region.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId></groupId>
|
||||||
|
<artifactId>rocket-</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
<build>
|
<build>
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue