初次提交代码

This commit is contained in:
2026-01-11 15:33:22 +08:00
commit 6603c6f4a1
455 changed files with 32175 additions and 0 deletions
@@ -0,0 +1,4 @@
package com.Modules.Device;
public class deviceInfo {
}
@@ -0,0 +1,568 @@
package com.Modules.NormalData;
import cn.hutool.core.date.DateTime;
import com.common.entity.XdrHoneypot;
import com.common.mapper.XdrHoneypotMapper;
import com.common.service.SyslogNonNormalMessageService;
import com.common.util.*;
import com.config.AppConfig;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import com.common.entity.SyslogMessage;
import com.influx.SyslogToInfluxApp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.common.service.DmNormalizeRuleService;
import com.common.service.DmColumnService;
import com.common.service.SyslogNormalDataService;
import com.common.entity.DmColumn;
import com.common.service.impl.DmColumnServiceImpl;
import com.common.mapper.DmColumnMapper;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RestController;
import com.common.mapper.DmColumnMapper;
import com.common.mapper.DmNormalizeRuleMapper;
import java.sql.Timestamp;
import java.util.*;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.json.JSONObject;
import com.common.entity.RuleContent.*;
import com.common.entity.SyslogNonNormalMessage;
import org.joda.time.LocalDateTime;
import com.common.service.LogDataFilterService;
import com.common.service.LogDataCompleteService;
import com.common.service.DeviceCollectTaskService;
import com.common.entity.DeviceCollectTask;
public class LogNormalProcessor {
private static final Logger logger = LoggerFactory.getLogger(LogNormalProcessor.class);
private String strLogMsg ;
private String strKafkaMessage;
private String strDeviceInfo;
private String strDataType ="json" ;
private Map<String, Object> messageMap ;
private Map<String, String> deviceInfoMap ;
private String strSyslogUUID ;
private String strSyslogTopic ;
private boolean isSaveNonNormal =false ;
@Autowired
public DmNormalizeRuleService dmNormalizeRuleService ;
@Autowired
public DmColumnService dmColumnService ;
@Autowired
public SyslogNormalDataService syslogNormalDataService =SpringContextUtil.getBean(SyslogNormalDataService.class);
@Autowired
private SyslogNonNormalMessageService messageService =SpringContextUtil.getBean(SyslogNonNormalMessageService.class);
@Autowired
private LogDataFilterService logDataFilterService= SpringContextUtil.getBean(LogDataFilterService.class);
@Autowired
private LogDataCompleteService logDataCompleteService= SpringContextUtil.getBean(LogDataCompleteService.class);
@Autowired
private DeviceCollectTaskService deviceCollectTaskService= SpringContextUtil.getBean(DeviceCollectTaskService.class);
@Autowired
SyslogNonNormalMessage syslogNonNormalMessage=new SyslogNonNormalMessage();
@Autowired
DmColumnMapper dmColumnMapper;
@Autowired
DmNormalizeRuleMapper dmNormalizeRuleMapper;
private static List<Map<String, Object>> dmNormalizeRuleList;
private static List<Map<String, Object>> dmColumnList;
private static LinkedHashMap<String, Object> OrginalColumnMap ;
public LogNormalProcessor( String LogMsg, String syslogUUID,String syslogTopic) {
/** 标准化数据处理步骤 */
//初始化 (获取syslog_normal_data 表全部字段的属性、设备ID对应的规则设置、转化成JSON)
//判断数据解析类型(json、键值、xml、正则表达式、分割符)解析syslogMessage 字段转成 HashMAP
//匹配规则内容,获取字段映射关系配置,抽取命中的字段名称、数值
//判断字段内容及类型,根据数据类型属性,进行内容转换。
//生成insert SQL语句、执行入库操作。
strKafkaMessage=LogMsg;
strSyslogUUID=syslogUUID;
strSyslogTopic=syslogTopic;
if(!LogMsg.isEmpty()) {
strLogMsg = SyslogParser.substringAfterFirstCloseBracket(LogMsg);
strDeviceInfo=SyslogParser.substringBeforeFirstChar(LogMsg,']');
}
else{
LogMsg="[receive_time=20251118165909470 device_id=1 device_name=honeypot vendor=changting data_type=json device_collect_id=1]<14>1 2025-09-24T11:52:26Z 5f46d3be75e1 supermario 128 honeypot_event - {\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
strLogMsg="<14>1 2025-09-24T11:52:26Z 5f46d3be75e1 supermario 128 honeypot_event - {\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
strDeviceInfo=SyslogParser.substringBeforeFirstChar(LogMsg,']');
}
}
/**
* 初始化数据init()
*/
public void init()
{
Map<String,String> mapdev =SyslogParser.parseKeyValuePairs(strDeviceInfo);
deviceInfoMap=mapdev;
//获取日志对应的设备ID
long deviceID=Long.parseLong(mapdev.get("device_id"));
System.out.println("device_id:"+deviceID );
strDataType=mapdev.get("data_type");
//getDeviceID(strLogMsg);
try{
// 通过工具类获取Service实例
dmColumnService= SpringContextUtil.getBean(DmColumnService.class);
dmNormalizeRuleService= SpringContextUtil.getBean(DmNormalizeRuleService.class);
if(deviceID>0) {
dmNormalizeRuleList = dmNormalizeRuleService.selectByDeviceIdAuto(deviceID);
}
//dmColumnList=dmColumnService.selectAllNormal();
//System.out.println("dmColumnList size:"+ dmColumnList.size());
//OrginalColumnMap=getMessageToMap(strLogMsg);
//解析SyslogMessage
//SyslogMessage logMsg = SyslogParser.parse(strLogMsg);
//设备对应的规则normal rule Map 进行标准化数据处理
for (int i = 0; i < dmNormalizeRuleList.size(); i++) {
try {
Map<String, Object> dmNormalizeRule = dmNormalizeRuleList.get(i);
String data_type = dmNormalizeRule.get("data_type").toString();
//数据类型及格式不符,则break;
System.out.println("索引: " + i + ", 值: " + dmNormalizeRule);
System.out.println("normalrule ID: " + dmNormalizeRule.get("id") + ", display_name:" + dmNormalizeRule.get("display_name") + " data_type:" + dmNormalizeRule.get("data_type"));
//数据类型不匹配,则跳过规则
//if (!data_type.equals(strDataType)) break;
//获取syslog message 文本解析配置项
OrginalColumnMap = getMessageToMap(dmNormalizeRule, strLogMsg, data_type);
if ((OrginalColumnMap == null) || (OrginalColumnMap.size()==0)) {
logger.error("OrginalColumnMap 对象获取为空");
//保存非标日志信息
syslogNonNormalMessage.setReason("Log解析异常");
syslogNonNormalMessage.setReasonDetail("log解析异常,返回规则名称:"+dmNormalizeRule.get("display_name") +",OrginalColumnMap 对象获取为空");
if(isSaveNonNormal==false)
SaveNonNormalMessage(deviceID, DateTime.now());
continue;
}
List<HashMap<String, Object>> destColumnList = getRuleContentMappers(dmNormalizeRule);
//解析字段匹配已命中的配置规则字段
List<HashMap<String, Object>> ruleColumnList = getNormalColumnList(OrginalColumnMap, destColumnList);
//System.out.println("ruleColumnList :"+ ruleColumnList);
Map<String, Object> destMap = getColumnMap(ruleColumnList);
//List<HashMap<String ,Object>> destMap2= getCompleteColumnsList(dmNormalizeRule);
//数据处理-过滤规则
if (logDataFilterService.evaluateFilterRule(dmNormalizeRule.get("rule_content").toString(), destMap))
continue;
//数据处理-补全规则
logDataCompleteService.processDataCompletion(dmNormalizeRule.get("rule_content").toString(), destMap);
//long ruleid=Long.parseLong(dmNormalizeRule.get("id").toString());
SaveNormalData(deviceID, DateTime.now(), destMap, Long.parseLong(dmNormalizeRule.get("id").toString()), dmNormalizeRule.get("name").toString());
} catch (Exception ex) {
logger.error("泛化规则处理失败:" + ex.getMessage());
System.out.println(ex.getMessage());
/**
//保存非标日志信息
syslogNonNormalMessage.setReason("泛化规则处理失败");
syslogNonNormalMessage.setReasonDetail("泛化规则处理失败,失败详情:"+ ex.getMessage());
SaveNonNormalMessage(deviceID,DateTime.now());
**/
}
}
}
catch (Exception ex)
{
logger.error("处理初始化异常:"+ex.getMessage());
System.out.println(ex.getMessage());
}
}
//获取数据泛化规则的列表字段,提取 Cropper_params
public Cropper_paramsType getCropperParams( Map<String, Object> dmNormalizeRule )
{
Cropper_paramsType cropperParams = new Cropper_paramsType();
try {
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
cropperParams.sethead_key(jsonObject.getJSONObject("cropper_params").get("head_key").toString());
//需要判断是否字符串head_offset是否空
cropperParams.sethead_offset(Integer.parseInt(jsonObject.getJSONObject("cropper_params").get("head_offset").toString()));
cropperParams.settail_key(jsonObject.getJSONObject("cropper_params").get("tail_key").toString());
//需要判断是否字符串tail_offset是否空
cropperParams.settail_offset( Integer.parseInt(jsonObject.getJSONObject("cropper_params").get("tail_offset").toString()));
return cropperParams;
}catch (Exception ex) {
logger.error("getCropperParams"+ex.getMessage());
return null;
}
}
public List<HashMap<String ,Object>> getRuleContentMappers( Map<String, Object> dmNormalizeRule )
{
List<HashMap<String, Object>> filedList =null;
ObjectMapper objectMapper = new ObjectMapper();
try {
// 转换为list Map
//Map<String, Object> rule_contentMap = objectMapper.readValue( dmNormalizeRule.get("rule_content").toString(), Map.class);
//if (rule_contentMap.isEmpty()) return null;
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
ObjectMapper objectMapper_mappers = new ObjectMapper();
filedList =readJsonToList( jsonObject.get("mappers").toString() );
}catch (Exception ex) {
logger.error("getRuleContentMappers异常:"+ex.getMessage());
}
return filedList;
}
public List<HashMap<String, Object>> getCompleteColumnsList( Map<String, Object> dmNormalizeRule )
{
List<HashMap<String, Object>> filedList =null;
ObjectMapper objectMapper = new ObjectMapper();
try {
// 转换为list Map
//Map<String, Object> rule_contentMap = objectMapper.readValue( dmNormalizeRule.get("rule_content").toString(), Map.class);
//if (rule_contentMap.isEmpty()) return null;
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
ObjectMapper objectMapper_mappers = new ObjectMapper();
filedList =readJsonToList( jsonObject.get("complete_columns").toString() );
}catch (Exception ex) {
logger.error("getCompleteColumnsList"+ex.getMessage());
}
return filedList;
}
/**
* 将JSON 字符串转换为 List<HashMap<String, Object>>
*/
public static List<HashMap<String, Object>> readJsonToList(String jsonStr) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(
jsonStr,
new TypeReference<List<HashMap<String, Object>>>() {}
);
} catch (Exception e) {
logger.error("readJsonToList 解析失败:"+jsonStr);
//throw new RuntimeException("JSON 解析失败: " + e.getMessage(), e);
return null;
}
}
/**\
* 判断日志信息的数据类型
* @param logMsg
* @return
*/
public String getLogDataType( String logMsg)
{
//判断日志信息的数据类型(json、regex、kv、sep、xml
//默认返回json
return "json";
}
public int getDeviceID( String logMsg)
{
//解析日志信息头部,获取device_id
// 默认返回device_id =1
return 1;
}
//获取log 内容字段及数据值
public LinkedHashMap<String, Object> getMessageToMap( String MsgContent )
{
LinkedHashMap<String, Object> linkMap =null;
if(strDataType.equals("json")) {
//json 类型
//解析SyslogMessage
SyslogMessage msg = SyslogParser.parse(MsgContent);
System.out.println("解析结果-log content: " + msg.getMessage().toString());
String complexJson = "{\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
System.out.println("complexJson content: " + complexJson);
//静态字符串、编码有问题,临时用 静态字符串做测试,流程环境没问题
if(AppConfig.geRunEnvironment().equals("dev")) {
//LinkedHashMap<String, Object> flatMap = JsonParser.parseJsonToFlatMap(complexJson);
LinkedHashMap<String, Object> flatMap =JsonParser.jsonToMap(complexJson);
flatMap.forEach((key, value) -> System.out.println(key + " = " + value));
return flatMap;
}
else
{
//LinkedHashMap<String, Object> flatMap = JsonParser.parseJsonToFlatMap(msg.getMessage());
LinkedHashMap<String, Object> flatMap =JsonParser.jsonToMap(msg.getMessage());
flatMap.forEach((key, value) -> System.out.println(key + " = " + value));
return flatMap;
}
}
else if(strDataType.equals("kv")) //key-value 键值类型
{
return null;
}
else if(strDataType.equals("sep")) //分隔符
{
return null;
}
else if(strDataType.equals("xml")) //类型 xml
{
return null;
}
else if(strDataType.equals("regex")) //正则表达式
{
return null;
}
// List<Map<String, Object>> rulelst=dmNormalizeRuleService.selectByDeviceId((long)1);
return linkMap;
}
//获取log message内容字段及数据值
public LinkedHashMap<String, Object> getMessageToMap( Map<String, Object> dmNormalizeRule, String MsgContent , String dataType)
{
LinkedHashMap<String, Object> linkMap = new LinkedHashMap<>();
try {
//String decode =dmNormalizeRule.get("decode").toString();
Cropper_paramsType cropperParams = getCropperParams(dmNormalizeRule);
if (dataType.equals("json")) {
String strJson = logNormalData.ParserMessageJsonType(MsgContent, cropperParams);
LinkedHashMap<String, Object> result1 = NestedJsonParserUtil.safeParseJson(strJson);
LinkedHashMap<String, Object> flattened = NestedJsonUtils.flattenNestedJson(result1);
return flattened;
//parseJsonToFlatMap 复杂的json转换、解析过程有异常
//return JsonParser.parseJsonToFlatMap(strJson);
// return JsonParser.jsonToMap(strJson);
}
else if (dataType.equals("kv")) //key-value 键值类型
{
String strKeyVal = logNormalData.ParserMessageJsonType(MsgContent, cropperParams);
KvTextParser kvTextParser =new KvTextParser();
kv_paramsType kvparams=logNormalData.getkv_paramsType( dmNormalizeRule);
linkMap= kvTextParser.parseKvText(strKeyVal,kvparams);
return linkMap;
}
else if (dataType.equals("sep")) //分隔符
{
//获取分隔符
String SepKey=logNormalData.sepType(dmNormalizeRule);
String strSep = logNormalData.ParserMessageJsonType(MsgContent, cropperParams);
linkMap= TextParserUtil.parseSeparatedText(strSep, SepKey);
return linkMap;
}
else if (dataType.equals("xml")) //类型 xml
{
return linkMap;
}
else if (dataType.equals("regex")) //正则表达式
{
String strRegex = logNormalData.ParserMessageJsonType(MsgContent, cropperParams);
String regexp= logNormalData.Regexp(dmNormalizeRule);
linkMap= RegexTextParser.parseWithRegex(strRegex,regexp );
return linkMap;
}
}catch (Exception ex) {
logger.error("getMessageToMap"+ex.getMessage());
return null;
}
return linkMap;
}
/**
* 根据设备ID获取配置规则
* @param device_id
* @return List<Map<String, Object>>
*/
public List<Map<String, Object>> getRuleList( long device_id)
{
List<Map<String, Object>> rulelst=dmNormalizeRuleService.selectByDeviceId((long)1);
if (rulelst!=null)
{
System.out.println("rulelst: " + rulelst);
}
else{
logger.error(" List<Map<String, Object>> rulelst is null" );
}
return rulelst;
}
/**
* 查找命中配置规则的字段及数值
* @param destColumnList 泛化目标字段list
* @param destColumnList 日志源字段及数值list
* @return
*/
public List<HashMap<String,Object>> getNormalColumnList( LinkedHashMap<String, Object> orginColumnMap ,List<HashMap<String, Object>> destColumnList )
{
List<HashMap<String, Object>> columnlist =new ArrayList<>();
// 原始解析字段遍历泛化规则目标字段
for (Map.Entry<String, Object > entry : orginColumnMap.entrySet()) {
//System.out.println(entry.getKey() + ": " + entry.getValue());
for (Map<String, Object> map : destColumnList) {
//System.out.println( "origin_field: " + map.get("origin_field").toString());
if ( map.get("origin_field").toString().equals(entry.getKey()) ) {
System.out.println(map);
Map<String, Object> normalColumMap =new LinkedHashMap<>();
normalColumMap.put("origin_field",entry.getKey() );
normalColumMap.put("dest_field",map.get("dest_field").toString());
normalColumMap.put("action",(HashMap<String, Object>)map.get("action") );
//normalColumMap.put("mapping ",entry.getKey() );
normalColumMap.put("origin_field_value",entry.getValue() );
// System.out.println("action: " + map.get("action").toString());
if( ((HashMap<String, Object>)map.get("action")).get("type").equals("equal"))
//直接赋值
normalColumMap.put("dest_field_value",entry.getValue() );
else if(((HashMap<String, Object>)map.get("action")).get("type").equals("mapping"))
{
//mapping 映射枚举值
normalColumMap.put("dest_field_value",entry.getValue() );
normalColumMap.put("action_param",((HashMap<String, Object>)map.get("action")).get("param") );
HashMap<String, Object> action_param=(HashMap<String, Object>)((HashMap<String, Object>)map.get("action")).get("param") ;
//匹配并获取映射枚举值
normalColumMap.put("dest_field_value",getMappingValue(action_param ,entry.getValue().toString() ));
}
else if(((HashMap<String, Object>)map.get("action")).get("type").equals("time\""))
{
//time 类型
normalColumMap.put("dest_field_value",entry.getValue() );
}
columnlist.add((HashMap<String, Object>)normalColumMap);
//System.out.println( "normalColumMap: " +normalColumMap);
break;
}
}
}
return columnlist;
}
/**
* paramMapping Map匹配查找对应的值
* @param paramMappingValueMap
* @param
* @return
*/
public Object getMappingValue(HashMap<String, Object> paramMappingValueMap,String value)
{
HashMap<String, Object> Map= (HashMap<String, Object>)paramMappingValueMap.get("mapping");
for (Map.Entry<String, Object > entry : Map.entrySet()) {
if ( entry.getKey().equals(value) ) {
return entry.getValue();
}
}
return null;
}
/**
* 获取字段Map,包含字段field及Value值
* @param normalColumnList
* @return
*/
public Map<String, Object > getColumnMap( List<HashMap<String,Object>> normalColumnList )
{
Map<String, Object > columnMap= new HashMap<>();
for (Map<String, Object> map : normalColumnList) {
columnMap.put(map.get("dest_field").toString(),map.get("dest_field_value"));
}
return columnMap;
}
/**
* 保存数据到标准化表
* @param deviceId
* @param logtime
* @param logColumnMap
*/
public void SaveNormalData(long deviceId , DateTime logtime, Map<String, Object > logColumnMap, long normalizeRuleId ,String normalizeRuleName)
{
try {
if(logColumnMap.isEmpty() )
{
logger.error("SaveNormalData ->logColumnMap 为空,syslogUUID:" +this.strSyslogUUID);
//保存非标日志信息
syslogNonNormalMessage.setReason("未命中规则");
syslogNonNormalMessage.setReasonDetail("失败详情:logColumnMap对象字段为空" );
if(isSaveNonNormal==false)
SaveNonNormalMessage(deviceId,DateTime.now());
return ;
}
Map<String, Object> columnMap = logColumnMap;
columnMap.put("device_id", deviceId);
columnMap.put("log_time", logtime);
columnMap.put("id", UUID.randomUUID().toString());
columnMap.put("normalize_rule_id", normalizeRuleId);
columnMap.put("normalize_rule_name", normalizeRuleName);
columnMap.put("syslog_uuid", this.strSyslogUUID);
columnMap.put("syslog_topic", this.strSyslogTopic);
System.out.println("columnMap"+columnMap);
syslogNormalDataService.insertDynamic(columnMap);
} catch (Exception e) {
logger.error("SaveNormalData失败 " );
//保存非标日志信息
syslogNonNormalMessage.setReason("入库失败");
syslogNonNormalMessage.setReasonDetail("入库失败,normalizeRuleName :"+normalizeRuleName+",失败详情:"+ e.getMessage());
if(isSaveNonNormal==false)
SaveNonNormalMessage(deviceId,DateTime.now());
throw new RuntimeException("SaveNormalData 失败: " + e.getMessage(), e);
}
}
public void SaveNonNormalMessage(long deviceId , DateTime logtime)
{
try {
SyslogNonNormalMessage normalMessage =new SyslogNonNormalMessage() ;
//获取日志对应的设备ID
Integer collect_id= Integer.parseInt(this.deviceInfoMap.get("device_collect_id"));
DeviceCollectTask deviceCollectTask=deviceCollectTaskService.getById(collect_id);
normalMessage.setId( UUID.randomUUID().toString());
normalMessage.setDeviceId((int)deviceId);
normalMessage.setLogTime(logtime.toLocalDateTime());
normalMessage.setRuleTime(logtime.toLocalDateTime());
normalMessage.setSyslogMessage(this.strLogMsg);
normalMessage.setSyslogTopic(this.strSyslogTopic);
normalMessage.setSyslogUuid(this.strSyslogUUID);
normalMessage.setHeaderMessage(this.strDeviceInfo);
normalMessage.setEtlNode("etlgo");
normalMessage.setReason( syslogNonNormalMessage.getReason());
normalMessage.setReasonDetail( syslogNonNormalMessage.getReasonDetail());
//采集探针名称
normalMessage.setRuleResult("FAIL");
normalMessage.setDeviceName(deviceInfoMap.get("device_name"));
normalMessage.setCollectTaskId(collect_id);
if(deviceCollectTask!=null)
normalMessage.setCollectTaskName(deviceCollectTask.getTaskName());
this.messageService.saveMessage(normalMessage );
isSaveNonNormal=true;
} catch (Exception e) {
logger.error("SaveNonNormalMessage 失败:" );
throw new RuntimeException("SaveNonNormalMessage 失败: " + e.getMessage(), e);
}
}
public static void main(String[] args) {
String strlogMessage= "<14>1 2025-09-24T11:52:26Z 5f46d3be75e1 supermario 128 honeypot_event - {\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
String strMsgContent= "{\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
//Map<String, Object> flatMap =(new logNormalData()).getMessageToMap(strMsgContent);
//flatMap.forEach((key, value) -> System.out.println(key + " = " + value));
//LogNormalProcessor logData =new LogNormalProcessor(strlogMessage,);
//logData.init();
}
}
@@ -0,0 +1,404 @@
package com.Modules.NormalData;
import com.influx.InfluxDBClient;
import com.common.util.SyslogParser;
import com.config.AppConfig;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.time.LocalDate;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
@Slf4j
@Component
public class SysLogProcessor {
@Autowired
@Qualifier("logNormalProcessorExecutor")
private Executor taskExecutor;
@Value("${app.processor.batch-size:100}")
private int batchSize;
@Value("${app.processor.process-timeout-ms:30000}")
private long processTimeoutMs;
private final AtomicInteger totalProcessed = new AtomicInteger(0);
private final AtomicInteger currentBatchCount = new AtomicInteger(0);
// 初始化 InfluxDB 客户端
private final com.influx.InfluxDBClient influxClient = new InfluxDBClient();
/**
* 方案一:直接多线程并行处理(推荐)
* 单线程消费,每条消息独立提交给线程池处理
*/
public void listen(List<String> records, Acknowledgment ack) {
try {
// 处理业务逻辑
for (String record : records) {
System.out.println(record);
}
// 手动提交偏移量
ack.acknowledge(); // 告诉Kafka这条消息已成功处理
} catch (Exception e) {
// 不提交偏移量,消息会被重新消费
}
}
@KafkaListener(
topics = "${spring.kafka.consumer.topic:agent-syslog-topic}",
concurrency = "2" // 多线程消费
)
public void consumeAndParallelProcess(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
if (records.isEmpty()) {
ack.acknowledge();
return;
}
log.info("开始处理批次消息,数量: {}", records.size());
currentBatchCount.set(records.size());
try {
List<CompletableFuture<Void>> processingFutures = new ArrayList<>();
// 为每条消息创建独立的处理任务
for (ConsumerRecord<String, String> record : records) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
// 异步处理单条消息
log.info("收到syslogmessage"+ record.value());
processSingleMessageAsync(record);
} catch (Exception e) {
log.error("处理消息失败, topic: {}, partition: {}, offset: {}",
record.topic(), record.partition(), record.offset(), e);
} finally {
// 计数器递减
if (currentBatchCount.decrementAndGet() == 0) {
log.debug("批次所有消息处理完成");
}
}
}, taskExecutor);
processingFutures.add(future);
}
// 等待所有处理任务完成(带超时)
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
processingFutures.toArray(new CompletableFuture[0])
);
try {
allFutures.get(processTimeoutMs, TimeUnit.MILLISECONDS);
log.info("批次处理完成,总数: {}", records.size());
} catch (java.util.concurrent.TimeoutException e) {
log.warn("批次处理超时,已处理: {}/{}",
records.size() - currentBatchCount.get(), records.size());
}
// 提交偏移量
ack.acknowledge();
totalProcessed.addAndGet(records.size());
} catch (Exception e) {
log.error("消费处理异常", e);
ack.acknowledge(); // 异常情况下也提交偏移量,避免阻塞
}
}
/**
* 方案二:分批多线程处理
* 将大批次拆分为小批次进行并行处理
*/
/**
@KafkaListener(
topics = "${kafka.topic.log-normal-batch:log-normal-topic-batch}",
concurrency = "1"
)
*/
public void consumeAndBatchProcess(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
if (records.isEmpty()) {
ack.acknowledge();
return;
}
log.info("开始分批处理消息,总数: {}", records.size());
try {
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
// 将消息列表拆分为多个批次进行并行处理
for (int i = 0; i < records.size(); i += batchSize) {
int end = Math.min(i + batchSize, records.size());
final List<ConsumerRecord<String, String>> batch = records.subList(i, end);
final int batchIndex = i / batchSize + 1;
CompletableFuture<Void> batchFuture = CompletableFuture.runAsync(() -> {
try {
processMessageBatch(batch, batchIndex);
} catch (Exception e) {
log.error("批次处理异常, 批次: {}", batchIndex, e);
}
}, taskExecutor);
batchFutures.add(batchFuture);
}
// 等待所有批次处理完成
CompletableFuture<Void> allBatches = CompletableFuture.allOf(
batchFutures.toArray(new CompletableFuture[0])
);
allBatches.get(processTimeoutMs, TimeUnit.MILLISECONDS);
// 提交偏移量
ack.acknowledge();
log.info("全部分批处理完成,总数: {}", records.size());
totalProcessed.addAndGet(records.size());
} catch (Exception e) {
log.error("分批消费处理异常", e);
ack.acknowledge();
}
}
/**
* 方案三:带优先级的处理
* 根据消息内容决定处理优先级
*/
/**
@KafkaListener(
topics = "${kafka.topic.log-normal-priority:log-normal-topic-priority}",
concurrency = "1"
)
*/
public void consumeWithPriority(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
if (records.isEmpty()) {
ack.acknowledge();
return;
}
log.info("开始优先级处理消息,数量: {}", records.size());
try {
List<CompletableFuture<Void>> highPriorityFutures = new ArrayList<>();
List<CompletableFuture<Void>> normalPriorityFutures = new ArrayList<>();
// 根据消息内容分类处理
for (ConsumerRecord<String, String> record : records) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
processSingleMessageAsync(record);
}, taskExecutor);
// 根据消息内容判断优先级(这里简单根据key判断)
if (isHighPriorityMessage(record)) {
highPriorityFutures.add(future);
} else {
normalPriorityFutures.add(future);
}
}
// 优先处理高优先级任务
if (!highPriorityFutures.isEmpty()) {
CompletableFuture.allOf(
highPriorityFutures.toArray(new CompletableFuture[0])
).get(processTimeoutMs / 2, TimeUnit.MILLISECONDS);
log.info("高优先级消息处理完成: {}", highPriorityFutures.size());
}
// 然后处理普通优先级任务
if (!normalPriorityFutures.isEmpty()) {
CompletableFuture.allOf(
normalPriorityFutures.toArray(new CompletableFuture[0])
).get(processTimeoutMs / 2, TimeUnit.MILLISECONDS);
log.info("普通优先级消息处理完成: {}", normalPriorityFutures.size());
}
ack.acknowledge();
totalProcessed.addAndGet(records.size());
} catch (Exception e) {
log.error("优先级消费处理异常", e);
ack.acknowledge();
}
}
/**
* 异步处理单条消息
*/
@Async("logNormalProcessorExecutor")
public void processSingleMessageAsync(ConsumerRecord<String, String> record) {
long startTime = System.currentTimeMillis();
try {
// 模拟消息解析
//String message = parseMessage(record);
// 模拟业务处理
//processBusinessLogic(message);
String sysLogUUID =getSysLogUUID();
String strDeviceInfo= SyslogParser.substringBeforeFirstChar(record.value(),']');
Map<String,String> mapdev =SyslogParser.parseKeyValuePairs(strDeviceInfo);
// 初始化 InfluxDB 客户端
Point point = Point.measurement("syslog_security_"+sysLogUUID.substring(0,8) )
.addTag("device_id", mapdev.get("device_id")) // 添加设备ID标签
.addTag("device_collect_id", mapdev.get("device_collect_id")) // 添加探针ID标签
.addTag("uuid", sysLogUUID) //syslog uuid
.addTag("topic", AppConfig.getTopic()) //kafka topic
.addField("message", record.value()) // 添加字段
.addField("receive_time", mapdev.get("receive_time")) // 添加字段
.addField("uuid", sysLogUUID)
.time(System.currentTimeMillis(), WritePrecision.MS) ;// 毫秒级时间戳
influxClient.writePointBlocking(point);
System.out.println("influxdb wirte syslog ,value:"+ record.key());
// 日志信息插入pg XdrHoneypot 表
//insertSingleRecord( record.value());
//String syslogMessage= AppConfig.geRunEnvironment().equals("test")? record.value().substring(34) : record.value();
String syslogMessage= record.value();
//剔除测试环境本机syslog新增的头部信息
LogNormalProcessor logNormalProcessor = new LogNormalProcessor(syslogMessage,sysLogUUID,AppConfig.getTopic());
//LogNormalProcessor logNormalProcessor =new LogNormalProcessor(record.value());
logNormalProcessor.init();
System.out.println("insert postgres syslog ,value:"+ record.key());
long costTime = System.currentTimeMillis() - startTime;
log.debug("消息处理完成, offset: {}, 耗时: {}ms", record.offset(), costTime);
} catch (Exception e) {
log.error("异步处理消息失败, offset: {}", record.offset(), e);
// 这里可以添加重试逻辑或死信队列处理
}
}
/**
* 处理消息批次
*/
private void processMessageBatch(List<ConsumerRecord<String, String>> batch, int batchIndex) {
long startTime = System.currentTimeMillis();
log.debug("开始处理批次: {}, 数量: {}", batchIndex, batch.size());
try {
List<CompletableFuture<Void>> futures = new ArrayList<>();
// 批次内并行处理
for (ConsumerRecord<String, String> record : batch) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
processSingleMessageAsync(record);
}, taskExecutor);
futures.add(future);
}
// 等待批次内所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(processTimeoutMs, TimeUnit.MILLISECONDS);
long costTime = System.currentTimeMillis() - startTime;
log.info("批次处理完成: {}, 数量: {}, 总耗时: {}ms",
batchIndex, batch.size(), costTime);
} catch (Exception e) {
log.error("批次处理异常: {}", batchIndex, e);
}
}
/**
* 判断是否为高优先级消息
*/
private boolean isHighPriorityMessage(ConsumerRecord<String, String> record) {
// 简单的优先级判断逻辑
// 可以根据key、header、或消息内容来判断
return record.key() != null &&
(record.key().contains("ERROR") || record.key().contains("URGENT"));
}
/**
* 消息解析(模拟)
*/
private String parseMessage(ConsumerRecord<String, String> record) {
// 模拟解析耗时
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return record.value();
}
/**
* 业务逻辑处理(模拟)
*/
private void processBusinessLogic(String message) {
// 模拟业务处理耗时
try {
Thread.sleep(5); // 模拟5ms处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 这里可以添加具体的业务逻辑
// 例如:数据转换、验证、丰富等
}
/**
* 获取处理统计信息
*/
public ProcessingStats getProcessingStats() {
return new ProcessingStats(
totalProcessed.get(),
currentBatchCount.get()
);
}
/**
* 处理统计信息类
*/
public static class ProcessingStats {
private final int totalProcessed;
private final int currentBatchCount;
public ProcessingStats(int totalProcessed, int currentBatchCount) {
this.totalProcessed = totalProcessed;
this.currentBatchCount = currentBatchCount;
}
public int getTotalProcessed() { return totalProcessed; }
public int getCurrentBatchCount() { return currentBatchCount; }
}
/**
* 获取日志信息UUID,格式: yyyyMMddxxxxxxxx
* @return
*/
private static String getSysLogUUID ()
{
// 获取当前日期
LocalDate currentDate = LocalDate.now();
// 定义格式 (yyyyMMdd)
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
// 格式化日期
String formattedDate = currentDate.format(formatter);
return formattedDate +"-"+ UUID.randomUUID() ;
}
}
@@ -0,0 +1,163 @@
package com.Modules.NormalData;
import com.common.entity.RuleContent.Cropper_paramsType;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import com.common.entity.SyslogMessage;
import com.common.util.JsonParser;
import com.common.util.SyslogParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.common.service.DmNormalizeRuleService;
import com.common.service.DmColumnService;
import java.util.List;
import java.util.Map;
import com.common.util.StringExtractorUtil;
import com.common.entity.RuleContent.kv_paramsType;
public class logNormalData {
private static final Logger logger = LoggerFactory.getLogger(logNormalData.class);
private String strLogMsg ;
private Map<String, Object> messageMap ;
@Autowired
public DmNormalizeRuleService dmNormalizeRuleService =new DmNormalizeRuleService();
@Autowired
public DmColumnService dmColumnService ;
public logNormalData( String LogMsg) {
strLogMsg=LogMsg;
/** 处理步骤 */
//初始化 (获取syslog_normal_data 表全部字段的属性、设备ID对应的规则设置、转化成JSON)
//判断数据解析类型(json、键值、xml、正则表达式、分割符)解析syslogMessage 字段转成 HashMAP
//匹配规则内容,获取字段映射关系配置,抽取命中的字段名称、数值
//判断字段内容及类型,根据数据类型属性,进行内容转换。
//生成insert SQL语句、执行入库操作。
}
public Map<String, Object> getMessageToMap( String MsgContent)
{
//解析SyslogMessage
SyslogMessage msg = SyslogParser.parse(MsgContent);
System.out.println("解析结果-log content: " + msg.getMessage().toString());
Map<String, Object> flatMap = JsonParser.parseJsonToFlatMap(msg.getMessage().toString());
flatMap.forEach((key, value) -> System.out.println(key + " = " + value));
List<Map<String, Object>> rulelst=dmNormalizeRuleService.selectByDeviceId((long)1);
return flatMap;
}
public List<Map<String, Object>> getRuleList( long device_id)
{
List<Map<String, Object>> rulelst=dmNormalizeRuleService.selectByDeviceId((long)1);
if (rulelst!=null)
{
System.out.println("rulelst: " + rulelst);
}
return rulelst;
}
/**
* 解析数据项内容,Json格式类型
* @param MsgContent
* @param cropperParams
* @return
*/
public static String ParserMessageJsonType(String MsgContent , Cropper_paramsType cropperParams)
{
System.out.println("ParserMessageJsonType MsgContent:" +MsgContent );
if(cropperParams ==null) {
return removeBOM(JsonParser.extractJsonFromLogMessage(MsgContent));
}
String strJson ="";
if(cropperParams.gettail_key().equals("") && cropperParams.gettail_offset()==0 ) {
strJson = StringExtractorUtil.extractFromStartToEnd(MsgContent, cropperParams.gethead_key(), cropperParams.gethead_offset());
System.out.println("ParserMessageJsonType extractFromStartToEnd strJson" +strJson );
}
else {
strJson= StringExtractorUtil.extract(MsgContent,cropperParams.gethead_key(),cropperParams.gethead_offset(),cropperParams.gettail_key(),cropperParams.gettail_offset() );
//return removeBOM(MsgContent).trim();
}
return removeBOM(strJson).trim();
}
public static String removeBOM(String s) {
if (s != null && s.startsWith("\uFEFF")) {
return s.substring(1);
}
return s;
}
//获取数据泛化规则的rule_content列表字段,kv类型提取 kv_params 节点值
public static kv_paramsType getkv_paramsType( Map<String, Object> dmNormalizeRule )
{
kv_paramsType kvParams = new kv_paramsType();
try {
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
kvParams.setInternal(jsonObject.getJSONObject("kv_params").get("internal").toString());
kvParams.setExternal(jsonObject.getJSONObject("kv_params").get("external").toString());
kvParams.setL_trim(jsonObject.getJSONObject("kv_params").get("l_trim").toString());
kvParams.setR_trim( jsonObject.getJSONObject("kv_params").get("r_trim").toString());
return kvParams;
}catch (Exception ex) {
logger.error("getkv_paramsType"+ex.getMessage());
return null;
}
}
//获取数据泛化规则的rule_content列表字段,sep类型提取 sep 节点值(分隔符)
public static String sepType( Map<String, Object> dmNormalizeRule )
{
try {
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
return jsonObject.get("sep").toString();
}catch (Exception ex) {
logger.error("regexp"+ex.getMessage());
return null;
}
}
//获取数据泛化规则的rule_content列表字段,regex类型提取 regexp 节点值(分隔符)
public static String Regexp ( Map <String, Object> dmNormalizeRule )
{
try {
JSONObject jsonObject = new JSONObject( dmNormalizeRule.get("rule_content").toString());
if (jsonObject.isEmpty()) return null;
return jsonObject.get("regexp").toString();
}catch (Exception ex) {
logger.error("sepType"+ex.getMessage());
return null;
}
}
public static void main(String[] args) {
String strlogMessage= "<14>1 2025-09-24T11:52:26Z 5f46d3be75e1 supermario 128 honeypot_event - {\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
String strMsgContent= "{\"source\":\"honeypot1\",\"id\":\"f6a13c35-bf9d-4da6-a181-50ce23e7ef6a\",\"start_time\":\"2023-09-03T11:07:02.50167643Z\",\"time\":\"2023-09-03T11:16:18.883885281Z\",\"risk_level\":4,\"connection\":\"b18f3fbe-3fbf-4495-815f-ff26f6fb0bdf\",\"file_info\":null,\"extra\":{\"payload\":{\"format\":\"line\",\"name\":{\"cn\":\"攻击载荷\",\"en\":\"payload\"},\"value\":\"\"},\"uid\":{\"format\":\"line\",\"name\":{\"cn\":\"\",\"en\":\"\"},\"uid\":\"b4cbc73c-25d0-4429-ae1b-a856cdf1a651\",\"value\":\"\"}},\"type\":\"WEB_ATTACK_SCANNER\",\"agent_sn\":\"caa7da42-0cca-4cb1-b501-1f1eb2b588d5\",\"agent_name\":\" 教育局蜜罐探针\",\"honeypot_id\":\"11a9ac6bdf38ae2aaa49ec4f1b4a921bff71952cb9f175bdd8ee1f0497057bc6\",\"honeypot_name\":\"茂名市中小学管理平台管理后台\",\"src_ip\":\"117.50.189.7\",\"src_port\":58512,\"src_mac\":\"\",\"dest_ip\":\"192.168.222.2\",\"dest_port\":9200,\"proxy_ip\":null,\"node\":\"WRx3\"}";
//Map<String, Object> flatMap =(new logNormalData()).getMessageToMap(strMsgContent);
//flatMap.forEach((key, value) -> System.out.println(key + " = " + value));
logNormalData logData =new logNormalData( strlogMessage);
Cropper_paramsType cropperParams =new Cropper_paramsType();
cropperParams.settail_offset(0);
cropperParams.settail_key("");
cropperParams.sethead_key("{");
cropperParams.sethead_offset(-1);
String str= ParserMessageJsonType(strlogMessage,cropperParams);
System.out.println("parser msg:"+str);
//List<Map<String, Object>> rulelst= logData.getRuleList(1);
}
}
@@ -0,0 +1,77 @@
package com.Modules.etl;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
public class TimeWindowCalculator {
/**
* 获取前一个5分钟完整时间周期的开始时间和结束时间
* @param currentTime 当前任务调度时间
* @return 包含开始时间和结束时间的数组,[0]=开始时间,[1]=结束时间
*/
public static LocalDateTime[] getPrevious5MinuteWindow(LocalDateTime currentTime) {
// 获取当前时间的分钟数
int currentMinute = currentTime.getMinute();
// 计算前一个5分钟周期的结束分钟数(向下取整到5的倍数)
int previousWindowEndMinute = (currentMinute / 5) * 5;
// 构建前一个周期的结束时间
LocalDateTime previousWindowEnd = currentTime
.withMinute(previousWindowEndMinute)
.withSecond(0)
.withNano(0);
// 前一个周期的开始时间是结束时间减去5分钟
LocalDateTime previousWindowStart = previousWindowEnd.minusMinutes(5);
return new LocalDateTime[]{previousWindowStart, previousWindowEnd};
}
/**
* 重载方法:使用当前系统时间
*/
public static LocalDateTime[] getPrevious5MinuteWindow() {
return getPrevious5MinuteWindow(LocalDateTime.now());
}
public static void main(String[] args) {
// 测试用例
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 测试1: 15:06:00
LocalDateTime testTime1 = LocalDateTime.of(2025, 11, 18, 15, 7, 15);
LocalDateTime[] window1 = getPrevious5MinuteWindow(testTime1);
System.out.println("当前时间: " + testTime1.format(formatter));
System.out.println("前一个5分钟周期: " +
window1[0].format(formatter) + " - " +
window1[1].format(formatter));
System.out.println();
// 测试2: 15:10:00
LocalDateTime testTime2 = LocalDateTime.of(2025, 11, 18, 15, 12, 20);
LocalDateTime[] window2 = getPrevious5MinuteWindow(testTime2);
System.out.println("当前时间: " + testTime2.format(formatter));
System.out.println("前一个5分钟周期: " +
window2[0].format(formatter) + " - " +
window2[1].format(formatter));
System.out.println();
// 测试3: 整点情况 16:00:00
LocalDateTime testTime3 = LocalDateTime.of(2025, 11, 18, 16, 0, 15);
LocalDateTime[] window3 = getPrevious5MinuteWindow(testTime3);
System.out.println("当前时间: " + testTime3.format(formatter));
System.out.println("前一个5分钟周期: " +
window3[0].format(formatter) + " - " +
window3[1].format(formatter));
System.out.println();
// 测试4: 使用当前系统时间
LocalDateTime[] currentWindow = getPrevious5MinuteWindow();
System.out.println("当前系统时间的前一个5分钟周期: " +
currentWindow[0].format(formatter) + " - " +
currentWindow[1].format(formatter));
}
}
@@ -0,0 +1,47 @@
package com.Modules.etl.handler;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;
import java.sql.*;
@MappedTypes(byte[][].class)
@MappedJdbcTypes(JdbcType.ARRAY)
public class ArrayByteTypeHandler extends BaseTypeHandler<byte[][]> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, byte[][] parameter, JdbcType jdbcType) throws SQLException {
Array array = ps.getConnection().createArrayOf("bytea", parameter);
ps.setArray(i, array);
}
@Override
public byte[][] getNullableResult(ResultSet rs, String columnName) throws SQLException {
return getArray(rs.getArray(columnName));
}
@Override
public byte[][] getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
return getArray(rs.getArray(columnIndex));
}
@Override
public byte[][] getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
return getArray(cs.getArray(columnIndex));
}
private byte[][] getArray(Array array) throws SQLException {
if (array != null) {
Object[] objArray = (Object[]) array.getArray();
byte[][] result = new byte[objArray.length][];
for (int i = 0; i < objArray.length; i++) {
result[i] = (byte[]) objArray[i];
}
return result;
}
return null;
}
}
@@ -0,0 +1,43 @@
package com.Modules.etl.handler;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;
import java.sql.*;
import java.util.Arrays;
@MappedTypes(Integer[].class)
@MappedJdbcTypes(JdbcType.ARRAY)
public class ArrayIntegerTypeHandler extends BaseTypeHandler<Integer[]> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, Integer[] parameter, JdbcType jdbcType) throws SQLException {
Array array = ps.getConnection().createArrayOf("integer", parameter);
ps.setArray(i, array);
}
@Override
public Integer[] getNullableResult(ResultSet rs, String columnName) throws SQLException {
return getArray(rs.getArray(columnName));
}
@Override
public Integer[] getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
return getArray(rs.getArray(columnIndex));
}
@Override
public Integer[] getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
return getArray(cs.getArray(columnIndex));
}
private Integer[] getArray(Array array) throws SQLException {
if (array != null) {
Object[] objArray = (Object[]) array.getArray();
return Arrays.copyOf(objArray, objArray.length, Integer[].class);
}
return null;
}
}
@@ -0,0 +1,43 @@
package com.Modules.etl.handler;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedJdbcTypes;
import org.apache.ibatis.type.MappedTypes;
import java.sql.*;
import java.util.Arrays;
@MappedTypes(String[].class)
@MappedJdbcTypes(JdbcType.ARRAY)
public class ArrayStringTypeHandler extends BaseTypeHandler<String[]> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i, String[] parameter, JdbcType jdbcType) throws SQLException {
Array array = ps.getConnection().createArrayOf("text", parameter);
ps.setArray(i, array);
}
@Override
public String[] getNullableResult(ResultSet rs, String columnName) throws SQLException {
return getArray(rs.getArray(columnName));
}
@Override
public String[] getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
return getArray(rs.getArray(columnIndex));
}
@Override
public String[] getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
return getArray(cs.getArray(columnIndex));
}
private String[] getArray(Array array) throws SQLException {
if (array != null) {
Object[] objArray = (Object[]) array.getArray();
return Arrays.copyOf(objArray, objArray.length, String[].class);
}
return null;
}
}
@@ -0,0 +1,36 @@
package com.Modules.etl.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ETLRetryHandler {
/**
* 带重试机制的ETL任务执行
*/
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 5000))
public void executeWithRetry(Runnable etlTask) {
try {
etlTask.run();
} catch (Exception e) {
log.warn("ETL任务执行失败,准备重试", e);
throw e;
}
}
/**
* 重试失败后的恢复处理
*/
@Recover
public void recover(Exception e, Runnable etlTask) {
log.error("ETL任务重试多次后仍然失败", e);
// 这里可以发送告警通知、记录失败状态等
throw new RuntimeException("ETL任务最终执行失败", e);
}
}
@@ -0,0 +1,67 @@
package com.Modules.etl.handler;
import org.apache.ibatis.type.BaseTypeHandler;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.MappedTypes;
import java.sql.*;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
@MappedTypes(LocalDateTime.class)
public class TimestamptzTypeHandler extends BaseTypeHandler<LocalDateTime> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i,
LocalDateTime parameter, JdbcType jdbcType)
throws SQLException {
if (parameter == null) {
ps.setTimestamp(i, null);
} else {
// 转换为Timestamp
ps.setTimestamp(i, Timestamp.valueOf(parameter));
}
}
@Override
public LocalDateTime getNullableResult(ResultSet rs, String columnName)
throws SQLException {
Timestamp timestamp = rs.getTimestamp(columnName);
return convertToLocalDateTime(timestamp);
}
@Override
public LocalDateTime getNullableResult(ResultSet rs, int columnIndex)
throws SQLException {
Timestamp timestamp = rs.getTimestamp(columnIndex);
return convertToLocalDateTime(timestamp);
}
@Override
public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex)
throws SQLException {
Timestamp timestamp = cs.getTimestamp(columnIndex);
return convertToLocalDateTime(timestamp);
}
private LocalDateTime convertToLocalDateTime(Timestamp timestamp) {
if (timestamp == null) {
return null;
}
try {
// 方法1: 直接从Timestamp转换
return timestamp.toLocalDateTime();
// 方法2: 通过Instant转换(处理时区)
// Instant instant = timestamp.toInstant();
// return LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
} catch (Exception e) {
throw new RuntimeException("Failed to convert TIMESTAMPTZ to LocalDateTime", e);
}
}
}