RAG 实战|用 StarRocks + DeepSeek 构建智能问答与企业知识库
RAG 实战|用 StarRocks + DeepSeek 构建智能问答与企业知识库文章作者:
免费影视、动漫、音乐、游戏、小说资源长期稳定更新! 👉 点此立即查看 👈
石强,镜舟科技解决方案架构师
赵恒,StarRocks TSC Member
RAG 和向量索引简介RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合外部知识检索与 AI 生成的技术,弥补了传统大模型知识静态、易编造信息的缺陷,使回答更加准确且基于实时信息。
RAG 的核心流程检索(Retrieval)用户输入问题后,RAG 从外部数据库(如维基百科、企业文档、科研论文等)检索相关内容。检索工具可以是向量数据库、搜索引擎或传统数据库。生成(Generation)将检索到的相关信息与用户输入一起输入生成模型(如 GPT、LLaMA 等),生成更准确的回答。模型基于检索内容“增强”输出,而非仅依赖内部参数化知识。
上图展示了 RAG 的标准流程。首先,图片、文档、视频和音频等数据经过预处理,转换为 Embedding 并存入向量数据库。Embedding 通常是高维 float 数组,借助向量索引(如 HNSW、IVF)进行相似性搜索,加速高效检索。
向量索引通过近似最近邻(ANN)算法优化查询效率,减少高维计算负担。语义搜索匹配用户问题与知识库中的相关内容,使回答基于真实信息,从而降低大模型的“幻觉”风险,提升回答的自然性和可靠性。
StarRocks + DeepSeek 的典型 RAG 应用场景DeepSeek 负责生成高质量 Embedding 和回答,StarRocks 提供实时高效的向量检索,二者结合可构建更智能、更精准的 AI 解决方案。
企业级知识库适用场景:企业内部知识库(文档搜索、FAQ)法律、金融、医药等专业领域问答代码搜索、软件开发文档查询方案:文档嵌入(DeepSeek 负责): 将企业知识库、FAQ、技术文档等数据转换为向量。存储+索引(StarRocks 负责): 使用 HNSW 或 IVFPQ 存储向量存储在 StarRocks 中,支持高效检索。检索增强生成(RAG 负责): 用户输入问题 → DeepSeek 生成查询向量 → StarRocks 进行向量匹配 → 返回相关文档 → DeepSeek 结合文档生成最终回答。AI 客服与智能问答适用场景:智能客服(银行、证券、电商)法律、医疗等专业咨询技术支持自动问答方案:客户对话日志嵌入(DeepSeek 负责): 训练 LLM 处理用户意图,转换历史聊天记录为向量。存储+索引(StarRocks 负责): 采用向量索引让客服系统能够高效查找相似案例。检索增强(RAG 负责): 结合历史客服对话 + 知识库 + DeepSeek LLM 生成答案。示例流程:用户问:“我如何更改银行卡预留手机号?”StarRocks 检索到 3 个最相似的客户服务记录DeepSeek 结合这 3 条历史记录 + 预设 FAQ,生成精准回答操作演示系统组成DeepSeek:提供文本向量化(embedding)和答案生成能力StarRocks:高效存储和检索向量数据(3.4+版本支持向量索引)实现流程:步骤
负责组件
具体实现
环境准备
Ollama StarRocks
用 Ollama 在本地机器上便捷地部署和运行大型语言模型
数据向量化
DeepSeek-Embedding
文本 → 3584 维向量
存储向量
StarRocks
创建表,存入向量
近似最近邻搜索
StarRocks 向量索引
IVFPQ / HNSW 检索
检索增强
模拟 RAG 逻辑
结合检索数据
生成答案
DeepSeek LLM
生成基于真实数据的回答
1.环境准备1.1 DeepSeek 本地部署Tips: 以下内容使用的是 macbook 进行 demo 演示
1.1.1 使用 ollama 安装本地模型在本地部署 DeepSeek 时,Ollama 主要起到模型管理和提供推理接口的作用,支持运行多个不同的 LLM,并允许用户在本地切换和管理不同的模型。
下载 ollama:https://ollama.com/安装 deepseek-r1:7b代码语言:javascript代码运行次数:0运行复制# 该命令会自动下载并加载模型ollama run deepseek-r1:7b登录后复制

