package com.common.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.common.entity.AlarmVisit; import com.common.entity.DeviceDevice; import com.common.entity.SecExceptionAlgorithm; import com.common.entity.SyslogNormalData; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.*; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; import javax.annotation.PostConstruct; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import com.common.mapper.*; import java.net.InetAddress; import com.common.util.AlgorithmResultParser; @Slf4j @Service @EnableScheduling @EnableAsync public class AccessLogAlertService { @Autowired public static DeviceDeviceService deviceDeviceService ; @Autowired private AlgorithmResultParser algorithmResultParser; @Autowired private RestTemplate restTemplate; @Autowired private SyslogNormalDataMapper syslogNormalDataMapper; @Autowired private SecExceptionAlgorithmMapper secExceptionAlgorithmMapper; @Autowired private AlarmVisitMapper alarmVisitMapper; // 记录上次处理的时间戳 private LocalDateTime lastProcessTime; // 时间格式化器 private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); // 存储算法配置的缓存 private Map algorithmCache = new ConcurrentHashMap<>(); @PostConstruct public void init() { // 初始化时设置为当前时间减2分钟 lastProcessTime = LocalDateTime.now().minusMinutes(1); log.info("初始化AccessLogAlertService,上次处理时间: {}", lastProcessTime); // 加载启用的算法配置到缓存 loadAlgorithmConfigs(); } /** * 加载启用的算法配置 */ private void loadAlgorithmConfigs() { try { List enabledAlgorithms = secExceptionAlgorithmMapper.findEnabledAlgorithms(); algorithmCache.clear(); for (SecExceptionAlgorithm algorithm : enabledAlgorithms) { algorithmCache.put(algorithm.getId(), algorithm); } log.info("加载了 {} 个启用的算法配置", algorithmCache.size()); } catch (Exception e) { log.error("加载算法配置失败: {}", e.getMessage(), e); } } /** * 批量处理开关 - 避免重复处理 */ private AtomicBoolean processing = new AtomicBoolean(false); /** * 安全的定时任务入口 */ @Scheduled(cron = "0 */1 * * * ?") public void safeProcessTask() { if (processing.compareAndSet(false, true)) { try { processAccessLogAlert(); } finally { processing.set(false); } } else { log.warn("上一个任务仍在执行中,跳过本次执行"); } } /** * 定时任务入口 - 每2分钟执行一次 */ @Scheduled(cron = "0 */1 * * * ?") @Async public void processAccessLogAlert() { log.info("开始执行访问日志告警处理任务"); try { // 1. 刷新算法配置缓存 loadAlgorithmConfigs(); if (algorithmCache.isEmpty()) { log.warn("没有启用的算法配置,跳过本次处理"); return; } // 2. 获取上次处理时间之后的日志数据 LocalDateTime currentTime = LocalDateTime.now(); //List newLogs = syslogNormalDataMapper.findAfterTime(lastProcessTime); List newLogs = syslogNormalDataMapper.findRequiredFieldsAfterTime(lastProcessTime); //List newLogs =algorithmResultParser.buildNewLogsFromExample(); if (newLogs.isEmpty()) { log.info("没有发现新的日志数据,上次处理时间: {}", lastProcessTime); return; } log.info("获取到 {} 条新的日志数据,时间范围: {} 到 {}", newLogs.size(), lastProcessTime, currentTime); // 3. 处理每条启用的算法配置 for (SecExceptionAlgorithm algorithm : algorithmCache.values()) { try { processAlgorithm(algorithm, newLogs); } catch (Exception e) { log.error("处理算法配置失败 [算法ID: {}, 算法名称: {}]: {}", algorithm.getId(), algorithm.getAlgorithmName(), e.getMessage(), e); } } // 4. 更新处理时间 lastProcessTime = currentTime; log.info("访问日志告警处理任务完成,下次将从 {} 开始处理", lastProcessTime); } catch (Exception e) { log.error("访问日志告警处理任务异常: {}", e.getMessage(), e); } } /** * 处理单个算法配置 */ private void processAlgorithm(SecExceptionAlgorithm algorithm, List logs) { log.info("开始处理算法: {} (ID: {})", algorithm.getAlgorithmName(), algorithm.getId()); // 1. 构建API请求参数 JSONArray requestBody = buildRequestBody(logs); if (requestBody.isEmpty()) { log.info("算法 {} 无有效的请求参数,跳过", algorithm.getAlgorithmName()); return; } // 2. 调用Python算子API ResponseEntity response = callAlgorithmApi(algorithm, requestBody); if (response == null || !response.getStatusCode().is2xxSuccessful()) { log.error("调用算法API失败: {} - {}", algorithm.getApiUrl(), response != null ? response.getStatusCode() : "无响应"); return; } // 直接解析响应体为JSONArray JSONArray results = JSON.parseArray(response.getBody()); if (results == null || results.isEmpty()) { log.info("算法 {} 未检测到告警", algorithm.getAlgorithmName()); return; } // 3. 解析API响应并写入告警表 processApiResponse(algorithm, response.getBody(), logs); } /** * 构建API请求体 */ private JSONArray buildRequestBody(List logs) { JSONArray requestArray = new JSONArray(); for (SyslogNormalData logData : logs) { JSONObject logObject = new JSONObject(); // 按照要求的格式构建请求参数 logObject.put("_id", logData.getId() != null ? logData.getId() : ""); logObject.put("_index", "es1:skyeye-file-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))); logObject.put("_source.@timestamp", ZonedDateTime.of(logData.getLogTime(), ZoneId.systemDefault()) .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); logObject.put("_source.@version", "6"); logObject.put("_source.access_time", logData.getLogTime().format(DATE_TIME_FORMATTER)); logObject.put("_source.device_ip", logData.getDeviceIp() != null ? logData.getDeviceIp() : ""); logObject.put("_source.dip", logData.getDestIp() != null ? logData.getDestIp() : ""); logObject.put("_source.dport", logData.getDestPort() != null ? logData.getDestPort() : 0); logObject.put("_source.dst_mac", logData.getDestMac() != null ? logData.getDestMac() : ""); logObject.put("_source.file_dir", logData.getHostFilePath() != null ? 1 : 0); logObject.put("_source.file_md5", logData.getFileMd5() != null ? logData.getFileMd5() : ""); logObject.put("_source.filename", logData.getFileName() != null ? logData.getFileName() : ""); // Geo信息 JSONObject geoDip = new JSONObject(); geoDip.put("city_name", logData.getDestCity() != null ? logData.getDestCity() : ""); geoDip.put("continent_code", ""); geoDip.put("country_code2", ""); geoDip.put("country_name", logData.getDestCountry() != null ? logData.getDestCountry() : ""); geoDip.put("latitude", logData.getDestLat() != null ? logData.getDestLat() : ""); geoDip.put("longitude", logData.getDestLon() != null ? logData.getDestLon() : ""); JSONObject geoSip = new JSONObject(); geoSip.put("city_name", logData.getSrcCity() != null ? logData.getSrcCity() : ""); geoSip.put("continent_code", ""); geoSip.put("country_code2", logData.getSrcCountryCode() != null ? logData.getSrcCountryCode() : ""); geoSip.put("country_name", logData.getSrcCountry() != null ? logData.getSrcCountry() : ""); geoSip.put("latitude", logData.getSrcLat() != null ? logData.getSrcLat() : ""); geoSip.put("longitude", logData.getSrcLon() != null ? logData.getSrcLon() : ""); logObject.put("_source.geo_dip", geoDip); logObject.put("_source.geo_sip", geoSip); // HTTP相关字段 logObject.put("_source.host", logData.getHttpHost() != null ? logData.getHttpHost() : ""); logObject.put("_source.host_md5", logData.getHostFileMd5() != null ? logData.getHostFileMd5() : ""); logObject.put("_source.host_raw", logData.getHttpReqHeaderRaw() != null ? logData.getHttpReqHeaderRaw() : ""); logObject.put("_source.host_reraw", logData.getHttpReqHeaderRaw() != null ? new StringBuilder(logData.getHttpReqHeaderRaw()).reverse().toString() : ""); logObject.put("_source.method", logData.getHttpMethod() != null ? logData.getHttpMethod() : ""); logObject.put("_source.mime_type", logData.getHttpRespContentType() != null ? logData.getHttpRespContentType() : ""); logObject.put("_source.priv_info", "{\"product_type\": \"sensor\"}"); logObject.put("_source.proto", logData.getProto() != null ? logData.getProto() : ""); logObject.put("_source.referer", logData.getHttpReferer() != null ? logData.getHttpReferer() : ""); logObject.put("_source.serial_num", UUID.randomUUID().toString().substring(0, 9)); logObject.put("_source.sess_key", new Random().nextInt()); logObject.put("_source.sip", logData.getSrcIp() != null ? logData.getSrcIp() : ""); logObject.put("_source.sport", logData.getSrcPort() != null ? logData.getSrcPort() : 0); logObject.put("_source.src_mac", logData.getSrcMac() != null ? logData.getSrcMac() : ""); logObject.put("_source.status", logData.getHttpStatusCode() != null ? String.valueOf(logData.getHttpStatusCode()) : ""); logObject.put("_source.uri", logData.getHttpUrl() != null ? logData.getHttpUrl() : ""); logObject.put("_source.uri_md5", logData.getHttpUrl() != null ? UUID.nameUUIDFromBytes(logData.getHttpUrl().getBytes()).toString() : ""); logObject.put("_source.vendor_id", ""); logObject.put("_source.vlan_id", ""); logObject.put("_type", "skyeye-file"); //补充syslog_normal_data.device_id 字段 logObject.put("_source.device_id", logData.getDeviceId() != null ? logData.getDeviceId() : ""); requestArray.add(logObject); } return requestArray; } /** * 调用算法API */ private ResponseEntity callAlgorithmApi(SecExceptionAlgorithm algorithm, JSONArray requestBody) { try { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.set("Accept", MediaType.APPLICATION_JSON_VALUE); HttpEntity requestEntity; if ("GET".equalsIgnoreCase(algorithm.getApiMethod())) { // 对于GET请求,将参数放在URL中 String url = algorithm.getApiUrl() + "?data=" + requestBody.toJSONString(); requestEntity = new HttpEntity<>(headers); return restTemplate.exchange(url, HttpMethod.GET, requestEntity, String.class); } else { // POST请求 requestEntity = new HttpEntity<>(requestBody.toJSONString(), headers); return restTemplate.exchange(algorithm.getApiUrl(), HttpMethod.POST, requestEntity, String.class); } } catch (Exception e) { log.error("调用算法API异常 [URL: {}]: {}", algorithm.getApiUrl(), e.getMessage(), e); return null; } } /** * 处理API响应 */ @Transactional private void processApiResponse(SecExceptionAlgorithm algorithm, String responseBody, List logs) { try { //JSONObject responseJson = JSON.parseObject(responseBody); JSONArray results = JSON.parseArray(responseBody); /** if (responseJson == null || !responseJson.containsKey("results")) { log.warn("算法API返回格式异常: {}", responseBody); return; } JSONArray results = responseJson.getJSONArray("results"); if (results == null || results.isEmpty()) { log.info("算法 {} 未检测到告警", algorithm.getAlgorithmName()); return; } **/ int alarmCount = 0; for (int i = 0; i < results.size(); i++) { JSONObject alarmResult = results.getJSONObject(i); // 创建告警记录 AlarmVisit alarmVisit = AlarmVisit.builder() .id(UUID.randomUUID().toString()) .createdAt(LocalDateTime.now()) .alarmName(alarmResult.getString("dname")) .alarmLevel("未知") // 可根据算法类型设置,默认未知 .alarmType(algorithm.getExceptionType()) .victimWebUrl(new String[]{alarmResult.getString("url")}) .logStartAt(parseDateTime(alarmResult.getString("access_time"))) .attackIp(new String[]{alarmResult.getString("sip")}) .victimIp(new String[]{alarmResult.getString("dip")}) .httpStatus(alarmResult.getString("status_code")) .comment(alarmResult.getString("reason") + " - " + alarmResult.getString("origin_field")) .originLogIds(new String[]{alarmResult.getString("log_id")}) .engineType("Python算子") .logCount(1) .alarmSource(2) // 算子告警 .attackResult(-1) .fall(0) .attackDirection("other") .etlTime(LocalDateTime.now()) .isAssetHit(0) .focused(false) .baseFocused(false) .isUpdated(false) .judgedState(0) .disposedState(0) .dispositionAdvice("研判后处置") .dnsInfo(alarmResult.getString("host")) .build(); //补充返回结果的原始日志字段 AddOriginLogField(algorithm.getAlgorithmName(),alarmVisit,alarmResult); // 保存告警记录 alarmVisitMapper.insert(alarmVisit); alarmCount++; } log.info("算法 {} 生成 {} 条告警记录", algorithm.getAlgorithmName(), alarmCount); } catch (Exception e) { log.error("处理API响应异常: {}", e.getMessage(), e); throw new RuntimeException("处理API响应失败", e); } } /** * 解析时间字符串 */ private LocalDateTime parseDateTime(String timeStr) { try { if (timeStr == null || timeStr.isEmpty()) { return LocalDateTime.now(); } // 尝试多种时间格式 try { return LocalDateTime.parse(timeStr); } catch (Exception e) { LocalDateTime localDateTime = LocalDateTime.parse( timeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") ); return LocalDateTime.now(ZoneId.systemDefault()); } } catch (Exception e) { log.warn("时间解析失败: {}, 使用当前时间", timeStr); return LocalDateTime.now(); } } /** * 补充返回结果的原始日志字段 * @param AlgorithmName * @param alarmVisit * @param alarmResult * @return */ private boolean AddOriginLogField(String AlgorithmName, AlarmVisit alarmVisit ,JSONObject alarmResult ) { try { JSONObject originLogObject= alarmResult.getJSONObject("origin_log"); if(originLogObject.isEmpty()) { log.debug("算法:{},ID:{} ,AlarmNme:{} 没有返回 origin_log节点.",AlgorithmName, alarmVisit.getId(), alarmVisit.getAlarmName()); return false; } /** 旧版有BUG alarmVisit.setAttackPort( new Integer[]{alarmResult.getInteger("_source.sport")} ); alarmVisit.setVictimPort( new Integer[]{alarmResult.getInteger("_source.dport")} ); alarmVisit.setAttackMethod(alarmResult.getString("_source.method") ); String deviceIp= alarmResult.getString("_source.device_ip"); //alarmVisit.setDeviceId( new Integer[]{ getDeviceID(deviceIp)} ); alarmVisit.setHttpStatus( alarmResult.getString("_source.status")); **/ // _source.sport/dport 在 JSON 中为字符串类型,需要用 getString() 读取后 parseInt String sportStr = originLogObject.getString("_source.sport"); if (sportStr != null && !sportStr.isEmpty()) { alarmVisit.setAttackPort(new Integer[]{Integer.parseInt(sportStr)}); } String dportStr = originLogObject.getString("_source.dport"); if (dportStr != null && !dportStr.isEmpty()) { alarmVisit.setVictimPort(new Integer[]{Integer.parseInt(dportStr)}); } alarmVisit.setAttackMethod(originLogObject.getString("_source.method")); String deviceIp = originLogObject.getString("_source.device_ip"); //alarmVisit.setDeviceId( new Integer[]{ getDeviceID(deviceIp)} ); //补充alarm_visist.device_id String deviceidStr = originLogObject.getString("_source.device_id"); if (deviceidStr != null && !deviceidStr.isEmpty()) { alarmVisit.setDeviceId(new Integer[]{Integer.parseInt(deviceidStr)}); } return true; } catch (Exception e) { log.error("算法:{} 补充原始记录日志字段异常。error:{} ",AlgorithmName,e.getMessage(), e ); return false; } } public int getDeviceID(String source_ip) { //默认deviceId =-1 int deviceId=-1 ; List deviceList= deviceDeviceService.getByIpSafely(source_ip); if(deviceList.isEmpty()) { return deviceId; } if(deviceList.size()>1) { log.error("设备请求的Host IP注册超过一条记录,请联系管理员处理!"); return deviceId; } return deviceList.get(0).getId(); } }