Browse Source

数据入clickhouse

master
583641232@qq.com 8 months ago
parent
commit
c70648a2b9
  1. 12
      client/src/main/java/com/inscloudtech/alog/client/task/Monitor.java
  2. 37
      common/src/main/java/com/inscloudtech/alog/common/model/ActionLogMessage.java
  3. 7
      common/src/main/java/com/inscloudtech/alog/common/utils/IdWorker.java
  4. 5
      example/pom.xml
  5. 23
      example/src/main/java/com/inscloudtech/alog/clientdemo/aspectj/ActionLogAspect.java
  6. 23
      worker/pom.xml
  7. 5
      worker/src/main/java/com/inscloudtech/alog/worker/Starter.java
  8. 6
      worker/src/main/java/com/inscloudtech/alog/worker/disruptor/DisruptorStarter.java
  9. 33
      worker/src/main/java/com/inscloudtech/alog/worker/disruptor/TracerConsumer.java
  10. 119
      worker/src/main/java/com/inscloudtech/alog/worker/domain/ActionLog.java
  11. 12
      worker/src/main/java/com/inscloudtech/alog/worker/mapper/ActionLogMapper.java
  12. 10
      worker/src/main/java/com/inscloudtech/alog/worker/service/ActionLogService.java
  13. 13
      worker/src/main/java/com/inscloudtech/alog/worker/service/impl/ActionLogServiceImpl.java
  14. 35
      worker/src/main/java/com/inscloudtech/alog/worker/store/ActionLogToDbStore.java
  15. 6
      worker/src/main/resources/alog.sql
  16. 39
      worker/src/main/resources/application.yml

12
client/src/main/java/com/inscloudtech/alog/client/task/Monitor.java

