1、完善kafka 接收消息进行sm4 解密
2、新增IP联动封禁相关的API接口,供探针模块进行调用。
This commit is contained in:
+12
-4
@@ -79,9 +79,9 @@ public class LogNormalProcessor {
|
||||
@Autowired
|
||||
DmNormalizeRuleMapper dmNormalizeRuleMapper;
|
||||
|
||||
private static List<Map<String, Object>> dmNormalizeRuleList;
|
||||
private static List<Map<String, Object>> dmColumnList;
|
||||
private static LinkedHashMap<String, Object> OrginalColumnMap ;
|
||||
private List<Map<String, Object>> dmNormalizeRuleList;
|
||||
private List<Map<String, Object>> dmColumnList;
|
||||
private LinkedHashMap<String, Object> OrginalColumnMap ;
|
||||
|
||||
public LogNormalProcessor( String LogMsg, String syslogUUID,String syslogTopic) {
|
||||
|
||||
@@ -489,7 +489,15 @@ public class LogNormalProcessor {
|
||||
{
|
||||
Map<String, Object > columnMap= new HashMap<>();
|
||||
for (Map<String, Object> map : normalColumnList) {
|
||||
columnMap.put(map.get("dest_field").toString(),map.get("dest_field_value"));
|
||||
|
||||
Object destFieldValue = map.get("dest_field_value");
|
||||
// 判断 dest_field_value 是否为 String 且包含 "\u0000"
|
||||
if (destFieldValue instanceof String && ((String) destFieldValue).contains("\u0000")) {
|
||||
// 替换掉所有 "\u0000" 字符
|
||||
destFieldValue = ((String) destFieldValue).replace("\u0000", "");
|
||||
}
|
||||
columnMap.put(map.get("dest_field").toString(), destFieldValue);
|
||||
//columnMap.put(map.get("dest_field").toString(),map.get("dest_field_value"));
|
||||
}
|
||||
return columnMap;
|
||||
}
|
||||
|
||||
+16
-4
@@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
import com.common.util.Sm4Util;
|
||||
import com.config.AppConfig;
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SysLogProcessor {
|
||||
@@ -40,10 +43,14 @@ public class SysLogProcessor {
|
||||
@Value("${app.processor.process-timeout-ms:30000}")
|
||||
private long processTimeoutMs;
|
||||
|
||||
private static String strhexKey=AppConfig.getSM4Key();
|
||||
private final AtomicInteger totalProcessed = new AtomicInteger(0);
|
||||
private final AtomicInteger currentBatchCount = new AtomicInteger(0);
|
||||
// 初始化 InfluxDB 客户端
|
||||
private final com.influx.InfluxDBClient influxClient = new InfluxDBClient();
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 方案一:直接多线程并行处理(推荐)
|
||||
* 单线程消费,每条消息独立提交给线程池处理
|
||||
@@ -80,7 +87,7 @@ public class SysLogProcessor {
|
||||
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
// 异步处理单条消息
|
||||
log.info("收到syslogmessage:"+ record.value());
|
||||
log.info("收到syslogmessage:"+ Sm4Util.decryptCbc(record.value(), strhexKey));
|
||||
processSingleMessageAsync(record);
|
||||
} catch (Exception e) {
|
||||
log.error("处理消息失败, topic: {}, partition: {}, offset: {}",
|
||||
@@ -251,8 +258,13 @@ public class SysLogProcessor {
|
||||
// 模拟业务处理
|
||||
//processBusinessLogic(message);
|
||||
|
||||
//Message进行SM4解密
|
||||
String Sm4message=Sm4Util.decryptCbc(record.value(), strhexKey);
|
||||
System.out.println("Sm4message:"+Sm4message);
|
||||
|
||||
|
||||
String sysLogUUID =getSysLogUUID();
|
||||
String strDeviceInfo= SyslogParser.substringBeforeFirstChar(record.value(),']');
|
||||
String strDeviceInfo= SyslogParser.substringBeforeFirstChar(Sm4message,']');
|
||||
Map<String,String> mapdev =SyslogParser.parseKeyValuePairs(strDeviceInfo);
|
||||
|
||||
// 初始化 InfluxDB 客户端
|
||||
@@ -261,7 +273,7 @@ public class SysLogProcessor {
|
||||
.addTag("device_collect_id", mapdev.get("device_collect_id")) // 添加探针ID标签
|
||||
.addTag("uuid", sysLogUUID) //syslog uuid
|
||||
.addTag("topic", AppConfig.getTopic()) //kafka topic
|
||||
.addField("message", record.value()) // 添加字段
|
||||
.addField("message", Sm4message) // 添加字段
|
||||
.addField("receive_time", mapdev.get("receive_time")) // 添加字段
|
||||
.addField("uuid", sysLogUUID)
|
||||
.time(System.currentTimeMillis(), WritePrecision.MS) ;// 毫秒级时间戳
|
||||
@@ -272,7 +284,7 @@ public class SysLogProcessor {
|
||||
//insertSingleRecord( record.value());
|
||||
|
||||
//String syslogMessage= AppConfig.geRunEnvironment().equals("test")? record.value().substring(34) : record.value();
|
||||
String syslogMessage= record.value();
|
||||
String syslogMessage= Sm4message;
|
||||
//剔除测试环境本机syslog新增的头部信息
|
||||
LogNormalProcessor logNormalProcessor = new LogNormalProcessor(syslogMessage,sysLogUUID,AppConfig.getTopic());
|
||||
//LogNormalProcessor logNormalProcessor =new LogNormalProcessor(record.value());
|
||||
|
||||
+23
-15
@@ -33,11 +33,27 @@ public class ETLOrchestrator {
|
||||
private NormalizeRuleHitTimeService normalizeRuleHitTimeService;
|
||||
/**
|
||||
* 定时任务 - 从每小时第1分钟开始,5分钟间隔执行
|
||||
* 20260317:暂定硬规则关联分析
|
||||
* 20260317:暂停硬规则关联分析,由可配置关联分析规则取代
|
||||
* 泛化规则最新命中时间更新任务保留
|
||||
*/
|
||||
//@Scheduled(cron = "0 1/5 * * * ?")
|
||||
@Scheduled(cron = "0 1/5 * * * ?")
|
||||
public void scheduledETL() {
|
||||
|
||||
//暂停ETL数据降噪任务(关联分析)
|
||||
//RunETL();
|
||||
|
||||
//泛化规则最新命中时间更新任务
|
||||
try {
|
||||
normalizeRuleHitTimeService.updateRuleHitTimeTask();
|
||||
} catch (Exception e) {
|
||||
log.error("泛化规则最新命中时间更新任务执行失败", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//ETL数据降噪任务处理
|
||||
private void RunETL()
|
||||
{
|
||||
long startTime = System.currentTimeMillis();
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
LocalDateTime[] currentWindow=TimeWindowCalculator.getPrevious5MinuteWindow();
|
||||
@@ -48,7 +64,7 @@ public class ETLOrchestrator {
|
||||
try {
|
||||
//retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcess24HoursGroupedData());
|
||||
//retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursGroupedData(strStartTime,strEndTime ));
|
||||
retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursAlarm(strStartTime,strEndTime ));
|
||||
//retryHandler.executeWithRetry(() -> dataExtractor.extractAndProcessQueryHoursAlarm(strStartTime,strEndTime ));
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = (endTime - startTime) / 1000;
|
||||
log.info("定时ETL任务执行完成,耗时: {} 秒", duration);
|
||||
@@ -56,14 +72,6 @@ public class ETLOrchestrator {
|
||||
} catch (Exception e) {
|
||||
log.error("定时ETL任务执行失败", e);
|
||||
}
|
||||
|
||||
//泛化规则最新命中时间更新任务
|
||||
try {
|
||||
normalizeRuleHitTimeService.updateRuleHitTimeTask();
|
||||
} catch (Exception e) {
|
||||
log.error("泛化规则最新命中时间更新任务执行失败", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -105,12 +113,12 @@ public class ETLOrchestrator {
|
||||
* 每天凌晨3点清理2天前的数据
|
||||
*/
|
||||
@Scheduled(cron = "0 0 3 * * ?")
|
||||
//@Scheduled(cron = "0 * * * * ?")
|
||||
public void cleanupOldLogs() {
|
||||
public void cleanupOldLogs() {
|
||||
try {
|
||||
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(2);
|
||||
//默认删除7天内接收日志记录
|
||||
LocalDateTime cutoffTime = LocalDateTime.now().minusDays(7);
|
||||
int deleted = deviceReceiveLogService.deleteOldLogs(cutoffTime);
|
||||
log.info("定时清理任务完成,删除{}条2天前的日志", deleted);
|
||||
log.info("定时清理任务完成,删除{}条7天前的日志", deleted);
|
||||
} catch (Exception e) {
|
||||
log.error("定时清理日志失败", e);
|
||||
}
|
||||
|
||||
+1
-9
@@ -52,15 +52,7 @@ public class PartitionTableSchedule {
|
||||
logger.info("测试任务: 分区表创建完成");
|
||||
}
|
||||
|
||||
/**
|
||||
* 每天检查一次分区表状态(可选)
|
||||
*/
|
||||
@Scheduled(cron = "0 0 2 * * ?")
|
||||
public void checkPartitionTableStatus() {
|
||||
logger.info("开始检查分区表状态...");
|
||||
// 这里可以添加分区表状态检查逻辑
|
||||
logger.info("分区表状态检查完成");
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
||||
+2
-2
@@ -85,9 +85,9 @@ public class DeviceStatsUpdateService {
|
||||
" updated_at = NOW() " ;
|
||||
|
||||
/**
|
||||
* 每分钟执行一次统计更新(秒:0,分:*,时:*)
|
||||
* 每5分钟执行一次设备统计更新(秒:0,分:*,时:*)
|
||||
*/
|
||||
@Scheduled(cron = "0 * * * * ?")
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
@Transactional
|
||||
public void updateDeviceStats() {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
@@ -101,5 +101,7 @@ public class AppConfig {
|
||||
|
||||
public static String geRunEnvironment() { return config.getString("server.run.environment"); }
|
||||
|
||||
|
||||
public static String getSM4Key() {
|
||||
return config.getString("syslog.sm4.generateKey");
|
||||
}
|
||||
}
|
||||
|
||||
+1
-2
@@ -95,8 +95,7 @@ public class SyslogNonNormalMessageController {
|
||||
/**
|
||||
* 删除非标日志
|
||||
*/
|
||||
@DeleteMapping("/delete/{id}")
|
||||
|
||||
//@DeleteMapping("/delete/{id}")
|
||||
public ResponseEntity<Map<String, Object>> deleteMessage(
|
||||
@PathVariable String id) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
|
||||
Reference in New Issue
Block a user