1.1.2 Deepseek 初步使用
启动 deepseek
代码语言:javascript代码运行次数:0运行复制执行 ollama run deepseek-r1:7b 直接进入交互模式登录后复制1.1.3 Deepseek 性能优化
直接在命令行设置参数:(参数单次生效)
代码语言:javascript代码运行次数:0运行复制OLLAMA_GPU_LAYERS=35 \OLLAMA_CPU_THREADS=6 \OLLAMA_BATCH_SIZE=128 \OLLAMA_CONTEXT_SIZE=4096 \ollama run deepseek-r1:7b登录后复制1.1.4 deepseek 使用

显而易见:直接使用 deepseek 进行问答,返回的答案是不符合预期的,需要对知识进行修正
1.2 StarRocks 准备工作1.2.1 集群部署版本需求:3.4 及以上
1.2.2 配置设置打开 vector index
代码语言:javascript代码运行次数:0运行复制ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true");登录后复制1.2.3 建库建表建库:
代码语言:javascript代码运行次数:0运行复制create database knowledge_base;登录后复制
建表:存储知识库向量
代码语言:javascript代码运行次数:0运行复制CREATE TABLE enterprise_knowledge ( id BIGINT AUTO_INCREMENT, content TEXT NOT NULL, embedding ARRAY登录后复制NOT NULL, INDEX vec_idx (embedding) USING VECTOR ( "index_type" = "hnsw", "dim" = "3584", "metric_type" = "l2_distance", "M" = "16", "efconstruction" = "40" )) ENGINE=OLAPPRIMARY KEY(id)DISTRIBUTED BY HASH(id) BUCKETS 1PROPERTIES ( "replication_num" = "1" );
Tips: DeepSeek 的 deepseek-r1:7b 模型(7B 参数版本)默认生成高维嵌入向量,通常是 3584 维
2.将文本转成向量测试通过 deepseek 将文本转为 3584 维向量
代码语言:javascript代码运行次数:0运行复制curl -X POST https://localhost:11434/api/embeddings -d '{"model": "deepseek-r1:7b", "prompt": "产品保修期是一年。"}'登录后复制下面将转化的向量数据保存在 StarRocks 中
3.知识存储 (存储向量到 StarRocks)代码语言:javascript代码运行次数:0运行复制import pymysqlimport requestsdef get_embedding(text): url = "https://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} response = requests.post(url, json=payload) response.raise_for_status() return response.json()["embedding"]try: content = "StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。" embedding = get_embedding(content) # 将 Python 列表转换为 StarRocks 的数组格式 embedding_str = "[" + ",".join(map(str, embedding)) + "]" # 例如:[0.1,0.2,0.3] conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() # 使用格式化的数组字符串 sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)" cursor.execute(sql, (content, embedding_str)) conn.commit() print(f"Inserted: {content} with embedding {embedding[:5]}...")except requests.RequestException as e: print(f"Embedding API error: {e}")except pymysql.Error as db_err: print(f"Database error: {db_err}")finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close()登录后复制登录后复制操作演示
import pymysqlimport requests# 获取嵌入向量的函数def get_embedding(text): url = "https://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} response = requests.post(url, json=payload) response.raise_for_status() return response.json()["embedding"]# 从 StarRocks 查询相似内容的函数def search_knowledge_base(query_embedding): try: conn = pymysql.connect( host='39.98.110.249', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() # 将查询向量转换为 StarRocks 的数组格式 embedding_str = "[" + ",".join(map(str, query_embedding)) + "]" # 使用 L2 距离搜索最相似的记录 sql = """ SELECT content, l2_distance(embedding, %s) AS distance FROM enterprise_knowledge ORDER BY distance ASC LIMIT 1 """ cursor.execute(sql, (embedding_str,)) result = cursor.fetchone() if result: return result[0] # 返回最匹配的 content else: return "未找到相关信息。" except pymysql.Error as db_err: print(f"Database error: {db_err}") return "查询失败。" finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close()# 主流程try: query = "StarRocks 的愿景是什么?" query_embedding = get_embedding(query) # 将查询转化为向量 answer = search_knowledge_base(query_embedding) # 从知识库检索答案 print(f"问题: {query}") print(f"回答: {answer}")except requests.RequestException as e: print(f"Embedding API error: {e}")except Exception as e: print(f"Error: {e}")登录后复制执行效果

