前情
本篇文章源于一个学习笔记——在掌握 Langchain 基础后,作者通过一个小型实战项目巩固理解,经过一段时间的整理才得以发布。从标题可以预见,该项目定位于 Langchain 中级实战路径,旨在将刚学到的抽象概念落地到具体场景中,帮助读者快速上手使用。
需求背景
业务场景
电商平台客户反馈自动化处理系统。
需求描述
某电商平台需要一套自动化客户反馈处理方案,实现以下核心能力:
- 情感分析:判断用户反馈的情感倾向(积极/中性/消极)
- 问题分类:识别反馈中涉及的具体问题类型
- 紧急程度评估:根据内容自动评定处理优先级
- 生成回复草稿:基于分析结果自动生成初步客服回复
核心技术
技术点
-
LangChain + LCEL 任务编排 —— 将复杂的 NLP 流程(分析、分类、生成)抽象为一组可组合的
Runnable组件。通过RunnableParallel、RunnableLambda、RunnablePassthrough等原语,构建了一条清晰的数据流管道:extract_chain→analysis_chain→generate_response。 -
“规则+大模型”的混合智能策略 —— 优先使用正则表达式(
re.search(r'ORD\d{10}', text))匹配结构化的订单 ID,仅在正则匹配失败时才降级调用大模型。这种策略兼顾了效率和成本,是实战中的常用技巧。 -
工程化设计 —— 容错与降级:
call_qwen_with_retry实现了指数退避重试(本示例采用简单重试),最终失败则返回友好的降级信息,保障系统鲁棒性。API 与监控:使用 FastAPI 快速封装为 RESTful 服务(/process-feedback),并内置处理耗时监控。注释中还给出了日志、批处理、缓存(如SQLiteCache)的增强方案,体现了从原型到生产的设计思路。配置化思维:optimized_qwen_call函数(见注释)展示了针对不同任务(情感分析、分类、生成)精细化调整大模型参数(temperature、max_tokens)的方法,这是优化效果与成本的关键。
项目架构
(此处应有架构图,但原文未展示具体图片,保留空位)
核心流程
(流程图的文字描述,保留位置)
项目代码
模型生成
# 使用通义千问模型
qwen = ChatTongyi(
model_name="qwen-max",
temperature=0.2, # 控制创造性
max_tokens=2000, # 最大输出长度
streaming=False, # 关闭流式输出
enable_search=True # 启用联网搜索增强
)
带重试的模型调用
容错与降级机制,防止“重试风暴”影响系统稳定性。
def call_qwen_with_retry(prompt, max_retries=3, retry_delay=2):
"""带错误重试的千问模型调用"""
for attempt in range(max_retries):
try:
response = qwen.invoke(prompt)
return response.content
except Exception as e:
print(f"模型调用失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
time.sleep(retry_delay)
return "模型服务暂时不可用,请稍后再试。"
提取订单
def extract_order_id(text: str) -> dict:
"""使用千问模型提取订单ID"""
prompt = f"""你是一个电商订单处理专家,请从以下客户反馈中提取订单ID:
{text}
订单ID通常是"ORD"开头的10位数字组合。如果找不到订单ID,返回"NOT_FOUND"。
请严格按JSON格式返回结果:{{"order_id": "提取结果"}}"""
try:
# 正则提取
match = re.search(r'ORD\d{10}', text)
return {"order_id": match.group(0) if match else "NOT_FOUND"}
except:
# 备选方案:大模型提取
result = call_qwen_with_retry(prompt)
# 尝试解析JSON
return json.loads(result.strip())
情感分类
sentiment: 情感类型, 依赖 LLMconfidence: 置信度, 依赖 LLMkey_phrases: ["短语1", "短语2", "短语3"]
def analyze_sentiment(text: str) -> dict:
"""使用千问模型进行情感分析"""
prompt = f"""请分析以下客户反馈的情感倾向:
「{text}」
要求:
1. 判断情感类型:POSITIVE(积极)/NEUTRAL(中性)/NEGATIVE(消极)
2. 评估置信度(0.0-1.0)
3. 提取3个关键短语
返回JSON格式:{{"sentiment": "情感类型","confidence": 置信度,"key_phrases": ["短语1", "短语2", "短语3"]}}"""
try:
result = call_qwen_with_retry(prompt)
output_parser = JsonOutputParser()
result = output_parser.parse(result)
return result
except Exception as e:
print(f"情感分析失败: {e}")
return {"sentiment": "NEUTRAL", "confidence": 0.7, "key_phrases": []}
问题分类
def classify_issue(text: str) -> dict:
"""使用千问模型进行问题分类"""
prompt = f"""作为电商客服专家,请对以下客户反馈进行分类:
「{text}」
分类选项:
- 物流问题:配送延迟、物流损坏等
- 产品质量:商品瑕疵、功能故障等
- 客户服务:客服态度、响应速度等
- 支付问题:扣款异常、退款延迟等
- 退货退款:退货流程、退款金额等
- 其他:无法归类的反馈
要求:
1. 选择最相关的1-2个分类
2. 按相关性排序
返回JSON格式:{{"categories": ["分类1", "分类2"]}}"""
try:
result = call_qwen_with_retry(prompt)
output_parser = JsonOutputParser()
result = output_parser.parse(result)
return result
except Exception as e:
print(f"问题分类失败: {e}")
return {"categories": ["其他"]}
紧急状态
def assess_urgency(text: str) -> dict:
"""使用千问模型评估紧急程度"""
prompt = f"""作为客服主管,请评估以下客户反馈的紧急程度:
「{text}」
评估标准:
- HIGH(高):包含"紧急"、"立刻"、"马上"或威胁投诉
- MEDIUM(中):表达强烈不满但无立即行动要求
- LOW(低):一般反馈或建议
返回JSON格式:{{"urgency": "紧急级别","sla_hours": 响应时限(小时),"reason": "评估理由"}}"""
try:
result = call_qwen_with_retry(prompt)
output_parser = JsonOutputParser()
result = output_parser.parse(result)
# 确保数值类型
result["sla_hours"] = int(result["sla_hours"])
return result
except Exception as e:
print(f"紧急度评估失败: {e}")
return {"urgency": "MEDIUM", "sla_hours": 24, "reason": "评估失败"}
LLM定制化回复
def generate_response(data: dict) -> dict:
"""使用千问模型生成定制化回复"""
prompt_template = """你是一名资深电商客服专家,请根据以下分析结果生成客户回复:
### 客户反馈原文:
{feedback}
### 分析结果:
- 订单ID:{order_id}
- 情感倾向:{sentiment} (置信度:{confidence:.2f})
- 问题类型:{categories}
- 紧急程度:{urgency} (需在{sla_hours}小时内响应)
{key_phrases_section}
### 回复要求:
1. 根据情感倾向调整语气:
- 积极反馈:表达感谢,适当赞美
- 消极反馈:诚恳道歉,明确解决方案
2. 包含订单ID和问题分类
3. 明确说明处理时限和后续步骤
4. 长度100-150字,使用自然口语
5. 结尾询问是否还有其他问题
请直接输出回复内容,不需要额外说明。"""
# 构建关键短语部分
key_phrases = data.get("key_phrases", [])
if key_phrases:
key_phrases_section = "- 关键要点:" + ",".join(key_phrases[:3])
else:
key_phrases_section = ""
# 填充模板
prompt = prompt_template.format(
feedback=data["original_feedback"],
order_id=data["order_id"],
sentiment=data["sentiment"],
confidence=data.get("confidence", 0.8),
categories="、".join(data["categories"]),
urgency=data["urgency"],
sla_hours=data["sla_hours"],
key_phrases_section=key_phrases_section
)
try:
response = call_qwen_with_retry(prompt)
# 添加紧急标识
if data["urgency"] == "HIGH":
response = f"[紧急] {response}"
return {
"final_response": response,
"assigned_team": data["categories"][0] if data["categories"] else "General",
"result": data
}
except Exception as e:
print(f"回复生成失败: {e}")
return {
"final_response": "感谢您的反馈,我们的团队将尽快处理您的问题。",
"assigned_team": "General"
}
LCEL处理链
extract_chain = RunnableParallel(
order_id=RunnableLambda(extract_order_id),
original_feedback=lambda x: x
)
用户行为分析
之所以采用 RunnableParallel,是因为情感分析、问题分类、紧急程度评估三者之间没有依赖关系,完全可以并行执行,从而节省大模型调用时间,提升系统响应速度。
analysis_chain = RunnableParallel(
# 情感分析
sentiment=RunnableLambda(analyze_sentiment),
# 问题分类
categories=RunnableLambda(classify_issue),
# 紧急程度
urgency=RunnableLambda(assess_urgency)
)
# 步骤3: 组合完整流程
processing_chain = (
# 订单提取,正则命中则直接返回,否则走大模型
extract_chain
| RunnablePassthrough.assign(
analysis=lambda x: analysis_chain.invoke(x["original_feedback"])
)
| {
"original_feedback": lambda x: x["original_feedback"],
"order_id": lambda x: x["order_id"]["order_id"],
"sentiment": lambda x: x["analysis"]["sentiment"].get("sentiment", "NEUTRAL"),
"confidence": lambda x: x["analysis"]["sentiment"].get("confidence", 0.8),
"key_phrases": lambda x: x["analysis"]["sentiment"].get("key_phrases", []),
"categories": lambda x: x["analysis"]["categories"]["categories"],
"urgency": lambda x: x["analysis"]["urgency"]["urgency"],
"sla_hours": lambda x: x["analysis"]["urgency"]["sla_hours"],
"urgency_reason": lambda x: x["analysis"]["urgency"].get("reason", "")
}
| RunnableLambda(generate_response)
)
本地测试
具体测试代码已包含在源码中,此处不再展开说明。
前端和部署
使用 FastAPI 将整套逻辑封装为 RESTful 服务,前端模板仅包含一个简单的 index.html 文件。
app = FastAPI(title="电商客服系统")
主页
@app.get("/")
async def read_index():
return FileResponse("index.html")
反馈接口
核心逻辑在于 processing_chain.invoke(request.content),前端调用后触发完整的 LangChain 大模型处理链。
@app.post("/process-feedback")
async def process_feedback(request: FeedbackRequest):
try:
start = time.time()
result = processing_chain.invoke(request.content)
elapsed = time.time() - start
return {
"success": True,
"processing_time": f"{elapsed:.2f}s",
"result": result
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"处理失败: {str(e)}")
前端JS代码
前端主要实现两个功能:向后台发送消息,以及渲染返回的结果。核心函数 sendMessage 调用接口后,通过 addMessage 显示回复内容,通过 updateAnalysisPanel 更新分析面板数据。
// 发送消息到后端API
async function sendMessage(message) {
showLoading();
try {
const response = await fetch(`${API_BASE}/process-feedback`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
content: message,
user_id: 'web_user'
})
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if (data.success) {
addMessage(data.result.final_response, false);
updateAnalysisPanel(data.result);
} else {
throw new Error('API returned unsuccessful response');
}
} catch (error) {
console.error('Error:', error);
addMessage('抱歉,系统暂时无法处理您的请求,请稍后再试。', false);
analysisResults.innerHTML = '分析失败,请重试
';
}
}
服务运行
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)
Run
(运行截图和示例交互,原文未展示,保留)
源码
完整代码已上传至 GitHub,感兴趣的朋友可自行查阅。