@ -24,6 +24,7 @@ public class Monitor {
private final static Logger LOGGER = LoggerFactory.getLogger(Monitor.class);
private static long appId;
/**
* 开始获取workerIp地址并保存</>
* 监听workerIp地址变化
@ -32,6 +33,15 @@ public class Monitor {
fetchWorkerInfo();
}
private static void setAppIdId(final long appId) {
Monitor.appId = appId;
}
public static long getAppIdId() {
return Monitor.appId;
}
/**
* 每隔30秒拉取worker信息
*/
@ -66,7 +76,7 @@ public class Monitor {
//如果设置了机房属性则拉取同机房的worker如果同机房没worker则拉取所有
ALogConfig aLogConfig = configurator.getObject("aLog", ALogConfig.class);
List<String> addresses = aLogConfig.getWorkers();
IdWorker.setAppIdId(aLogConfig.getAppId());
setAppIdId(aLogConfig.getAppId());
//全是空给个警告
if (addresses == null || addresses.isEmpty()) {
LOGGER.warn("very important warn !!! workers ip info is null!!!");

37
common/src/main/java/com/inscloudtech/alog/common/model/ActionLogMessage.java

@ -1,7 +1,6 @@
package com.inscloudtech.alog.common.model;
import java.util.Date;
import java.util.Map;
/**
* classNameRunLogMessage
@ -14,12 +13,12 @@ public class ActionLogMessage {
/**
* 时间创建时间
*/
private long createTime;
private Date createTime;
/**
* 日志主键
* 应用id
*/
private long logId;
private long appId;
/**
* 操作模块
*/
@ -44,7 +43,7 @@ public class ActionLogMessage {
/**
* 操作人员Id
*/
private String operUserId;
private String operUid;
/**
* 操作人员
@ -163,12 +162,12 @@ public class ActionLogMessage {
this.operatorType = operatorType;
}
public String getOperUserId() {
return operUserId;
public String getOperUid() {
return operUid;
}
public void setOperUserId(String operUserId) {
this.operUserId = operUserId;
public void setoperUid(String operUid) {
this.operUid = operUid;
}
public String getDeptName() {
@ -267,23 +266,19 @@ public class ActionLogMessage {
this.businessId = businessId;
}
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public long getCreateTime() {
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public long getAppId() {
return appId;
}
public void setAppId(long appId) {
this.appId = appId;
}
}

7
common/src/main/java/com/inscloudtech/alog/common/utils/IdWorker.java

@ -30,7 +30,7 @@ public class IdWorker {
private static long workerId;
private static long appId;
static {
Calendar calendar = Calendar.getInstance();
@ -87,9 +87,6 @@ public class IdWorker {
}
}
public static void setAppIdId(final Long appId) {
IdWorker.appId = appId;
}
//下一个ID生成算法
public static long nextId() {
@ -117,7 +114,7 @@ public class IdWorker {
}
//下一个ID生成算法
public static long nextIdWithAppId() {
public static long nextIdWithAppId(long appId) {
long time = System.currentTimeMillis();
if (lastTime > time) {
throw new RuntimeException("Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds" + lastTime);

5
example/pom.xml

@ -30,6 +30,11 @@
</exclusions>
</dependency>
<!-- SpringBoot 拦截器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.inscloudtech</groupId>

23
example/src/main/java/com/inscloudtech/alog/clientdemo/aspectj/ActionLogAspect.java

@ -2,14 +2,15 @@ package com.inscloudtech.alog.clientdemo.aspectj;
import com.baomidou.mybatisplus.annotation.TableId;
import com.inscloudtech.alog.client.task.Monitor;
import com.inscloudtech.alog.clientdemo.utils.*;
import com.inscloudtech.alog.common.enums.BusinessStatus;
import com.inscloudtech.alog.common.enums.BusinessType;
import com.inscloudtech.alog.common.model.ActionLogMessage;
import com.inscloudtech.alog.client.udp.UdpSender;
import com.inscloudtech.alog.common.annotation.ActionLog;
import com.inscloudtech.alog.common.model.ActionLogMessage;
import com.inscloudtech.alog.common.utils.FastJsonUtils;
import com.inscloudtech.alog.common.utils.IdWorker;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@ -22,10 +23,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -40,8 +38,8 @@ public class ActionLogAspect {
@Around("@annotation(actionLog)")
public Object doAround(ProceedingJoinPoint joinPoint, ActionLog actionLog) throws Throwable {
ActionLogMessage actionLogMessage = new ActionLogMessage();
Object[] args = joinPoint.getArgs();
Object[] args = joinPoint.getArgs();
//当操作行为是修改数据时需要上报修改前后数据值
if(actionLog.businessType().equals(BusinessType.UPDATE)){
setBeforeAfterValue(args,actionLog,actionLogMessage);
@ -56,7 +54,6 @@ public class ActionLogAspect {
throw e;
}
//当操作行为是新增数据时需要上报数据业务id
if(actionLog.businessType().equals(BusinessType.INSERT)){
setBusinessId(args,actionLog,actionLogMessage);
@ -67,17 +64,18 @@ public class ActionLogAspect {
// 计算响应时间
long responseTime = endTime - startTime;
actionLogMessage.setResponseTime(responseTime);
// 请求的地址
String ip = ServletUtils.getClientIP();
actionLogMessage.setOperIp(ip);
actionLogMessage.setOperUrl(StringUtils.sub(ServletUtils.getRequest().getRequestURI(), 0, 255));
//从业务系统获取传入
// LoginUser loginUser = LoginHelper.getLoginUser();//获取当前用户应用自行实现
// loginUser.getUserId();
// loginUser.getNickname();
// loginUser.getDeptName();
actionLogMessage.setOperUserId("2322434253");
actionLogMessage.setAppId(Monitor.getAppIdId());
actionLogMessage.setoperUid("2322434253");
actionLogMessage.setOperUserName("演示用户");
actionLogMessage.setDeptName("演示部门");
@ -216,9 +214,8 @@ public class ActionLogAspect {
actionLogMessage.setRequestMethod(ServletUtils.getRequest().getMethod());
// 处理设置注解上的参数
getControllerMethodDescription(joinPoint, actionLog, actionLogMessage, jsonResult);
long tracerId = IdWorker.nextIdWithAppId();
actionLogMessage.setLogId(tracerId);
actionLogMessage.setCreateTime(System.currentTimeMillis());
actionLogMessage.setCreateTime(new Date());
UdpSender.offerActionLogger(actionLogMessage);
} catch (Exception exp) {
// 记录本地异常日志

23
worker/pom.xml

@ -55,11 +55,24 @@
<optional>true</optional>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.baomidou</groupId>-->
<!-- <artifactId>mybatis-plus-boot-starter</artifactId>-->
<!-- <version>3.4.3</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.26</version>
</dependency>
<!-- hutool 的依赖配置-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.18</version>
</dependency>
</dependencies>
<build>

5
worker/src/main/java/com/inscloudtech/alog/worker/Starter.java

@ -1,6 +1,7 @@
package com.inscloudtech.alog.worker;
import com.inscloudtech.alog.worker.config.CenterStarter;
import com.inscloudtech.alog.worker.store.ActionLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerModelToDbStore;
import com.inscloudtech.alog.worker.udp.UdpServer;
@ -33,6 +34,9 @@ public class Starter {
*/
@Resource
private TracerLogToDbStore tracerLogToDbStore;
@Resource
private ActionLogToDbStore actionLogToDbStore;
/**
* udp server
*/
@ -56,6 +60,7 @@ public class Starter {
//入库
tracerModelToDbStore.beginIntoDb();
tracerLogToDbStore.beginIntoDb();
actionLogToDbStore.beginIntoDb();
//上报自己ip到配置中心
centerStarter.uploadSelfInfo();

6
worker/src/main/java/com/inscloudtech/alog/worker/disruptor/DisruptorStarter.java

@ -1,5 +1,6 @@
package com.inscloudtech.alog.worker.disruptor;
import com.inscloudtech.alog.worker.store.ActionLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerModelToDbStore;
import com.lmax.disruptor.BlockingWaitStrategy;
@ -38,6 +39,9 @@ public class DisruptorStarter {
*/
@Resource
private TracerLogToDbStore tracerLogToDbStore;
@Resource
private ActionLogToDbStore actionLogToDbStore;
/**
* 队列最大容量1024*16也就是RingBuffer大小必须是2的N次方
*/
@ -55,7 +59,7 @@ public class DisruptorStarter {
// 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
TracerConsumer[] consumers = new TracerConsumer[threadCount];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new TracerConsumer(tracerModelToDbStore, tracerLogToDbStore);
consumers[i] = new TracerConsumer(tracerModelToDbStore, tracerLogToDbStore,actionLogToDbStore);
}
disruptor.handleEventsWithWorkerPool(consumers);

33
worker/src/main/java/com/inscloudtech/alog/worker/disruptor/TracerConsumer.java

@ -1,13 +1,18 @@
package com.inscloudtech.alog.worker.disruptor;
import com.inscloudtech.alog.common.constant.Constant;
import com.inscloudtech.alog.common.constant.LogTypeEnum;
import com.inscloudtech.alog.common.model.ActionLogMessage;
import com.inscloudtech.alog.common.model.RunLogMessage;
import com.inscloudtech.alog.common.utils.FastJsonUtils;
import cn.hutool.core.bean.BeanUtil;
import com.inscloudtech.alog.common.utils.IdWorker;
import com.inscloudtech.alog.common.utils.ProtostuffUtils;
import com.inscloudtech.alog.common.utils.ZstdUtils;
import com.inscloudtech.alog.common.model.TracerBean;
import com.inscloudtech.alog.common.model.TracerData;
import com.inscloudtech.alog.worker.domain.ActionLog;
import com.inscloudtech.alog.worker.store.ActionLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerLogToDbStore;
import com.inscloudtech.alog.worker.store.TracerModelToDbStore;
import com.lmax.disruptor.WorkHandler;
@ -21,7 +26,6 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.LongAdder;
/**
* TracerConsumer
*
@ -47,12 +51,15 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
*/
private TracerLogToDbStore tracerLogToDbStore;
private ActionLogToDbStore actionLogToDbStore;
private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public TracerConsumer(TracerModelToDbStore tracerModelToDbStore, TracerLogToDbStore tracerLogToDbStore) {
public TracerConsumer(TracerModelToDbStore tracerModelToDbStore, TracerLogToDbStore tracerLogToDbStore,ActionLogToDbStore actionLogToDbStore) {
this.tracerModelToDbStore = tracerModelToDbStore;
this.tracerLogToDbStore = tracerLogToDbStore;
this.actionLogToDbStore = actionLogToDbStore;
}
@Override
@ -87,9 +94,23 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
} else if (LogTypeEnum.SPAN.equals(tracerData.getType())){
dealFilterModel(tracerData.getTracerBeanList());
}else{
String s = FastJsonUtils.convertObjectToJSON(tracerData.getActionLogs());
// ACTION_LOG
logger.info("接收到行为日志{}",s);
dealActionLog(tracerData.getActionLogs());
}
}
/**
* 处理中途日志
*/
private void dealActionLog(List<ActionLogMessage> actionLogMessages) {
if (actionLogMessages == null) {
return;
}
for (ActionLogMessage runLogMessage : actionLogMessages) {
ActionLog actionLog = new ActionLog();
BeanUtil.copyProperties(runLogMessage,actionLog);
long tracerId = IdWorker.nextIdWithAppId(actionLog.getAppId());
actionLog.setLogId(tracerId);
actionLogToDbStore.offer(actionLog);
}
}

119
worker/src/main/java/com/inscloudtech/alog/worker/domain/ActionLog.java

@ -0,0 +1,119 @@
package com.inscloudtech.alog.worker.domain;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* 示例用户 stock
* @author inscloudtech
* @date 2023-07-27
*/
@Data
@TableName(autoResultMap = true)
public class ActionLog {
private static final long serialVersionUID=1L;
/**
* 时间创建时间
*/
private Date createTime;
/**
* 日志主键
*/
@TableId
private long logId;
private long appId;
/**
* 操作模块
*/
private String businessName;
/**
* 业务类型0其它 1新增 2修改 3删除
*/
private Integer businessType;
/**
* 请求方法
*/
private String method;
/**
* 请求方式
*/
private String requestMethod;
/**
* 操作人员0用户 1系统自动
*/
private Integer operatorType;
/**
* 操作人员Id
*/
private String operUid;
/**
* 操作人员
*/
private String operUserName;
/**
* 部门名称
*/
private String deptName;
/**
* 请求url
*/
private String operUrl;
/**
* 操作地址
*/
private String operIp;
/**
* 操作地点
*/
private String operLocation;
/**
* 请求参数
*/
private String operParam;
/**
* 返回参数
*/
private String jsonResult;
/**
* 操作状态0正常 1异常
*/
private Integer status;
/**
* 错误消息
*/
private String errorMsg;
/**
* 操作时间
*/
private Date operTime;
/**
* 更新前值
*/
private String beforeValue;
/**
* 更新新后
*/
private String afterValue;
/**
* 业务数据id
*/
private Long businessId;
/**
* 响应时间
*/
private long responseTime;
}

12
worker/src/main/java/com/inscloudtech/alog/worker/mapper/ActionLogMapper.java

@ -0,0 +1,12 @@
package com.inscloudtech.alog.worker.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.inscloudtech.alog.worker.domain.ActionLog;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface ActionLogMapper extends BaseMapper<ActionLog> {
}

10
worker/src/main/java/com/inscloudtech/alog/worker/service/ActionLogService.java

@ -0,0 +1,10 @@
package com.inscloudtech.alog.worker.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.inscloudtech.alog.worker.domain.ActionLog;
public interface ActionLogService extends IService<ActionLog> {
}

13
worker/src/main/java/com/inscloudtech/alog/worker/service/impl/ActionLogServiceImpl.java

@ -0,0 +1,13 @@
package com.inscloudtech.alog.worker.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.inscloudtech.alog.worker.domain.ActionLog;
import com.inscloudtech.alog.worker.mapper.ActionLogMapper;
import com.inscloudtech.alog.worker.service.ActionLogService;
import org.springframework.stereotype.Service;
@Service
public class ActionLogServiceImpl extends ServiceImpl<ActionLogMapper, ActionLog> implements ActionLogService {
}

35
worker/src/main/java/com/inscloudtech/alog/worker/store/ActionLogToDbStore.java

@ -4,8 +4,12 @@ import com.inscloudtech.alog.common.utils.AsyncPool;
import com.inscloudtech.alog.common.utils.AsyncWorker;
import com.inscloudtech.alog.common.utils.CollectionUtil;
import com.inscloudtech.alog.worker.db.Db;
import com.inscloudtech.alog.worker.domain.ActionLog;
import com.inscloudtech.alog.worker.service.ActionLogService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -25,20 +29,16 @@ import java.util.concurrent.atomic.LongAdder;
* @date 2021-08-21
*/
@Component
@Slf4j
public class ActionLogToDbStore {
/**
* 待入库的数据
*/
private LinkedBlockingQueue<Map<String, Object>> modelQueue;
/**
* logger
*/
private Logger logger = LoggerFactory.getLogger(getClass());
private LinkedBlockingQueue<ActionLog> modelQueue;
/**
* db
*/
@Resource
private Db db;
/**
* 已入库总数量
*/
@ -47,7 +47,7 @@ public class ActionLogToDbStore {
* 每批往ck写多少条
*/
@Value("${clickhouse.batchSize}")
private String batchSize;
private int batchSize;
/**
* 几个线程去入库
*/
@ -66,14 +66,17 @@ public class ActionLogToDbStore {
/**
* 写入队列
*/
public void offer(Map<String, Object> map) {
boolean success = modelQueue.offer(map);
public void offer(ActionLog actionLog) {
boolean success = modelQueue.offer(actionLog);
//如果队列已满则做其他处理
if (!success) {
}
}
@Autowired
ActionLogService actionLogService;
/**
* 入库
*/
@ -91,17 +94,19 @@ public class ActionLogToDbStore {
}
while (true) {
try {
List<Map<String, Object>> tempModels = new ArrayList<>();
List<ActionLog> tempModels = new ArrayList<>();
//每1s入库一次
AsyncWorker.drain(modelQueue, tempModels, Integer.parseInt(batchSize), interval, TimeUnit.SECONDS);
AsyncWorker.drain(modelQueue, tempModels, batchSize, interval, TimeUnit.SECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
//批量插入
int successCount = db.insertAll("tracer_model", tempModels);
totalInsertCount.add(successCount);
logger.info("model成功入库 " + tempModels.size() + " 条, 累计已入库 " + totalInsertCount.longValue() + ", 待入库队列size " + modelQueue.size());
actionLogService.saveBatch(tempModels,batchSize);
totalInsertCount.add(tempModels.size());
log.info("model成功入库 " + tempModels.size() + " 条, 累计已入库 " + totalInsertCount.longValue() + ", 待入库队列size " + modelQueue.size());
} catch (Exception e) {
e.printStackTrace();

6
worker/src/main/resources/jlog.sql → worker/src/main/resources/alog.sql

@ -39,8 +39,10 @@ CREATE TABLE tracer_log (
CREATE TABLE action_log (
log_id Int64 COMMENT '日志主键',
app_id Int8 COMMENT '应用id',
title String COMMENT '分类名称',
method String COMMENT '方法名称',
business_name String COMMENT '操作模块',
business_type String COMMENT '业务类型',
request_method String COMMENT '请求方式',
operator_type Int8 COMMENT '操作人员(0用户 1系统自动)',
oper_uid String COMMENT '操作人员id',
@ -61,6 +63,6 @@ CREATE TABLE action_log (
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD ( create_time )
ORDER BY ( app_id, create_time )
ORDER BY ( log_id, create_time )
PRIMARY key log_id;

39
worker/src/main/resources/application.yml

@ -9,12 +9,16 @@ queue:
server:
port: 8086
#spring:
# datasource:
# driverClassName: ru.yandex.clickhouse.ClickHouseDriver
# url: jdbc:clickhouse://192.168.3.20:8123/default
# username: default
# password:
spring:
datasource:
driverClassName: ru.yandex.clickhouse.ClickHouseDriver
url: jdbc:clickhouse://192.168.3.20:8123/default
username: default
password:
initialSize: 5
maxActive: 100
minIdle: 10
maxWait: 6000
#ck信息,自行修改
clickhouse:
@ -35,3 +39,26 @@ log:
# 此处仅供参考和测试
workers: "['192.268.1.2:8888','192.268.1.3:8888']"
mybatis-plus:
# 不支持多包, 如有需要可在注解配置 或 提升扫包等级
# 例如 com.**.**.mapper
mapperPackage: com.inscloudtech.**.mapper
# 对应的 XML 文件位置
mapperLocations: classpath*:mapper/**/*Mapper.xml
# 实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.inscloudtech.**.domain
# 启动时是否检查 MyBatis XML 文件的存在,默认不检查
checkConfigLocation: false
configuration:
# 自动驼峰命名规则(camel case)映射
mapUnderscoreToCamelCase: true
# MyBatis 自动映射策略
# NONE:不启用 PARTIAL:只对非嵌套 resultMap 自动映射 FULL:对所有 resultMap 自动映射
autoMappingBehavior: PARTIAL
# MyBatis 自动映射时未知列或未知属性处理策
# NONE:不做处理 WARNING:打印相关警告 FAILING:抛出异常和详细信息
autoMappingUnknownColumnBehavior: NONE
# 更详细的日志输出 会有性能损耗 org.apache.ibatis.logging.stdout.StdOutImpl
# 关闭日志记录 (可单纯使用 p6spy 分析) org.apache.ibatis.logging.nologging.NoLoggingImpl
# 默认日志输出 org.apache.ibatis.logging.slf4j.Slf4jImpl
logImpl: org.apache.ibatis.logging.nologging.NoLoggingImpl
Loading…
Cancel
Save