补充说明:到目前为止的流程仅依赖 StarRocks 进行向量检索,未利用 DeepSeek LLM 进行生成,导致回答生硬且缺乏上下文整合,影响自然性和准确性。为提升效果,应引入 RAG 机制,使检索结果与生成模型深度融合,从而优化回答质量并减少幻觉问题。
5.加入 RAG 增强5.1 将查询知识库的结果,返回给 DeepSeek LLM ,优化回答质量代码语言:javascript代码运行次数:0运行复制# 构造 RAG Promptdef build_rag_prompt(query, retrieved_content): prompt = f""" [系统指令] 你是企业智能客服,基于以下知识回答用户问题: [知识上下文] {retrieved_content} [用户问题] {query} """ return prompt# 调用 DeepSeek 生成回答def generate_answer(prompt): url = "https://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} try: response = requests.post(url, json=payload) response.raise_for_status() full_response = "" for line in response.text.splitlines(): if line.strip(): # 过滤空行 try: json_obj = json.loads(line) if "response" in json_obj: full_response += json_obj["response"] # 只提取答案 if json_obj.get("done", False): break except json.JSONDecodeError as e: print(f"JSON 解析错误: {e}, line: {line}") return clean_response(full_response.strip()) # 处理并去掉 XXX except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return "生成失败。"登录后复制5.2 创建 RAG 过程表:用于记录用户问题、检索结果和生成回答,保存上下文,方便进行长对话,至于长对话,用户可自行探索。
customer_service_log 表建表语句如下:
代码语言:javascript代码运行次数:0运行复制CREATE TABLE customer_service_log ( id BIGINT AUTO_INCREMENT, user_id VARCHAR(50), question TEXT NOT NULL, question_embedding ARRAY登录后复制6.优化后的版本6.1 知识提取代码6.1.1 知识提取代码语言:javascript代码运行次数:0运行复制NOT NULL, retrieved_content TEXT, generated_answer TEXT, timestamp DATETIME NOT NULL, feedback TINYINT DEFAULT NULL) ENGINE=OLAPPRIMARY KEY(id)DISTRIBUTED BY HASH(id) BUCKETS 1PROPERTIES ( "replication_num" = "1");
import pymysqlimport requestsimport jsonfrom datetime import datetimeimport loggingimport re# 获取嵌入向量def get_embedding(text): url = "https://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text,"stream": "true"} response = requests.post(url, json=payload) response.raise_for_status() return response.json()["embedding"]# 从 StarRocks 检索知识def search_knowledge_base(query_embedding): try: conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() embedding_str = "[" + ",".join(map(str, query_embedding)) + "]" sql = """ SELECT content, l2_distance(embedding, %s) AS distance FROM enterprise_knowledge ORDER BY distance ASC LIMIT 3 """ cursor.execute(sql, (embedding_str,)) results=cursor.fetchall() content="" for result in results: content+=result[0] return content except pymysql.Error as db_err: print(f"Database error: {db_err}") return "查询失败。" finally: cursor.close() conn.close()def build_rag_prompt(query, retrieved_content): prompt = f""" [系统指令] 你是企业智能客服,基于以下知识回答用户问题: [知识上下文] {retrieved_content} [用户问题] {query} """ return prompt# 调用 DeepSeek 生成回答def generate_answer(prompt): url = "https://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} try: response = requests.post(url, json=payload) response.raise_for_status() full_response = "" for line in response.text.splitlines(): if line.strip(): # 过滤空行 try: json_obj = json.loads(line) if "response" in json_obj: full_response += json_obj["response"] # 只提取答案 if json_obj.get("done", False): break except json.JSONDecodeError as e: print(f"JSON 解析错误: {e}, line: {line}") return clean_response(full_response.strip()) # 处理并去掉 XXX except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return "生成失败。"# 记录对话日志def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer): try: conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() embedding_str = "[" + ",".join(map(str, question_embedding)) + "]" sql = """ INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp) VALUES (%s, %s, %s, %s, %s, NOW()) """ cursor.execute(sql, (user_id, question, embedding_str, retrieved_content, generated_answer)) conn.commit() except pymysql.Error as db_err: print(f"Database error: {db_err}") finally: cursor.close() conn.close()def clean_response(text): # 去掉所有 xxx 结构 return re.sub(r".*? ", "", text, flags=re.DOTALL).strip()# 主流程def rag_pipeline(user_id, query): try: logging.info(f"开始处理查询: {query}") query_embedding = get_embedding(query) logging.info("获取嵌入向量成功") retrieved_content = search_knowledge_base(query_embedding) logging.info(f"检索到内容: {retrieved_content[:50]}...") # 只展示前50字符 prompt = build_rag_prompt(query, retrieved_content) generated_answer = generate_answer(prompt) logging.info(f"生成回答: {generated_answer[:50]}...") log_conversation(user_id, query, query_embedding, retrieved_content, generated_answer) logging.info("日志记录完成") return generated_answer except Exception as e: logging.error(f"发生错误: {e}", exc_info=True) return "处理失败。"# 测试if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") user_id = "user123" query = "StarRocks 的愿景是什么?" answer = rag_pipeline(user_id, query) print(f"问题: {query}") print(f"回答: {answer}")登录后复制6.1.2 操作演示
登录后复制6.3 完整问答后台服务代码6.3.1 代码结构如下智能问答客服系统 智能问答客服系统

