1. 后台任务项目完成
This commit is contained in:
parent
9c36e6855e
commit
1b90e3bfff
|
|
@ -62,6 +62,11 @@
|
|||
<artifactId>chushang-common-mongo</artifactId>
|
||||
<version>${common.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-mq</artifactId>
|
||||
<version>${common.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-mybatis</artifactId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,48 @@
|
|||
package com.chushang.common.core.constant;
|
||||
|
||||
public interface ServiceConstant {
|
||||
|
||||
/**
|
||||
* 系统
|
||||
*/
|
||||
String SYSTEM = "system-service";
|
||||
String AUTH = "auth-service";
|
||||
/**
|
||||
* 系统模块消费者组
|
||||
*/
|
||||
String SYSTEM_CONSUMER_GROUP = "system_consumer_group";
|
||||
/**
|
||||
* 系统模块生产者组 -- 与配置文件对应
|
||||
*/
|
||||
String SYSTEM_PRODUCE_GROUP = "system_produce_group";
|
||||
/**
|
||||
* 文件
|
||||
*/
|
||||
String OSS = "oss-service";
|
||||
/**
|
||||
* 工单
|
||||
*/
|
||||
String WRK = "wrk-service";
|
||||
/**
|
||||
* 文件模块消费者组
|
||||
*/
|
||||
String OSS_CONSUMER_GROUP = "oss_consumer_group";
|
||||
/**
|
||||
* 文件模块生产者组
|
||||
*/
|
||||
String OSS_PRODUCE_GROUP = "oss_produce_group";
|
||||
/**
|
||||
* 后台任务
|
||||
*/
|
||||
String TASK = "task-service";
|
||||
/**
|
||||
* 后台任务模块消费者组
|
||||
*/
|
||||
String TASK_CONSUMER_GROUP = "task_consumer_group";
|
||||
/**
|
||||
* 后台任务模块生产者组
|
||||
*/
|
||||
String TASK_PRODUCE_GROUP = "task_produce_group";
|
||||
|
||||
String QUEUE_NAME = "OPERATION_LOG";
|
||||
}
|
||||
|
|
@ -17,7 +17,6 @@ public class MdcInterceptor implements RequestInterceptor {
|
|||
}
|
||||
|
||||
public void apply(RequestTemplate template) {
|
||||
log.info("TRACE_ID : -------- {}", MDC.get(CommonConstants.TRACE_ID));
|
||||
template.header(CommonConstants.TRACE_ID, MDC.get(CommonConstants.TRACE_ID));
|
||||
template.header(CommonConstants.SPAN, MDC.get(CommonConstants.SPAN));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import com.chushang.common.core.jackson.JacksonUtils;
|
|||
import com.chushang.common.core.util.IPUtils;
|
||||
import com.chushang.common.core.util.ServletUtils;
|
||||
import com.chushang.common.log.annotation.SysLog;
|
||||
import com.chushang.common.log.constants.LogConstant;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import com.chushang.common.log.entity.SysLogEntity;
|
||||
import com.chushang.common.log.enums.BusinessType;
|
||||
import com.chushang.common.log.enums.LogTypeEnum;
|
||||
|
|
@ -171,7 +171,7 @@ public class SysLogAspect {
|
|||
assert syslog != null;
|
||||
if (syslog.isDatabase()){
|
||||
// 入队列, 方便在system 统一查询
|
||||
RList<SysLogEntity> list = redissonClient.getList(LogConstant.QUEUE_NAME);
|
||||
RList<SysLogEntity> list = redissonClient.getList(ServiceConstant.QUEUE_NAME);
|
||||
list.add(sysLogEntity);
|
||||
// sysLogService.save(sysLogEntity);
|
||||
}else {
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
package com.chushang.common.log.constants;
|
||||
|
||||
public interface LogConstant {
|
||||
|
||||
/**
|
||||
* 系统
|
||||
*/
|
||||
String SYSTEM = "system-service";
|
||||
/**
|
||||
* 文件
|
||||
*/
|
||||
String OSS = "oss-service";
|
||||
/**
|
||||
* 后台任务
|
||||
*/
|
||||
String TASK = "task-service";
|
||||
|
||||
String QUEUE_NAME = "OPERATION_LOG";
|
||||
}
|
||||
|
|
@ -10,7 +10,6 @@
|
|||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>chushang-common-mq</artifactId>
|
||||
<description>mq模块</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -1,24 +0,0 @@
|
|||
package com.chushang.common.canal.annotation;
|
||||
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* inject the present class to the spring context
|
||||
* as a listener of the canal event
|
||||
*
|
||||
* @author chen.qian
|
||||
* @date 2018/3/19
|
||||
*/
|
||||
@Target({ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@Component
|
||||
public @interface CanalEventListener {
|
||||
|
||||
@AliasFor(annotation = Component.class)
|
||||
String value() default "";
|
||||
|
||||
}
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
package com.chushang.common.canal.annotation;
|
||||
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* ListenPoint for delete
|
||||
*
|
||||
* @author chen.qian
|
||||
* @date 2018/3/19
|
||||
*/
|
||||
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@ListenPoint(eventType = CanalEntry.EventType.DELETE)
|
||||
public @interface DeleteListenPoint {
|
||||
|
||||
/**
|
||||
* canal destination
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String destination() default "";
|
||||
|
||||
/**
|
||||
* database schema which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] schema() default {};
|
||||
|
||||
/**
|
||||
* tables which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] table() default {};
|
||||
|
||||
}
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
package com.chushang.common.canal.annotation;
|
||||
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* ListenPoint for insert
|
||||
*
|
||||
* @author chen.qian
|
||||
* @date 2018/3/19
|
||||
*/
|
||||
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@ListenPoint(eventType = CanalEntry.EventType.INSERT)
|
||||
public @interface InsertListenPoint {
|
||||
|
||||
/**
|
||||
* canal destination
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String destination() default "";
|
||||
|
||||
/**
|
||||
* database schema which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] schema() default {};
|
||||
|
||||
/**
|
||||
* tables which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] table() default {};
|
||||
|
||||
}
|
||||
|
|
@ -1,48 +0,0 @@
|
|||
package com.chushang.common.canal.annotation;
|
||||
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* used to indicate that method(or methods) is(are) the candidate of the
|
||||
* canal event distributor
|
||||
*
|
||||
* @author chen.qian
|
||||
* @date 2018/3/19
|
||||
*/
|
||||
|
||||
@Target({ElementType.METHOD, ElementType.TYPE})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface ListenPoint {
|
||||
|
||||
/**
|
||||
* canal destination
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
String destination() default "";
|
||||
|
||||
/**
|
||||
* database schema which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
String[] schema() default {};
|
||||
|
||||
/**
|
||||
* tables which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
String[] table() default {};
|
||||
|
||||
/**
|
||||
* canal event type
|
||||
* default for all
|
||||
* @return canal event type
|
||||
*/
|
||||
CanalEntry.EventType[] eventType() default {};
|
||||
|
||||
}
|
||||
|
|
@ -1,45 +0,0 @@
|
|||
package com.chushang.common.canal.annotation;
|
||||
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* ListenPoint for update
|
||||
*
|
||||
* @author chen.qian
|
||||
* @date 2018/3/19
|
||||
*/
|
||||
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@ListenPoint(eventType = CanalEntry.EventType.UPDATE)
|
||||
public @interface UpdateListenPoint {
|
||||
|
||||
/**
|
||||
* canal destination
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String destination() default "";
|
||||
|
||||
/**
|
||||
* database schema which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] schema() default {};
|
||||
|
||||
/**
|
||||
* tables which you are concentrate on
|
||||
* default for all
|
||||
* @return canal destination
|
||||
*/
|
||||
@AliasFor(annotation = ListenPoint.class)
|
||||
String[] table() default {};
|
||||
|
||||
}
|
||||
|
|
@ -1,108 +0,0 @@
|
|||
package com.chushang.common.canal.client;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.client.CanalConnectors;
|
||||
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||
import com.chushang.common.canal.client.transfer.TransponderFactory;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractCanalClient implements CanalClient {
|
||||
|
||||
/**
|
||||
* running flag
|
||||
*/
|
||||
private volatile boolean running;
|
||||
|
||||
/**
|
||||
* customer config
|
||||
*/
|
||||
private final CanalConfig canalConfig;
|
||||
|
||||
|
||||
/**
|
||||
* TransponderFactory
|
||||
*/
|
||||
protected final TransponderFactory factory;
|
||||
|
||||
AbstractCanalClient(CanalConfig canalConfig, TransponderFactory factory) {
|
||||
Objects.requireNonNull(canalConfig, "canalConfig can not be null!");
|
||||
Objects.requireNonNull(canalConfig, "transponderFactory can not be null!");
|
||||
this.canalConfig = canalConfig;
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
Map<String, CanalConfig.Instance> instanceMap = getConfig();
|
||||
for (Map.Entry<String, CanalConfig.Instance> instanceEntry : instanceMap.entrySet()) {
|
||||
process(processInstanceEntry(instanceEntry), instanceEntry);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* To initialize the canal connector
|
||||
* @param connector CanalConnector
|
||||
* @param config config
|
||||
*/
|
||||
protected abstract void process(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config);
|
||||
|
||||
private CanalConnector processInstanceEntry(Map.Entry<String, CanalConfig.Instance> instanceEntry) {
|
||||
CanalConfig.Instance instance = instanceEntry.getValue();
|
||||
CanalConnector connector;
|
||||
if (instance.isClusterEnabled()) {
|
||||
List<SocketAddress> addresses = new ArrayList<>();
|
||||
for (String s : instance.getZookeeperAddress()) {
|
||||
String[] entry = s.split(":");
|
||||
if (entry.length != 2)
|
||||
throw new CanalClientException("error parsing zookeeper address:" + s);
|
||||
addresses.add(new InetSocketAddress(entry[0], Integer.parseInt(entry[1])));
|
||||
}
|
||||
connector = CanalConnectors.newClusterConnector(addresses, instanceEntry.getKey(),
|
||||
instance.getUserName(),
|
||||
instance.getPassword());
|
||||
} else {
|
||||
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(instance.getHost(), instance.getPort()),
|
||||
instanceEntry.getKey(),
|
||||
instance.getUserName(),
|
||||
instance.getPassword());
|
||||
}
|
||||
return connector;
|
||||
}
|
||||
|
||||
/**
|
||||
* get the config
|
||||
*
|
||||
* @return config
|
||||
*/
|
||||
protected Map<String, CanalConfig.Instance> getConfig() {
|
||||
CanalConfig config = canalConfig;
|
||||
Map<String, CanalConfig.Instance> instanceMap;
|
||||
if (config != null && (instanceMap = config.getInstances()) != null && !instanceMap.isEmpty()) {
|
||||
return config.getInstances();
|
||||
} else {
|
||||
throw new CanalClientException("can not get the configuration of canal client!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
setRunning(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private void setRunning(boolean running) {
|
||||
this.running = running;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
package com.chushang.common.canal.client;
|
||||
|
||||
public interface CanalClient {
|
||||
|
||||
/**
|
||||
* open the canal client
|
||||
* to get the config and connect to the canal server (1 : 1 or 1 : n)
|
||||
* and then transfer the event to the special listener
|
||||
* */
|
||||
void start();
|
||||
|
||||
/**
|
||||
* stop the client
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* is running
|
||||
* @return yes or no
|
||||
*/
|
||||
boolean isRunning();
|
||||
}
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
package com.chushang.common.canal.client;
|
||||
|
||||
import com.chushang.common.canal.annotation.ListenPoint;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ListenerPoint {
|
||||
private final Object target;
|
||||
private final Map<Method, ListenPoint> invokeMap = new HashMap<>();
|
||||
|
||||
ListenerPoint(Object target, Method method, ListenPoint anno) {
|
||||
this.target = target;
|
||||
this.invokeMap.put(method, anno);
|
||||
}
|
||||
|
||||
public Object getTarget() {
|
||||
return target;
|
||||
}
|
||||
|
||||
public Map<Method, ListenPoint> getInvokeMap() {
|
||||
return invokeMap;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,85 +0,0 @@
|
|||
package com.chushang.common.canal.client;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.chushang.common.canal.annotation.CanalEventListener;
|
||||
import com.chushang.common.canal.util.BeanUtil;
|
||||
import com.chushang.common.canal.annotation.ListenPoint;
|
||||
import com.chushang.common.canal.client.transfer.TransponderFactory;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SimpleCanalClient extends AbstractCanalClient {
|
||||
private final static Logger logger = LoggerFactory.getLogger(SimpleCanalClient.class);
|
||||
|
||||
/**
|
||||
* executor
|
||||
*/
|
||||
private final ThreadPoolExecutor executor;
|
||||
|
||||
/**
|
||||
* listeners which are used by implementing the Interface
|
||||
*/
|
||||
private final List<com.chushang.common.canal.event.CanalEventListener> listeners = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* listeners which are used by annotation
|
||||
*/
|
||||
private final List<ListenerPoint> annoListeners = new ArrayList<>();
|
||||
|
||||
public SimpleCanalClient(CanalConfig canalConfig, TransponderFactory factory) {
|
||||
super(canalConfig, factory);
|
||||
executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||
60L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>(), Executors.defaultThreadFactory());
|
||||
initListeners();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void process(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config) {
|
||||
executor.submit(factory.newTransponder(connector, config, listeners, annoListeners));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
super.stop();
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* init listeners
|
||||
*/
|
||||
private void initListeners() {
|
||||
logger.info("{}: initializing the listeners....", Thread.currentThread().getName());
|
||||
List<com.chushang.common.canal.event.CanalEventListener> list = BeanUtil.getBeansOfType(com.chushang.common.canal.event.CanalEventListener.class);
|
||||
if (list != null) {
|
||||
listeners.addAll(list);
|
||||
}
|
||||
Map<String, Object> listenerMap = BeanUtil.getBeansWithAnnotation(CanalEventListener.class);
|
||||
if (listenerMap != null) {
|
||||
for (Object target : listenerMap.values()) {
|
||||
Method[] methods = target.getClass().getDeclaredMethods();
|
||||
for (Method method : methods) {
|
||||
ListenPoint l = AnnotationUtils.findAnnotation(method, ListenPoint.class);
|
||||
if (l != null) {
|
||||
annoListeners.add(new ListenerPoint(target, method, l));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("{}: initializing the listeners end.", Thread.currentThread().getName());
|
||||
if (logger.isWarnEnabled() && listeners.isEmpty() && annoListeners.isEmpty()) {
|
||||
logger.warn("{}: No listener found in context! ", Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import com.chushang.common.canal.event.CanalEventListener;
|
||||
import com.chushang.common.canal.annotation.ListenPoint;
|
||||
import com.chushang.common.canal.client.ListenerPoint;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public abstract class AbstractBasicMessageTransponder extends AbstractMessageTransponder {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(AbstractBasicMessageTransponder.class);
|
||||
|
||||
public AbstractBasicMessageTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners, List<ListenerPoint> annoListeners) {
|
||||
super(connector, config, listeners, annoListeners);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void distributeEvent(List<CanalEntry.Entry> entryList) {
|
||||
for (CanalEntry.Entry entry : entryList) {
|
||||
//ignore the transaction operations
|
||||
List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();
|
||||
if (ignoreEntryTypes != null
|
||||
&& ignoreEntryTypes.stream().anyMatch(t -> entry.getEntryType() == t)) {
|
||||
continue;
|
||||
}
|
||||
CanalEntry.RowChange rowChange;
|
||||
try {
|
||||
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
||||
} catch (Exception e) {
|
||||
throw new CanalClientException("ERROR ## parser of event has an error , data:" + entry.toString(),
|
||||
e);
|
||||
}
|
||||
//ignore the ddl operation
|
||||
if (rowChange.hasIsDdl() && rowChange.getIsDdl()) {
|
||||
processDdl(rowChange);
|
||||
continue;
|
||||
}
|
||||
CanalEntry.Header header = entry.getHeader();
|
||||
String tableName = header.getTableName();
|
||||
String schemaName = header.getSchemaName();
|
||||
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
||||
//distribute to listener interfaces
|
||||
distributeByImpl(rowChange.getEventType(),schemaName ,tableName, rowData);
|
||||
//distribute to annotation listener interfaces
|
||||
distributeByAnnotation(destination,
|
||||
schemaName,
|
||||
tableName,
|
||||
rowChange.getEventType(),
|
||||
rowData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* process the ddl event
|
||||
* @param rowChange rowChange
|
||||
*/
|
||||
protected void processDdl(CanalEntry.RowChange rowChange) {}
|
||||
|
||||
/**
|
||||
* distribute to listener interfaces
|
||||
*
|
||||
* @param eventType eventType
|
||||
* @param schemaName 指定数据库名称 --> 一般由filter 指定, 此处不应当判断
|
||||
* @param tableName 指定数据表名
|
||||
* @param rowData rowData
|
||||
*/
|
||||
protected void distributeByImpl(CanalEntry.EventType eventType, String schemaName, String tableName, CanalEntry.RowData rowData) {
|
||||
logger.info("schemaName : {}, tableName : {}", schemaName, tableName);
|
||||
if (listeners != null) {
|
||||
for (CanalEventListener listener : listeners) {
|
||||
if (tableName.equals(listener.tableName()) && schemaName.equals(listener.schemaName())){
|
||||
listener.onEvent(eventType, rowData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* distribute to annotation listener interfaces
|
||||
*
|
||||
* @param destination destination
|
||||
* @param schemaName schema
|
||||
* @param tableName table name
|
||||
* @param eventType event type
|
||||
* @param rowData row data
|
||||
*/
|
||||
protected void distributeByAnnotation(String destination,
|
||||
String schemaName,
|
||||
String tableName,
|
||||
CanalEntry.EventType eventType,
|
||||
CanalEntry.RowData rowData) {
|
||||
//invoke the listeners
|
||||
annoListeners.forEach(point -> point
|
||||
.getInvokeMap()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(getAnnotationFilter(destination, schemaName, tableName, eventType))
|
||||
.forEach(entry -> {
|
||||
Method method = entry.getKey();
|
||||
method.setAccessible(true);
|
||||
try {
|
||||
Object[] args = getInvokeArgs(method, eventType, rowData);
|
||||
method.invoke(point.getTarget(), args);
|
||||
} catch (Exception e) {
|
||||
logger.error("{}: Error occurred when invoke the listener's interface! class:{}, method:{}",
|
||||
Thread.currentThread().getName(),
|
||||
point.getTarget().getClass().getName(), method.getName());
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* get the filters predicate
|
||||
*
|
||||
* @param destination destination
|
||||
* @param schemaName schema
|
||||
* @param tableName table name
|
||||
* @param eventType event type
|
||||
* @return predicate
|
||||
*/
|
||||
protected abstract Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
|
||||
String schemaName,
|
||||
String tableName,
|
||||
CanalEntry.EventType eventType);
|
||||
|
||||
/**
|
||||
* get the args
|
||||
*
|
||||
* @param method method
|
||||
* @param eventType event type
|
||||
* @param rowData row data
|
||||
* @return args which will be used by invoking the annotation methods
|
||||
*/
|
||||
protected abstract Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType,
|
||||
CanalEntry.RowData rowData);
|
||||
|
||||
/**
|
||||
* get the ignore eventType list
|
||||
*
|
||||
* @return eventType list
|
||||
*/
|
||||
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,157 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import com.alibaba.otter.canal.protocol.Message;
|
||||
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
||||
import com.chushang.common.canal.client.ListenerPoint;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import com.chushang.common.canal.event.CanalEventListener;
|
||||
import com.chushang.common.core.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractMessageTransponder extends MessageTransponder {
|
||||
|
||||
/**
|
||||
* canal connector
|
||||
*/
|
||||
private final CanalConnector connector;
|
||||
|
||||
/**
|
||||
* custom config
|
||||
*/
|
||||
protected final CanalConfig.Instance config;
|
||||
|
||||
/**
|
||||
* destination of canal server
|
||||
*/
|
||||
protected final String destination;
|
||||
|
||||
/**
|
||||
* listeners which are used by implementing the Interface
|
||||
*/
|
||||
protected final List<CanalEventListener> listeners = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* listeners which are used by annotation
|
||||
*/
|
||||
protected final List<ListenerPoint> annoListeners = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* running flag
|
||||
*/
|
||||
private volatile boolean running = true;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AbstractMessageTransponder.class);
|
||||
|
||||
public AbstractMessageTransponder(CanalConnector connector,
|
||||
Map.Entry<String, CanalConfig.Instance> config,
|
||||
List<CanalEventListener> listeners,
|
||||
List<ListenerPoint> annoListeners) {
|
||||
Objects.requireNonNull(connector, "connector can not be null!");
|
||||
Objects.requireNonNull(config, "config can not be null!");
|
||||
this.connector = connector;
|
||||
this.destination = config.getKey();
|
||||
this.config = config.getValue();
|
||||
if (listeners != null)
|
||||
this.listeners.addAll(listeners);
|
||||
if (annoListeners != null)
|
||||
this.annoListeners.addAll(annoListeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// 在 run 时 才进行连接
|
||||
connect();
|
||||
int errorCount = config.getRetryCount();
|
||||
final long interval = config.getAcquireInterval();
|
||||
final String threadName = Thread.currentThread().getName();
|
||||
boolean interrupted = Thread.currentThread().isInterrupted();
|
||||
do {
|
||||
try {
|
||||
Message message = connector.getWithoutAck(config.getBatchSize());
|
||||
long batchId = message.getId();
|
||||
int size = message.getEntries().size();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{}: Get message from canal server >>>>> size:{}", threadName, size);
|
||||
}
|
||||
//empty message
|
||||
if (batchId == -1 || size == 0) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{}: Empty message... sleep for {} millis", threadName, interval);
|
||||
}
|
||||
Thread.sleep(interval);
|
||||
} else {
|
||||
distributeEvent(message.getEntries());
|
||||
}
|
||||
|
||||
// commit ack
|
||||
connector.ack(batchId);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{}: Ack message. batchId:{}", threadName, batchId);
|
||||
}
|
||||
} catch (CanalClientException e) {
|
||||
errorCount--;
|
||||
logger.error(threadName + ": Error occurred!! ", e);
|
||||
try {
|
||||
Thread.sleep(interval);
|
||||
} catch (InterruptedException e1) {
|
||||
errorCount = 0;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
errorCount = 0;
|
||||
connector.rollback();
|
||||
} finally {
|
||||
// 重试次数 为0, 不在进行重试, 等待重新连接
|
||||
if (errorCount <= 0) {
|
||||
stop();
|
||||
logger.info("{}: Topping the client.. ", Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
} while ((running && !interrupted));
|
||||
}
|
||||
|
||||
protected abstract void distributeEvent(List<CanalEntry.Entry> entryList);
|
||||
|
||||
private void connect() {
|
||||
connector.connect();
|
||||
// 此处 添加 过滤 ->
|
||||
if (StringUtils.isNotEmpty(config.getFilter())) {
|
||||
connector.subscribe(config.getFilter());
|
||||
} else {
|
||||
connector.subscribe();
|
||||
}
|
||||
connector.rollback();
|
||||
|
||||
logger.info("connector is connect");
|
||||
}
|
||||
|
||||
/**
|
||||
* stop running
|
||||
*/
|
||||
void stop() {
|
||||
// 此处应当就是失败了, 需要停止进行重试
|
||||
logger.info("{}: client stopped. ", Thread.currentThread().getName());
|
||||
running = false;
|
||||
if (null == connector) {
|
||||
return;
|
||||
}
|
||||
// 停止时, 关闭连接
|
||||
connector.disconnect();
|
||||
// 说明失败重试
|
||||
if (config.isErrorRetry()) {
|
||||
logger.info("connector restart");
|
||||
|
||||
long errorRetryTime = config.getErrorRetryTime();
|
||||
running = true;
|
||||
// 延时执行
|
||||
timer.schedule(this, errorRetryTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import com.chushang.common.canal.annotation.ListenPoint;
|
||||
import com.chushang.common.canal.client.ListenerPoint;
|
||||
import com.chushang.common.canal.event.CanalEventListener;
|
||||
import com.chushang.common.core.util.StringUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class DefaultMessageTransponder extends AbstractBasicMessageTransponder {
|
||||
|
||||
public DefaultMessageTransponder(CanalConnector connector,
|
||||
Map.Entry<String, CanalConfig.Instance> config,
|
||||
List<CanalEventListener> listeners,
|
||||
List<ListenerPoint> annoListeners) {
|
||||
super(connector, config, listeners, annoListeners);
|
||||
}
|
||||
|
||||
/**
|
||||
* get the filters predicate
|
||||
*
|
||||
* @param destination destination
|
||||
* @param schemaName schema
|
||||
* @param tableName table name
|
||||
* @param eventType event type
|
||||
* @return predicate
|
||||
*/
|
||||
@Override
|
||||
protected Predicate<Map.Entry<Method, ListenPoint>> getAnnotationFilter(String destination,
|
||||
String schemaName,
|
||||
String tableName,
|
||||
CanalEntry.EventType eventType) {
|
||||
Predicate<Map.Entry<Method, ListenPoint>> df = e -> StringUtils.isEmpty(e.getValue().destination())
|
||||
|| e.getValue().destination().equals(destination);
|
||||
|
||||
Predicate<Map.Entry<Method, ListenPoint>> sf = e -> e.getValue().schema().length == 0
|
||||
|| Arrays.asList(e.getValue().schema()).contains(schemaName);
|
||||
|
||||
Predicate<Map.Entry<Method, ListenPoint>> tf = e -> e.getValue().table().length == 0
|
||||
|| Arrays.asList(e.getValue().table()).contains(tableName);
|
||||
|
||||
Predicate<Map.Entry<Method, ListenPoint>> ef = e -> (e.getValue().eventType().length == 0)
|
||||
|| Arrays.stream(e.getValue().eventType()).anyMatch(ev -> ev == eventType);
|
||||
return df.and(sf).and(tf).and(ef);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object[] getInvokeArgs(Method method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
||||
return Arrays.stream(method.getParameterTypes())
|
||||
.map(p -> p == CanalEntry.EventType.class
|
||||
? eventType
|
||||
: p == CanalEntry.RowData.class
|
||||
? rowData : null)
|
||||
.toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
|
||||
return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, CanalEntry.EntryType.HEARTBEAT);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import com.chushang.common.canal.client.ListenerPoint;
|
||||
import com.chushang.common.canal.event.CanalEventListener;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DefaultTransponderFactory implements TransponderFactory {
|
||||
@Override
|
||||
public MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners,
|
||||
List<ListenerPoint> annoListeners) {
|
||||
return new DefaultMessageTransponder(connector, config, listeners, annoListeners);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
public abstract class MessageTransponder extends TimerTask {
|
||||
|
||||
public static final Timer timer = new Timer();
|
||||
|
||||
}
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
public class MessageTransponders {
|
||||
|
||||
public static TransponderFactory defaultMessageTransponder() {
|
||||
return new DefaultTransponderFactory();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
package com.chushang.common.canal.client.transfer;
|
||||
|
||||
import com.alibaba.otter.canal.client.CanalConnector;
|
||||
import com.chushang.common.canal.client.ListenerPoint;
|
||||
import com.chushang.common.canal.config.CanalConfig;
|
||||
import com.chushang.common.canal.event.CanalEventListener;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface TransponderFactory {
|
||||
|
||||
/**
|
||||
* @param connector connector
|
||||
* @param config config
|
||||
* @param listeners listeners
|
||||
* @param annoListeners annoListeners
|
||||
* @return MessageTransponder
|
||||
*/
|
||||
MessageTransponder newTransponder(CanalConnector connector, Map.Entry<String, CanalConfig.Instance> config, List<CanalEventListener> listeners,
|
||||
List<ListenerPoint> annoListeners);
|
||||
}
|
||||
|
|
@ -1,43 +0,0 @@
|
|||
package com.chushang.common.canal.config;
|
||||
|
||||
|
||||
import com.chushang.common.canal.client.SimpleCanalClient;
|
||||
import com.chushang.common.canal.client.transfer.MessageTransponders;
|
||||
import com.chushang.common.canal.util.BeanUtil;
|
||||
import com.chushang.common.canal.client.CanalClient;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
|
||||
|
||||
public class CanalClientAutoConfiguration {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(CanalClientAutoConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private CanalConfig canalConfig;
|
||||
|
||||
@Bean
|
||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||
public BeanUtil beanUtil() {
|
||||
return new BeanUtil();
|
||||
}
|
||||
|
||||
@Bean
|
||||
private CanalClient canalClient() {
|
||||
|
||||
CanalClient canalClient =
|
||||
new SimpleCanalClient(canalConfig, MessageTransponders.defaultMessageTransponder());
|
||||
|
||||
canalClient.start();
|
||||
|
||||
logger.info("Starting canal client....");
|
||||
|
||||
return canalClient;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -1,200 +0,0 @@
|
|||
package com.chushang.common.canal.config;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||
@RefreshScope
|
||||
@Configuration
|
||||
@ConfigurationProperties(prefix = "canal.client")
|
||||
public class CanalConfig {
|
||||
|
||||
/**
|
||||
* instance config
|
||||
*/
|
||||
private Map<String, Instance> instances = new LinkedHashMap<>();
|
||||
|
||||
public Map<String, Instance> getInstances() {
|
||||
return instances;
|
||||
}
|
||||
|
||||
public void setInstances(Map<String, Instance> instances) {
|
||||
this.instances = instances;
|
||||
}
|
||||
|
||||
/**
|
||||
* instance config class
|
||||
*/
|
||||
public static class Instance {
|
||||
|
||||
/**
|
||||
* is cluster-mod
|
||||
*/
|
||||
private boolean clusterEnabled;
|
||||
/**
|
||||
* zookeeper address
|
||||
*/
|
||||
private Set<String> zookeeperAddress = new LinkedHashSet<>();
|
||||
|
||||
/**
|
||||
* canal server host
|
||||
*/
|
||||
private String host = "127.0.0.1";
|
||||
|
||||
/**
|
||||
* canal server port
|
||||
*/
|
||||
private int port = 10001;
|
||||
|
||||
/**
|
||||
* canal user name
|
||||
*/
|
||||
private String userName = "";
|
||||
|
||||
/**
|
||||
* canal password
|
||||
*/
|
||||
private String password = "";
|
||||
|
||||
/**
|
||||
* size when get messages from the canal server
|
||||
*/
|
||||
private int batchSize = 1000;
|
||||
|
||||
/**
|
||||
* filter
|
||||
*/
|
||||
private String filter;
|
||||
|
||||
/**
|
||||
* retry count when error occurred
|
||||
* 单次异常 的重试次数
|
||||
*/
|
||||
private int retryCount = 5;
|
||||
|
||||
/**
|
||||
* 失败后重新启动 默认不进行失败重新连接
|
||||
*/
|
||||
private boolean errorRetry = false;
|
||||
|
||||
/**
|
||||
* 失败后间隔多长时间进行重启
|
||||
* 单位 ms
|
||||
* 默认10分钟 一次进行重新连接
|
||||
*/
|
||||
private long errorRetryTime = 10 * 60 * 1000;
|
||||
|
||||
/**
|
||||
* interval of the message-acquiring
|
||||
*/
|
||||
private long acquireInterval = 1000;
|
||||
|
||||
public Instance() {}
|
||||
|
||||
public boolean isClusterEnabled() {
|
||||
return clusterEnabled;
|
||||
}
|
||||
|
||||
public void setClusterEnabled(boolean clusterEnabled) {
|
||||
this.clusterEnabled = clusterEnabled;
|
||||
}
|
||||
|
||||
public Set<String> getZookeeperAddress() {
|
||||
return zookeeperAddress;
|
||||
}
|
||||
|
||||
public void setZookeeperAddress(Set<String> zookeeperAddress) {
|
||||
this.zookeeperAddress = zookeeperAddress;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public String getUserName() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
public void setUserName(String userName) {
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public int getBatchSize() {
|
||||
return batchSize;
|
||||
}
|
||||
|
||||
public void setBatchSize(int batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
public String getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
public void setFilter(String filter) {
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public void setRetryCount(int retryCount) {
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
||||
public long getAcquireInterval() {
|
||||
return acquireInterval;
|
||||
}
|
||||
|
||||
public void setAcquireInterval(long acquireInterval) {
|
||||
this.acquireInterval = acquireInterval;
|
||||
}
|
||||
|
||||
public boolean isErrorRetry() {
|
||||
return errorRetry;
|
||||
}
|
||||
|
||||
public void setErrorRetry(boolean errorRetry) {
|
||||
this.errorRetry = errorRetry;
|
||||
}
|
||||
|
||||
public long getErrorRetryTime() {
|
||||
return errorRetryTime;
|
||||
}
|
||||
|
||||
public void setErrorRetryTime(long errorRetryTime) {
|
||||
this.errorRetryTime = errorRetryTime;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
package com.chushang.common.canal.event;
|
||||
|
||||
import com.alibaba.otter.canal.protocol.CanalEntry;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public interface CanalEventListener {
|
||||
|
||||
default void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
||||
Objects.requireNonNull(eventType);
|
||||
switch (eventType) {
|
||||
case INSERT:
|
||||
onInsert(rowData);
|
||||
break;
|
||||
case UPDATE:
|
||||
onUpdate(rowData);
|
||||
break;
|
||||
case DELETE:
|
||||
onDelete(rowData);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* fired on insert event
|
||||
*
|
||||
* @param rowData rowData
|
||||
*/
|
||||
void onInsert(CanalEntry.RowData rowData);
|
||||
|
||||
/**
|
||||
* fired on update event
|
||||
*
|
||||
* @param rowData rowData
|
||||
*/
|
||||
void onUpdate(CanalEntry.RowData rowData);
|
||||
|
||||
/**
|
||||
* fired on delete event
|
||||
*
|
||||
* @param rowData rowData
|
||||
*/
|
||||
void onDelete(CanalEntry.RowData rowData);
|
||||
|
||||
String tableName();
|
||||
|
||||
String schemaName();
|
||||
|
||||
}
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
package com.chushang.common.canal.util;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
public class BeanUtil implements ApplicationContextAware {
|
||||
|
||||
private static ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
BeanUtil.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
public static <T> T getBean(Class<T> clazz) {
|
||||
T obj;
|
||||
try {
|
||||
obj = applicationContext.getBean(clazz);
|
||||
} catch (Exception e) {
|
||||
obj = null;
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
public static <T> List<T> getBeansOfType(Class<T> clazz) {
|
||||
Map<String, T> map;
|
||||
try {
|
||||
map = applicationContext.getBeansOfType(clazz);
|
||||
} catch (Exception e) {
|
||||
map = null;
|
||||
}
|
||||
return map == null ? null : new ArrayList<>(map.values());
|
||||
}
|
||||
|
||||
public static Map<String, Object> getBeansWithAnnotation(Class<? extends Annotation> anno) {
|
||||
Map<String, Object> map;
|
||||
try {
|
||||
map = applicationContext.getBeansWithAnnotation(anno);
|
||||
} catch (Exception e) {
|
||||
map = null;
|
||||
}
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
package com.chushang.common.mq.produce;
|
||||
|
||||
import apache.rocketmq.v2.Message;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.validation.constraints.NotBlank;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 14:18
|
||||
*/
|
||||
@Component
|
||||
public class MqProduceService<T> {
|
||||
|
||||
@Resource
|
||||
RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
private String getDesi(String topic, String tag){
|
||||
if (StringUtils.isNotBlank(tag)) topic = topic + ":" + tag;
|
||||
return topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送单向消息, 不关心发送结果
|
||||
*/
|
||||
public void sendOneWayMsg(@NotBlank String topic,@NotBlank T message){
|
||||
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送单向消息, 不关心发送结果
|
||||
*/
|
||||
public void sendOneWayMsg(@NotBlank String topic,String tag,@NotBlank T message){
|
||||
rocketMQTemplate.sendOneWay(getDesi(topic, tag), MessageBuilder.withPayload(message).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 topic 消息
|
||||
*/
|
||||
public void send(@NotBlank String topic, @NotBlank T message){
|
||||
rocketMQTemplate.send(topic, MessageBuilder.withPayload(message).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 带tag 的消息
|
||||
* tag 可为空
|
||||
*/
|
||||
public void send(@NotBlank String topic,String tag, @NotBlank T message){
|
||||
rocketMQTemplate.send(getDesi(topic, tag), MessageBuilder.withPayload(message).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送同步消息
|
||||
*/
|
||||
public SendResult sendMsg(@NotBlank String topic,@NotBlank T message){
|
||||
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message)
|
||||
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送同步消息
|
||||
*/
|
||||
public SendResult sendMsg(@NotBlank String topic,String tag,@NotBlank T message){
|
||||
return rocketMQTemplate.syncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送异步消息
|
||||
*/
|
||||
public void sendAsyncMsg(@NotBlank String topic,@NotBlank T message, @NotNull SendCallback sendCallback){
|
||||
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(),sendCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送异步消息
|
||||
*/
|
||||
public void sendAsyncMsg(@NotBlank String topic,String tag,@NotBlank T message, @NotNull SendCallback sendCallback){
|
||||
rocketMQTemplate.asyncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build(),sendCallback);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送延时消息
|
||||
* 延时消息一共分为18个等级1-18分别含义为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||
* delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||
* messageTimeOut: 超时时间, 用于重试
|
||||
*/
|
||||
public void sendDelayMsg(@NotBlank String topic, @NotBlank T message, int messageTimeOut,int delayLevel){
|
||||
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build(),messageTimeOut,delayLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送延时消息
|
||||
* 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||
* delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||
* messageTimeOut: 超时时间, 用于重试
|
||||
*/
|
||||
public void sendDelayMsg(@NotBlank String topic,String tag, @NotBlank T message,int messageTimeOut, int delayLevel){
|
||||
rocketMQTemplate.syncSend(getDesi(topic, tag), MessageBuilder.withPayload(message).build(),messageTimeOut,delayLevel);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -1,3 +1,2 @@
|
|||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.chushang.common.canal.config.CanalConfig, \
|
||||
com.chushang.common.canal.config.CanalClientAutoConfiguration
|
||||
com.chushang.common.mq.produce.MqProduceService
|
||||
|
|
|
|||
|
|
@ -34,5 +34,9 @@
|
|||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-data-scope</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>task-feign</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
package com.chushang.oss.consumer;
|
||||
|
||||
import com.chushang.common.core.constant.SecurityConstants;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import com.chushang.common.core.web.AjaxResult;
|
||||
import com.chushang.common.core.web.Result;
|
||||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.entity.dto.UpdateTaskDTO;
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import com.chushang.task.enums.TaskTypeEnum;
|
||||
import com.chushang.task.feign.RemoteTaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 16:59
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = ServiceConstant.OSS, consumerGroup = ServiceConstant.OSS_CONSUMER_GROUP)
|
||||
public class TaskConsumerService implements RocketMQListener<TaskInfo> {
|
||||
@Resource
|
||||
RemoteTaskService remoteTaskService;
|
||||
/**
|
||||
* 执行 下发的 后台任务
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(TaskInfo message) {
|
||||
remoteTaskService.updateTask(message.getTaskId(), UpdateTaskDTO.builder()
|
||||
.taskStatus(TaskStatusEnum.EXECUTING)
|
||||
.build(), SecurityConstants.INNER);
|
||||
// 通过 反射 去执行具体方法
|
||||
String className = message.getClassName();
|
||||
String methodName = message.getMethodName();
|
||||
String params = message.getParams();
|
||||
Integer taskType = message.getTaskType();
|
||||
UpdateTaskDTO updateTask = new UpdateTaskDTO();
|
||||
try {
|
||||
// 包名+类名
|
||||
Class classzz = Class.forName(className);
|
||||
// 获取构造器对象
|
||||
Constructor constructor = classzz.getConstructor();
|
||||
// 利用构造器对象创建一个对象
|
||||
Object o = constructor.newInstance();
|
||||
// 传递需要执行的方法
|
||||
Method method = classzz.getMethod(methodName, String.class);
|
||||
// 调用的方法有多个参数 Method method = classzz.getMethod("class1method",long.class,String.class,String.class);
|
||||
// 如果调用的方法有参数 invoke(o,param1,param2,param3,...)
|
||||
updateTask.setLastRunTime(LocalDateTime.now());
|
||||
Object invoke = method.invoke(o, params);
|
||||
if (invoke instanceof AjaxResult result){
|
||||
Integer code = (Integer) result.get(AjaxResult.DATA_TAG);
|
||||
if (result.isSuccess() && 200 == code){
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS);
|
||||
String data = (String)result.get(AjaxResult.DATA_TAG);
|
||||
updateTask.setLastRunResult("success");
|
||||
if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){
|
||||
updateTask.setRemark(data);
|
||||
}
|
||||
}else {
|
||||
String errMsg = (String)result.get(AjaxResult.MSG_TAG);
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
updateTask.setLastRunResult(errMsg);
|
||||
}
|
||||
}else if (invoke instanceof Result<?> result){
|
||||
if (result.isSuccess() && result.getCode() == 200){
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS);
|
||||
String data = (String)result.getData();
|
||||
updateTask.setLastRunResult("success");
|
||||
if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){
|
||||
updateTask.setRemark(data);
|
||||
}
|
||||
}else {
|
||||
String errMsg = result.getMsg();
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
updateTask.setLastRunResult(errMsg);
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
log.error("执行失败", e);
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append(e).append("\n");
|
||||
for (StackTraceElement s : e.getStackTrace()) {
|
||||
stringBuilder.append(s.toString()).append("\n");
|
||||
}
|
||||
updateTask.setLastRunResult(stringBuilder.toString());
|
||||
}
|
||||
// 执行完毕后 更新 任务状态
|
||||
remoteTaskService.updateTask(message.getTaskId(), updateTask, SecurityConstants.INNER);
|
||||
}
|
||||
}
|
||||
|
|
@ -93,4 +93,32 @@ management:
|
|||
# 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径
|
||||
logging:
|
||||
config: classpath:logback-nacos.xml
|
||||
|
||||
rocketmq:
|
||||
name-server: ${config.rocketmq.name-server}
|
||||
producer:
|
||||
# 生产者分组
|
||||
group: ${config.rocketmq.producer.group}
|
||||
# 发送消息超时时间, 单位毫秒, 默认3000
|
||||
send-message-timeout: 10000
|
||||
# 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B
|
||||
compress-message-body-threshold: 4096
|
||||
# 消息体的最大允许大小,默认为4*1024*1024B
|
||||
max-message-size: 4194304
|
||||
# 同步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-failed: 2
|
||||
# 异步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-async-failed: 2
|
||||
# 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false
|
||||
retry-next-server: false
|
||||
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
|
||||
secret-key: # Secret Key
|
||||
# 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
|
||||
enable-msg-trace: false
|
||||
# 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
|
||||
customized-trace-topic: RMQ_SYS_TRACE_TOPIC
|
||||
consumer:
|
||||
# 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
|
||||
listeners:
|
||||
# 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
test-consumer-group:
|
||||
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
|
|
|
|||
|
|
@ -29,6 +29,11 @@
|
|||
<artifactId>oss-feign</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>task-feign</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
|
|
|
|||
|
|
@ -31,6 +31,11 @@
|
|||
<artifactId>system-feign</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>task-feign</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<dependencies>
|
||||
|
|
|
|||
|
|
@ -37,5 +37,9 @@
|
|||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-dict</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>task-feign</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -1 +0,0 @@
|
|||
com.chushang.common.feign.ChushangFeignAutoConfiguration=
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
package com.chushang.system.component;
|
||||
|
||||
import com.chushang.common.log.constants.LogConstant;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import com.chushang.common.log.entity.SysLogEntity;
|
||||
import com.chushang.system.service.SysLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
|
@ -69,7 +69,7 @@ public class LogRedisComponent {
|
|||
Thread.currentThread().interrupt();
|
||||
}
|
||||
if (!redissonClient.isShutdown()){
|
||||
RQueue<SysLogEntity> list = redissonClient.getQueue(LogConstant.QUEUE_NAME);
|
||||
RQueue<SysLogEntity> list = redissonClient.getQueue(ServiceConstant.QUEUE_NAME);
|
||||
List<SysLogEntity> reqList = list.poll(5000);
|
||||
logService.saveBatch(reqList);
|
||||
}else {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,102 @@
|
|||
package com.chushang.system.consumer;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.chushang.common.core.constant.SecurityConstants;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import com.chushang.common.core.web.AjaxResult;
|
||||
import com.chushang.common.core.web.Result;
|
||||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.entity.dto.UpdateTaskDTO;
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import com.chushang.task.enums.TaskTypeEnum;
|
||||
import com.chushang.task.feign.RemoteTaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 16:59
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = ServiceConstant.SYSTEM, consumerGroup = ServiceConstant.SYSTEM_CONSUMER_GROUP)
|
||||
public class TaskConsumerService implements RocketMQListener<TaskInfo> {
|
||||
@Resource
|
||||
RemoteTaskService remoteTaskService;
|
||||
/**
|
||||
* 执行 下发的 后台任务
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(TaskInfo message) {
|
||||
remoteTaskService.updateTask(message.getTaskId(), UpdateTaskDTO.builder()
|
||||
.taskStatus(TaskStatusEnum.EXECUTING)
|
||||
.build(), SecurityConstants.INNER);
|
||||
// 通过 反射 去执行具体方法
|
||||
String className = message.getClassName();
|
||||
String methodName = message.getMethodName();
|
||||
String params = message.getParams();
|
||||
Integer taskType = message.getTaskType();
|
||||
UpdateTaskDTO updateTask = new UpdateTaskDTO();
|
||||
try {
|
||||
// 包名+类名
|
||||
Class classzz = Class.forName(className);
|
||||
// 获取构造器对象
|
||||
Constructor constructor = classzz.getConstructor();
|
||||
// 利用构造器对象创建一个对象
|
||||
Object o = constructor.newInstance();
|
||||
// 传递需要执行的方法
|
||||
Method method = classzz.getMethod(methodName, String.class);
|
||||
// 调用的方法有多个参数 Method method = classzz.getMethod("class1method",long.class,String.class,String.class);
|
||||
// 如果调用的方法有参数 invoke(o,param1,param2,param3,...)
|
||||
updateTask.setLastRunTime(LocalDateTime.now());
|
||||
Object invoke = method.invoke(o, params);
|
||||
if (invoke instanceof AjaxResult result){
|
||||
Integer code = (Integer) result.get(AjaxResult.DATA_TAG);
|
||||
if (result.isSuccess() && 200 == code){
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS);
|
||||
String data = (String)result.get(AjaxResult.DATA_TAG);
|
||||
updateTask.setLastRunResult("success");
|
||||
if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){
|
||||
updateTask.setRemark(data);
|
||||
}
|
||||
}else {
|
||||
String errMsg = (String)result.get(AjaxResult.MSG_TAG);
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
updateTask.setLastRunResult(errMsg);
|
||||
}
|
||||
}else if (invoke instanceof Result<?> result){
|
||||
if (result.isSuccess() && result.getCode() == 200){
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_SUCCESS);
|
||||
String data = (String)result.getData();
|
||||
updateTask.setLastRunResult("success");
|
||||
if (Objects.equals(taskType, TaskTypeEnum.DOWN.getCode())){
|
||||
updateTask.setRemark(data);
|
||||
}
|
||||
}else {
|
||||
String errMsg = result.getMsg();
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
updateTask.setLastRunResult(errMsg);
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
log.error("执行失败", e);
|
||||
updateTask.setTaskStatus(TaskStatusEnum.EXECUTE_FAIL);
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append(e).append("\n");
|
||||
for (StackTraceElement s : e.getStackTrace()) {
|
||||
stringBuilder.append(s.toString()).append("\n");
|
||||
}
|
||||
updateTask.setLastRunResult(stringBuilder.toString());
|
||||
}
|
||||
// 执行完毕后 更新 任务状态
|
||||
remoteTaskService.updateTask(message.getTaskId(), updateTask, SecurityConstants.INNER);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
package com.chushang.system.service;
|
||||
|
||||
import com.chushang.common.core.exception.ResultException;
|
||||
import com.chushang.common.core.web.Result;
|
||||
import com.chushang.system.entity.bo.LoginBody;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 17:26
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TaskServiceTest {
|
||||
|
||||
public Result<String> test(String code, LoginBody loginBody)
|
||||
{
|
||||
|
||||
log.info("code {}", code);
|
||||
log.info("loginBody {}", loginBody);
|
||||
return Result.ok("test");
|
||||
}
|
||||
|
||||
public Result<String> testError(String code, LoginBody loginBody)
|
||||
{
|
||||
log.info("code {}", code);
|
||||
log.info("loginBody {}", loginBody);
|
||||
|
||||
return Result.failed("test", "test");
|
||||
}
|
||||
|
||||
public void testError2(String code, LoginBody loginBody)
|
||||
{
|
||||
|
||||
log.info("code {}", code);
|
||||
log.info("loginBody {}", loginBody);
|
||||
throw new ResultException("error");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -1,18 +1,14 @@
|
|||
package com.chushang.system.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.chushang.common.log.constants.LogConstant;
|
||||
import com.chushang.common.log.entity.SysLogEntity;
|
||||
import com.chushang.system.mapper.SysLogMapper;
|
||||
import com.chushang.system.service.SysLogService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RQueue;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* by zhaowenyuan create 2022/3/16 15:35
|
||||
|
|
|
|||
|
|
@ -93,7 +93,32 @@ management:
|
|||
# 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径
|
||||
logging:
|
||||
config: classpath:logback-nacos.xml
|
||||
pagehelper:
|
||||
async-count:
|
||||
banner: false
|
||||
|
||||
rocketmq:
|
||||
name-server: ${config.rocketmq.name-server}
|
||||
producer:
|
||||
# 生产者分组
|
||||
group: ${config.rocketmq.producer.group}
|
||||
# 发送消息超时时间, 单位毫秒, 默认3000
|
||||
send-message-timeout: 10000
|
||||
# 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B
|
||||
compress-message-body-threshold: 4096
|
||||
# 消息体的最大允许大小,默认为4*1024*1024B
|
||||
max-message-size: 4194304
|
||||
# 同步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-failed: 2
|
||||
# 异步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-async-failed: 2
|
||||
# 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false
|
||||
retry-next-server: false
|
||||
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
|
||||
secret-key: # Secret Key
|
||||
# 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
|
||||
enable-msg-trace: false
|
||||
# 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
|
||||
customized-trace-topic: RMQ_SYS_TRACE_TOPIC
|
||||
consumer:
|
||||
# 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
|
||||
listeners:
|
||||
# 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
test-consumer-group:
|
||||
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
|
|
|
|||
|
|
@ -1,10 +1,17 @@
|
|||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.chushang.common.core.constant.SecurityConstants;
|
||||
import com.chushang.security.service.TokenService;
|
||||
import com.chushang.SystemApplication;
|
||||
import com.chushang.system.entity.bo.LoginBody;
|
||||
import com.chushang.system.entity.po.SysMenu;
|
||||
import com.chushang.security.entity.po.SysUser;
|
||||
import com.chushang.system.entity.vo.RouterVo;
|
||||
import com.chushang.system.service.*;
|
||||
import com.chushang.task.entity.dto.CreateTaskDTO;
|
||||
import com.chushang.task.enums.ServiceEnum;
|
||||
import com.chushang.task.enums.TaskTypeEnum;
|
||||
import com.chushang.task.feign.RemoteTaskService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
|
@ -12,6 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
|
|
@ -22,67 +30,25 @@ import java.util.*;
|
|||
@SpringBootTest(classes = SystemApplication.class)
|
||||
public class DemoTest {
|
||||
|
||||
@Autowired
|
||||
ISysMenuService menuService;
|
||||
@Autowired
|
||||
ISysDeptService sysDeptService;
|
||||
@Autowired
|
||||
ISysUserService sysUserService;
|
||||
@Autowired
|
||||
ISysRoleService sysRoleService;
|
||||
@Autowired
|
||||
ISysPermissionService permissionService;
|
||||
@Autowired
|
||||
TokenService tokenService;
|
||||
|
||||
// @Test
|
||||
// public void test(){
|
||||
// String username = "zhaowenyuan@31gamestudio.com";
|
||||
//// String username = "admin";
|
||||
//
|
||||
// SysUser sysUser = sysUserService.selectUserByUserName(username);
|
||||
// if (StringUtils.isNull(sysUser)) {
|
||||
// log.error("密码错误");
|
||||
// }
|
||||
//
|
||||
// // 角色集合
|
||||
// Set<String> roles = permissionService.getRolePermission(sysUser);
|
||||
// // 权限集合
|
||||
// Set<String> permissions = permissionService.getMenuPermission(sysUser);
|
||||
//
|
||||
// LoginUser sysUserVo = new LoginUser();
|
||||
// sysUserVo.setUserId(sysUser.getUserId());
|
||||
// sysUserVo.setUsername(sysUser.getUsername());
|
||||
// sysUserVo.setSysUser(sysUser);
|
||||
// sysUserVo.setRoles(roles);
|
||||
// sysUserVo.setPermissions(permissions);
|
||||
//
|
||||
// List<SysUserData> userDataList = userDataService.listUserData(sysUser);
|
||||
// // 默认传递一个空Map
|
||||
// sysUserVo.setAuthDataMap(new HashMap<>());
|
||||
// if (CollectionUtil.isNotEmpty(userDataList)){
|
||||
// Map<String, Set<String>> authDataMap = new HashMap<>();
|
||||
// // 对应的数据权限
|
||||
// for (SysUserData sysUserData : userDataList) {
|
||||
// authDataMap.put(sysUserData.getDataType().getCodeType(),
|
||||
// new HashSet<>(JSONUtil.toList(sysUserData.getDataValue(), String.class)));
|
||||
// }
|
||||
// sysUserVo.setAuthDataMap(authDataMap);
|
||||
// }
|
||||
//
|
||||
// log.info("{}", JSONUtil.toJsonStr(sysUserVo));
|
||||
// }
|
||||
|
||||
@Resource
|
||||
RemoteTaskService remoteTaskService;
|
||||
|
||||
@Test
|
||||
public void menuTest(){
|
||||
// List<SysMenu> menus = menuService.list(
|
||||
// new LambdaQueryWrapper<SysMenu>()
|
||||
// .eq(SysMenu::getMenuId, 2136)
|
||||
// );
|
||||
List<SysMenu> menus = menuService.selectMenuTreeByUserId(new SysUser(1l));
|
||||
List<RouterVo> routerVos = menuService.buildMenus(menus);
|
||||
log.info("{}", JSONUtil.toJsonStr(routerVos));
|
||||
LoginBody loginBody = new LoginBody();
|
||||
loginBody.setPassword("1");
|
||||
loginBody.setUsername("2");
|
||||
remoteTaskService.createTask(CreateTaskDTO.builder()
|
||||
.params(JSON.toJSONString(loginBody))
|
||||
.taskName("测试后台任务")
|
||||
.applicationName(ServiceEnum.SYSTEM)
|
||||
.methodName("methodName")
|
||||
.className(this.getClass().getName())
|
||||
.deptId(1L)
|
||||
.taskType(TaskTypeEnum.OTHER)
|
||||
.remark("remark")
|
||||
.createBy("system")
|
||||
.build(), SecurityConstants.INNER);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,10 +13,6 @@
|
|||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-feign</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-mybatis</artifactId>
|
||||
</dependency>
|
||||
<!-- 权限控制 -->
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
|
|
@ -26,5 +22,13 @@
|
|||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-dict</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-mq</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.chushang</groupId>
|
||||
<artifactId>chushang-common-log</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@ import com.baomidou.mybatisplus.annotation.IdType;
|
|||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.chushang.common.dict.annotation.DictFormat;
|
||||
import com.chushang.common.mybatis.base.BaseEntity;
|
||||
import com.chushang.task.enums.ServiceEnum;
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import lombok.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
|
@ -26,69 +28,61 @@ public class TaskInfo extends BaseEntity {
|
|||
*/
|
||||
@TableId(value = "task_id", type = IdType.ASSIGN_ID)
|
||||
private Long taskId;
|
||||
|
||||
/**
|
||||
* 后台任务名称
|
||||
*/
|
||||
@TableField(value = "task_name")
|
||||
private String taskName;
|
||||
|
||||
/**
|
||||
* 所属服务名
|
||||
* 取 ServiceEnum 的 value
|
||||
*/
|
||||
@TableField(value = "application_name")
|
||||
private String applicationName;
|
||||
|
||||
/**
|
||||
* 方法名称
|
||||
*/
|
||||
@TableField(value = "method_name")
|
||||
private String methodName;
|
||||
|
||||
/**
|
||||
* 类名
|
||||
*/
|
||||
@TableField(value = "class_name")
|
||||
private String className;
|
||||
|
||||
/**
|
||||
* 任务状态
|
||||
* 字典表 task_status
|
||||
*
|
||||
*/
|
||||
@TableField(value = "task_status")
|
||||
@DictFormat(dictType = "task_status")
|
||||
private Integer taskStatus;
|
||||
|
||||
/**
|
||||
* 后台任务类型
|
||||
*/
|
||||
@TableField(value = "task_type")
|
||||
@DictFormat(dictType = "task_type")
|
||||
private Integer taskType;
|
||||
/**
|
||||
* 创建人
|
||||
*/
|
||||
@TableField(value = "create_by")
|
||||
private String createBy;
|
||||
|
||||
/**
|
||||
* 修改人
|
||||
*/
|
||||
@TableField(value = "update_by")
|
||||
private String updateBy;
|
||||
|
||||
/**
|
||||
* 后台任务执行参数
|
||||
*/
|
||||
@TableField(value = "params")
|
||||
private String params;
|
||||
|
||||
/**
|
||||
* 备注信息
|
||||
*/
|
||||
@TableField(value = "remark")
|
||||
private String remark;
|
||||
|
||||
/**
|
||||
* 导出或者下载时, 对应的返回的文件路径
|
||||
*/
|
||||
@TableField(value = "down_url")
|
||||
private String downUrl;
|
||||
/**
|
||||
* 最后一次执行时间
|
||||
*/
|
||||
|
|
@ -99,9 +93,6 @@ public class TaskInfo extends BaseEntity {
|
|||
*/
|
||||
@TableField("last_run_result")
|
||||
private String lastRunResult;
|
||||
|
||||
@TableField("error_message")
|
||||
private String errorMessage;
|
||||
/**
|
||||
* 部门id -- 根据部门查看区分
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,13 +1,20 @@
|
|||
package com.chushang.task.entity.dto;
|
||||
|
||||
import com.chushang.task.enums.ServiceEnum;
|
||||
import com.chushang.task.enums.TaskTypeEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/11 10:19
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class CreateTaskDTO {
|
||||
|
||||
/**
|
||||
|
|
@ -39,6 +46,10 @@ public class CreateTaskDTO {
|
|||
* 备注信息
|
||||
*/
|
||||
private String remark;
|
||||
/**
|
||||
* 后台任务类型
|
||||
*/
|
||||
private TaskTypeEnum taskType;
|
||||
/**
|
||||
* 部门id
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
package com.chushang.task.entity.dto;
|
||||
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
|
|
@ -9,32 +13,28 @@ import java.time.LocalDateTime;
|
|||
* @date: 2024/6/11 10:27
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class UpdateTaskDTO {
|
||||
/**
|
||||
* 任务状态
|
||||
* 字典表 task_status
|
||||
*
|
||||
*/
|
||||
private Integer taskStatus;
|
||||
/**
|
||||
* 导出或者下载时, 对应的返回的文件路径
|
||||
*/
|
||||
private String downUrl;
|
||||
private TaskStatusEnum taskStatus;
|
||||
/**
|
||||
* 最后一次执行时间
|
||||
*/
|
||||
private LocalDateTime lastRunTime;
|
||||
/**
|
||||
* 最后一次执行结果
|
||||
* 成功时 填写 对应的路径
|
||||
* 失败填写失败原因
|
||||
*/
|
||||
private String lastRunResult;
|
||||
/**
|
||||
* 错误信息
|
||||
* 备注信息
|
||||
*/
|
||||
private String errorMessage;
|
||||
/**
|
||||
* 修改人
|
||||
*/
|
||||
private String updateBy;
|
||||
|
||||
private String remark;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package com.chushang.task.enums;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IEnum;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
|
|
@ -14,23 +15,23 @@ public enum ServiceEnum {
|
|||
/**
|
||||
* 系统
|
||||
*/
|
||||
SYSTEM("system-service"),
|
||||
SYSTEM(ServiceConstant.SYSTEM),
|
||||
/**
|
||||
* 授权
|
||||
*/
|
||||
AUTH("auth-service"),
|
||||
AUTH(ServiceConstant.AUTH),
|
||||
/**
|
||||
* 文件
|
||||
*/
|
||||
OSS("oss-service"),
|
||||
OSS(ServiceConstant.OSS),
|
||||
/**
|
||||
* 工单
|
||||
*/
|
||||
WRK("wrk-service"),
|
||||
WRK(ServiceConstant.WRK),
|
||||
/**
|
||||
* 后台任务
|
||||
*/
|
||||
TASK("task-service"),
|
||||
TASK(ServiceConstant.TASK),
|
||||
;
|
||||
private final String serviceName;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
package com.chushang.task.enums;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 16:34
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TaskStatusEnum implements IEnum<Integer> {
|
||||
CREATE(0,"新建"),
|
||||
DISPATCH(1, "已下发"),
|
||||
EXECUTING(2, "执行中"),
|
||||
|
||||
EXECUTE_FAIL(3, "执行失败"),
|
||||
EXECUTE_SUCCESS(4, "执行成功"),
|
||||
|
||||
CANCEL(5, "已取消"),
|
||||
;
|
||||
private final Integer code;
|
||||
private final String desc;
|
||||
|
||||
@Override
|
||||
public Integer getValue() {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
package com.chushang.task.enums;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 18:02
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum TaskTypeEnum implements IEnum<Integer> {
|
||||
/**
|
||||
* 导出时, 需要取remark 为导出文件路径
|
||||
*/
|
||||
DOWN(1, "导出任务"),
|
||||
UPLOAD(2, "上传任务"),
|
||||
OTHER(3, "其他后台任务"),
|
||||
;
|
||||
|
||||
private final Integer code;
|
||||
private final String desc;
|
||||
|
||||
@Override
|
||||
public Integer getValue() {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
package com.chushang.task.consumer;
|
||||
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/12 15:03
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = ServiceConstant.TASK, consumerGroup = ServiceConstant.TASK_CONSUMER_GROUP)
|
||||
public class TaskConsumerService implements RocketMQListener<String> {
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
log.info("msg {}", message);
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ import com.chushang.security.annotation.RequiresPermissions;
|
|||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.service.TaskInfoService;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
|
|
@ -31,4 +32,13 @@ public class TaskController {
|
|||
return AjaxResult.success(pageResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 后台任务信息
|
||||
*/
|
||||
@GetMapping(value = "/{taskId}")
|
||||
@RequiresPermissions("system:task:info")
|
||||
public AjaxResult info(@PathVariable Long taskId){
|
||||
return AjaxResult.success(taskInfoService.getById(taskId));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,15 @@
|
|||
package com.chushang.task.remote;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.chushang.common.core.web.Result;
|
||||
import com.chushang.common.mq.produce.MqProduceService;
|
||||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.entity.dto.CreateTaskDTO;
|
||||
import com.chushang.task.entity.dto.UpdateTaskDTO;
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import com.chushang.task.feign.RemoteTaskService;
|
||||
import com.chushang.task.service.TaskInfoService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
|
@ -14,6 +18,7 @@ import javax.annotation.Resource;
|
|||
* @auther: zhao
|
||||
* @date: 2024/6/11 10:28
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping(value = "/remote")
|
||||
public class RemoteTaskController implements RemoteTaskService {
|
||||
|
|
@ -32,13 +37,17 @@ public class RemoteTaskController implements RemoteTaskService {
|
|||
.className(task.getClassName())
|
||||
.methodName(task.getMethodName())
|
||||
.params(task.getParams())
|
||||
.taskStatus(0)
|
||||
.taskStatus(TaskStatusEnum.CREATE.getCode())
|
||||
.deptId(task.getDeptId())
|
||||
.taskType(task.getTaskType().getCode())
|
||||
.taskName(task.getTaskName())
|
||||
.build();
|
||||
boolean save = taskInfoService.save(taskInfo);
|
||||
// todo 需要创建队列数据
|
||||
|
||||
// 如果 入库成功, 则将其发送到消息队列中
|
||||
if (save){
|
||||
log.info("消息发送队列 {}", taskInfo.getTaskId());
|
||||
taskInfoService.joinQueue(taskInfo);
|
||||
}
|
||||
return Result.ok(save ? taskInfo.getTaskId() : null);
|
||||
}
|
||||
|
||||
|
|
@ -49,12 +58,11 @@ public class RemoteTaskController implements RemoteTaskService {
|
|||
{
|
||||
TaskInfo taskInfo = TaskInfo.builder()
|
||||
.taskId(taskId)
|
||||
.updateBy(task.getUpdateBy())
|
||||
.taskStatus(task.getTaskStatus())
|
||||
.downUrl(task.getDownUrl())
|
||||
.errorMessage(task.getErrorMessage())
|
||||
.updateBy("system")
|
||||
.taskStatus(task.getTaskStatus().getCode())
|
||||
.lastRunResult(task.getLastRunResult())
|
||||
.lastRunTime(task.getLastRunTime())
|
||||
.remark(task.getRemark())
|
||||
.build();
|
||||
return Result.ok(taskInfoService.updateById(taskInfo));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,21 +1,40 @@
|
|||
package com.chushang.task.service;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.chushang.common.core.constant.ServiceConstant;
|
||||
import com.chushang.common.mq.produce.MqProduceService;
|
||||
import com.chushang.common.mybatis.page.CommonParam;
|
||||
import com.chushang.common.mybatis.utils.PageResult;
|
||||
import com.chushang.datascope.annotation.DataScope;
|
||||
import com.chushang.datascope.constants.ScopeConstants;
|
||||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.enums.TaskStatusEnum;
|
||||
import com.chushang.task.mapper.TaskInfoMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/7 18:19
|
||||
*/
|
||||
public interface TaskInfoService extends IService<TaskInfo> {
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TaskInfoService extends ServiceImpl<TaskInfoMapper, TaskInfo> implements IService<TaskInfo> {
|
||||
|
||||
@Resource
|
||||
MqProduceService<TaskInfo> mqProduceService;
|
||||
|
||||
@DataScope(tableAlias = "t")
|
||||
default PageResult pagePostList(TaskInfo sysPost, CommonParam commonParam){
|
||||
public PageResult pagePostList(TaskInfo sysPost, CommonParam commonParam){
|
||||
IPage<TaskInfo> page = this.page(
|
||||
new com.baomidou.mybatisplus.extension.plugins.pagination.Page<>(commonParam.getPage(), commonParam.getLimit()),
|
||||
buildWrapper(sysPost, commonParam)
|
||||
|
|
@ -23,5 +42,24 @@ public interface TaskInfoService extends IService<TaskInfo> {
|
|||
return new PageResult(page);
|
||||
}
|
||||
|
||||
Wrapper<TaskInfo> buildWrapper(TaskInfo sysPost, CommonParam commonParam);
|
||||
public Wrapper<TaskInfo> buildWrapper(TaskInfo taskInfo, CommonParam commonParam) {
|
||||
return new QueryWrapper<TaskInfo>()
|
||||
.orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy())
|
||||
.lambda()
|
||||
.eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId())
|
||||
.like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName())
|
||||
.eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName())
|
||||
.eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus())
|
||||
.like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName())
|
||||
.like(StringUtils.isNotEmpty(taskInfo.getMethodName()), TaskInfo::getMethodName, taskInfo.getMethodName())
|
||||
.in(ObjectUtils.isNotEmpty(taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)), TaskInfo::getDeptId, taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE));
|
||||
}
|
||||
|
||||
public void joinQueue(TaskInfo taskInfo){
|
||||
mqProduceService.send(taskInfo.getApplicationName(), taskInfo);
|
||||
|
||||
taskInfo.setTaskStatus(TaskStatusEnum.DISPATCH.getCode());
|
||||
updateById(taskInfo);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,38 +0,0 @@
|
|||
package com.chushang.task.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.Wrapper;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.chushang.common.core.constant.SecurityConstants;
|
||||
import com.chushang.common.mybatis.page.CommonParam;
|
||||
import com.chushang.datascope.constants.ScopeConstants;
|
||||
import com.chushang.task.entity.TaskInfo;
|
||||
import com.chushang.task.mapper.TaskInfoMapper;
|
||||
import com.chushang.task.service.TaskInfoService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @auther: zhao
|
||||
* @date: 2024/6/7 18:19
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class TaskInfoServiceImpl extends ServiceImpl<TaskInfoMapper, TaskInfo> implements TaskInfoService {
|
||||
|
||||
@Override
|
||||
public Wrapper<TaskInfo> buildWrapper(TaskInfo taskInfo, CommonParam commonParam) {
|
||||
return new QueryWrapper<TaskInfo>()
|
||||
.orderBy(true, "asc".equals(commonParam.getIsAsc()), commonParam.getOrderBy())
|
||||
.lambda()
|
||||
.eq(null != taskInfo.getTaskId(), TaskInfo::getTaskId, taskInfo.getTaskId())
|
||||
.like(null != taskInfo.getApplicationName(), TaskInfo::getApplicationName, taskInfo.getApplicationName())
|
||||
.eq(StringUtils.isNotEmpty(taskInfo.getTaskName()), TaskInfo::getTaskName, taskInfo.getTaskName())
|
||||
.eq(null != taskInfo.getTaskStatus(), TaskInfo::getTaskStatus, taskInfo.getTaskStatus())
|
||||
.like(StringUtils.isNotEmpty(taskInfo.getClassName()), TaskInfo::getClassName, taskInfo.getClassName())
|
||||
.like(StringUtils.isNotEmpty(taskInfo.getMethodName()), TaskInfo::getMethodName, taskInfo.getMethodName())
|
||||
.in(ObjectUtils.isNotEmpty(taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE)), TaskInfo::getDeptId, taskInfo.getSqlParam().get(ScopeConstants.DATA_SCOPE));
|
||||
}
|
||||
}
|
||||
|
|
@ -93,4 +93,34 @@ management:
|
|||
# 日志监听配置 -- 如果启用的话,需要配置 kafka 的路径
|
||||
logging:
|
||||
config: classpath:logback-nacos.xml
|
||||
rocketmq:
|
||||
name-server: ${config.rocketmq.name-server}
|
||||
producer:
|
||||
# 生产者分组
|
||||
group: ${config.rocketmq.producer.group}
|
||||
# 发送消息超时时间, 单位毫秒, 默认3000
|
||||
send-message-timeout: 10000
|
||||
# 消息压缩阈值, 当消息体的大小超过该阈值时进行消息压缩,默认为4*1024B
|
||||
compress-message-body-threshold: 4096
|
||||
# 消息体的最大允许大小,默认为4*1024*1024B
|
||||
max-message-size: 4194304
|
||||
# 同步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-failed: 2
|
||||
# 异步发送消息时,失败重试次数,默认2
|
||||
retry-times-when-send-async-failed: 2
|
||||
# 发送消息给broker 时,发送失败是否重试另一台broker, 默认为false
|
||||
retry-next-server: false
|
||||
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
|
||||
secret-key: # Secret Key
|
||||
# 是否开启消息轨迹功能,默认为false 可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
|
||||
enable-msg-trace: false
|
||||
# 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
|
||||
customized-trace-topic: RMQ_SYS_TRACE_TOPIC
|
||||
consumer:
|
||||
# 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
|
||||
listeners:
|
||||
# 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
test-consumer-group:
|
||||
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue