Files
ai-security-xdr/haobang-security-xdr/syslog-consumer/src/main/java/com/common/service/AccessLogAlertService.java
T

454 lines
19 KiB
Java

package com.common.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.common.entity.AlarmVisit;
import com.common.entity.DeviceDevice;
import com.common.entity.SecExceptionAlgorithm;
import com.common.entity.SyslogNormalData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.common.mapper.*;
import java.net.InetAddress;
import com.common.util.AlgorithmResultParser;
@Slf4j
@Service
@EnableScheduling
@EnableAsync
public class AccessLogAlertService {
@Autowired
public static DeviceDeviceService deviceDeviceService ;
@Autowired
private AlgorithmResultParser algorithmResultParser;
@Autowired
private RestTemplate restTemplate;
@Autowired
private SyslogNormalDataMapper syslogNormalDataMapper;
@Autowired
private SecExceptionAlgorithmMapper secExceptionAlgorithmMapper;
@Autowired
private AlarmVisitMapper alarmVisitMapper;
// 记录上次处理的时间戳
private LocalDateTime lastProcessTime;
// 时间格式化器
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
// 存储算法配置的缓存
private Map<Long, SecExceptionAlgorithm> algorithmCache = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 初始化时设置为当前时间减2分钟
lastProcessTime = LocalDateTime.now().minusMinutes(1);
log.info("初始化AccessLogAlertService,上次处理时间: {}", lastProcessTime);
// 加载启用的算法配置到缓存
loadAlgorithmConfigs();
}
/**
* 加载启用的算法配置
*/
private void loadAlgorithmConfigs() {
try {
List<SecExceptionAlgorithm> enabledAlgorithms = secExceptionAlgorithmMapper.findEnabledAlgorithms();
algorithmCache.clear();
for (SecExceptionAlgorithm algorithm : enabledAlgorithms) {
algorithmCache.put(algorithm.getId(), algorithm);
}
log.info("加载了 {} 个启用的算法配置", algorithmCache.size());
} catch (Exception e) {
log.error("加载算法配置失败: {}", e.getMessage(), e);
}
}
/**
* 批量处理开关 - 避免重复处理
*/
private AtomicBoolean processing = new AtomicBoolean(false);
/**
* 安全的定时任务入口
*/
@Scheduled(cron = "0 */1 * * * ?")
public void safeProcessTask() {
if (processing.compareAndSet(false, true)) {
try {
processAccessLogAlert();
} finally {
processing.set(false);
}
} else {
log.warn("上一个任务仍在执行中,跳过本次执行");
}
}
/**
* 定时任务入口 - 每2分钟执行一次
*/
@Scheduled(cron = "0 */1 * * * ?")
@Async
public void processAccessLogAlert() {
log.info("开始执行访问日志告警处理任务");
try {
// 1. 刷新算法配置缓存
loadAlgorithmConfigs();
if (algorithmCache.isEmpty()) {
log.warn("没有启用的算法配置,跳过本次处理");
return;
}
// 2. 获取上次处理时间之后的日志数据
LocalDateTime currentTime = LocalDateTime.now();
//List<SyslogNormalData> newLogs = syslogNormalDataMapper.findAfterTime(lastProcessTime);
List<SyslogNormalData> newLogs = syslogNormalDataMapper.findRequiredFieldsAfterTime(lastProcessTime);
//List<SyslogNormalData> newLogs =algorithmResultParser.buildNewLogsFromExample();
if (newLogs.isEmpty()) {
log.info("没有发现新的日志数据,上次处理时间: {}", lastProcessTime);
return;
}
log.info("获取到 {} 条新的日志数据,时间范围: {} 到 {}",
newLogs.size(), lastProcessTime, currentTime);
// 3. 处理每条启用的算法配置
for (SecExceptionAlgorithm algorithm : algorithmCache.values()) {
try {
processAlgorithm(algorithm, newLogs);
} catch (Exception e) {
log.error("处理算法配置失败 [算法ID: {}, 算法名称: {}]: {}",
algorithm.getId(), algorithm.getAlgorithmName(), e.getMessage(), e);
}
}
// 4. 更新处理时间
lastProcessTime = currentTime;
log.info("访问日志告警处理任务完成,下次将从 {} 开始处理", lastProcessTime);
} catch (Exception e) {
log.error("访问日志告警处理任务异常: {}", e.getMessage(), e);
}
}
/**
* 处理单个算法配置
*/
private void processAlgorithm(SecExceptionAlgorithm algorithm, List<SyslogNormalData> logs) {
log.info("开始处理算法: {} (ID: {})", algorithm.getAlgorithmName(), algorithm.getId());
// 1. 构建API请求参数
JSONArray requestBody = buildRequestBody(logs);
if (requestBody.isEmpty()) {
log.info("算法 {} 无有效的请求参数,跳过", algorithm.getAlgorithmName());
return;
}
// 2. 调用Python算子API
ResponseEntity<String> response = callAlgorithmApi(algorithm, requestBody);
if (response == null || !response.getStatusCode().is2xxSuccessful()) {
log.error("调用算法API失败: {} - {}", algorithm.getApiUrl(),
response != null ? response.getStatusCode() : "无响应");
return;
}
// 直接解析响应体为JSONArray
JSONArray results = JSON.parseArray(response.getBody());
if (results == null || results.isEmpty()) {
log.info("算法 {} 未检测到告警", algorithm.getAlgorithmName());
return;
}
// 3. 解析API响应并写入告警表
processApiResponse(algorithm, response.getBody(), logs);
}
/**
* 构建API请求体
*/
private JSONArray buildRequestBody(List<SyslogNormalData> logs) {
JSONArray requestArray = new JSONArray();
for (SyslogNormalData logData : logs) {
JSONObject logObject = new JSONObject();
// 按照要求的格式构建请求参数
logObject.put("_id", logData.getId() != null ? logData.getId() : "");
logObject.put("_index", "es1:skyeye-file-" +
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
logObject.put("_source.@timestamp",
ZonedDateTime.of(logData.getLogTime(), ZoneId.systemDefault())
.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
logObject.put("_source.@version", "6");
logObject.put("_source.access_time",
logData.getLogTime().format(DATE_TIME_FORMATTER));
logObject.put("_source.device_ip",
logData.getDeviceIp() != null ? logData.getDeviceIp() : "");
logObject.put("_source.dip",
logData.getDestIp() != null ? logData.getDestIp() : "");
logObject.put("_source.dport", logData.getDestPort() != null ? logData.getDestPort() : 0);
logObject.put("_source.dst_mac", logData.getDestMac() != null ? logData.getDestMac() : "");
logObject.put("_source.file_dir", logData.getHostFilePath() != null ? 1 : 0);
logObject.put("_source.file_md5", logData.getFileMd5() != null ? logData.getFileMd5() : "");
logObject.put("_source.filename", logData.getFileName() != null ? logData.getFileName() : "");
// Geo信息
JSONObject geoDip = new JSONObject();
geoDip.put("city_name", logData.getDestCity() != null ? logData.getDestCity() : "");
geoDip.put("continent_code", "");
geoDip.put("country_code2", "");
geoDip.put("country_name", logData.getDestCountry() != null ? logData.getDestCountry() : "");
geoDip.put("latitude", logData.getDestLat() != null ? logData.getDestLat() : "");
geoDip.put("longitude", logData.getDestLon() != null ? logData.getDestLon() : "");
JSONObject geoSip = new JSONObject();
geoSip.put("city_name", logData.getSrcCity() != null ? logData.getSrcCity() : "");
geoSip.put("continent_code", "");
geoSip.put("country_code2", logData.getSrcCountryCode() != null ? logData.getSrcCountryCode() : "");
geoSip.put("country_name", logData.getSrcCountry() != null ? logData.getSrcCountry() : "");
geoSip.put("latitude", logData.getSrcLat() != null ? logData.getSrcLat() : "");
geoSip.put("longitude", logData.getSrcLon() != null ? logData.getSrcLon() : "");
logObject.put("_source.geo_dip", geoDip);
logObject.put("_source.geo_sip", geoSip);
// HTTP相关字段
logObject.put("_source.host", logData.getHttpHost() != null ? logData.getHttpHost() : "");
logObject.put("_source.host_md5", logData.getHostFileMd5() != null ? logData.getHostFileMd5() : "");
logObject.put("_source.host_raw", logData.getHttpReqHeaderRaw() != null ? logData.getHttpReqHeaderRaw() : "");
logObject.put("_source.host_reraw",
logData.getHttpReqHeaderRaw() != null ?
new StringBuilder(logData.getHttpReqHeaderRaw()).reverse().toString() : "");
logObject.put("_source.method", logData.getHttpMethod() != null ? logData.getHttpMethod() : "");
logObject.put("_source.mime_type",
logData.getHttpRespContentType() != null ? logData.getHttpRespContentType() : "");
logObject.put("_source.priv_info", "{\"product_type\": \"sensor\"}");
logObject.put("_source.proto", logData.getProto() != null ? logData.getProto() : "");
logObject.put("_source.referer", logData.getHttpReferer() != null ? logData.getHttpReferer() : "");
logObject.put("_source.serial_num", UUID.randomUUID().toString().substring(0, 9));
logObject.put("_source.sess_key", new Random().nextInt());
logObject.put("_source.sip",
logData.getSrcIp() != null ? logData.getSrcIp() : "");
logObject.put("_source.sport", logData.getSrcPort() != null ? logData.getSrcPort() : 0);
logObject.put("_source.src_mac", logData.getSrcMac() != null ? logData.getSrcMac() : "");
logObject.put("_source.status",
logData.getHttpStatusCode() != null ? String.valueOf(logData.getHttpStatusCode()) : "");
logObject.put("_source.uri", logData.getHttpUrl() != null ? logData.getHttpUrl() : "");
logObject.put("_source.uri_md5",
logData.getHttpUrl() != null ?
UUID.nameUUIDFromBytes(logData.getHttpUrl().getBytes()).toString() : "");
logObject.put("_source.vendor_id", "");
logObject.put("_source.vlan_id", "");
logObject.put("_type", "skyeye-file");
requestArray.add(logObject);
}
return requestArray;
}
/**
* 调用算法API
*/
private ResponseEntity<String> callAlgorithmApi(SecExceptionAlgorithm algorithm, JSONArray requestBody) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Accept", MediaType.APPLICATION_JSON_VALUE);
HttpEntity<String> requestEntity;
if ("GET".equalsIgnoreCase(algorithm.getApiMethod())) {
// 对于GET请求,将参数放在URL中
String url = algorithm.getApiUrl() + "?data=" + requestBody.toJSONString();
requestEntity = new HttpEntity<>(headers);
return restTemplate.exchange(url, HttpMethod.GET, requestEntity, String.class);
} else {
// POST请求
requestEntity = new HttpEntity<>(requestBody.toJSONString(), headers);
return restTemplate.exchange(algorithm.getApiUrl(), HttpMethod.POST, requestEntity, String.class);
}
} catch (Exception e) {
log.error("调用算法API异常 [URL: {}]: {}", algorithm.getApiUrl(), e.getMessage(), e);
return null;
}
}
/**
* 处理API响应
*/
@Transactional
private void processApiResponse(SecExceptionAlgorithm algorithm, String responseBody, List<SyslogNormalData> logs) {
try {
//JSONObject responseJson = JSON.parseObject(responseBody);
JSONArray results = JSON.parseArray(responseBody);
/**
if (responseJson == null || !responseJson.containsKey("results")) {
log.warn("算法API返回格式异常: {}", responseBody);
return;
}
JSONArray results = responseJson.getJSONArray("results");
if (results == null || results.isEmpty()) {
log.info("算法 {} 未检测到告警", algorithm.getAlgorithmName());
return;
}
**/
int alarmCount = 0;
for (int i = 0; i < results.size(); i++) {
JSONObject alarmResult = results.getJSONObject(i);
// 创建告警记录
AlarmVisit alarmVisit = AlarmVisit.builder()
.id(UUID.randomUUID().toString())
.createdAt(LocalDateTime.now())
.alarmName(alarmResult.getString("dname"))
.alarmLevel("未知") // 可根据算法类型设置,默认未知
.alarmType(algorithm.getExceptionType())
.victimWebUrl(new String[]{alarmResult.getString("url")})
.logStartAt(parseDateTime(alarmResult.getString("access_time")))
.attackIp(new String[]{alarmResult.getString("sip")})
.victimIp(new String[]{alarmResult.getString("dip")})
.httpStatus(alarmResult.getString("status_code"))
.comment(alarmResult.getString("reason") + " - " + alarmResult.getString("origin_field"))
.originLogIds(new String[]{alarmResult.getString("log_id")})
.engineType("Python算子")
.logCount(1)
.alarmSource(2) // 算子告警
.attackResult(-1)
.fall(0)
.attackDirection("other")
.etlTime(LocalDateTime.now())
.isAssetHit(0)
.focused(false)
.baseFocused(false)
.isUpdated(false)
.judgedState(0)
.disposedState(0)
.dispositionAdvice("研判后处置")
.dnsInfo(alarmResult.getString("host"))
.build();
//补充返回结果的原始日志字段
AddOriginLogField(algorithm.getAlgorithmName(),alarmVisit,alarmResult);
// 保存告警记录
alarmVisitMapper.insert(alarmVisit);
alarmCount++;
}
log.info("算法 {} 生成 {} 条告警记录", algorithm.getAlgorithmName(), alarmCount);
} catch (Exception e) {
log.error("处理API响应异常: {}", e.getMessage(), e);
throw new RuntimeException("处理API响应失败", e);
}
}
/**
* 解析时间字符串
*/
private LocalDateTime parseDateTime(String timeStr) {
try {
if (timeStr == null || timeStr.isEmpty()) {
return LocalDateTime.now();
}
// 尝试多种时间格式
try {
return LocalDateTime.parse(timeStr);
} catch (Exception e) {
LocalDateTime localDateTime = LocalDateTime.parse(
timeStr,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
);
return LocalDateTime.now(ZoneId.systemDefault());
}
} catch (Exception e) {
log.warn("时间解析失败: {}, 使用当前时间", timeStr);
return LocalDateTime.now();
}
}
/**
* 补充返回结果的原始日志字段
* @param AlgorithmName
* @param alarmVisit
* @param alarmResult
* @return
*/
private boolean AddOriginLogField(String AlgorithmName, AlarmVisit alarmVisit ,JSONObject alarmResult )
{
try {
JSONObject originLogObject= alarmResult.getJSONObject("origin_log");
if(originLogObject.isEmpty()) {
log.debug("算法:{},ID:{} ,AlarmNme:{} 没有返回 origin_log节点.",AlgorithmName, alarmVisit.getId(), alarmVisit.getAlarmName());
return false;
}
alarmVisit.setAttackPort( new Integer[]{alarmResult.getInteger("_source.sport")} );
alarmVisit.setVictimPort( new Integer[]{alarmResult.getInteger("_source.dport")} );
alarmVisit.setAttackMethod(alarmResult.getString("_source.method") );
String deviceIp= alarmResult.getString("_source.device_ip");
//alarmVisit.setDeviceId( new Integer[]{ getDeviceID(deviceIp)} );
alarmVisit.setHttpStatus( alarmResult.getString("_source.status"));
return true;
} catch (Exception e) {
log.error("算法:{} 补充原始记录日志字段异常。error:{} ",AlgorithmName,e.getMessage(), e );
return false;
}
}
public int getDeviceID(String source_ip)
{
//默认deviceId =-1
int deviceId=-1 ;
List<DeviceDevice> deviceList= deviceDeviceService.getByIpSafely(source_ip);
if(deviceList.isEmpty()) {
return deviceId;
}
if(deviceList.size()>1)
{
log.error("设备请求的Host IP注册超过一条记录,请联系管理员处理!");
return deviceId;
}
return deviceList.get(0).getId();
}
}