Day 9:成果落地 —— Act 阶段战报生成与大屏数据落盘
好了,各位,这已经是 PEAK 框架实战的第9天。今天我们要完成整个链路中最重要的闭环:Act with Knowledge(基于知识采取行动),让成果真正落地。
我们来回顾一下现场情况:经过昨天(Day 8)的代码执行,Python 内存里已经躺着一个叫 all_hunt_evidence 的列表,里面装满了类似 round_1_hit_count: 150 这样的硬核数据。接下来要做的,就是把这一堆冰冷的数据喂给大模型,让它基于这些证据进行二次推理,最终产出一份完整的 JSON 战报,再连同之前的所有证据,打包成一个巨大的“企业级 Payload”,直接写入 Splunk 索引。
这才是真正的成果落地,不是停留在屏幕上的几行日志,而是可查询、可展示的数据资产。

Step 1: Act 阶段 —— 喂入证据、二次触发 LLM 与防御性解析
这一步的核心动作是这样的:把 Day 8 那个 for 循环结束后得到的结果,直接对接上 Day 9 的推理引擎。
整个流程可以分为三个关键环节:
第一,数据序列化。 Python 里的字典数组,必须转成 JSON 字符串,原因很简单——大模型只认识文本,不认识 Python 对象。这是个基本功,但不少人会在这里翻车,导致后续流程中断。
第二,构建 Prompt。 这一步需要精心设计身份设定。我们要给 LLM 赋予“安全总监”的角色,要求它根据命中数(Hit Counts)之间的逻辑关系来评分。举个例子:如果 R1 命中数很高,但 R2 深度下钻的结果却是0,那基本可以判定这是误报;反过来,如果 R1 和 R2 都很高,那就不是巧合了,这是真实的攻击行为。最终它会给出一个 0-100 的确切风险评分。
第三,防弹解析。 必须要加 try...except json.JSONDecodeError 保护。原因很现实——大模型偶尔会抽风,如果它输出一段非 JSON 格式的文字,而你没有任何异常处理,整个5分钟的调度任务就会瞬间崩溃。这不是技术洁癖,这是企业级生产环境的底线要求,确保数据管道永不中断。
这是需要接在 Day 8 代码下方的核心逻辑代码:
# ==========================================
# STEP 4: The Act Phase (Day 9 AI Final Qualification)
# ==========================================
helper.log_info("Initiating Act Phase: Triggering LLM API for Final Assessment...")
# 1. Serialize the collected evidence list into a JSON string to feed the LLM
evidence_payload = json.dumps(all_hunt_evidence, ensure_ascii=False)
# 2. In a production environment, you would construct your messages here
# and execute the second API call. For testing the pipeline, we mock the response.
# This mock represents the LLM analyzing the 'evidence_payload' and making a decision.
mock_act_response = """{
"executive_summary": "通过两轮下钻分析,假设1(暴力破解)在R2深度关联中发现连续高频请求,证实存在凭证填充攻击风险。",
"threat_qualification": "Confirmed Threat",
"risk_score": 92,
"recommended_alert_spl": "search index=main CIp=* | bin _time span=1s | stats count by _time, CIp | where count > 10"
}"""
# 3. Defensive parsing block to handle potential AI format hallucinations
final_report = {}
try:
# Strip whitespace to prevent JSONDecodeError from trailing newlines
final_report = json.loads(mock_act_response.strip())
helper.log_info(f"AI Assessment Complete. Final Risk Score calculated: {final_report.get('risk_score')}")
except json.JSONDecodeError as e:
helper.log_error(f"Critical error: Failed to parse final report JSON. Detail: {str(e)}")
# Fallback mechanism to ensure the data pipeline does not break
final_report = {
"executive_summary": "Error parsing AI response during Act Phase.",
"threat_qualification": "Unknown",
"risk_score": 0,
"recommended_alert_spl": ""
}
Step 2: 数据落盘 —— 组装上帝视角 Payload 并写入 Splunk
这里必须指出一个很多新手都会犯的致命错误:她们习惯用 print() 或者 helper.log_info() 来输出结果。这是大错特错的!打印在后台日志(也就是 _internal 索引)里的东西,企业级大屏根本看不见,也搜不到。
真正的数据落盘需要三个步骤:
第一,组装 Master Payload。 我们需要把 Day 7 的蓝图(ai_hunting_plan)、Day 8 的证据(execution_metrics)和刚才 Day 9 得到的终极定性(final_assessment)合并到一个字典里。这就好比一份完整的案件卷宗——起因、经过、结果,缺一不可。
第二,强制指定 Sourcetype。 使用 helper.new_event() 时,必须把 sourcetype 显式声明为 _json。这一步的意义在于:它会触发 Splunk 底层的自动字段提取机制(Auto KV Extraction),让复杂的 JSON 树在前端直接被拆解成可被搜索的独立字段。别小看这个细节,这是后续大屏展示的基础。
第三,提交写入。 使用 ew.write_event(event) 把这个史诗级的事件永久打入业务索引(比如 main 索引)。从此,这条数据不再只是内存中的一个临时变量,而是变成了企业数据资产的一部分。
对应的落盘逻辑代码如下:
# ==========================================
# STEP 5: Data Ingestion (Writing the Master Payload to Splunk)
# ==========================================
helper.log_info("Assembling Master Hunt Report for index ingestion...")
# 1. Assemble the ultimate JSON document containing all three phases of PEAK
master_payload = {
"event_type": "PEAK_Hunting_Report", # CRITICAL: Anchor field for Dashboard searches
"timestamp": datetime.datetime.utcnow().isoformat(),
"target_index": target_index,
"hunting_plan": ai_hunting_plan, # The original blueprint from Prepare Phase
"execution_metrics": all_hunt_evidence, # The hits and duration from Execute Phase
"final_assessment": final_report # The ultimate AI qualification from Act Phase
}
# 2. Create a new Splunk event object using the Add-on Builder helper
# Setting sourcetype to "_json" enables native Splunk JSON syntax highlighting and field extraction
event = helper.new_event(
source=helper.get_input_type(),
index=target_index,
sourcetype="_json",
data=json.dumps(master_payload, ensure_ascii=False)
)
# 3. Commit the event to the user-specified Splunk data store
ew.write_event(event)
# Stop the master stopwatch and calculate total execution time
total_cycle_time = round(time.time() - cycle_start_time, 2)
helper.log_info(f"SUCCESS: Master Hunt Report completely written to Splunk! Total cycle time: {total_cycle_time} seconds.")
# 4. Catch-all exception block for the entire Agentic Workflow
except Exception as e:
helper.log_error(f"Critical Failure during Agentic Execution Workflow: {str(e)}")
Step 3: 终极极客验证 —— 全自动闭环的结果确认
代码写完还不算完。真正的极客从来不会相信“理论上应该能跑”,我们必须用 SPL 在前端验证数据是否真的完美结构化落地了。
验证操作步骤很清晰:
- 在 AOB 代码编辑器的右上角,点击绿色的 Test 按钮。
- 观察底部的 Output 面板,等待系统打印出绿色的
Done。 - 别忘了点击右上角的 Sa ve 按钮保存代码!很多人在这里翻车——不点 Sa ve,代码永远只是编辑器里的草稿。
- 打开一个新的浏览器标签页,进入 Splunk 的 Search & Reporting(搜索与报表)应用。
- 在搜索框中输入下面这条极客指令(记得把时间范围调整为 Last 15 minutes):
index=main sourcetype="_json" event_type="PEAK_Hunting_Report"
| rename final_assessment.risk_score as Risk_Score,
final_assessment.threat_qualification as Threat_Level,
final_assessment.executive_summary as Summary
| table _time, target_index, Threat_Level, Risk_Score, Summary
| sort - Risk_Score
那么,你会看到什么?答案是:表格中会清晰地展现出 Risk_Score = 92, Threat_Level = Confirmed Threat,以及详细的中文战报摘要。
这背后的架构意义值得多说两句。由于我们在 Step 5 中设置了 sourcetype="_json",Splunk 极其聪明地自动解析了深达三层嵌套的 JSON 结构——它自动识别出了 final_assessment.risk_score 这个字段路径。这种无缝的结构化数据落盘能力,正是 Day 16 我们能够用几行 SPL 就画出极其绚丽的高管安全态势大屏的核心保障。
数据已经落地,战报已经生成。接下来的日子里,我们要用这些数据,给管理层讲一个生动而有说服力的安全故事。