import pymysqlimport requestsdef get_embedding(text): url = "https://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text} response = requests.post(url, json=payload) response.raise_for_status() return response.json()["embedding"]try: content = "StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。" embedding = get_embedding(content) # 将 Python 列表转换为 StarRocks 的数组格式 embedding_str = "[" + ",".join(map(str, embedding)) + "]" # 例如:[0.1,0.2,0.3] conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() # 使用格式化的数组字符串 sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)" cursor.execute(sql, (content, embedding_str)) conn.commit() print(f"Inserted: {content} with embedding {embedding[:5]}...")except requests.RequestException as e: print(f"Embedding API error: {e}")except pymysql.Error as db_err: print(f"Database error: {db_err}")finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close()登录后复制登录后复制6.3.3 知识提取代码语言:javascript代码运行次数:0运行复制import pymysqlimport requestsimport jsonimport loggingimport refrom flask import Flask, request, jsonify, render_templateapp = Flask(__name__)# 配置日志logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")# 获取嵌入向量def get_embedding(text): url = "https://localhost:11434/api/embeddings" payload = {"model": "deepseek-r1:7b", "prompt": text, "stream": "true"} response = requests.post(url, json=payload) response.raise_for_status() return response.json()["embedding"]# 从 StarRocks 检索知识def search_knowledge_base(query_embedding): try: conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() embedding_str = "[" + ",".join(map(str, query_embedding)) + "]" sql = """ SELECT content, l2_distance(embedding, %s) AS distance FROM enterprise_knowledge ORDER BY distance ASC LIMIT 3 """ cursor.execute(sql, (embedding_str,)) results=cursor.fetchall() content="" for result in results: content+=result[0] # result = cursor.fetchone() return content except pymysql.Error as db_err: print(f"Database error: {db_err}") return "查询失败。" finally: cursor.close() conn.close()# 构造 RAG Promptdef build_rag_prompt(query, retrieved_content): return f""" [系统指令] 你是企业智能客服,基于以下知识回答用户问题: [知识上下文] {retrieved_content} [用户问题] {query} """# 调用 DeepSeek 生成回答def generate_answer(prompt): url = "https://localhost:11434/api/generate" payload = {"model": "deepseek-r1:7b", "prompt": prompt} try: response = requests.post(url, json=payload) response.raise_for_status() full_response = "" for line in response.text.splitlines(): if line.strip(): try: json_obj = json.loads(line) if "response" in json_obj: full_response += json_obj["response"] if json_obj.get("done", False): break except json.JSONDecodeError as e: logging.warning(f"JSON 解析错误: {e}, line: {line}") return clean_response(full_response.strip()) # 处理并去掉 XXX except requests.exceptions.RequestException as e: logging.error(f"请求失败: {e}") return "生成失败。"# 记录对话日志def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer): try: conn = pymysql.connect( host='X.X.X.X', port=9030, user='root', password='sr123456', database='knowledge_base' ) cursor = conn.cursor() embedding_str = "[" + ",".join(map(str, question_embedding)) + "]" sql = """ INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp) VALUES (%s, %s, %s, %s, %s, NOW()) """ cursor.execute(sql, (user_id, question, embedding_str, retrieved_content, generated_answer)) conn.commit() except pymysql.Error as db_err: logging.error(f"数据库错误: {db_err}") finally: cursor.close() conn.close()# 清理回答内容,去掉 XXX def clean_response(text): return re.sub(r".*? ", "", text, flags=re.DOTALL).strip()# RAG 处理流程def rag_pipeline(user_id,query): try: logging.info(f"开始处理查询: {query}") query_embedding = get_embedding(query) logging.info("获取嵌入向量成功") retrieved_content = search_knowledge_base(query_embedding) logging.info(f"检索到内容: {retrieved_content[:50]}...") # 只展示前50字符 prompt = build_rag_prompt(query, retrieved_content) generated_answer = generate_answer(prompt) logging.info(f"生成回答: {generated_answer[:50]}...") log_conversation(user_id, query, query_embedding, retrieved_content, generated_answer) logging.info("日志记录完成") return generated_answer except Exception as e: logging.error(f"发生错误: {e}", exc_info=True) return "处理失败。"# Flask API@app.route("/")def index(): return render_template("index.html") # 渲染前端页面@app.route("/ask", methods=["POST"])def ask(): user_id="sr_01" data = request.json question = data.get("question", "") result=rag_pipeline(user_id,question) answer = f"问题:{question}。\n 回答:{result}" return jsonify({"answer": answer})if __name__ == "__main__": user_id = "sr" app.run(host="0.0.0.0", port=9033, debug=True)登录后复制6.3.4 效果演示

