关联分析规则-数据降噪

This commit is contained in:
2026-03-18 18:00:25 +08:00
parent cf6b89ea94
commit c0063a5a44
64 changed files with 6642 additions and 2007 deletions
@@ -70,9 +70,9 @@ public class Alarm {
private Boolean baseFocused;
private Boolean isUpdated;
private int alarmSource;
private String[] httpReqHeaders;
private String[] httpReqBodys;
private String[] httpRespHeaders;
private String[] httpRespBodys;
private String[] httpReqHeader;
private String[] httpReqBody;
private String[] httpRespHeader;
private String[] httpRespBody;
}
@@ -71,8 +71,8 @@ public class AlarmVisit {
private Boolean baseFocused;
private Boolean isUpdated;
private int alarmSource;
private String[] httpReqHeaders;
private String[] httpReqBodys;
private String[] httpRespHeaders;
private String[] httpRespBodys;
private String[] httpReqHeader;
private String[] httpReqBody;
private String[] httpRespHeader;
private String[] httpRespBody;
}
@@ -0,0 +1,144 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 分析规则实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisAnalysisRule {
/**
* 规则唯一标识(UUID)
*/
private String ruleId;
/**
* 创建用户账号
*/
private String createUser;
/**
* 规则名称(界面显示用)
*/
private String ruleName;
/**
* 分析方式:高级版/基础版
*/
private String analysisMethod;
/**
* 运行模式:realtime-实时流式/offline-离线批处理
*/
private String runMode;
/**
* 规则输出类型:risk_alarm-风险告警/statistics-统计报表
*/
private String ruleOutput;
/**
* 所属组织机构编码
*/
private String organization;
/**
* 任务状态:running-正在执行/stopped-已停止/waiting-等待中
*/
private String taskStatus;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志:0-未删除 1-已删除
*/
private String delFlag;
/**
* 记录创建时间(带时区)
*/
private LocalDateTime createTime;
/**
* 记录更新时间(带时区)
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 规则备注说明文本
*/
private String remark;
/**
* 多租户隔离ID
*/
private String tenantId;
/**
* 规则详细描述
*/
private String ruleDesc;
/**
* 规则分类编码(预留扩展用)
*/
private Integer ruleType;
/**
* 规则启用状态(0-禁用/1-启用等)
*/
private Integer ruleStatus;
/**
* 规则配置内容(JSON或结构化文本)
*/
private String ruleContent;
/**
* 规则计算表达式(如SQL片段或自定义脚本)
*/
private String ruleExpression;
/**
* 规则执行优先级(数值越大优先级越高)
*/
private Long priority;
/**
* 规则标签,多个标签用逗号分隔
*/
private String tags;
/**
* 规则版本号,用于版本管理
*/
private Integer version;
/**
* 分组ID
*/
private Integer subsetId;
}
@@ -0,0 +1,144 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 分析字段配置实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisField {
/**
* 主键ID,自增序列
*/
private Integer id;
/**
* 关联的规则配置UUID
*/
private String ruleId;
/**
* 字段类型(如维度/指标/计算字段等)
*/
private String type;
/**
* 数据源名称,如数据库连接名或数据流标识
*/
private String dataSource;
/**
* 数据库名称
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 表别名,用于SQL语句中的表引用
*/
private String tableAlias;
/**
* 字段名
*/
private String columnName;
/**
* 字段描述
*/
private String columnDesc;
/**
* 字段数据类型(如int/varchar/decimal等)
*/
private String dataType;
/**
* 计算函数名(如SUM/COUNT/自定义函数,NULL表示原始字段)
*/
private String fn;
/**
* 函数参数列表,JSON数组格式(即使无参数也保留空数组)
*/
private String arguments;
/**
* 字段显示占位符(用于前端展示的默认文本)
*/
private String placeholder;
/**
* 基础类型编码(预留分类扩展用)
*/
private Integer baseType;
/**
* 字段分类ID
*/
private Integer categoryId;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志(0-未删除/1-已删除)
*/
private String delFlag;
/**
* 记录创建时间
*/
private LocalDateTime createTime;
/**
* 记录更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 备注说明
*/
private String remark;
/**
* 租户ID,默认000000表示系统级配置
*/
private String tenantId;
/**
* 告警字段名(别名)
*/
private String alarmColumnName;
/**
* 告警字段描述
*/
private String alarmColumnDesc;
}
@@ -0,0 +1,147 @@
package com.common.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 分析过滤条件实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("analysis_filter")
public class AnalysisFilter {
/**
* 主键ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 规则ID
*/
private String ruleId;
/**
* 数据源
*/
private String dataSource;
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 表别名
*/
private String tableAlias;
/**
* 列名
*/
private String columnName;
/**
* 列描述
*/
private String columnDesc;
/**
* 数据类型
*/
private String dataType;
/**
* 函数名
*/
private String fn;
/**
* 函数参数
*/
private Object arguments;
/**
* 操作符
*/
private String operator;
/**
* 筛选值
*/
private Object value;
/**
* 基础类型
*/
private Integer baseType;
/**
* 分类ID
*/
private Integer categoryId;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志:0-正常 1-删除
*/
@TableLogic
private String delFlag;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 备注信息
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
/**
* 关联条件逻辑表ID
*/
private Integer condId;
/**
* 表达式执行顺序号
*/
private Integer seqNum;
}
@@ -0,0 +1,81 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 分组规则配置实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisGroupBy {
/**
* 主键ID
*/
private Long id;
/**
* 规则ID
*/
private String ruleId;
/**
* 分组类型:1-标准分组 2-窗口分组
*/
private Integer groupType;
/**
* 窗口类型:tumble/hop/session
*/
private String windowType;
/**
* 窗口配置(关联AnalysisGroupByWindow
*/
private AnalysisGroupByWindow window;
/**
* 创建部门
*/
private Long createDept;
/**
* 删除标志:0-未删除,1-已删除
*/
private String delFlag;
/**
* 创建时间
*/
private String createTime;
/**
* 更新时间
*/
private String updateTime;
/**
* 创建人
*/
private Long createBy;
/**
* 更新人
*/
private Long updateBy;
/**
* 备注
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
}
@@ -0,0 +1,133 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 分析分组字段配置实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisGroupByColumn {
/**
* 主键ID:自增唯一标识符
*/
private Integer id;
/**
* 分组ID:关联分析分组的外键
*/
private Integer groupById;
/**
* 数据源名称:数据来源系统标识
*/
private String dataSource;
/**
* 数据库名称:字段所在的数据库
*/
private String database;
/**
* 表名:字段所在的物理表名
*/
private String tableName;
/**
* 表别名:查询时使用的表别名
*/
private String tableAlias;
/**
* 字段名:物理字段名称
*/
private String columnName;
/**
* 字段描述:业务层面的字段说明
*/
private String columnDesc;
/**
* 数据类型:字段的数据类型(如varchar、int4
*/
private String dataType;
/**
* 基础类型:字段的业务基础类型编码
*/
private Integer baseType;
/**
* 分类ID:字段所属的业务分类ID
*/
private Integer categoryId;
/**
* 创建部门:创建记录的部门ID
*/
private Long createDept;
/**
* 删除标志:0-未删除,1-已删除
*/
private String delFlag;
/**
* 创建时间:记录创建时间戳
*/
private LocalDateTime createTime;
/**
* 更新时间:记录最后更新时间戳
*/
private LocalDateTime updateTime;
/**
* 创建人:记录创建者ID
*/
private Long createBy;
/**
* 更新人:记录最后更新者ID
*/
private Long updateBy;
/**
* 备注:扩展说明信息
*/
private String remark;
/**
* 租户ID:多租户环境下的租户标识
*/
private String tenantId;
/**
* 规则ID:关联的业务规则UUID
*/
private String ruleId;
/**
* 用户组ID:关联的权限用户组ID
*/
private Long groupId;
/**
* 字段唯一ID:跨表唯一的字段标识符
*/
private Long fieldId;
/**
* 排序号:字段展示顺序
*/
private Integer sort;
}
@@ -0,0 +1,138 @@
package com.common.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 分析分组聚合条件配置实体类
* 存储GROUP BY后的HAVING筛选条件定义
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("analysis_group_by_having")
public class AnalysisGroupByHaving {
/**
* 主键ID
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 分组ID
*/
private Integer groupById;
/**
* 数据源名称
*/
private String dataSource;
/**
* 数据库名称
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 表别名
*/
private String tableAlias;
/**
* 字段名
*/
private String columnName;
/**
* 字段描述
*/
private String columnDesc;
/**
* 字段数据类型
*/
private String dataType;
/**
* 聚合函数名(如SUM/COUNT/AVG等)
*/
private String fn;
/**
* 聚合函数参数列表,JSON数组格式
*/
private Object arguments;
/**
* 比较运算符(如>、<、=、>=、<=、IN、LIKE等)
*/
private String operator;
/**
* 比较值,JSON格式存储
*/
private Object value;
/**
* 基础类型标识
*/
private Integer baseType;
/**
* 分类ID,用于条件分组归类
*/
private Integer categoryId;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志:0-未删除 1-已删除
*/
@TableLogic
private String delFlag;
/**
* 记录创建时间
*/
private LocalDateTime createTime;
/**
* 记录更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 备注说明
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
}
@@ -0,0 +1,139 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 分析时间窗口配置实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisGroupByWindow {
/**
* 主键ID
*/
private Integer id;
/**
* 分组ID
*/
private Integer groupById;
/**
* 窗口类型:tumble/hop/session
*/
private String windowType;
// ============ 滚动窗口配置 ============
/**
* 滚动窗口时间类型:second/minute/hour
*/
private String tumbleWindowTimeType;
/**
* 滚动窗口大小
*/
private Integer tumbleWindowSize;
/**
* 滚动窗口单位:s/m/h/d
*/
private String tumbleWindowSizeUnit;
// ============ 滑动窗口配置 ============
/**
* 滑动窗口时间类型
*/
private String hopWindowTimeType;
/**
* 滑动窗口大小
*/
private Integer hopWindowSize;
/**
* 滑动窗口单位:s/m/h/d
*/
private String hopWindowSizeUnit;
/**
* 滑动窗口步长
*/
private Integer hopWindowSlide;
/**
* 滑动窗口步长单位:s/m/h/d
*/
private String hopWindowSlideUnit;
/**
* 滑动窗口告警频率:是否每个窗口仅告警一次
*/
private Boolean hopWindowAlarmOncePerWindow;
// ============ 会话窗口配置 ============
/**
* 会话窗口时间类型
*/
private String sessionWindowTimeType;
/**
* 会话窗口超时时间
*/
private Integer sessionWindowSize;
/**
* 会话窗口单位:s/m/h/d
*/
private String sessionWindowSizeUnit;
// ============ 公共字段 ============
/**
* 创建部门
*/
private Long createDept;
/**
* 删除标志:0-未删除,1-已删除
*/
private String delFlag;
/**
* 创建时间
*/
private String createTime;
/**
* 更新时间
*/
private String updateTime;
/**
* 创建人
*/
private Long createBy;
/**
* 更新人
*/
private Long updateBy;
/**
* 备注
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
}
@@ -0,0 +1,103 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.UUID;
import java.time.LocalDateTime;
/**
* 分析规则任务运行历史及状态记录
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisTaskHistory {
/**
* 记录ID
*/
private Long id;
/**
* 规则ID
*/
private String ruleId;
/**
* 开始时间
*/
private LocalDateTime startTime;
/**
* 结束时间
*/
private LocalDateTime endTime;
/**
* 持续时间(秒)
*/
private Long durationTime;
/**
* 进度百分比
*/
private Integer progressPercent;
/**
* 输入数据量
*/
private Long inputCount;
/**
* 输出数据量
*/
private Long outputCount;
/**
* 状态(RUNNING/CANCELED/KILLED/COMPLETED/FAILED)
*/
private String status;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志(0-正常,1-删除)
*/
private String delFlag;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 备注信息
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
}
@@ -0,0 +1,66 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 用户自定义函数实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisUserDefinedFunction {
/**
* 函数ID
*/
private String functionId;
/**
* 规则ID
*/
private String ruleId;
/**
* 函数名称
*/
private String functionName;
/**
* 函数类型:AGGREGATE-聚合函数/SCALAR-标量函数/WINDOW-窗口函数
*/
private String functionType;
/**
* 函数描述
*/
private String functionDesc;
/**
* 函数实现SQL
*/
private String functionSql;
/**
* 返回类型
*/
private String returnType;
/**
* 参数定义(JSON格式)
*/
private String params;
/**
* 启用状态
*/
private Boolean enabled;
/**
* 删除标志:0-未删除 1-已删除
*/
private String delFlag;
}
@@ -0,0 +1,83 @@
package com.common.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* WHERE条件逻辑表实体类(仅存储条件逻辑组信息)
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisWhereCondition {
/**
* 条件ID
*/
private Integer condId;
/**
* 规则ID
*/
private String ruleId;
/**
* 逻辑运算符:AND, OR
*/
private String logicalOp;
/**
* 序号
*/
private Integer seqNum;
/**
* 父节点ID
*/
private Integer parentCondId;
/**
* 创建部门ID
*/
private Long createDept;
/**
* 删除标志:0-未删除 1-已删除
*/
private String delFlag;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人ID
*/
private Long createBy;
/**
* 更新人ID
*/
private Long updateBy;
/**
* 备注信息
*/
private String remark;
/**
* 租户ID
*/
private String tenantId;
}
@@ -29,6 +29,7 @@ public class GroupedSyslogData {
private String[] httpReqBodys;
private String[] httpRespHeaders;
private String[] httpRespBodys;
private String dnsInfo;
private String victimIpsStr;
private Integer[] allAttackResults; // 所有不同的attack_result值
private Integer mostCommonAttackResult; // 最常见的attack_result值
@@ -15,10 +15,10 @@ public interface AlarmMapper {
"INSERT INTO alarm (",
"id, created_at, alarm_name, alarm_level, alarm_type, ",
"alarm_major_type, alarm_minor_type,alarm_area_id, attack_ip, victim_ip, victim_web_url, ",
"device_id, comment,origin_log_ids,log_start_at, log_end_at, http_status, ",
"device_id, comment,origin_log_ids,log_start_at, log_end_at, window_time, http_status, ",
"attack_port, victim_port, attack_method, etl_time, log_count, ",
"attack_chain_phase, disposition_advice, attack_direction, ",
"judged_state, disposed_state, attack_result, fall, payload, engine_type, " ,
"judged_state, disposed_state, attack_result, fall, payload, dns_info, engine_type, " ,
"http_req_header , http_req_body,http_resp_header , http_resp_body ",
") VALUES ",
"<foreach collection='list' item='item' separator=','>",
@@ -30,18 +30,18 @@ public interface AlarmMapper {
"#{item.deviceId, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.comment}, " ,
"#{item.originLogIds, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.logStartAt}, #{item.logEndAt}, #{item.httpStatus}, ",
"#{item.logStartAt}, #{item.logEndAt}, #{item.windowTime}, #{item.httpStatus}, ",
"#{item.attackPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.victimPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.attackMethod}, #{item.etlTime}, #{item.logCount}, ",
"#{item.attackChainPhase, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.dispositionAdvice}, #{item.attackDirection}, ",
"#{item.judgedState}, #{item.disposedState}, #{item.attackResult}, #{item.fall}, ",
"#{item.payload}, #{item.engineType}, ",
"#{item.httpReqHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpReqBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler})",
"#{item.payload},#{item.dnsInfo}, #{item.engineType}, ",
"#{item.httpReqHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpReqBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler})",
"</foreach>",
"</script>"})
void batchInsert(@Param("list") List<Alarm> alarmList);
@@ -52,10 +52,10 @@ public interface AlarmMapper {
@Insert("INSERT INTO alarm (" +
"id, created_at, alarm_name, alarm_level, alarm_type, " +
"alarm_major_type, alarm_minor_type,alarm_area_id, attack_ip, victim_ip, victim_web_url, " +
"device_id, comment,origin_log_ids, log_start_at, log_end_at, http_status, " +
"device_id, comment,origin_log_ids, log_start_at, log_end_at, window_time, http_status, " +
"attack_port, victim_port, attack_method, etl_time, log_count, " +
"attack_chain_phase, disposition_advice, attack_direction, " +
"judged_state, disposed_state, attack_result, fall, payload, engine_type, " +
"judged_state, disposed_state, attack_result, fall, payload, dns_info, engine_type, " +
"http_req_header , http_req_body,http_resp_header , http_resp_body " +
") VALUES (" +
"#{id}, #{createdAt}, #{alarmName}, #{alarmLevel}, " +
@@ -66,17 +66,17 @@ public interface AlarmMapper {
"#{deviceId, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{comment}, " +
"#{originLogIds, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{logStartAt}, #{logEndAt}, #{httpStatus}, " +
"#{logStartAt}, #{logEndAt}, #{windowTime},#{httpStatus}, " +
"#{attackPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{victimPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{attackMethod}, #{etlTime}, #{logCount}, " +
"#{attackChainPhase, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{dispositionAdvice}, #{attackDirection}, " +
"#{judgedState}, #{disposedState}, #{attackResult}, #{fall}, #{payload}, #{engineType}, " +
"#{httpReqHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpReqBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler} " +
"#{judgedState}, #{disposedState}, #{attackResult}, #{fall}, #{payload}, #{dnsInfo},#{engineType}, " +
"#{httpReqHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpReqBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler} " +
")")
void insert(Alarm alarm);
}
@@ -19,10 +19,10 @@ public interface AlarmVisitMapper {
"INSERT INTO alarm_visit (",
"id, created_at, alarm_name, alarm_level, alarm_type, ",
"alarm_major_type, alarm_minor_type,alarm_area_id, attack_ip, victim_ip, victim_web_url, ",
"device_id, comment,origin_log_ids,log_start_at, log_end_at, http_status, ",
"device_id, comment,origin_log_ids,log_start_at, log_end_at,window_time, http_status, ",
"attack_port, victim_port, attack_method, etl_time, log_count, ",
"attack_chain_phase, disposition_advice, attack_direction, ",
"judged_state, disposed_state, attack_result, fall, payload, " ,
"judged_state, disposed_state, attack_result, fall, payload, dns_info, engine_type, " ,
"http_req_header , http_req_body,http_resp_header , http_resp_body ",
") VALUES ",
"<foreach collection='list' item='item' separator=','>",
@@ -34,18 +34,18 @@ public interface AlarmVisitMapper {
"#{item.deviceId, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.comment}, " ,
"#{item.originLogIds, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.logStartAt}, #{item.logEndAt}, #{item.httpStatus}, ",
"#{item.logStartAt}, #{item.logEndAt},, #{item.windowTime} #{item.httpStatus}, ",
"#{item.attackPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.victimPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.attackMethod}, #{item.etlTime}, #{item.logCount}, ",
"#{item.attackChainPhase, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, ",
"#{item.dispositionAdvice}, #{item.attackDirection}, ",
"#{item.judgedState}, #{item.disposedState}, #{item.attackResult}, #{item.fall}, ",
"#{item.payload}, ",
"#{item.httpReqHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpReqBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}) ",
"#{item.payload},#{item.dnsInfo}, #{item.engineType}, ",
"#{item.httpReqHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpReqBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, ",
"#{item.httpRespBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}) ",
"</foreach>",
"</script>"})
void batchInsert(@Param("list") List<AlarmVisit> alarmList);
@@ -56,10 +56,10 @@ public interface AlarmVisitMapper {
@Insert("INSERT INTO alarm_visit (" +
"id, created_at, alarm_name, alarm_level, alarm_type, " +
"alarm_major_type, alarm_minor_type,alarm_area_id, attack_ip, victim_ip, victim_web_url, " +
"device_id, comment,origin_log_ids, log_start_at, log_end_at, http_status, " +
"device_id, comment,origin_log_ids, log_start_at, log_end_at, window_time,http_status, " +
"attack_port, victim_port, attack_method, etl_time, log_count, " +
"attack_chain_phase, disposition_advice, attack_direction, " +
"judged_state, disposed_state, attack_result, fall, payload, " +
"judged_state, disposed_state, attack_result, fall, payload, dns_info,engine_type, " +
"http_req_header , http_req_body,http_resp_header , http_resp_body " +
") VALUES (" +
"#{id}, #{createdAt}, #{alarmName}, #{alarmLevel}, " +
@@ -70,17 +70,17 @@ public interface AlarmVisitMapper {
"#{deviceId, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{comment}, " +
"#{originLogIds, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{logStartAt}, #{logEndAt}, #{httpStatus}, " +
"#{logStartAt}, #{logEndAt}, #{windowTime}, #{httpStatus}, " +
"#{attackPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{victimPort, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{attackMethod}, #{etlTime}, #{logCount}, " +
"#{attackChainPhase, typeHandler=com.Modules.etl.handler.ArrayIntegerTypeHandler}, " +
"#{dispositionAdvice}, #{attackDirection}, " +
"#{judgedState}, #{disposedState}, #{attackResult}, #{fall}, #{payload}, " +
"#{httpReqHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpReqBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespHeaders, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespBodys, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler} " +
"#{judgedState}, #{disposedState}, #{attackResult}, #{fall}, #{payload},#{dnsInfo} ,#{engineType}, " +
"#{httpReqHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpReqBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespHeader, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler}, " +
"#{httpRespBody, typeHandler=com.Modules.etl.handler.ArrayStringTypeHandler} " +
")")
void insert(AlarmVisit alarm);
}
@@ -0,0 +1,50 @@
package com.common.mapper;
import com.common.entity.AnalysisAnalysisRule;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.UUID;
/**
* 分析规则Mapper
*/
@Mapper
public interface AnalysisAnalysisRuleMapper {
/**
* 查询指定运行模式的活动规则
*
* @param runMode 运行模式:realtime-实时/offline-离线
* @return 规则列表
*/
List<AnalysisAnalysisRule> selectActiveRulesByRunMode(@Param("runMode") String runMode);
/**
* 根据规则ID查询规则
*
* @param ruleId 规则ID
* @return 规则信息
*/
AnalysisAnalysisRule selectByRuleId(@Param("ruleId") String ruleId);
/**
* 更新规则任务状态
*
* @param ruleId 规则ID
* @param taskStatus 任务状态
* @param updateBy 更新人ID
* @return 影响行数
*/
int updateTaskStatus(@Param("ruleId") String ruleId,
@Param("taskStatus") String taskStatus,
@Param("updateBy") Long updateBy);
/**
* 根据ID查询规则
*
* @param ruleId 规则ID
* @return 规则信息
*/
AnalysisAnalysisRule selectById(@Param("ruleId") String ruleId);
}
@@ -0,0 +1,38 @@
package com.common.mapper;
import com.common.entity.AnalysisField;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.UUID;
/**
* 分析字段配置Mapper
*/
@Mapper
public interface AnalysisFieldMapper {
/**
* 根据规则ID查询字段配置
*
* @param ruleId 规则ID
* @return 字段列表
*/
List<AnalysisField> selectByRuleId(@Param("ruleId") String ruleId);
/**
* 查询规则中用于SELECT的字段
*
* @param ruleId 规则ID
* @return 字段列表
*/
List<AnalysisField> selectSelectFieldsByRuleId(@Param("ruleId") String ruleId);
/**
* 查询规则中用于GROUP BY的字段
*
* @param ruleId 规则ID
* @return 字段列表
*/
List<AnalysisField> selectGroupByFieldsByRuleId(@Param("ruleId") String ruleId);
}
@@ -0,0 +1,38 @@
package com.common.mapper;
import com.common.entity.AnalysisFilter;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 分析过滤条件Mapper
*/
@Mapper
public interface AnalysisFilterMapper {
/**
* 根据规则ID查询过滤条件
*
* @param ruleId 规则ID
* @return 过滤条件列表
*/
List<AnalysisFilter> selectByRuleId(@Param("ruleId") String ruleId);
/**
* 根据字段ID查询过滤条件
*
* @param fieldId 字段ID
* @return 过滤条件列表
*/
List<AnalysisFilter> selectByFieldId(@Param("fieldId") Long fieldId);
/**
* 根据条件ID查询过滤条件(与where_condition表关联)
*
* @param condId 条件ID
* @return 过滤条件列表
*/
List<AnalysisFilter> selectByCondId(@Param("condId") Integer condId);
}
@@ -0,0 +1,38 @@
package com.common.mapper;
import com.common.entity.AnalysisGroupByColumn;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 分组字段配置Mapper
*/
@Mapper
public interface AnalysisGroupByColumnMapper {
/**
* 根据规则ID查询分组字段配置
*
* @param ruleId 规则ID
* @return 分组字段列表
*/
List<AnalysisGroupByColumn> selectByRuleId(@Param("ruleId") String ruleId);
/**
* 根据分组ID查询分组字段配置
*
* @param groupById 分组ID
* @return 分组字段列表
*/
List<AnalysisGroupByColumn> selectByGroupById(@Param("groupById") Integer groupById);
/**
* 根据用户组ID查询分组字段配置
*
* @param groupId 用户组ID
* @return 分组字段列表
*/
List<AnalysisGroupByColumn> selectByGroupId(@Param("groupId") Long groupId);
}
@@ -0,0 +1,30 @@
package com.common.mapper;
import com.common.entity.AnalysisGroupByHaving;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 分析分组聚合条件Mapper
*/
@Mapper
public interface AnalysisGroupByHavingMapper {
/**
* 根据分组ID查询HAVING条件
*
* @param groupById 分组ID
* @return HAVING条件列表
*/
List<AnalysisGroupByHaving> selectByGroupById(@Param("groupById") Integer groupById);
/**
* 根据规则ID查询HAVING条件(通过关联分组表)
*
* @param ruleId 规则ID
* @return HAVING条件列表
*/
List<AnalysisGroupByHaving> selectByRuleId(@Param("ruleId") String ruleId);
}
@@ -0,0 +1,22 @@
package com.common.mapper;
import com.common.entity.AnalysisGroupBy;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 分组规则配置Mapper
*/
@Mapper
public interface AnalysisGroupByMapper {
/**
* 根据规则ID查询分组配置
*
* @param ruleId 规则ID
* @return 分组配置列表
*/
List<AnalysisGroupBy> selectByRuleId(@Param("ruleId") String ruleId);
}
@@ -0,0 +1,20 @@
package com.common.mapper;
import com.common.entity.AnalysisGroupByWindow;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* 分析时间窗口配置Mapper
*/
@Mapper
public interface AnalysisGroupByWindowMapper {
/**
* 根据分组ID查询窗口配置
*
* @param groupById 分组ID
* @return 窗口配置
*/
AnalysisGroupByWindow selectByGroupById(@Param("groupById") Integer groupById);
}
@@ -0,0 +1,50 @@
package com.common.mapper;
import com.common.entity.AnalysisTaskHistory;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 分析任务历史Mapper
*/
@Mapper
public interface AnalysisTaskHistoryMapper {
/**
* 插入任务历史记录
*
* @param history 任务历史
* @return 影响行数
*/
int insert(AnalysisTaskHistory history);
/**
* 更新任务历史记录
*
* @param history 任务历史
* @return 影响行数
*/
int update(AnalysisTaskHistory history);
/**
* 根据规则ID查询最近的任务历史
*
* @param ruleId 规则ID
* @param limit 查询数量
* @return 任务历史列表
*/
List<AnalysisTaskHistory> selectRecentByRuleId(@Param("ruleId") String ruleId,
@Param("limit") int limit);
/**
* 根据规则ID和状态查询任务历史
*
* @param ruleId 规则ID
* @param status 状态
* @return 任务历史
*/
AnalysisTaskHistory selectByRuleIdAndStatus(@Param("ruleId") String ruleId,
@Param("status") String status);
}
@@ -0,0 +1,39 @@
package com.common.mapper;
import com.common.entity.AnalysisWhereCondition;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.UUID;
/**
* WHERE条件Mapper
*/
@Mapper
public interface AnalysisWhereConditionMapper {
/**
* 根据规则ID查询WHERE条件
*
* @param ruleId 规则ID
* @return WHERE条件列表
*/
List<AnalysisWhereCondition> selectByRuleId(@Param("ruleId") String ruleId);
/**
* 查询根节点条件(无父节点)
*
* @param ruleId 规则ID
* @return 根条件列表
*/
List<AnalysisWhereCondition> selectRootConditions(@Param("ruleId") String ruleId);
/**
* 查询指定条件的子条件
*
* @param parentCondId 父条件ID
* @return 子条件列表
*/
List<AnalysisWhereCondition> selectChildConditions(@Param("parentCondId") String parentCondId);
}
@@ -71,6 +71,7 @@ public interface SyslogNormalAlarmMapper {
"ARRAY_AGG(DISTINCT http_req_body) as httpReqBodys, " +
"ARRAY_AGG(DISTINCT http_resp_header) as httpRespHeaders, " +
"ARRAY_AGG(DISTINCT http_resp_body) as httpRespBodys, " +
"MODE() WITHIN GROUP (ORDER BY dest_domain) as dns_info, " +
"STRING_AGG(DISTINCT COALESCE(host(dest_ip)::text, ''), ',') as victim_ips_str " +
"FROM syslog_normal_alarm " +
"WHERE log_time >= #{startTime} AND log_time < #{endTime} " +
@@ -68,6 +68,7 @@ public interface SyslogNormalDataMapper {
"ARRAY_AGG(DISTINCT http_req_body) as httpReqBodys, " +
"ARRAY_AGG(DISTINCT http_resp_header) as httpRespHeaders, " +
"ARRAY_AGG(DISTINCT http_resp_body) as httpRespBodys, " +
"MODE() WITHIN GROUP (ORDER BY dest_domain) as dns_info, " +
"STRING_AGG(DISTINCT COALESCE(host(dest_ip)::text, ''), ',') as victim_ips_str " +
"FROM syslog_normal_data " +
"WHERE log_time >= #{startTime} AND log_time < #{endTime} " +
@@ -33,8 +33,9 @@ public class ETLOrchestrator {
private NormalizeRuleHitTimeService normalizeRuleHitTimeService;
/**
* 定时任务 - 从每小时第1分钟开始,5分钟间隔执行
* 20260317:暂定硬规则关联分析
*/
@Scheduled(cron = "0 1/5 * * * ?")
//@Scheduled(cron = "0 1/5 * * * ?")
public void scheduledETL() {
long startTime = System.currentTimeMillis();
@@ -46,7 +47,7 @@ public class ETLOrchestrator {
//泛化标准数据告警降噪任务
try {
//retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcess24HoursGroupedData());
retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursGroupedData(strStartTime,strEndTime ));
//retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursGroupedData(strStartTime,strEndTime ));
retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursAlarm(strStartTime,strEndTime ));
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime) / 1000;
@@ -0,0 +1,62 @@
package com.common.schedule;
import com.common.service.AnalysisRuleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* 离线分析定时任务
*/
@Slf4j
@Component
public class OfflineAnalysisScheduler {
@Autowired
private AnalysisRuleService analysisRuleService;
@Value("${analysis.offline.enabled:true}")
private boolean offlineEnabled;
/**
* 定时执行离线分析(使用cron表达式,默认每小时执行一次)
* 具体分析规则运行需要根据配置运行时间周期进行,离线暂停
*/
// @Scheduled(cron = "${analysis.offline.cron-expression:0 0 */1 * * ?}")
public void executeOfflineAnalysis() {
if (!offlineEnabled) {
log.debug("离线分析引擎已禁用,跳过执行");
return;
}
try {
log.info("========== 开始执行离线分析任务 ==========");
long startTime = System.currentTimeMillis();
List<Map<String, Object>> results = analysisRuleService.executeOfflineAnalysis();
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
log.info("========== 离线分析任务完成,耗时: {} ms,处理规则数: {} ==========",
duration, results.size());
// 输出执行结果摘要
for (Map<String, Object> result : results) {
log.info("规则: {}, 状态: {}, 处理记录数: {}, 生成告警数: {}",
result.get("ruleName"),
result.get("status"),
result.get("processedCount"),
result.get("alarmCount"));
}
} catch (Exception e) {
log.error("执行离线分析任务失败", e);
}
}
}
@@ -0,0 +1,196 @@
package com.common.schedule;
import com.common.entity.AnalysisAnalysisRule;
import com.common.entity.AnalysisGroupBy;
import com.common.entity.AnalysisGroupByWindow;
import com.common.mapper.AnalysisAnalysisRuleMapper;
import com.common.mapper.AnalysisGroupByMapper;
import com.common.mapper.AnalysisGroupByWindowMapper;
import com.common.service.AnalysisRuleService;
import com.common.service.RuleExecutionTimeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
/**
* 实时分析定时任务调度器
* 每个规则根据窗口类型(滚动、滑动、会话)独立管理下次运行时间
*/
@Slf4j
@Component
public class RealtimeAnalysisScheduler {
@Autowired
private AnalysisRuleService analysisRuleService;
@Autowired
private RuleExecutionTimeService ruleExecutionTimeService;
@Autowired
private AnalysisAnalysisRuleMapper ruleMapper;
@Autowired
private AnalysisGroupByMapper groupByMapper;
@Autowired
private AnalysisGroupByWindowMapper groupByWindowMapper;
@Value("${analysis.realtime.enabled:true}")
private boolean realtimeEnabled;
@Value("${analysis.realtime.check-interval-seconds:10}")
private int checkIntervalSeconds;
/**
* 应用启动时初始化所有规则的执行时间
*/
@PostConstruct
public void init() {
if (!realtimeEnabled) {
log.info("实时分析引擎已禁用,跳过初始化");
return;
}
log.info("========== 初始化实时分析调度器 ==========");
try {
initAllRules();
log.info("========== 实时分析调度器初始化完成 ==========");
} catch (Exception e) {
log.error("初始化实时分析调度器失败", e);
}
}
/**
* 定时检查规则是否需要执行(默认每10秒检查一次)
* 根据窗口类型(滚动、滑动、会话)独立计算下次执行时间
*/
@Scheduled(fixedDelayString = "${analysis.realtime.check-interval-seconds:10}000")
public void checkAndExecuteRules() {
if (!realtimeEnabled) {
return;
}
try {
LocalDateTime now = LocalDateTime.now();
// 查询所有启用的实时规则
List<AnalysisAnalysisRule> rules = ruleMapper.selectActiveRulesByRunMode("realtime");
if (rules.isEmpty()) {
return;
}
int executedCount = 0;
int skippedCount = 0;
for (AnalysisAnalysisRule rule : rules) {
try {
// 获取规则下次执行时间
LocalDateTime nextTime = ruleExecutionTimeService.getNextExecuteTime(rule.getRuleId());
// 如果未初始化或已到执行时间
if (nextTime == null || !nextTime.isAfter(now)) {
log.info("执行规则: ruleId={}, ruleName={}, nextTime={}, now={}",
rule.getRuleId(), rule.getRuleName(),
nextTime, now);
// 执行规则
Map<String, Object> result = analysisRuleService.executeRealtimeRule(rule.getRuleId());
// 计算下次执行时间
updateNextExecuteTime(rule);
executedCount++;
} else {
skippedCount++;
log.debug("规则未到执行时间: ruleId={}, nextTime={}, now={}, diff={}",
rule.getRuleId(), nextTime, now,
java.time.Duration.between(now, nextTime).getSeconds());
}
} catch (Exception e) {
log.error("检查和执行规则失败: ruleId={}", rule.getRuleId(), e);
}
}
if (executedCount > 0) {
log.info("本次调度执行规则数: {}, 跳过规则数: {}", executedCount, skippedCount);
}
} catch (Exception e) {
log.error("检查和执行规则失败", e);
}
}
/**
* 初始化所有规则的执行时间
*/
private void initAllRules() {
List<AnalysisAnalysisRule> rules = ruleMapper.selectActiveRulesByRunMode("realtime");
log.info("查询到 {} 个实时分析规则", rules.size());
for (AnalysisAnalysisRule rule : rules) {
try {
// 加载窗口配置
AnalysisGroupByWindow groupByWindow = loadGroupByWindow(rule.getRuleId());
// 初始化下次执行时间
ruleExecutionTimeService.initRuleExecuteTime(rule, groupByWindow);
log.info("初始化规则: ruleId={}, ruleName={}, windowType={}",
rule.getRuleId(), rule.getRuleName(),
groupByWindow != null ? groupByWindow.getWindowType() : "NONE");
} catch (Exception e) {
log.error("初始化规则执行时间失败: ruleId={}", rule.getRuleId(), e);
}
}
}
/**
* 加载规则的窗口配置
*/
private AnalysisGroupByWindow loadGroupByWindow(String ruleId) {
try {
List<AnalysisGroupBy> groupByList = groupByMapper.selectByRuleId(ruleId);
if (groupByList != null && !groupByList.isEmpty()) {
AnalysisGroupBy groupBy = groupByList.get(0);
if (groupBy.getId() != null) {
return groupByWindowMapper.selectByGroupById(groupBy.getId().intValue());
}
}
return null;
} catch (Exception e) {
log.error("加载窗口配置失败: ruleId={}", ruleId, e);
return null;
}
}
/**
* 更新规则下次执行时间
*/
private void updateNextExecuteTime(AnalysisAnalysisRule rule) {
try {
// 加载窗口配置
AnalysisGroupByWindow groupByWindow = loadGroupByWindow(rule.getRuleId());
// 更新下次执行时间
ruleExecutionTimeService.updateNextExecuteTime(rule, groupByWindow);
} catch (Exception e) {
log.error("更新规则下次执行时间失败: ruleId={}", rule.getRuleId(), e);
}
}
/**
* 应用关闭时清理
*/
@PreDestroy
public void destroy() {
log.info("========== 关闭实时分析调度器 ==========");
}
}
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.common.entity.AlarmVisit;
import com.common.entity.DeviceDevice;
import com.common.entity.SecExceptionAlgorithm;
import com.common.entity.SyslogNormalData;
import lombok.extern.slf4j.Slf4j;
@@ -36,6 +37,10 @@ import com.common.util.AlgorithmResultParser;
@EnableScheduling
@EnableAsync
public class AccessLogAlertService {
@Autowired
public static DeviceDeviceService deviceDeviceService ;
@Autowired
private AlgorithmResultParser algorithmResultParser;
@Autowired
@@ -63,7 +68,7 @@ public class AccessLogAlertService {
@PostConstruct
public void init() {
// 初始化时设置为当前时间减2分钟
lastProcessTime = LocalDateTime.now().minusMinutes(2);
lastProcessTime = LocalDateTime.now().minusMinutes(1);
log.info("初始化AccessLogAlertService,上次处理时间: {}", lastProcessTime);
// 加载启用的算法配置到缓存
@@ -93,7 +98,7 @@ public class AccessLogAlertService {
/**
* 安全的定时任务入口
*/
@Scheduled(cron = "0 */2 * * * ?")
@Scheduled(cron = "0 */1 * * * ?")
public void safeProcessTask() {
if (processing.compareAndSet(false, true)) {
try {
@@ -108,7 +113,7 @@ public class AccessLogAlertService {
/**
* 定时任务入口 - 每2分钟执行一次
*/
@Scheduled(cron = "0 */2 * * * ?")
@Scheduled(cron = "0 */1 * * * ?")
@Async
public void processAccessLogAlert() {
log.info("开始执行访问日志告警处理任务");
@@ -351,8 +356,11 @@ public class AccessLogAlertService {
.judgedState(0)
.disposedState(0)
.dispositionAdvice("研判后处置")
.dnsInfo(alarmResult.getString("host"))
.build();
//补充返回结果的原始日志字段
AddOriginLogField(algorithm.getAlgorithmName(),alarmVisit,alarmResult);
// 保存告警记录
alarmVisitMapper.insert(alarmVisit);
alarmCount++;
@@ -366,6 +374,8 @@ public class AccessLogAlertService {
}
}
/**
* 解析时间字符串
*/
@@ -374,7 +384,6 @@ public class AccessLogAlertService {
if (timeStr == null || timeStr.isEmpty()) {
return LocalDateTime.now();
}
// 尝试多种时间格式
try {
return LocalDateTime.parse(timeStr);
@@ -392,6 +401,54 @@ public class AccessLogAlertService {
}
/**
* 补充返回结果的原始日志字段
* @param AlgorithmName
* @param alarmVisit
* @param alarmResult
* @return
*/
private boolean AddOriginLogField(String AlgorithmName, AlarmVisit alarmVisit ,JSONObject alarmResult )
{
try {
JSONObject originLogObject= alarmResult.getJSONObject("origin_log");
if(originLogObject.isEmpty()) {
log.debug("算法:{},ID:{} ,AlarmNme:{} 没有返回 origin_log节点.",AlgorithmName, alarmVisit.getId(), alarmVisit.getAlarmName());
return false;
}
alarmVisit.setAttackPort( new Integer[]{alarmResult.getInteger("_source.sport")} );
alarmVisit.setVictimPort( new Integer[]{alarmResult.getInteger("_source.dport")} );
alarmVisit.setAttackMethod(alarmResult.getString("_source.method") );
String deviceIp= alarmResult.getString("_source.device_ip");
//alarmVisit.setDeviceId( new Integer[]{ getDeviceID(deviceIp)} );
alarmVisit.setHttpStatus( alarmResult.getString("_source.status"));
return true;
} catch (Exception e) {
log.error("算法:{} 补充原始记录日志字段异常。error:{} ",AlgorithmName,e.getMessage(), e );
return false;
}
}
public int getDeviceID(String source_ip)
{
//默认deviceId =-1
int deviceId=-1 ;
List<DeviceDevice> deviceList= deviceDeviceService.getByIpSafely(source_ip);
if(deviceList.isEmpty()) {
return deviceId;
}
if(deviceList.size()>1)
{
log.error("设备请求的Host IP注册超过一条记录,请联系管理员处理!");
return deviceId;
}
return deviceList.get(0).getId();
}
}
@@ -0,0 +1,42 @@
package com.common.service;
import com.common.entity.AnalysisAnalysisRule;
import java.util.List;
import java.util.Map;
/**
* 分析引擎接口
*/
public interface AnalysisEngine {
/**
* 执行分析规则
*
* @param rule 规则
* @return 处理结果
*/
Map<String, Object> executeRule(AnalysisAnalysisRule rule);
/**
* 批量执行规则
*
* @param rules 规则列表
* @return 处理结果列表
*/
List<Map<String, Object>> executeRules(List<AnalysisAnalysisRule> rules);
/**
* 停止指定规则
*
* @param ruleId 规则ID
*/
void stopRule(String ruleId);
/**
* 获取引擎运行模式
*
* @return 运行模式
*/
String getRunMode();
}
@@ -0,0 +1,50 @@
package com.common.service;
import com.common.entity.AnalysisAnalysisRule;
import java.util.List;
import java.util.Map;
/**
* 分析规则服务接口
*/
public interface AnalysisRuleService {
/**
* 执行实时分析
*
* @return 执行结果
*/
List<Map<String, Object>> executeRealtimeAnalysis();
/**
* 执行离线分析
*
* @return 执行结果
*/
List<Map<String, Object>> executeOfflineAnalysis();
/**
* 查询活动规则
*
* @param runMode 运行模式
* @return 规则列表
*/
List<AnalysisAnalysisRule> getActiveRules(String runMode);
/**
* 停止规则
*
* @param ruleId 规则ID
*/
void stopRule(String ruleId);
/**
* 执行单个实时分析规则
*
* @param ruleId 规则ID
* @return 执行结果
*/
Map<String, Object> executeRealtimeRule(String ruleId);
}
@@ -84,7 +84,7 @@ public class DataTransformer {
.logStartAt(groupedData.getMinLogTime())
.logEndAt(groupedData.getMaxLogTime())
.httpStatus(convertHttpStatus(groupedData.getHttpStatusCodes()))
.dnsInfo(null)
.dnsInfo(groupedData.getDnsInfo())
.accountInfo(null)
.attackerInfo(null)
.victimInfo(null)
@@ -114,10 +114,10 @@ public class DataTransformer {
.focused(false)
.baseFocused(false)
.isUpdated(false)
.httpReqHeaders(groupedData.getHttpReqHeaders())
.httpReqBodys(groupedData.getHttpReqBodys())
.httpRespHeaders(groupedData.getHttpRespHeaders())
.httpRespBodys(groupedData.getHttpRespBodys())
.httpReqHeader(groupedData.getHttpReqHeaders())
.httpReqBody(groupedData.getHttpReqBodys())
.httpRespHeader(groupedData.getHttpRespHeaders())
.httpRespBody(groupedData.getHttpRespBodys())
.build();
} catch (Exception e) {
@@ -160,7 +160,7 @@ public class DataTransformer {
.logStartAt(groupedData.getMinLogTime())
.logEndAt(groupedData.getMaxLogTime())
.httpStatus(convertHttpStatus(groupedData.getHttpStatusCodes()))
.dnsInfo(null)
.dnsInfo(groupedData.getDnsInfo())
.accountInfo(null)
.attackerInfo(null)
.victimInfo(null)
@@ -190,10 +190,10 @@ public class DataTransformer {
.focused(false)
.baseFocused(false)
.isUpdated(false)
.httpReqHeaders(groupedData.getHttpReqHeaders())
.httpReqBodys(groupedData.getHttpReqBodys())
.httpRespHeaders(groupedData.getHttpRespHeaders())
.httpRespBodys(groupedData.getHttpRespBodys())
.httpReqHeader(groupedData.getHttpReqHeaders())
.httpReqBody(groupedData.getHttpReqBodys())
.httpRespHeader(groupedData.getHttpRespHeaders())
.httpRespBody(groupedData.getHttpRespBodys())
.build();
} catch (Exception e) {
@@ -0,0 +1,52 @@
package com.common.service;
import com.common.entity.AnalysisAnalysisRule;
import com.common.entity.AnalysisGroupByWindow;
import java.time.LocalDateTime;
/**
* 规则执行时间管理服务
* 根据窗口类型(滚动、滑动、会话)动态计算下次执行时间
*/
public interface RuleExecutionTimeService {
/**
* 获取规则下次执行时间
*
* @param ruleId 规则ID
* @return 下次执行时间,如果未初始化则返回null
*/
LocalDateTime getNextExecuteTime(String ruleId);
/**
* 更新规则下次执行时间
* 根据窗口类型(滚动、滑动、会话)动态计算
*
* @param rule 分析规则
* @param groupByWindow 窗口配置
*/
void updateNextExecuteTime(AnalysisAnalysisRule rule, AnalysisGroupByWindow groupByWindow);
/**
* 初始化规则执行时间
* 应用启动时调用,设置初始执行时间
*
* @param rule 分析规则
* @param groupByWindow 窗口配置
*/
void initRuleExecuteTime(AnalysisAnalysisRule rule, AnalysisGroupByWindow groupByWindow);
/**
* 删除规则执行时间
*
* @param ruleId 规则ID
*/
void removeRuleExecuteTime(String ruleId);
/**
* 清空所有规则执行时间
*/
void clearAllRuleExecuteTime();
}
@@ -0,0 +1,132 @@
package com.common.service;
import com.common.entity.*;
import java.util.List;
/**
* SQL生成服务接口
*/
public interface SqlGeneratorService {
/**
* 根据规则动态生成SQL
*
* @param rule 分析规则
* @param fields 字段配置
* @param whereConditions WHERE条件逻辑组
* @param filters 过滤条件列表
* @param groupByColumns GROUP BY字段
* @param havingConditions HAVING条件列表
* @param groupByWindow 时间窗口配置
* @param startTime 开始时间
* @param endTime 结束时间
* @return 生成的SQL语句
*/
String generateSql(AnalysisAnalysisRule rule,
List<AnalysisField> fields,
List<AnalysisWhereCondition> whereConditions,
List<AnalysisFilter> filters,
List<AnalysisGroupByColumn> groupByColumns,
List<AnalysisGroupByHaving> havingConditions,
AnalysisGroupByWindow groupByWindow,
String startTime,
String endTime);
/**
* 生成SELECT子句
*
* @param fields 字段配置
* @param groupByWindow 时间窗口配置
* @return SELECT子句SQL
*/
String generateSelectClause(List<AnalysisField> fields, AnalysisGroupByWindow groupByWindow);
/**
* 生成FROM子句
*
* @param fields 字段配置
* @param groupByWindow 时间窗口配置
* @return FROM子句SQL
*/
String generateFromClause(List<AnalysisField> fields, AnalysisGroupByWindow groupByWindow);
/**
* 生成WHERE子句(新版本,使用AnalysisFilter
*
* @param filters 过滤条件列表
* @param startTime 开始时间
* @param endTime 结束时间
* @return WHERE子句SQL
*/
String generateWhereClauseFromFilters(List<AnalysisFilter> filters,
String startTime,
String endTime);
/**
* 生成WHERE子句(旧版本,使用AnalysisWhereCondition
*
* @param conditions WHERE条件列表
* @param startTime 开始时间
* @param endTime 结束时间
* @return WHERE子句SQL
* @deprecated 使用 generateWhereClauseFromFilters 替代
*/
@Deprecated
String generateWhereClause(List<AnalysisWhereCondition> conditions,
String startTime,
String endTime);
/**
* 生成GROUP BY子句
*
* @param columns GROUP BY字段列表
* @param groupByWindow 时间窗口配置
* @return GROUP BY子句SQL
*/
String generateGroupByClause(List<AnalysisGroupByColumn> columns, AnalysisGroupByWindow groupByWindow);
/**
* 生成HAVING子句(新版本,使用AnalysisGroupByHaving
*
* @param havingConditions HAVING条件列表
* @return HAVING子句SQL
*/
String generateHavingClauseFromConditions(List<AnalysisGroupByHaving> havingConditions);
/**
* 生成HAVING子句(旧版本,从规则表达式获取)
*
* @param rule 规则
* @return HAVING子句SQL
* @deprecated 使用 generateHavingClauseFromConditions 替代
*/
@Deprecated
String generateHavingClause(AnalysisAnalysisRule rule);
/**
* 生成时间窗口SQL子句
*
* @param groupByWindow 时间窗口配置
* @return 窗口SQL子句
*/
String generateWindowClause(AnalysisGroupByWindow groupByWindow);
/**
* 构建完整的SQL语句
*
* @param selectClause SELECT子句
* @param fromClause FROM子句
* @param whereClause WHERE子句
* @param groupByClause GROUP BY子句
* @param havingClause HAVING子句
* @return 完整SQL
*/
String buildFullSql(String selectClause,
String fromClause,
String whereClause,
String groupByClause,
String havingClause);
}
@@ -0,0 +1,114 @@
package com.common.service.impl;
import com.common.entity.AnalysisAnalysisRule;
import com.common.mapper.AnalysisAnalysisRuleMapper;
import com.common.service.AnalysisEngine;
import com.common.service.AnalysisRuleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
/**
* 分析规则服务实现
*/
@Slf4j
@Service
public class AnalysisRuleServiceImpl implements AnalysisRuleService {
@Autowired
private AnalysisAnalysisRuleMapper ruleMapper;
@Autowired
@Qualifier("realtimeAnalysisEngine")
private AnalysisEngine realtimeAnalysisEngine;
@Autowired
@Qualifier("offlineAnalysisEngine")
private AnalysisEngine offlineAnalysisEngine;
@Override
public List<Map<String, Object>> executeRealtimeAnalysis() {
log.info("开始执行实时分析任务");
List<AnalysisAnalysisRule> rules = getActiveRules("realtime");
log.info("查询到 {} 条实时分析规则", rules.size());
return realtimeAnalysisEngine.executeRules(rules);
}
@Override
public List<Map<String, Object>> executeOfflineAnalysis() {
log.info("开始执行离线分析任务");
List<AnalysisAnalysisRule> rules = getActiveRules("offline");
log.info("查询到 {} 条离线分析规则", rules.size());
return offlineAnalysisEngine.executeRules(rules);
}
@Override
public List<AnalysisAnalysisRule> getActiveRules(String runMode) {
return ruleMapper.selectActiveRulesByRunMode(runMode);
}
@Override
public void stopRule(String ruleId) {
// 查询规则确定运行模式
AnalysisAnalysisRule rule = ruleMapper.selectByRuleId(ruleId);
if (rule != null) {
String runMode = rule.getRunMode();
if ("realtime".equalsIgnoreCase(runMode)) {
realtimeAnalysisEngine.stopRule(ruleId);
} else if ("offline".equalsIgnoreCase(runMode)) {
offlineAnalysisEngine.stopRule(ruleId);
}
}
}
@Override
public Map<String, Object> executeRealtimeRule(String ruleId) {
log.info("执行实时分析规则: ruleId={}", ruleId);
// 查询规则
AnalysisAnalysisRule rule = ruleMapper.selectByRuleId(ruleId);
if (rule == null) {
log.warn("规则不存在: ruleId={}", ruleId);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", "规则不存在");
return result;
}
if (!"realtime".equalsIgnoreCase(rule.getRunMode())) {
log.warn("规则不是实时分析模式: ruleId={}, runMode={}", ruleId, rule.getRunMode());
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", "规则不是实时分析模式");
return result;
}
try {
// 执行单个规则
List<AnalysisAnalysisRule> ruleList = new ArrayList<>();
ruleList.add(rule);
List<Map<String, Object>> results = realtimeAnalysisEngine.executeRules(ruleList);
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("ruleId", ruleId);
result.put("ruleName", rule.getRuleName());
result.put("resultCount", results.size());
result.put("results", results);
return result;
} catch (Exception e) {
log.error("执行实时分析规则失败: ruleId={}", ruleId, e);
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("ruleId", ruleId);
result.put("message", e.getMessage());
return result;
}
}
}
@@ -0,0 +1,587 @@
package com.common.service.impl;
import com.common.entity.*;
import com.common.mapper.*;
import com.common.service.AnalysisEngine;
import com.common.service.SqlGeneratorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 离线分析引擎实现
*/
@Slf4j
@Service("offlineAnalysisEngine")
public class OfflineAnalysisEngine implements AnalysisEngine {
@Autowired
private SqlGeneratorService sqlGeneratorService;
@Autowired
private AnalysisAnalysisRuleMapper ruleMapper;
@Autowired
private AnalysisFieldMapper fieldMapper;
@Autowired
private AnalysisWhereConditionMapper whereConditionMapper;
@Autowired
private AnalysisGroupByColumnMapper groupByColumnMapper;
@Autowired
private AnalysisFilterMapper filterMapper;
@Autowired
private AnalysisGroupByHavingMapper groupByHavingMapper;
@Autowired
private AnalysisGroupByMapper groupByMapper;
@Autowired
private AnalysisGroupByWindowMapper groupByWindowMapper;
@Autowired
private AnalysisTaskHistoryMapper taskHistoryMapper;
@Autowired
private AlarmMapper alarmMapper;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final String RUN_MODE = "offline";
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public Map<String, Object> executeRule(AnalysisAnalysisRule rule) {
String batchNo = generateBatchNo();
LocalDateTime startTime = LocalDateTime.now();
log.info("开始执行离线规则: ruleId={}, ruleName={}, batchNo={}",
rule.getRuleId(), rule.getRuleName(), batchNo);
// 从规则配置中获取时间范围
LocalDateTime dataStartTime = parseDataStartTime(rule);
LocalDateTime dataEndTime = parseDataEndTime(rule);
// 如果配置中没有指定时间范围,使用默认值
if (dataStartTime == null) {
dataStartTime = startTime.minusHours(1);
}
if (dataEndTime == null) {
dataEndTime = startTime;
}
// 创建任务历史记录
AnalysisTaskHistory history = AnalysisTaskHistory.builder()
.id(System.currentTimeMillis()) // 使用时间戳作为ID
.ruleId(rule.getRuleId())
.startTime(startTime)
.status("RUNNING")
.progressPercent(0)
.inputCount(0L)
.outputCount(0L)
.delFlag("0")
.createTime(startTime)
.updateTime(startTime)
.tenantId("000000")
.remark("离线分析任务 - " + batchNo)
.build();
taskHistoryMapper.insert(history);
Map<String, Object> result = new HashMap<>();
result.put("ruleId", rule.getRuleId());
result.put("ruleName", rule.getRuleName());
result.put("runMode", RUN_MODE);
result.put("batchNo", batchNo);
result.put("dataStartTime", dataStartTime);
result.put("dataEndTime", dataEndTime);
try {
// 更新规则状态为运行中
ruleMapper.updateTaskStatus(rule.getRuleId(), "running", 1L);
// 加载规则配置
List<AnalysisField> fields = fieldMapper.selectByRuleId(rule.getRuleId());
List<AnalysisWhereCondition> whereConditions = whereConditionMapper.selectByRuleId(rule.getRuleId());
List<AnalysisFilter> filters = filterMapper.selectByRuleId(rule.getRuleId());
List<AnalysisGroupByColumn> groupByColumns = groupByColumnMapper.selectByRuleId(rule.getRuleId());
List<AnalysisGroupByHaving> havingConditions = groupByHavingMapper.selectByRuleId(rule.getRuleId());
// 加载分组和窗口配置
AnalysisGroupByWindow groupByWindow = null;
List<AnalysisGroupBy> groupByList = groupByMapper.selectByRuleId(rule.getRuleId());
if (groupByList != null && !groupByList.isEmpty()) {
AnalysisGroupBy groupBy = groupByList.get(0);
if (groupBy.getId() != null) {
groupByWindow = groupByWindowMapper.selectByGroupById(groupBy.getId().intValue());
}
}
// 生成SQL
String sql = sqlGeneratorService.generateSql(
rule,
fields,
whereConditions,
filters,
groupByColumns,
havingConditions,
groupByWindow,
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER)
);
log.info("生成的SQL: {}", sql);
// 执行SQL
List<Map<String, Object>> queryResult = jdbcTemplate.queryForList(sql);
result.put("queryResult", queryResult);
// 处理结果,生成告警
long alarmCount = 0;
if (!queryResult.isEmpty()) {
List<Alarm> alarms = convertToAlarms(rule, queryResult);
if (!alarms.isEmpty()) {
//String tableName = "alarm_" + dataStartTime.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String tableName = "alarm";
alarmMapper.batchInsert( alarms);
alarmCount = alarms.size();
}
}
// 更新任务历史
LocalDateTime endTime = LocalDateTime.now();
long durationSeconds = java.time.Duration.between(startTime, endTime).getSeconds();
history.setEndTime(endTime);
history.setDurationTime(durationSeconds);
history.setProgressPercent(100);
history.setInputCount((long) queryResult.size());
history.setOutputCount(alarmCount);
history.setStatus("COMPLETED");
history.setUpdateTime(endTime);
taskHistoryMapper.update(history);
result.put("processedCount", queryResult.size());
result.put("alarmCount", alarmCount);
result.put("status", "success");
// 更新规则状态为等待
ruleMapper.updateTaskStatus(rule.getRuleId(), "waiting", 1L);
log.info("规则执行成功: ruleId={}, processedCount={}, alarmCount={}",
rule.getRuleId(), queryResult.size(), alarmCount);
} catch (Exception e) {
log.error("规则执行失败: ruleId={}", rule.getRuleId(), e);
// 更新任务历史
LocalDateTime endTime = LocalDateTime.now();
long durationSeconds = java.time.Duration.between(startTime, endTime).getSeconds();
history.setEndTime(endTime);
history.setDurationTime(durationSeconds);
history.setStatus("FAILED");
//history.setRemark("执行失败: " + e.getMessage());
history.setRemark("执行失败: " + (e.getMessage().length()>480? e.getMessage().substring(0,480):e.getMessage()));
history.setUpdateTime(endTime);
taskHistoryMapper.update(history);
// 更新规则状态
ruleMapper.updateTaskStatus(rule.getRuleId(), "stopped", 1L);
result.put("status", "failed");
result.put("errorMsg", e.getMessage());
}
return result;
}
@Override
public List<Map<String, Object>> executeRules(List<AnalysisAnalysisRule> rules) {
List<Map<String, Object>> results = new ArrayList<>();
for (AnalysisAnalysisRule rule : rules) {
try {
Map<String, Object> result = executeRule(rule);
results.add(result);
} catch (Exception e) {
log.error("执行规则失败: ruleId={}", rule.getRuleId(), e);
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("ruleId", rule.getRuleId());
errorResult.put("ruleName", rule.getRuleName());
errorResult.put("status", "failed");
errorResult.put("errorMsg", e.getMessage());
results.add(errorResult);
}
}
return results;
}
@Override
public void stopRule(String ruleId) {
try {
ruleMapper.updateTaskStatus(ruleId, "stopped", 1L);
log.info("已停止规则: ruleId={}", ruleId);
} catch (Exception e) {
log.error("停止规则失败: ruleId={}", ruleId, e);
}
}
@Override
public String getRunMode() {
return RUN_MODE;
}
/**
* 生成批次号
*/
private String generateBatchNo() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
}
/**
* 解析数据开始时间
*/
private LocalDateTime parseDataStartTime(AnalysisAnalysisRule rule) {
// 从rule_content或rule_expression中解析时间范围
// 这里简化处理,实际需要解析JSON配置
/* if (StringUtils.isNotBlank(rule.getRuleContent())) {
// TODO: 解析JSON配置获取时间范围
}*/
return null;
}
/**
* 解析数据结束时间
*/
private LocalDateTime parseDataEndTime(AnalysisAnalysisRule rule) {
// 从rule_content或rule_expression中解析时间范围
/* if (StringUtils.isNotBlank(rule.getRuleContent())) {
// TODO: 解析JSON配置获取时间范围
}*/
return null;
}
/**
* 转换查询结果为告警对象
* 复用实时引擎的转换逻辑
*/
private List<Alarm> convertToAlarms(AnalysisAnalysisRule rule, List<Map<String, Object>> queryResult) {
List<Alarm> alarms = new ArrayList<>();
for (Map<String, Object> row : queryResult) {
Alarm alarm = Alarm.builder()
.id(UUID.randomUUID().toString())
.createdAt(LocalDateTime.now())
.updatedAt(LocalDateTime.now())
.alarmName(rule.getRuleName())
.engineType("offline")
.attackResult(-1)
.focused(false)
.fall(0)
.alarmLevel("未知")
.baseFocused(false)
.isUpdated(false)
.alarmSource(1)
.dispositionAdvice("研判后处置")
.disposedState(0)
.attackDirection("other")
.etlTime(LocalDateTime.now())
.alarmAreaId(0)
.build();
// 映射查询结果字段到告警对象(根据新表结构)
if (row.containsKey("log_start_at")) {
alarm.setLogStartAt(getTimestampValue(row.get("log_start_at")));
}
if (row.containsKey("log_end_at")) {
alarm.setLogEndAt(getTimestampValue(row.get("log_end_at")));
}
if (row.containsKey("alarm_name")) {
alarm.setComment(getStringValue(row.get("alarm_name")));
}
if (row.containsKey("alarm_type")) {
alarm.setComment(getStringValue(row.get("alarm_type")));
}
if (row.containsKey("alarm_level")) {
alarm.setAlarmLevel(convertAlarmLevel(getIntegerValue(row.get("alarm_level"))));
}
if (row.containsKey("attack_ip")) {
alarm.setAttackIp(getStringArray(row.get("attack_ip")));
}
if (row.containsKey("victim_ip")) {
alarm.setVictimIp(getStringArray(row.get("victim_ip")));
}
if (row.containsKey("victim_web_url")) {
alarm.setVictimWebUrl(getStringArray(row.get("victim_web_url")));
}
if (row.containsKey("attack_chain_phase")) {
alarm.setAttackChainPhase(getIntegerArray(row.get("attack_chain_phase")));
}
if (row.containsKey("device_id")) {
alarm.setDeviceId(getIntegerArray(row.get("device_id")));
}
if (row.containsKey("tag")) {
alarm.setTag(getStringArray(row.get("tag")));
}
if (row.containsKey("comment")) {
alarm.setComment(getStringValue(row.get("comment")));
}
if (row.containsKey("origin_log_ids")) {
alarm.setOriginLogIds(getStringArray(row.get("origin_log_ids")));
}
if (row.containsKey("query_id")) {
alarm.setQueryId(getStringValue(row.get("query_id")));
}
if (row.containsKey("attack_result")) {
alarm.setAttackResult(getIntegerValue(row.get("attack_result")));
}
if (row.containsKey("fall")) {
alarm.setFall(getIntegerValue(row.get("fall")));
}
if (row.containsKey("payload")) {
alarm.setPayload(getBytesValue(row.get("payload")));
}
if (row.containsKey("operate_event")) {
alarm.setOperateEvent(getIntegerArray(row.get("operate_event")));
}
if (row.containsKey("attack_port")) {
alarm.setAttackPort(getIntegerArray(row.get("attack_port")));
}
if (row.containsKey("victim_port")) {
alarm.setVictimPort(getIntegerArray(row.get("victim_port")));
}
if (row.containsKey("attack_method")) {
alarm.setAttackMethod(getStringValue(row.get("attack_method")));
}
if (row.containsKey("business_ext")) {
alarm.setBusinessExt(getStringValue(row.get("business_ext")));
}
if (row.containsKey("http_status")) {
alarm.setHttpStatus(getStringValue(row.get("http_status")));
}
if (row.containsKey("dns_info")) {
alarm.setDnsInfo(getStringValue(row.get("dns_info")));
}
if (row.containsKey("account_info")) {
alarm.setAccountInfo(getStringValue(row.get("account_info")));
}
if (row.containsKey("attacker_info")) {
alarm.setAttackerInfo(getStringValue(row.get("attacker_info")));
}
if (row.containsKey("victim_info")) {
alarm.setVictimInfo(getStringValue(row.get("victim_info")));
}
if (row.containsKey("suspicious_action")) {
alarm.setSuspiciousAction(getStringValue(row.get("suspicious_action")));
}
if (row.containsKey("vuln_info")) {
alarm.setVulnInfo(getStringValue(row.get("vuln_info")));
}
if (row.containsKey("weak_pwd")) {
alarm.setWeakPwd(getStringValue(row.get("weak_pwd")));
}
if (row.containsKey("compliance_baseline")) {
alarm.setComplianceBaseline(getStringValue(row.get("compliance_baseline")));
}
if (row.containsKey("file_info")) {
alarm.setFileInfo(getStringValue(row.get("file_info")));
}
if (row.containsKey("file_tags")) {
alarm.setFileTags(getStringValue(row.get("file_tags")));
}
if (row.containsKey("endpoint_info")) {
alarm.setEndpointInfo(getStringValue(row.get("endpoint_info")));
}
if (row.containsKey("origin_info")) {
alarm.setOriginInfo(getStringValue(row.get("origin_info")));
}
if (row.containsKey("protocol_info")) {
alarm.setProtocolInfo(getStringValue(row.get("protocol_info")));
}
if (row.containsKey("email_info")) {
alarm.setEmailInfo(getStringValue(row.get("email_info")));
}
if (row.containsKey("sensitive_data")) {
alarm.setSensitiveData(getStringValue(row.get("sensitive_data")));
}
if (row.containsKey("hit_intelligence")) {
alarm.setHitIntelligence(getIntegerValue(row.get("hit_intelligence")));
}
if (row.containsKey("window_time")) {
alarm.setWindowTime(getStringValue(row.get("window_time")));
}
if (row.containsKey("attack_ip_pic")) {
alarm.setAttackIpPic(getStringValue(row.get("attack_ip_pic")));
}
if (row.containsKey("victim_ip_pic")) {
alarm.setVictimIpPic(getStringValue(row.get("victim_ip_pic")));
}
if (row.containsKey("operation_at")) {
alarm.setOperationAt(getTimestampValue(row.get("operation_at")));
}
if (row.containsKey("attack_direction")) {
alarm.setAttackDirection(getStringValue(row.get("attack_direction")));
}
if (row.containsKey("etl_time")) {
alarm.setEtlTime(getTimestampValue(row.get("etl_time")));
}
if (row.containsKey("log_count")) {
alarm.setLogCount(getIntegerValue(row.get("log_count")));
}
if (row.containsKey("is_asset_hit")) {
alarm.setIsAssetHit(getIntegerValue(row.get("is_asset_hit")));
}
if (row.containsKey("http_req_header")) {
alarm.setHttpReqHeader(getStringArray(row.get("http_req_header")));
}
if (row.containsKey("http_req_body")) {
alarm.setHttpReqBody(getStringArray(row.get("http_req_body")));
}
if (row.containsKey("http_resp_header")) {
alarm.setHttpRespHeader(getStringArray(row.get("http_resp_header")));
}
if (row.containsKey("http_resp_body")) {
alarm.setHttpRespBody(getStringArray(row.get("http_resp_body")));
}
if (row.containsKey("window_time")) {
alarm.setLogEndAt(getTimestampValue(row.get("window_time")));
}
alarms.add(alarm);
}
return alarms;
}
private byte[] getBytesValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof byte[]) {
return (byte[]) value;
}
return value.toString().getBytes();
}
// 以下为辅助方法,用于类型转换
private String getStringValue(Object value) {
if (value == null) {
return null;
}
return value.toString();
}
private Long getLongValue(Object value) {
if (value == null) {
return 0L;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
return Long.parseLong(value.toString());
}
private Integer getIntegerValue(Object value) {
if (value == null) {
return 0;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
return Integer.parseInt(value.toString());
}
private LocalDateTime getTimestampValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof LocalDateTime) {
return (LocalDateTime) value;
}
return LocalDateTime.parse(value.toString());
}
@SuppressWarnings("unchecked")
private String[] getStringArray(Object value) {
if (value == null) {
return new String[0];
}
if (value instanceof String[]) {
return (String[]) value;
}
if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
String[] result = new String[arr.length];
for (int i = 0; i < arr.length; i++) {
result[i] = arr[i] != null ? arr[i].toString() : null;
}
return result;
}
String str = value.toString();
if (str.startsWith("{") && str.endsWith("}")) {
str = str.substring(1, str.length() - 1);
return str.split(",");
}
return new String[]{str};
}
@SuppressWarnings("unchecked")
private Integer[] getIntegerArray(Object value) {
if (value == null) {
return new Integer[0];
}
String[] strArray = getStringArray(value);
Integer[] result = new Integer[strArray.length];
for (int i = 0; i < strArray.length; i++) {
try {
result[i] = Integer.parseInt(strArray[i]);
} catch (NumberFormatException e) {
result[i] = null;
}
}
return result;
}
@SuppressWarnings("unchecked")
private byte[][] getByteArrayArray(Object value) {
if (value == null) {
return new byte[0][];
}
if (value instanceof byte[][]) {
return (byte[][]) value;
}
if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
byte[][] result = new byte[arr.length][];
for (int i = 0; i < arr.length; i++) {
if (arr[i] instanceof byte[]) {
result[i] = (byte[]) arr[i];
} else {
result[i] = arr[i] != null ? arr[i].toString().getBytes() : null;
}
}
return result;
}
return new byte[0][];
}
private String convertAlarmLevel(Integer eventLevel) {
if (eventLevel == null) return "未知";
switch (eventLevel) {
case 0: return "安全(无威胁)";
case 1: return "低危";
case 2: return "中危";
case 3: return "高危";
case 4: return "超危";
default: return "未知";
}
}
}
@@ -0,0 +1,847 @@
package com.common.service.impl;
import com.common.entity.*;
import com.common.mapper.*;
import com.common.service.AnalysisEngine;
import com.common.service.SqlGeneratorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 实时分析引擎实现
*/
@Slf4j
@Service("realtimeAnalysisEngine")
public class RealtimeAnalysisEngine implements AnalysisEngine {
@Autowired
private SqlGeneratorService sqlGeneratorService;
@Autowired
private AnalysisAnalysisRuleMapper ruleMapper;
@Autowired
private AnalysisFieldMapper fieldMapper;
@Autowired
private AnalysisWhereConditionMapper whereConditionMapper;
@Autowired
private AnalysisGroupByColumnMapper groupByColumnMapper;
@Autowired
private AnalysisGroupByWindowMapper groupByWindowMapper;
@Autowired
private AnalysisFilterMapper filterMapper;
@Autowired
private AnalysisGroupByHavingMapper groupByHavingMapper;
@Autowired
private AnalysisTaskHistoryMapper taskHistoryMapper;
@Autowired
private AlarmMapper alarmMapper;
@Autowired
private AnalysisGroupByMapper groupByMapper;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final String RUN_MODE = "realtime";
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public Map<String, Object> executeRule(AnalysisAnalysisRule rule) {
String batchNo = generateBatchNo();
//LocalDateTime startTime = LocalDateTime.now();
//LocalDateTime dataEndTime = startTime;
//LocalDateTime dataStartTime = startTime.minusMinutes(30); // 默认查询最近30分钟
// 时间点取整分钟,避免秒级时间导致窗口计算不一致
LocalDateTime startTime = LocalDateTime.now().withSecond(0).withNano(0);
LocalDateTime dataEndTime = startTime;
// 加载分组和窗口配置(在计算时间范围之前)
AnalysisGroupByWindow groupByWindow = null;
List<AnalysisGroupBy> groupByList = groupByMapper.selectByRuleId(rule.getRuleId());
if (groupByList != null && !groupByList.isEmpty()) {
AnalysisGroupBy groupBy = groupByList.get(0);
if (groupBy.getId() != null) {
// groupBy.getId()是Long类型,需要转换为Integer
groupByWindow = groupByWindowMapper.selectByGroupById(groupBy.getId().intValue());
}
}
// 根据窗口类型动态计算数据查询时间范围
LocalDateTime dataStartTime = calculateDataStartTime(dataEndTime, groupByWindow);
log.info("开始执行实时规则: ruleId={}, ruleName={}, batchNo={}, windowType={}, dataStartTime={}, dataEndTime={}",
rule.getRuleId(), rule.getRuleName(), batchNo,
groupByWindow != null ? groupByWindow.getWindowType() : "NONE",
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER));
// 创建任务历史记录
AnalysisTaskHistory history = AnalysisTaskHistory.builder()
.id(System.currentTimeMillis()) // 使用时间戳作为ID
.ruleId(rule.getRuleId())
.startTime(startTime)
.status("RUNNING")
.progressPercent(0)
.inputCount(0L)
.outputCount(0L)
.delFlag("0")
.createTime(startTime)
.updateTime(startTime)
.tenantId("000000")
.remark("实时分析任务 - " + batchNo)
.build();
taskHistoryMapper.insert(history);
Map<String, Object> result = new HashMap<>();
result.put("ruleId", rule.getRuleId());
result.put("ruleName", rule.getRuleName());
result.put("runMode", RUN_MODE);
result.put("batchNo", batchNo);
try {
// 更新规则状态为运行中
ruleMapper.updateTaskStatus(rule.getRuleId(), "running", 1L);
// 加载规则配置
List<AnalysisField> fields = fieldMapper.selectByRuleId(rule.getRuleId());
List<AnalysisWhereCondition> whereConditions = whereConditionMapper.selectByRuleId(rule.getRuleId());
List<AnalysisFilter> filters = filterMapper.selectByRuleId(rule.getRuleId());
List<AnalysisGroupByColumn> groupByColumns = groupByColumnMapper.selectByRuleId(rule.getRuleId());
List<AnalysisGroupByHaving> havingConditions = groupByHavingMapper.selectByRuleId(rule.getRuleId());
/**
// 加载分组和窗口配置
AnalysisGroupByWindow groupByWindow = null;
List<AnalysisGroupBy> groupByList = groupByMapper.selectByRuleId(rule.getRuleId());
if (groupByList != null && !groupByList.isEmpty()) {
AnalysisGroupBy groupBy = groupByList.get(0);
if (groupBy.getId() != null) {
groupByWindow = groupByWindowMapper.selectByGroupById(groupBy.getId().intValue());
}
}
***/
// 生成SQL
String sql = sqlGeneratorService.generateSql(
rule,
fields,
whereConditions,
filters,
groupByColumns,
havingConditions,
groupByWindow,
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER)
);
log.info("生成的SQL: {}", sql);
// 执行SQL
List<Map<String, Object>> queryResult = jdbcTemplate.queryForList(sql);
result.put("queryResult", queryResult);
// 处理结果,生成告警
long alarmCount = 0;
if (!queryResult.isEmpty()) {
List<Alarm> alarms = convertToAlarms(rule, queryResult);
if (!alarms.isEmpty()) {
//String tableName = "alarm_" + dataStartTime.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String tableName = "alarm";
alarmMapper.batchInsert( alarms);
alarmCount = alarms.size();
}
}
// 更新任务历史
LocalDateTime endTime = LocalDateTime.now();
long durationSeconds = java.time.Duration.between(startTime, endTime).getSeconds();
history.setEndTime(endTime);
history.setDurationTime(durationSeconds);
history.setProgressPercent(100);
history.setInputCount((long) queryResult.size());
history.setOutputCount(alarmCount);
history.setStatus("COMPLETED");
history.setUpdateTime(endTime);
taskHistoryMapper.update(history);
result.put("processedCount", queryResult.size());
result.put("alarmCount", alarmCount);
result.put("status", "success");
// 更新规则状态为等待
ruleMapper.updateTaskStatus(rule.getRuleId(), "waiting", 1L);
log.info("规则执行成功: ruleId={}, processedCount={}, alarmCount={}",
rule.getRuleId(), queryResult.size(), alarmCount);
} catch (Exception e) {
log.error("规则执行失败: ruleId={}", rule.getRuleId(), e);
// 更新任务历史
LocalDateTime endTime = LocalDateTime.now();
long durationSeconds = java.time.Duration.between(startTime, endTime).getSeconds();
history.setEndTime(endTime);
history.setDurationTime(durationSeconds);
history.setStatus("FAILED");
history.setRemark("执行失败: " + (e.getMessage().length()>480? e.getMessage().substring(0,480):e.getMessage()));
history.setUpdateTime(endTime);
taskHistoryMapper.update(history);
// 更新规则状态
ruleMapper.updateTaskStatus(rule.getRuleId(), "stopped", 1L);
result.put("status", "failed");
result.put("errorMsg", e.getMessage());
}
return result;
}
@Override
public List<Map<String, Object>> executeRules(List<AnalysisAnalysisRule> rules) {
List<Map<String, Object>> results = new ArrayList<>();
for (AnalysisAnalysisRule rule : rules) {
try {
Map<String, Object> result = executeRule(rule);
results.add(result);
} catch (Exception e) {
log.error("执行规则失败: ruleId={}", rule.getRuleId(), e);
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("ruleId", rule.getRuleId());
errorResult.put("ruleName", rule.getRuleName());
errorResult.put("status", "failed");
errorResult.put("errorMsg", e.getMessage());
results.add(errorResult);
}
}
return results;
}
@Override
public void stopRule(String ruleId) {
try {
ruleMapper.updateTaskStatus(ruleId, "stopped", 1L);
log.info("已停止规则: ruleId={}", ruleId);
} catch (Exception e) {
log.error("停止规则失败: ruleId={}", ruleId, e);
}
}
@Override
public String getRunMode() {
return RUN_MODE;
}
/**
* 生成批次号
*/
private String generateBatchNo() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"));
}
/**
* 根据窗口类型动态计算数据查询开始时间
*
* @param dataEndTime 数据查询结束时间
* @param groupByWindow 窗口配置
* @return 数据查询开始时间
*/
private LocalDateTime calculateDataStartTime(LocalDateTime dataEndTime, AnalysisGroupByWindow groupByWindow) {
// 如果没有配置窗口,使用默认30分钟
if (groupByWindow == null) {
log.warn("未配置窗口类型,使用默认查询范围:最近30分钟");
return dataEndTime.minusMinutes(30);
}
String windowType = groupByWindow.getWindowType();
if (windowType == null || windowType.trim().isEmpty()) {
log.warn("窗口类型为空,使用默认查询范围:最近30分钟");
return dataEndTime.minusMinutes(30);
}
switch (windowType.toUpperCase()) {
case "TUMBLE":
// 滚动窗口:查询最近一个完整窗口的数据
return calculateTumbleWindowStartTime(dataEndTime, groupByWindow);
case "HOP":
// 滑动窗口:查询覆盖当前时间点的所有滑动窗口
return calculateHopWindowStartTime(dataEndTime, groupByWindow);
case "SESSION":
// 会话窗口:查询最近一个会话超时时间的数据
return calculateSessionWindowStartTime(dataEndTime, groupByWindow);
default:
log.warn("未知窗口类型: {},使用默认查询范围:最近30分钟", windowType);
return dataEndTime.minusMinutes(30);
}
}
/**
* 计算滚动窗口的数据查询开始时间
*
* 滚动窗口特点:窗口之间不重叠,数据量固定
* 查询策略:查询最近1个窗口的数据(包含当前正在进行的窗口)
*/
private LocalDateTime calculateTumbleWindowStartTime(LocalDateTime dataEndTime, AnalysisGroupByWindow groupByWindow) {
Integer windowSize = groupByWindow.getTumbleWindowSize();
String windowSizeUnit = groupByWindow.getTumbleWindowSizeUnit();
if (windowSize == null || windowSize <= 0) {
log.warn("滚动窗口大小配置无效,使用默认值:5分钟");
windowSize = 5;
windowSizeUnit = "m";
}
if (windowSizeUnit == null || windowSizeUnit.trim().isEmpty()) {
windowSizeUnit = "m";
}
LocalDateTime dataStartTime;
switch (windowSizeUnit.toLowerCase()) {
case "s":
dataStartTime = dataEndTime.minusSeconds(windowSize);
break;
case "m":
dataStartTime = dataEndTime.minusMinutes(windowSize);
break;
case "h":
dataStartTime = dataEndTime.minusHours(windowSize);
break;
case "d":
dataStartTime = dataEndTime.minusDays(windowSize);
break;
default:
log.warn("滚动窗口单位无效: {},使用默认单位:分钟", windowSizeUnit);
dataStartTime = dataEndTime.minusMinutes(windowSize);
}
log.info("滚动窗口查询范围: 窗口大小={}{},查询时间范围=[{}, {}]",
windowSize, windowSizeUnit,
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER));
return dataStartTime;
}
/**
* 计算滑动窗口的数据查询开始时间
*
* 滑动窗口特点:窗口之间重叠,每个滑动步长触发一次计算
* 查询策略:查询覆盖当前时间点的完整窗口(窗口大小),而不是查询所有历史滑动窗口
* 原因:避免数据量过大,实时分析只需要分析当前活跃窗口的数据
*/
private LocalDateTime calculateHopWindowStartTime(LocalDateTime dataEndTime, AnalysisGroupByWindow groupByWindow) {
Integer windowSize = groupByWindow.getHopWindowSize();
String windowSizeUnit = groupByWindow.getHopWindowSizeUnit();
if (windowSize == null || windowSize <= 0) {
log.warn("滑动窗口大小配置无效,使用默认值:5分钟");
windowSize = 5;
windowSizeUnit = "m";
}
if (windowSizeUnit == null || windowSizeUnit.trim().isEmpty()) {
windowSizeUnit = "m";
}
LocalDateTime dataStartTime;
switch (windowSizeUnit.toLowerCase()) {
case "s":
dataStartTime = dataEndTime.minusSeconds(windowSize);
break;
case "m":
dataStartTime = dataEndTime.minusMinutes(windowSize);
break;
case "h":
dataStartTime = dataEndTime.minusHours(windowSize);
break;
case "d":
dataStartTime = dataEndTime.minusDays(windowSize);
break;
default:
log.warn("滑动窗口单位无效: {},使用默认单位:分钟", windowSizeUnit);
dataStartTime = dataEndTime.minusMinutes(windowSize);
}
log.info("滑动窗口查询范围: 窗口大小={}{},查询时间范围=[{}, {}]",
windowSize, windowSizeUnit,
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER));
return dataStartTime;
}
/**
* 计算会话窗口的数据查询开始时间
*
* 会话窗口特点:根据用户行为动态划分,超时后结束会话
* 查询策略:查询最近一个会话超时时间的数据,确保捕获活跃会话
* 额外策略:为避免遗漏跨天或长时间运行的会话,额外增加1天缓冲时间
*/
private LocalDateTime calculateSessionWindowStartTime(LocalDateTime dataEndTime, AnalysisGroupByWindow groupByWindow) {
Integer sessionTimeout = groupByWindow.getSessionWindowSize();
String sessionTimeoutUnit = groupByWindow.getSessionWindowSizeUnit();
if (sessionTimeout == null || sessionTimeout <= 0) {
log.warn("会话窗口超时时间配置无效,使用默认值:30分钟");
sessionTimeout = 30;
sessionTimeoutUnit = "m";
}
if (sessionTimeoutUnit == null || sessionTimeoutUnit.trim().isEmpty()) {
sessionTimeoutUnit = "m";
}
LocalDateTime dataStartTime;
switch (sessionTimeoutUnit.toLowerCase()) {
case "s":
dataStartTime = dataEndTime.minusSeconds(sessionTimeout);
break;
case "m":
dataStartTime = dataEndTime.minusMinutes(sessionTimeout);
break;
case "h":
dataStartTime = dataEndTime.minusHours(sessionTimeout);
break;
case "d":
dataStartTime = dataEndTime.minusDays(sessionTimeout);
break;
default:
log.warn("会话窗口超时单位无效: {},使用默认单位:分钟", sessionTimeoutUnit);
dataStartTime = dataEndTime.minusMinutes(sessionTimeout);
}
// 会话窗口额外增加1天缓冲时间,避免遗漏跨天或长时间运行的会话
dataStartTime = dataStartTime.minusDays(1);
log.info("会话窗口查询范围: 超时时间={}{},额外缓冲1天,查询时间范围=[{}, {}]",
sessionTimeout, sessionTimeoutUnit,
dataStartTime.format(DATE_FORMATTER),
dataEndTime.format(DATE_FORMATTER));
return dataStartTime;
}
/**
* 转换查询结果为告警对象
*/
private List<Alarm> convertToAlarms(AnalysisAnalysisRule rule, List<Map<String, Object>> queryResult) {
List<Alarm> alarms = new ArrayList<>();
for (Map<String, Object> row : queryResult) {
Alarm alarm = Alarm.builder()
.id(UUID.randomUUID().toString())
.createdAt(LocalDateTime.now())
.updatedAt(LocalDateTime.now())
.alarmName(rule.getRuleName())
.engineType("realtime")
.attackResult(-1)
.focused(false)
.fall(0)
.alarmLevel("未知")
.baseFocused(false)
.isUpdated(false)
.alarmSource(1)
.dispositionAdvice("研判后处置")
.disposedState(0)
.attackDirection("other")
.etlTime(LocalDateTime.now())
.alarmAreaId(0)
.comment(buildComment(row))
.attackChainPhase(getIntegerArray(-1))
.judgedState(0)
.build();
// 映射查询结果字段到告警对象
if (row.containsKey("log_start_at")) {
alarm.setLogStartAt(getTimestampValue(row.get("log_start_at")));
}
if (row.containsKey("log_end_at")) {
alarm.setLogEndAt(getTimestampValue(row.get("log_end_at")));
}
if (row.containsKey("alarm_name")) {
alarm.setAlarmName(getStringValue(row.get("alarm_name")));
}
if (row.containsKey("alarm_type")) {
alarm.setAlarmType(getStringValue(row.get("alarm_type")));
}
if (row.containsKey("alarm_level")) {
alarm.setAlarmLevel(convertAlarmLevel(getIntegerValue(row.get("alarm_level"))));
}
if (row.containsKey("attack_ip")) {
alarm.setAttackIp(getStringArray(row.get("attack_ip")));
}
if (row.containsKey("victim_ip")) {
alarm.setVictimIp(getStringArray(row.get("victim_ip")));
}
if (row.containsKey("victim_web_url")) {
alarm.setVictimWebUrl(getStringArray(row.get("victim_web_url")));
}
if (row.containsKey("attack_chain_phase")) {
alarm.setAttackChainPhase(getIntegerArray(row.get("attack_chain_phase")));
}
if (row.containsKey("device_id")) {
alarm.setDeviceId(getIntegerArray(row.get("device_id")));
}
if (row.containsKey("tag")) {
alarm.setTag(getStringArray(row.get("tag")));
}
if (row.containsKey("comment")) {
alarm.setComment(getStringValue(row.get("comment")));
}
if (row.containsKey("origin_log_ids")) {
alarm.setOriginLogIds(getStringArray(row.get("origin_log_ids")));
}
if (row.containsKey("query_id")) {
alarm.setQueryId(getStringValue(row.get("query_id")));
}
if (row.containsKey("attack_result")) {
alarm.setAttackResult(getIntegerValue(row.get("attack_result")));
}
if (row.containsKey("fall")) {
alarm.setFall(getIntegerValue(row.get("fall")));
}
if (row.containsKey("payload")) {
alarm.setPayload(getBytesValue(row.get("payload")));
}
if (row.containsKey("operate_event")) {
alarm.setOperateEvent(getIntegerArray(row.get("operate_event")));
}
if (row.containsKey("attack_port")) {
alarm.setAttackPort(getIntegerArray(row.get("attack_port")));
}
if (row.containsKey("victim_port")) {
alarm.setVictimPort(getIntegerArray(row.get("victim_port")));
}
if (row.containsKey("attack_method")) {
alarm.setAttackMethod(getStringValue(row.get("attack_method")));
}
if (row.containsKey("business_ext")) {
alarm.setBusinessExt(getStringValue(row.get("business_ext")));
}
if (row.containsKey("http_status")) {
alarm.setHttpStatus(getStringValue(row.get("http_status")));
}
if (row.containsKey("dns_info")) {
alarm.setDnsInfo(getStringValue(row.get("dns_info")));
}
if (row.containsKey("account_info")) {
alarm.setAccountInfo(getStringValue(row.get("account_info")));
}
if (row.containsKey("attacker_info")) {
alarm.setAttackerInfo(getStringValue(row.get("attacker_info")));
}
if (row.containsKey("victim_info")) {
alarm.setVictimInfo(getStringValue(row.get("victim_info")));
}
if (row.containsKey("suspicious_action")) {
alarm.setSuspiciousAction(getStringValue(row.get("suspicious_action")));
}
if (row.containsKey("vuln_info")) {
alarm.setVulnInfo(getStringValue(row.get("vuln_info")));
}
if (row.containsKey("weak_pwd")) {
alarm.setWeakPwd(getStringValue(row.get("weak_pwd")));
}
if (row.containsKey("compliance_baseline")) {
alarm.setComplianceBaseline(getStringValue(row.get("compliance_baseline")));
}
if (row.containsKey("file_info")) {
alarm.setFileInfo(getStringValue(row.get("file_info")));
}
if (row.containsKey("file_tags")) {
alarm.setFileTags(getStringValue(row.get("file_tags")));
}
if (row.containsKey("endpoint_info")) {
alarm.setEndpointInfo(getStringValue(row.get("endpoint_info")));
}
if (row.containsKey("origin_info")) {
alarm.setOriginInfo(getStringValue(row.get("origin_info")));
}
if (row.containsKey("protocol_info")) {
alarm.setProtocolInfo(getStringValue(row.get("protocol_info")));
}
if (row.containsKey("email_info")) {
alarm.setEmailInfo(getStringValue(row.get("email_info")));
}
if (row.containsKey("sensitive_data")) {
alarm.setSensitiveData(getStringValue(row.get("sensitive_data")));
}
if (row.containsKey("hit_intelligence")) {
alarm.setHitIntelligence(getIntegerValue(row.get("hit_intelligence")));
}
if (row.containsKey("window_time")) {
alarm.setWindowTime(getStringValue(row.get("window_time")));
}
if (row.containsKey("attack_ip_pic")) {
alarm.setAttackIpPic(getStringValue(row.get("attack_ip_pic")));
}
if (row.containsKey("victim_ip_pic")) {
alarm.setVictimIpPic(getStringValue(row.get("victim_ip_pic")));
}
if (row.containsKey("operation_at")) {
alarm.setOperationAt(getTimestampValue(row.get("operation_at")));
}
if (row.containsKey("attack_direction")) {
alarm.setAttackDirection(getStringValue(row.get("attack_direction")));
}
if (row.containsKey("etl_time")) {
alarm.setEtlTime(getTimestampValue(row.get("etl_time")));
}
if (row.containsKey("log_count")) {
alarm.setLogCount(getIntegerValue(row.get("log_count")));
}
if (row.containsKey("is_asset_hit")) {
alarm.setIsAssetHit(getIntegerValue(row.get("is_asset_hit")));
}
if (row.containsKey("http_req_header")) {
alarm.setHttpReqHeader(getStringArray(row.get("http_req_header")));
}
if (row.containsKey("http_req_body")) {
alarm.setHttpReqBody(getStringArray(row.get("http_req_body")));
}
if (row.containsKey("http_resp_header")) {
alarm.setHttpRespHeader(getStringArray(row.get("http_resp_header")));
}
if (row.containsKey("http_resp_body")) {
alarm.setHttpRespBody(getStringArray(row.get("http_resp_body")));
}
if (row.containsKey("window_time")) {
alarm.setLogEndAt(getTimestampValue(row.get("window_time")));
}
alarms.add(alarm);
}
return alarms;
}
private byte[] getBytesValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof byte[]) {
return (byte[]) value;
}
return value.toString().getBytes();
}
// 以下为辅助方法,用于类型转换
private String getStringValue(Object value) {
if (value == null) {
return null;
}
return value.toString();
}
private Long getLongValue(Object value) {
if (value == null) {
return 0L;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
return Long.parseLong(value.toString());
}
private Integer getIntegerValue(Object value) {
if (value == null) {
return -1;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
return Integer.parseInt(value.toString());
}
private LocalDateTime getTimestampValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof LocalDateTime) {
return (LocalDateTime) value;
}
String strValue = value.toString().trim();
// 尝试多种时间格式解析
String[] patterns = {
"yyyy-MM-dd HH:mm:ss.SSS", // 2026-02-05 18:14:25.824
"yyyy-MM-dd HH:mm:ss", // 2026-02-05 18:14:25
"yyyy-MM-dd'T'HH:mm:ss.SSS", // 2026-02-05T18:14:25.824
"yyyy-MM-dd'T'HH:mm:ss", // 2026-02-05T18:14:25
"yyyy-MM-dd HH:mm:ss.SSSSSS", // 带微秒
"yyyy-MM-dd HH:mm:ss.SSSSS", // 带10万分秒
"yyyy-MM-dd HH:mm:ss.S", // 1位毫秒
"yyyy-MM-dd HH:mm:ss.SS", // 2位毫秒
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSS", // ISO带纳秒
"yyyy-MM-dd", // 只有日期
"yyyy/MM/dd HH:mm:ss", // 斜杠分隔
"yyyy/MM/dd HH:mm:ss.SSS" // 斜杠分隔+毫秒
};
for (String pattern : patterns) {
try {
return LocalDateTime.parse(strValue, java.time.format.DateTimeFormatter.ofPattern(pattern));
} catch (Exception ignored) {
// 继续尝试下一个格式
}
}
// 如果所有格式都失败,尝试直接解析(ISO格式)
try {
return LocalDateTime.parse(strValue);
} catch (Exception e) {
log.warn("无法解析时间字符串: {}", strValue);
return null;
}
}
@SuppressWarnings("unchecked")
private String[] getStringArray(Object value) {
if (value == null) {
return new String[0];
}
if (value instanceof String[]) {
return (String[]) value;
}
if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
String[] result = new String[arr.length];
for (int i = 0; i < arr.length; i++) {
result[i] = arr[i] != null ? arr[i].toString() : null;
}
return result;
}
// PostgreSQL数组以字符串形式返回,如 "{ip1,ip2,ip3}"
String str = value.toString();
if (str.startsWith("{") && str.endsWith("}")) {
str = str.substring(1, str.length() - 1);
return str.split(",");
}
return new String[]{str};
}
@SuppressWarnings("unchecked")
private Integer[] getIntegerArray(Object value) {
if (value == null) {
return new Integer[0];
}
String[] strArray = getStringArray(value);
Integer[] result = new Integer[strArray.length];
for (int i = 0; i < strArray.length; i++) {
try {
result[i] = Integer.parseInt(strArray[i]);
} catch (NumberFormatException e) {
result[i] = null;
}
}
return result;
}
@SuppressWarnings("unchecked")
private byte[][] getByteArrayArray(Object value) {
if (value == null) {
return new byte[0][];
}
if (value instanceof byte[][]) {
return (byte[][]) value;
}
if (value instanceof Object[]) {
Object[] arr = (Object[]) value;
byte[][] result = new byte[arr.length][];
for (int i = 0; i < arr.length; i++) {
if (arr[i] instanceof byte[]) {
result[i] = (byte[]) arr[i];
} else {
result[i] = arr[i] != null ? arr[i].toString().getBytes() : null;
}
}
return result;
}
return new byte[0][];
}
private String convertAlarmLevel(Integer eventLevel) {
if (eventLevel == null) return "未知";
switch (eventLevel) {
case 0: return "安全(无威胁)";
case 1: return "低危";
case 2: return "中危";
case 3: return "高危";
case 4: return "超危";
default: return "未知";
}
}
/**
* 构建comment字段
*/
private String buildComment(Map<String, Object> row) {
String victimIpsStr;
if (row.containsKey("victim_ip")) {
victimIpsStr = String.join(",", getStringArray(row.get("victim_ip")));
}
else {
victimIpsStr = "未知";
}
String alarmName="";
if (row.containsKey("alarm_name")) {
alarmName=getStringValue(row.get("alarm_name"));
}
String AttackIps="";
if (row.containsKey("attack_ip")) {
AttackIps=String.join(",", getStringArray(row.get("attack_ip")));
}
return String.format(
"24小时内,检测到%s上产生%s告警:\n告警名称:%s\n攻击IP%s\n攻击结果:%d",
victimIpsStr,
convertAlarmLevel(getIntegerValue(row.get("alarm_level"))),
alarmName,
AttackIps,
determineAttackResult(row)
);
}
private String convertAttackIps(String[] attackIps) {
if (attackIps == null || attackIps.length == 0) {
return null;
}
return String.join(",", attackIps);
}
/**
* 确定attack_result的值
*/
private Integer determineAttackResult( Map<String, Object> row ) {
// 优先使用单个attack_result值
if (row.containsKey("attack_result")) {
return getIntegerValue(row.get("attack_result"));
}
else
{
return -1;
}
}
}
@@ -0,0 +1,281 @@
package com.common.service.impl;
import com.common.entity.AnalysisAnalysisRule;
import com.common.entity.AnalysisGroupByWindow;
import com.common.service.RuleExecutionTimeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import java.time.temporal.ChronoUnit;
/**
* 规则执行时间管理服务实现(基于Redis)
* 根据窗口类型(滚动、滑动、会话)动态计算下次执行时间
*/
@Slf4j
@Service
public class RuleExecutionTimeServiceImpl implements RuleExecutionTimeService {
private static final String REDIS_KEY_PREFIX = "rule:next_execute:";
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public LocalDateTime getNextExecuteTime(String ruleId) {
String key = REDIS_KEY_PREFIX + ruleId;
String value = redisTemplate.opsForValue().get(key);
if (value == null) {
log.debug("规则首次执行,无下次执行时间记录,ruleId={}", ruleId);
return null;
}
try {
return LocalDateTime.parse(value, DATE_FORMATTER);
} catch (Exception e) {
log.error("解析下次执行时间失败,ruleId={}, value={}", ruleId, value, e);
return null;
}
}
@Override
public void updateNextExecuteTime(AnalysisAnalysisRule rule, AnalysisGroupByWindow groupByWindow) {
LocalDateTime nextTime = calculateNextExecuteTime(rule, groupByWindow);
String ruleId = rule.getRuleId();
String key = REDIS_KEY_PREFIX + ruleId;
String value = nextTime.format(DATE_FORMATTER);
// 设置过期时间:5天
redisTemplate.opsForValue().set(key, value, Duration.ofDays(5));
log.info("更新规则下次执行时间,ruleId={}, ruleName={}, windowType={}, nextExecuteTime={}",
ruleId, rule.getRuleName(),
groupByWindow != null ? groupByWindow.getWindowType() : "NONE",
value);
}
@Override
public void initRuleExecuteTime(AnalysisAnalysisRule rule, AnalysisGroupByWindow groupByWindow) {
String ruleId = rule.getRuleId();
String key = REDIS_KEY_PREFIX + ruleId;
// 如果已存在,则不覆盖(防止重启后重新计算)
if (redisTemplate.hasKey(key)) {
log.info("规则执行时间已存在,跳过初始化,ruleId={}", ruleId);
return;
}
// 计算初始执行时间(当前时间 + 窗口间隔)
LocalDateTime nextTime = calculateNextExecuteTime(rule, groupByWindow);
String value = nextTime.format(DATE_FORMATTER);
redisTemplate.opsForValue().set(key, value, Duration.ofDays(5));
log.info("初始化规则执行时间,ruleId={}, ruleName={}, windowType={}, nextExecuteTime={}",
ruleId, rule.getRuleName(),
groupByWindow != null ? groupByWindow.getWindowType() : "NONE",
value);
}
@Override
public void removeRuleExecuteTime(String ruleId) {
String key = REDIS_KEY_PREFIX + ruleId;
redisTemplate.delete(key);
log.debug("删除规则执行时间记录,ruleId={}", ruleId);
}
@Override
public void clearAllRuleExecuteTime() {
String pattern = REDIS_KEY_PREFIX + "*";
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
log.info("清空所有规则执行时间,共 {} 条", keys.size());
}
}
/**
* 根据窗口类型动态计算下次执行时间
*
* @param rule 分析规则
* @param groupByWindow 窗口配置
* @return 下次执行时间
*/
private LocalDateTime calculateNextExecuteTime(AnalysisAnalysisRule rule, AnalysisGroupByWindow groupByWindow) {
LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MINUTES);
// 如果没有配置窗口,使用默认30秒
if (groupByWindow == null) {
log.warn("未配置窗口类型,使用默认执行间隔:60秒");
return now.plusSeconds(60);
}
String windowType = groupByWindow.getWindowType();
if (windowType == null || windowType.trim().isEmpty()) {
log.warn("窗口类型为空,使用默认执行间隔:60秒");
return now.plusSeconds(60);
}
switch (windowType.toUpperCase()) {
case "TUMBLE":
return calculateTumbleNextExecuteTime(now, groupByWindow);
case "HOP":
return calculateHopNextExecuteTime(now, groupByWindow);
case "SESSION":
return calculateSessionNextExecuteTime(now, groupByWindow);
default:
log.warn("未知窗口类型: {},使用默认执行间隔:60秒", windowType);
return now.plusSeconds(60);
}
}
/**
* 计算滚动窗口的下次执行时间
*
* 策略:执行间隔 = 窗口大小
* 示例:窗口大小5分钟,则每5分钟执行一次
*/
private LocalDateTime calculateTumbleNextExecuteTime(LocalDateTime now, AnalysisGroupByWindow groupByWindow) {
Integer windowSize = groupByWindow.getTumbleWindowSize();
String windowSizeUnit = groupByWindow.getTumbleWindowSizeUnit();
if (windowSize == null || windowSize <= 0) {
log.warn("滚动窗口大小配置无效,使用默认值:5分钟");
windowSize = 5;
windowSizeUnit = "m";
}
if (windowSizeUnit == null || windowSizeUnit.trim().isEmpty()) {
windowSizeUnit = "m";
}
LocalDateTime nextTime;
switch (windowSizeUnit.toLowerCase()) {
case "s":
nextTime = now.plusSeconds(windowSize);
break;
case "m":
nextTime = now.plusMinutes(windowSize);
break;
case "h":
nextTime = now.plusHours(windowSize);
break;
case "d":
nextTime = now.plusDays(windowSize);
break;
default:
log.warn("滚动窗口单位无效: {},使用默认单位:分钟", windowSizeUnit);
nextTime = now.plusMinutes(windowSize);
}
log.debug("滚动窗口下次执行时间: 窗口大小={}{}nextTime={}",
windowSize, windowSizeUnit, nextTime.format(DATE_FORMATTER));
return nextTime;
}
/**
* 计算滑动窗口的下次执行时间
*
* 策略:执行间隔 = 滑动步长(slide)
* 示例:窗口大小10分钟,步长5分钟,则每5分钟执行一次
*/
private LocalDateTime calculateHopNextExecuteTime(LocalDateTime now, AnalysisGroupByWindow groupByWindow) {
Integer slide = groupByWindow.getHopWindowSlide();
String slideUnit = groupByWindow.getHopWindowSizeUnit();
if (slide == null || slide <= 0) {
log.warn("滑动窗口步长配置无效,使用默认值:5分钟");
slide = 5;
slideUnit = "m";
}
if (slideUnit == null || slideUnit.trim().isEmpty()) {
slideUnit = "m";
}
LocalDateTime nextTime;
switch (slideUnit.toLowerCase()) {
case "s":
nextTime = now.plusSeconds(slide);
break;
case "m":
nextTime = now.plusMinutes(slide);
break;
case "h":
nextTime = now.plusHours(slide);
break;
case "d":
nextTime = now.plusDays(slide);
break;
default:
log.warn("滑动窗口步长单位无效: {},使用默认单位:分钟", slideUnit);
nextTime = now.plusMinutes(slide);
}
log.debug("滑动窗口下次执行时间: 步长={}{}nextTime={}",
slide, slideUnit, nextTime.format(DATE_FORMATTER));
return nextTime;
}
/**
* 计算会话窗口的下次执行时间
*
* 策略:执行间隔 = 会话超时时间(session window size
* 示例:超时时间30分钟,则每30分钟执行一次
*/
private LocalDateTime calculateSessionNextExecuteTime(LocalDateTime now, AnalysisGroupByWindow groupByWindow) {
Integer sessionTimeout = groupByWindow.getSessionWindowSize();
String sessionTimeoutUnit = groupByWindow.getSessionWindowSizeUnit();
if (sessionTimeout == null || sessionTimeout <= 0) {
log.warn("会话窗口超时时间配置无效,使用默认值:30分钟");
sessionTimeout = 30;
sessionTimeoutUnit = "m";
}
if (sessionTimeoutUnit == null || sessionTimeoutUnit.trim().isEmpty()) {
sessionTimeoutUnit = "m";
}
LocalDateTime nextTime;
switch (sessionTimeoutUnit.toLowerCase()) {
case "s":
nextTime = now.plusSeconds(sessionTimeout);
break;
case "m":
nextTime = now.plusMinutes(sessionTimeout);
break;
case "h":
nextTime = now.plusHours(sessionTimeout);
break;
case "d":
nextTime = now.plusDays(sessionTimeout);
break;
default:
log.warn("会话窗口超时单位无效: {},使用默认单位:分钟", sessionTimeoutUnit);
nextTime = now.plusMinutes(sessionTimeout);
}
log.debug("会话窗口下次执行时间: 超时时间={}{}nextTime={}",
sessionTimeout, sessionTimeoutUnit, nextTime.format(DATE_FORMATTER));
return nextTime;
}
}
@@ -0,0 +1,161 @@
package com.common.util;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* JSONB字段解析工具类
* 用于处理PostgreSQL JSONB类型字段的解析
*/
public class JsonbUtil {
private static final ObjectMapper objectMapper = new ObjectMapper();
/**
* 解析JSONB字段值为字符串
* 适用于JSONB存储的是带引号的字符串,如 "127.0.0.1"
*
* @param value JSONB字段的值
* @return 解析后的字符串
*/
public static String parseString(Object value) {
if (value == null) {
return null;
}
// 如果已经是字符串,直接返回
if (value instanceof String) {
String strValue = (String) value;
// 去除可能存在的引号
if (strValue.startsWith("\"") && strValue.endsWith("\"")) {
return strValue.substring(1, strValue.length() - 1);
}
return strValue;
}
// 其他类型转为字符串
return value.toString();
}
/**
* 解析JSONB字段值为Integer
*
* @param value JSONB字段的值
* @return 解析后的Integer
*/
public static Integer parseInteger(Object value) {
if (value == null) {
return null;
}
if (value instanceof Integer) {
return (Integer) value;
}
if (value instanceof Number) {
return ((Number) value).intValue();
}
if (value instanceof String) {
String strValue = parseString(value);
try {
return Integer.parseInt(strValue);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
/**
* 解析JSONB字段值为Long
*
* @param value JSONB字段的值
* @return 解析后的Long
*/
public static Long parseLong(Object value) {
if (value == null) {
return null;
}
if (value instanceof Long) {
return (Long) value;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof String) {
String strValue = parseString(value);
try {
return Long.parseLong(strValue);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
/**
* 解析JSONB字段值为Boolean
*
* @param value JSONB字段的值
* @return 解析后的Boolean
*/
public static Boolean parseBoolean(Object value) {
if (value == null) {
return null;
}
if (value instanceof Boolean) {
return (Boolean) value;
}
if (value instanceof String) {
String strValue = parseString(value);
return Boolean.parseBoolean(strValue);
}
return null;
}
/**
* 从JSONB Object中解析字符串值
* 适用于复杂的JSON对象结构
*
* @param jsonbValue JSONB字段的值(可能是Map或字符串)
* @param fieldName JSON对象中的字段名
* @return 解析后的字符串
*/
@SuppressWarnings("unchecked")
public static String parseStringFromJson(Object jsonbValue, String fieldName) {
if (jsonbValue == null) {
return null;
}
try {
if (jsonbValue instanceof java.util.Map) {
java.util.Map<String, Object> map = (java.util.Map<String, Object>) jsonbValue;
Object fieldValue = map.get(fieldName);
return parseString(fieldValue);
}
// 如果是JSON字符串,尝试解析
if (jsonbValue instanceof String) {
String strValue = (String) jsonbValue;
if (strValue.startsWith("{")) {
java.util.Map<String, Object> map = objectMapper.readValue(strValue, java.util.Map.class);
Object fieldValue = map.get(fieldName);
return parseString(fieldValue);
}
}
} catch (Exception e) {
// 解析失败,返回null
return null;
}
return null;
}
}
@@ -0,0 +1,136 @@
package com.controllers;
import com.common.entity.AnalysisAnalysisRule;
import com.common.entity.AnalysisTaskHistory;
import com.common.mapper.AnalysisTaskHistoryMapper;
import com.common.service.AnalysisRuleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 分析规则控制器
*/
@Slf4j
@RestController
@RequestMapping("/api/analysis")
public class AnalysisRuleController {
@Autowired
private AnalysisRuleService analysisRuleService;
@Autowired
private AnalysisTaskHistoryMapper taskHistoryMapper;
/**
* 手动触发实时分析
*/
@PostMapping("/realtime/execute")
public Map<String, Object> executeRealtimeAnalysis() {
Map<String, Object> result = new HashMap<>();
try {
List<Map<String, Object>> results = analysisRuleService.executeRealtimeAnalysis();
result.put("code", 200);
result.put("message", "实时分析执行成功");
result.put("data", results);
} catch (Exception e) {
log.error("执行实时分析失败", e);
result.put("code", 500);
result.put("message", "执行实时分析失败: " + e.getMessage());
}
return result;
}
/**
* 手动触发离线分析
*/
@PostMapping("/offline/execute")
public Map<String, Object> executeOfflineAnalysis() {
Map<String, Object> result = new HashMap<>();
try {
List<Map<String, Object>> results = analysisRuleService.executeOfflineAnalysis();
result.put("code", 200);
result.put("message", "离线分析执行成功");
result.put("data", results);
} catch (Exception e) {
log.error("执行离线分析失败", e);
result.put("code", 500);
result.put("message", "执行离线分析失败: " + e.getMessage());
}
return result;
}
/**
* 查询活动规则
*/
@GetMapping("/rules")
public Map<String, Object> getActiveRules(@RequestParam String runMode) {
Map<String, Object> result = new HashMap<>();
try {
List<AnalysisAnalysisRule> rules = analysisRuleService.getActiveRules(runMode);
result.put("code", 200);
result.put("message", "查询成功");
result.put("data", rules);
} catch (Exception e) {
log.error("查询活动规则失败", e);
result.put("code", 500);
result.put("message", "查询活动规则失败: " + e.getMessage());
}
return result;
}
/**
* 停止规则
*/
@PostMapping("/rules/{ruleId}/stop")
public Map<String, Object> stopRule(@PathVariable String ruleId) {
Map<String, Object> result = new HashMap<>();
try {
analysisRuleService.stopRule(ruleId);
result.put("code", 200);
result.put("message", "规则已停止");
} catch (Exception e) {
log.error("停止规则失败", e);
result.put("code", 500);
result.put("message", "停止规则失败: " + e.getMessage());
}
return result;
}
/**
* 查询规则执行历史
*/
@GetMapping("/rules/{ruleId}/history")
public Map<String, Object> getRuleHistory(
@PathVariable String ruleId,
@RequestParam(defaultValue = "10") int limit) {
Map<String, Object> result = new HashMap<>();
try {
List<AnalysisTaskHistory> historyList = taskHistoryMapper.selectRecentByRuleId(ruleId, limit);
result.put("code", 200);
result.put("message", "查询成功");
result.put("data", historyList);
} catch (Exception e) {
log.error("查询规则执行历史失败", e);
result.put("code", 500);
result.put("message", "查询规则执行历史失败: " + e.getMessage());
}
return result;
}
/**
* 健康检查
*/
@GetMapping("/health")
public Map<String, Object> health() {
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("message", "服务正常");
result.put("timestamp", System.currentTimeMillis());
return result;
}
}