18 changed files with 270 additions and 426 deletions
-
125common/src/main/java/com/inscloudtech/alog/common/model/ActionLogMessage.java
-
5common/src/main/java/com/inscloudtech/alog/common/model/TracerData.java
-
2config/config-nacos/src/main/java/com/inscloudtech/alog/nacos/NacosListener.java
-
45example/pom.xml
-
12example/src/main/java/com/inscloudtech/alog/clientdemo/aspectj/ActionLogAspect.java
-
100example/src/main/java/com/inscloudtech/alog/clientdemo/custom/CustomConfigurator.java
-
20example/src/main/java/com/inscloudtech/alog/clientdemo/custom/Starter.java
-
45example/src/main/java/com/inscloudtech/alog/clientdemo/demo/controller/DemoUserController.java
-
5example/src/main/java/com/inscloudtech/alog/clientdemo/demo/domain/DemoUser.java
-
130example/src/main/java/com/inscloudtech/alog/clientdemo/demo/domain/R.java
-
4example/src/main/java/com/inscloudtech/alog/clientdemo/demo/service/impl/DemoUserServiceImpl.java
-
30example/src/main/java/com/inscloudtech/alog/clientdemo/utils/BeanCopyUtils.java
-
5example/src/main/java/com/inscloudtech/alog/clientdemo/utils/StringUtils.java
-
5example/src/main/resources/application.yml
-
6worker/pom.xml
-
116worker/src/main/java/com/inscloudtech/alog/worker/store/ActionLogToDbStore.java
-
7worker/src/main/resources/application.yml
-
34worker/src/main/resources/jlog.sql
@ -1,50 +1,50 @@ |
|||
package com.inscloudtech.alog.clientdemo.custom; |
|||
|
|||
import com.inscloudtech.alog.core.ConfiguratorFactory; |
|||
import com.inscloudtech.alog.core.FileConfigurator; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.stream.Collectors; |
|||
|
|||
/** |
|||
* @author tangbohu |
|||
* @version 1.0.0 |
|||
* @desc 基于redis实现自定义配置器示例,主要用于重写获取workers实现(毕竟是动态的,文件配置器不好处理) |
|||
* @ClassName CustomConfigurator.java |
|||
* @createTime 2022年03月21日 11:37:00 |
|||
*/ |
|||
@Component |
|||
public class CustomConfigurator extends FileConfigurator { |
|||
private CustomConfigurator() throws IOException { |
|||
super(); |
|||
/** |
|||
* 由于配置器构建是static方法,在spring注入之前已经完成了 这里自定义配置器需要手动覆盖 |
|||
*/ |
|||
ConfiguratorFactory.cover(this); |
|||
} |
|||
|
|||
|
|||
@Autowired |
|||
private RedisUtil redisUtil; |
|||
|
|||
|
|||
/** |
|||
* 获取list |
|||
*/ |
|||
@Override |
|||
public List<String> getList(String key) { |
|||
Set<Object> set = redisUtil.getMembers(key); |
|||
return set.stream().map(v->(String)v).collect(Collectors.toList()); |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public String getType() { |
|||
return "custom-redis"; |
|||
} |
|||
|
|||
} |
|||
//package com.inscloudtech.alog.clientdemo.custom; |
|||
// |
|||
//import com.inscloudtech.alog.core.ConfiguratorFactory; |
|||
//import com.inscloudtech.alog.core.FileConfigurator; |
|||
//import org.springframework.beans.factory.annotation.Autowired; |
|||
//import org.springframework.stereotype.Component; |
|||
// |
|||
//import java.io.IOException; |
|||
//import java.util.List; |
|||
//import java.util.Set; |
|||
//import java.util.stream.Collectors; |
|||
// |
|||
///** |
|||
// * @author tangbohu |
|||
// * @version 1.0.0 |
|||
// * @desc 基于redis实现自定义配置器示例,主要用于重写获取workers实现(毕竟是动态的,文件配置器不好处理) |
|||
// * @ClassName CustomConfigurator.java |
|||
// * @createTime 2022年03月21日 11:37:00 |
|||
// */ |
|||
//@Component |
|||
//public class CustomConfigurator extends FileConfigurator { |
|||
// private CustomConfigurator() throws IOException { |
|||
// super(); |
|||
// /** |
|||
// * 由于配置器构建是static方法,在spring注入之前已经完成了 这里自定义配置器需要手动覆盖 |
|||
// */ |
|||
// ConfiguratorFactory.cover(this); |
|||
// } |
|||
// |
|||
// |
|||
// @Autowired |
|||
// private RedisUtil redisUtil; |
|||
// |
|||
// |
|||
// /** |
|||
// * 获取list |
|||
// */ |
|||
// @Override |
|||
// public List<String> getList(String key) { |
|||
// Set<Object> set = redisUtil.getMembers(key); |
|||
// return set.stream().map(v->(String)v).collect(Collectors.toList()); |
|||
// } |
|||
// |
|||
// |
|||
// @Override |
|||
// public String getType() { |
|||
// return "custom-redis"; |
|||
// } |
|||
// |
|||
//} |
@ -1,130 +0,0 @@ |
|||
package com.inscloudtech.alog.clientdemo.demo.domain; |
|||
|
|||
import lombok.Data; |
|||
import lombok.NoArgsConstructor; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
/** |
|||
* 响应信息主体 |
|||
* |
|||
* @author Lion Li |
|||
*/ |
|||
|
|||
@NoArgsConstructor |
|||
public class R<T> implements Serializable { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** |
|||
* 成功 |
|||
*/ |
|||
public static final int SUCCESS = 200; |
|||
|
|||
/** |
|||
* 失败 |
|||
*/ |
|||
public static final int FAIL = 500; |
|||
|
|||
private int code; |
|||
|
|||
private String msg; |
|||
|
|||
private T data; |
|||
|
|||
public int getCode() { |
|||
return code; |
|||
} |
|||
|
|||
public void setCode(int code) { |
|||
this.code = code; |
|||
} |
|||
|
|||
public String getMsg() { |
|||
return msg; |
|||
} |
|||
|
|||
public void setMsg(String msg) { |
|||
this.msg = msg; |
|||
} |
|||
|
|||
public T getData() { |
|||
return data; |
|||
} |
|||
|
|||
public void setData(T data) { |
|||
this.data = data; |
|||
} |
|||
|
|||
public static <T> R<T> ok() { |
|||
return restResult(null, SUCCESS, "操作成功"); |
|||
} |
|||
|
|||
public static <T> R<T> ok(T data) { |
|||
return restResult(data, SUCCESS, "操作成功"); |
|||
} |
|||
|
|||
public static <T> R<T> ok(String msg) { |
|||
return restResult(null, SUCCESS, msg); |
|||
} |
|||
|
|||
public static <T> R<T> ok(String msg, T data) { |
|||
return restResult(data, SUCCESS, msg); |
|||
} |
|||
|
|||
public static <T> R<T> fail() { |
|||
return restResult(null, FAIL, "操作失败"); |
|||
} |
|||
|
|||
public static <T> R<T> fail(String msg) { |
|||
return restResult(null, FAIL, msg); |
|||
} |
|||
|
|||
public static <T> R<T> fail(T data) { |
|||
return restResult(data, FAIL, "操作失败"); |
|||
} |
|||
|
|||
public static <T> R<T> fail(String msg, T data) { |
|||
return restResult(data, FAIL, msg); |
|||
} |
|||
|
|||
public static <T> R<T> fail(int code, String msg) { |
|||
return restResult(null, code, msg); |
|||
} |
|||
|
|||
/** |
|||
* 返回警告消息 |
|||
* |
|||
* @param msg 返回内容 |
|||
* @return 警告消息 |
|||
*/ |
|||
public static <T> R<T> warn(String msg) { |
|||
return restResult(null, 601, msg); |
|||
} |
|||
|
|||
/** |
|||
* 返回警告消息 |
|||
* |
|||
* @param msg 返回内容 |
|||
* @param data 数据对象 |
|||
* @return 警告消息 |
|||
*/ |
|||
public static <T> R<T> warn(String msg, T data) { |
|||
return restResult(data, 601, msg); |
|||
} |
|||
|
|||
private static <T> R<T> restResult(T data, int code, String msg) { |
|||
R<T> r = new R<>(); |
|||
r.setCode(code); |
|||
r.setData(data); |
|||
r.setMsg(msg); |
|||
return r; |
|||
} |
|||
|
|||
public static <T> Boolean isError(R<T> ret) { |
|||
return !isSuccess(ret); |
|||
} |
|||
|
|||
public static <T> Boolean isSuccess(R<T> ret) { |
|||
return R.SUCCESS == ret.getCode(); |
|||
} |
|||
} |
@ -0,0 +1,116 @@ |
|||
package com.inscloudtech.alog.worker.store; |
|||
|
|||
import com.inscloudtech.alog.common.utils.AsyncPool; |
|||
import com.inscloudtech.alog.common.utils.AsyncWorker; |
|||
import com.inscloudtech.alog.common.utils.CollectionUtil; |
|||
import com.inscloudtech.alog.worker.db.Db; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Random; |
|||
import java.util.concurrent.LinkedBlockingQueue; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.LongAdder; |
|||
|
|||
/** |
|||
* 解析好的数据暂存和入库 |
|||
* @author wuweifeng |
|||
* @version 1.0 |
|||
* @date 2021-08-21 |
|||
*/ |
|||
@Component |
|||
public class ActionLogToDbStore { |
|||
/** |
|||
* 待入库的数据 |
|||
*/ |
|||
private LinkedBlockingQueue<Map<String, Object>> modelQueue; |
|||
/** |
|||
* logger |
|||
*/ |
|||
private Logger logger = LoggerFactory.getLogger(getClass()); |
|||
/** |
|||
* db |
|||
*/ |
|||
@Resource |
|||
private Db db; |
|||
/** |
|||
* 已入库总数量 |
|||
*/ |
|||
private final LongAdder totalInsertCount = new LongAdder(); |
|||
/** |
|||
* 每批往ck写多少条 |
|||
*/ |
|||
@Value("${clickhouse.batchSize}") |
|||
private String batchSize; |
|||
/** |
|||
* 几个线程去入库 |
|||
*/ |
|||
@Value("${clickhouse.poolSize}") |
|||
private String poolSize; |
|||
/** |
|||
* 间隔几秒入库 |
|||
*/ |
|||
@Value("${clickhouse.insertInterval}") |
|||
private int interval; |
|||
/** |
|||
* 待入库队列长度 |
|||
*/ |
|||
@Value("${queue.preDbSize}") |
|||
private int preDbSize; |
|||
/** |
|||
* 写入队列 |
|||
*/ |
|||
public void offer(Map<String, Object> map) { |
|||
boolean success = modelQueue.offer(map); |
|||
//如果队列已满,则做其他处理 |
|||
if (!success) { |
|||
|
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 入库 |
|||
*/ |
|||
public void beginIntoDb() { |
|||
//初始化队列长度 |
|||
modelQueue = new LinkedBlockingQueue<>(preDbSize); |
|||
int pool = Integer.parseInt(poolSize); |
|||
|
|||
for (int i = 0; i < pool; i++) { |
|||
AsyncPool.asyncDo(() -> { |
|||
try { |
|||
Thread.sleep(new Random().nextInt(8000)); |
|||
} catch (InterruptedException e) { |
|||
e.printStackTrace(); |
|||
} |
|||
while (true) { |
|||
try { |
|||
List<Map<String, Object>> tempModels = new ArrayList<>(); |
|||
//每1s入库一次 |
|||
AsyncWorker.drain(modelQueue, tempModels, Integer.parseInt(batchSize), interval, TimeUnit.SECONDS); |
|||
if (CollectionUtil.isEmpty(tempModels)) { |
|||
continue; |
|||
} |
|||
|
|||
//批量插入 |
|||
int successCount = db.insertAll("tracer_model", tempModels); |
|||
totalInsertCount.add(successCount); |
|||
logger.info("model成功入库 " + tempModels.size() + " 条, 累计已入库 " + totalInsertCount.longValue() + ", 待入库队列size " + modelQueue.size()); |
|||
|
|||
} catch (Exception e) { |
|||
e.printStackTrace(); |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue