基于deepseek-v3大模型的应用日志安全分析程序(附完整python源代码)
时间:2025-2-27 23:31 作者:Anglei 分类: 自动化运维
在大语言模型盛世空前的年代,我们做什么事不带大模型玩那都是对不起这个产品,在安全圈外围混了20多年,一直想把安全监管简单化,但面对动辄几百G的LOG日志面前就是一头雾水,一条一条分析真是头都大了,于是想到了deepseek,毕竟是收费的API接口,也不敢太浪费TOKEN,于是仅仅对100多K的日志进行了取样分析,你别说,这分析完后的报告有模有样,一定可以把甲方哄住。快来跟我一起动手操作吧!
新创建两个目录,一个input,一个output,用于存放LOG日志,和分析输出报告。
Python程序如下:
import os
import json
import requests
from datetime import datetime
from pathlib import Path
# 配置参数
INPUT_DIR = "input" # 输入目录
OUTPUT_DIR = "output" # 输出目录
API_URL = "https://api.lkeap.cloud.tencent.com/v1" # 确认API地址
MODEL_NAME = "deepseek-v3" # 使用模型名称
DEEPSEEK_API_KEY= "your api key"
TIMEOUT = 120 # API超时时间
def load_log_files(input_dir):
"""遍历输入目录获取所有日志文件"""
log_files = []
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith(".log"):
log_files.append(Path(root) / file)
return log_files
def analyze_log_with_deepseek(api_key, log_content):
"""调用DeepSeek API进行日志分析"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": MODEL_NAME,
"messages": [
{
"role": "system",
"content": "你是一个专业的安全分析专家,需要仔细分析以下日志内容,检查是否存在安全威胁、攻击迹象、异常行为或潜在风险。请用中文给出详细的分析报告,包含发现的可疑内容和安全建议。"
},
{
"role": "user",
"content": f"请分析以下日志内容:\n{log_content}"
}
],
"temperature": 0.3
}
try:
response = requests.post(API_URL, headers=headers, json=payload, timeout=TIMEOUT)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
except requests.exceptions.RequestException as e:
return f"API请求失败: {str(e)}"
except KeyError:
return "API响应解析失败,请检查返回格式"
def generate_report(results, output_dir):
"""生成分析报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = Path(output_dir) / f"security_report_{timestamp}.md"
report_content = [
"# 日志安全分析报告\n",
f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
f"**分析模型**: {MODEL_NAME}\n\n"
]
for filepath, analysis in results:
report_content.append(f"## 文件: {filepath.name}\n")
report_content.append(f"**路径**: {filepath}\n")
report_content.append("### 分析结果:\n")
report_content.append(f"{analysis}\n\n")
report_content.append("---\n")
with open(report_file, "w", encoding="utf-8") as f:
f.writelines(report_content)
return report_file
def main():
# 检查环境变量
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
print("请设置环境变量: DEEPSEEK_API_KEY")
return
# 创建输出目录
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
# 获取日志文件
log_files = load_log_files(INPUT_DIR)
if not log_files:
print(f"在 {INPUT_DIR} 目录中未找到日志文件")
return
results = []
for log_file in log_files:
try:
# 读取日志内容
with open(log_file, "r", encoding="utf-8") as f:
log_content = f.read()
# 调用API分析
print(f"正在分析: {log_file}...")
analysis_result = analyze_log_with_deepseek(api_key, log_content)
# 保存结果
results.append((log_file, analysis_result))
except Exception as e:
error_msg = f"处理文件 {log_file} 时发生错误: {str(e)}"
results.append((log_file, error_msg))
# 生成报告
report_path = generate_report(results, OUTPUT_DIR)
print(f"分析完成!报告已生成至: {report_path}")
if __name__ == "__main__":
main()
是不是很帅?几百M的日志要分割进行,不然一个文件上去,系统就噶掉了,或者根据起止时间进行自动截取取样分析。

推荐阅读:
![]() 路过(0) |
![]() 雷人(0) |
![]() 握手(2) |
![]() 鲜花(1) |
![]() 鸡蛋(0) |