当业务需求从“爬取网页”转向“批量向多个 AI 平台提问并分析回答”时,后端架构面临的挑战就完全不一样了。上千条任务如何高效调度?调用失败如何自动处理?原始回答如何存储,提取出的指标怎样入库?这篇文章我们将聚焦消费品牌 AI 回答监测这一典型场景,深入剖析从队列调度、并发采集、失败重试到分层存储的完整数据链路。

一、场景特征:采集任务的三个特殊约束
消费品牌进行 AI 回答监测,与常规数据采集相比,存在以下几个显著差异:
任务量波动极大。一次测评可能涵盖 7-8 个消费场景,对接 5-6 个 AI 平台,每个组合再乘以 3-5 轮采样,任务总数轻松突破千级。响应时间不可控。AI 生成回答的耗时从几秒到几十秒不等,若开启联网搜索,响应时长更难以预估。结果非确定性。同一问题多次调用,AI 可能给出截然不同的回答,单次结果基本不具备统计意义。
这三个约束决定了:架构上必须将“任务生产”与“任务执行”彻底解耦,同时为每条回答保留完整的原始证据。
二、整体数据链路
整个数据链路可拆解为五个关键环节:任务生产、队列缓冲、并发消费、分层存储、失败兜底。下面逐步拆解每一层的设计要点。
三、任务生产:用“三要素”拼装消息
调度器不直接调用 AI 平台,而是先生成任务消息并投递到队列中。每条消息包含三个核心维度:
{"task_id": "uuid-xxxx","scene_type": "RECOMMEND","platform": "kimi","query_text": "有哪些适合敏感肌的防晒霜推荐?","target_brand": "某品牌","round": 3,"max_retries": 3,"created_at": "2026-06-29T09:00:00Z"}
问题本身来自预先搭建的问题库,场景标签随任务一同下发,便于后续按场景维度进行聚合分析。
CREATE TABLE question_library (id BIGINT AUTO_INCREMENT PRIMARY KEY,scene_type VARCHAR(30) NOT NULL COMMENT '场景: RECOMMEND/COMPARE/RISK等',query_text TEXT NOT NULL,target_brand VARCHAR(100),status TINYINT DEFAULT 1,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,INDEX idx_scene (scene_type)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
四、队列调度:削峰填谷与并发控制
消息队列在此链路中发挥三大作用:解耦,使任务生产与执行互不阻塞;削峰,瞬间产生的上千条任务排队消费,避免下游 API 被击穿;并发控制,通过限制消费者实例数量,管理对 AI 平台的并发请求。
以腾讯云 TDMQ 为例,推荐配置包括:开启批量投递,降低网络开销;设置消费限流,防止短时间对同一 AI 平台发起过高并发;配置消息超时时间,与云函数执行超时保持一致,建议设为 120 秒。
云函数侧采用单条消息单次处理模式,核心消费逻辑的伪代码如下:
def handle_message(message):task = parse_message(message)adapter = PlatformAdapterFactory.get(task.platform)try:raw_response = adapter.call_api(task.query_text, timeout=60)cos_key = sa ve_raw_to_cos(task, raw_response)structured = extract_metrics(task, raw_response)sa ve_to_db(task, cos_key, structured)return "SUCCESS"except TimeoutError:handle_retry(task)except Exception as e:send_to_dlq(task, str(e))
五、失败重试:分级策略与死信兜底
在消费品牌监测场景中,失败主要分为三类,应对策略各不相同:
失败类型 | 典型原因 | 处理策略 |
|---|---|---|
临时故障 | AI 平台限流、网络抖动 | 指数退避重试(1min/2min/4min) |
超时 | 联网搜索耗时过长 | 延长云函数超时或降级为非联网模式 |
确定性失败 | API Key 失效、参数错误 | 直接进入死信队列,人工介入 |
重试次数由消息体中的 max_retries 字段控制,每次重试 retry_count 1。超出上限的消息将投递至死信队列,避免无限重试浪费资源。
def handle_retry(task):task['retry_count'] = task.get('retry_count', 0) 1if task['retry_count'] <= task['max_retries']:delay = 60 * (2 ** (task['retry_count'] - 1))# 指数退避requeue_with_delay(task, delay)else:send_to_dlq(task, "MAX_RETRIES_EXCEEDED")
六、分层存储:原始留档 结构化入库
存储分为两条独立路径,写入互不干扰。
路径一:对象存储 COS,用于留存原始回答。
// COS 文件命名: {date}/{platform}/{scene_type}/{task_id}.json{"task_id": "uuid-xxxx","platform": "kimi","scene_type": "RECOMMEND","query": "有哪些适合敏感肌的防晒霜推荐?","raw_response": "针对敏感肌,以下是几个值得考虑的品牌:...","response_at": "2026-06-29T09:00:35Z","duration_ms": 34800}
这一层不做任何加工,仅供事后复核。任何指标争议均可回溯至原始回答。
路径二:关系型数据库,用于结构化指标的落库。
CREATE TABLE monitoring_result (id BIGINT AUTO_INCREMENT PRIMARY KEY,task_id VARCHAR(64) NOT NULL UNIQUE,platform VARCHAR(30) NOT NULL,scene_type VARCHAR(30) NOT NULL COMMENT '场景分类',target_brand VARCHAR(100) NOT NULL,is_mentioned TINYINT(1) DEFAULT 0 COMMENT '是否被提及',is_recommended TINYINT(1) DEFAULT 0 COMMENT '是否被推荐',recommendation_rank INT DEFAULT 0 COMMENT '推荐位次,0表示未推荐',has_citation TINYINT(1) DEFAULT 0 COMMENT '是否引用来源',is_valid TINYINT(1) DEFAULT 1 COMMENT '样本有效性',raw_data_url VARCHAR(512) COMMENT 'COS文件链接',sampled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,INDEX idx_brand_scene (target_brand, scene_type),INDEX idx_platform_date (platform, sampled_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
入库时需注意两个要点:task_id 设置唯一约束,防止消息重复消费导致数据重复;is_valid 标记有效样本与超时、报错等无效样本,后续指标计算仅统计 is_valid=1 的数据。
七、场景化查询:数据如何被消费
结构化入库后,按场景维度聚合变得非常便捷。消费品牌可以快速了解自身在不同决策环节的表现差异:
SELECT scene_type,COUNT(*) AS total,SUM(is_mentioned) AS mentioned,SUM(is_recommended) AS recommended,ROUND(SUM(is_mentioned)*100.0/COUNT(*), 1) AS mention_rate,ROUND(SUM(is_recommended)*100.0/COUNT(*), 1) AS recommend_rateFROM monitoring_resultWHERE target_brand = '某品牌防晒霜'AND is_valid = 1AND sampled_at >= '2026-06-01'GROUP BY scene_typeORDER BY mention_rate DESC;
输出示例:
场景 | 样本数 | 提及率 | 推荐率 |
|---|---|---|---|
推荐决策 | 45 | 73.3% | 51.1% |
对比分析 | 30 | 60.0% | 36.7% |
风险判断 | 20 | 25.0% | 5.0% |
这份数据能直观反映:该品牌在推荐场景中可见度较高,但在用户“查风险/口碑”环节几乎隐身。这提示品牌方需加强第三方测评、用户反馈等公开内容的建设。
八、三个工程实践建议
1. 云函数超时设置要留余量
AI 平台在联网搜索模式下,单次调用可能耗时 60-90 秒。云函数超时建议设为 120 秒,同时在代码层面设置 API 调用超时(例如 60 秒),两者配合使用。
2. 消息去重需要业务层保障
消息队列通常提供“至少一次”投递语义,可能引发重复消费。通过 task_id 唯一约束加插入前检查,或采用幂等写入(INSERT IGNORE / ON DUPLICATE KEY UPDATE),可避免同一条采集结果重复入库。
3. 无效样本单独标记而非直接丢弃
对于超时、报错、AI 回复“我还不清楚”等情况,不要直接删除。统一标记 is_valid=0 并保留数据。事后分析无效样本的比例,能反映系统稳定性。若某个平台的无效率突然飙升,很可能意味着接口变动或限流策略调整。
九、结语
场景化 AI 回答采集的工程核心,不在于“调用一次大模型”,而在于将成百上千次调用组织成一条可靠、可追溯、可分析的数据链路。消息队列负责调度与解耦,分层存储兼顾证据留存与指标提取,失败重试机制保障采集完整度,场景标签贯穿全链路,使数据从采集之初就具备业务语义。
这套设计已在消费品牌、企业服务等行业的多平台 AI 回答监测中实际应用。开发者可参考本文思路,结合自身业务场景,在腾讯云上快速搭建原型。