参考文档:
Deepseek 搭建:https://zhuanlan.zhihu.com/p/20803691410
Vector index 资料:https://docs.starrocks.io/zh/docs/table_design/indexes/vector_index/
AI 增强分析:用 LLM、RAG 优化查询、智能 SQL 生成、自然语言交互工具 & 插件:开发 AI 扩展、模型集成、自动化运维方案实战案例:分享你的 AI+StarRocks 应用 Demo(附代码/视频更佳!)? 丰厚奖励
Top 10 优秀贡献者将获得 StarRocks 社区荣誉 + 2000 积分奖励(详情参考 StarRocks 布道师计划)优秀项目有机会被最新推荐,并整合进 StarRocks 生态? 立即行动!? 在社区论坛分享你的创意或 AI 实践:https://forum.mirrorship.cn/
相关攻略
Socket连接(准确说是Unix域套接字,Unix Domain Socket,UDS)是MySQL为本地进程间通信设计的专属连接方式,它并非网络协议,而是基于操作系统文件系统实现的进程通信机制。
新智元报道编辑:LRST【新智元导读】ContextBench首次从「过程」评测代码智能体,不再只看是否修好代码,而是追踪它是否精准找到并真正使用了关键代码片段,揭示了当前模型多读少用、被关键词误导
在之前的文章中,举了一个强制类型转换导致死锁的例子,有朋友询问是不是类型转换都不能命中索引,花1分钟细说一下。 《两个小公举,调试MySQL死锁必备!》中,举了一个强制类型转换导致死锁的例子,有朋友
MySQL 索引优化不用追求复杂,把以下五个基础技巧用熟,就能解决80%的索引问题。 MySQL索引优化是提升SQL查询效率的核心方法,用好索引能让慢查询“飞起来”,用不好反而会拖垮数据库。今天整理
今天和大家聊一个让无数 DBA 抓狂的问题:MySQL 异常宕机后,重启卡在 InnoDB。 今天想和大家聊一个让无数DBA抓狂的问题:MySQL异常宕机后,重启卡在“InnoDB: Startin
热门专题
热门推荐
豆包上线视频通话功能:支持实时视频问答 最近,豆包官方放出了一个大消息:App正式上线了实时视频通话功能。这可不是简单的功能叠加,它实实在在地将用户交互体验,带进了一个全新的维度。 那么,这个新功能具体能做什么?简单来说,当你在豆包的电话界面开启视频画面,你面前的就不再是一个冷冰冰的聊天窗口了。豆包
苹果的AI新棋局:向开发者敞开设备端智能的大门 科技圈最近有个消息传得挺热:苹果正计划在2025年的全球开发者大会(WWDC)上,向第三方开发者开放其设备端AI模型。这步棋的目的很明确,就是要激发一波应用创新,并扩展其智能生态的边界。具体来说,苹果正在开发一个软件开发工具包(SDK),这个工具包将允
OpenAI联合创始人:AI智能体十年内难堪大用,“智能体之年”言过其实 最近,OpenAI的联合创始人安德烈·卡帕西(Andrej Karpathy)给出了一个颇为清醒的判断。他公开表示,我们今天谈论的AI智能体,距离真正意义上的“功能完善”,还有很长一段路要走。 话说回来,他的观点非常直接:眼下
《王者荣耀世界》:付费机制革新,从“数值碾压”到“个性表达” 随着《王者荣耀世界》正式上线,其独特的付费设计理念引发了广泛关注。一个明确的趋势是,这款游戏并未延续传统网游依赖数值付费的陈旧模式。其核心设计逻辑,旨在构建一个以深度叙事与自由探索为核心的开放世界。在此框架下,付费机制的角色发生了本质性转
Windows 10 安装 Redis 的六种方法详解 在 Windows 10 操作系统上安装 Redis 数据库,许多开发者会遇到官方不再提供原生 Windows 版本支持的难题。常见问题包括命令无法识别、Windows 服务注册失败或配置文件加载错误。这些问题通常源于版本兼容性、系统环境变量配





