游乐游手机版
首页/AI教程/文章详情

基于多索引表架构的Vector大规模向量检索

时间:2026-06-22 15:05
针对多租户隔离和大规模向量检索延迟问题,提出基于多索引表架构的解决方案。通过将数据按租户拆分为独立索引,实现物理隔离与检索提速,支持并发检索多索引并合并结果,同时提供CLI和SDK实现按租户定向导入与检索,显著降低响应时延。

在做 RAG 或者语义搜索的时候,向量检索系统通常要面对两个比较棘手的问题。

一个是多租户隔离——比如你是一个SaaS服务商,手里几十上百个企业客户,每个客户的知识库必须天然隔离,谁的也不能串到谁家去。另一个就是数据规模太大,单索引的量级一旦到了千万甚至亿级,检索延迟就很难压下来,实时性往往跟不上。

针对这两个场景,OSS 向量 Bucket 提供了一种挺实用的思路:支持在同一账号、同一地域下创建大量的向量索引(Index)。通过将数据按租户或业务维度拆分到不同的索引中,隔离和性能的问题都能兼顾着解决。

多索引架构的优势

  • 数据隔离:让不同租户的数据放在各自独立的索引里,从物理层面杜绝了跨租户泄露的风险。
  • 检索提速:大表拆成多张小表之后,单次检索的范围立刻变小了。如果再配合并发检索多个索引再合并结果,总体响应时延还能进一步降下来。
  • 运维灵活:每个索引可以独立配置维度、模型和相似度算法。删某个租户的数据也很省事,直接删掉对应的索引就行,不用逐条过滤删除。

通过 CLI 按租户导入数据

oss-vectors-embed 这个 CLI 工具可以指定文件写入到指定的索引里,天然就支持按租户或业务做定向导入。

开始之前,需要确保下面几个条件已经满足:

  • 配置好环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRETDASHSCOPE_API_KEY
  • 已经创建了向量 Bucket,以及每个租户对应的向量索引。

把下面示例里的占位符换成实际值就行:

占位符 说明
阿里云账号 ID
向量 Bucket 名称

按租户写入不同索引

不同租户的数据写入各自独立的索引,数据隔离自然就实现了。

# 将租户 A 的文档写入租户 A 的索引 oss-vectors-embed --account-id "" --vectors-region cn-hangzhou put --vector-bucket-name "" --index-name "tenantcompanya" --model-id text-embedding-v4 --text-value "租户A的知识库文档内容" --key "doc_001" --metadata '{"tenant": "company_a", "category": "faq"}' # 将租户 B 的文档写入租户 B 的索引 oss-vectors-embed --account-id "" --vectors-region cn-hangzhou put --vector-bucket-name "" --index-name "tenantcompanyb" --model-id text-embedding-v4 --text-value "租户B的知识库文档内容" --key "doc_001" --metadata '{"tenant": "company_b", "category": "manual"}'

按租户定向检索

检索时只查目标租户的索引,数据隔离天然生效。

# 仅在租户 A 的索引中检索 oss-vectors-embed --account-id "" --vectors-region cn-hangzhou query --vector-bucket-name "" --index-name "tenantcompanya" --model-id text-embedding-v4 --text-value "常见问题" --top-k 5 --return-metadata

通过 SDK 构建多索引架构

Python SDK

开始前记得安装 alibabacloud-oss-v2 SDK:

pip install alibabacloud-oss-v2

环境变量 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET 也要配置好。

创建多租户索引

以租户 ID 为后缀命名索引,批量创建独立的向量索引。

import alibabacloud_oss_v2 as oss import alibabacloud_oss_v2.vectors as oss_vectors ACCOUNT_ID = "" REGION = "cn-hangzhou" BUCKET = "" def create_vector_client(): credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = REGION cfg.account_id = ACCOUNT_ID return oss_vectors.Client(cfg) client = create_vector_client() # 批量为租户创建索引 tenant_ids = ["companya", "companyb", "companyc"] for tenant_id in tenant_ids: index_name = f"tenant{tenant_id}" result = client.put_vector_index(oss_vectors.models.PutVectorIndexRequest( bucket=BUCKET, index_name=index_name, dimension=1024, data_type="float32", distance_metric="cosine", )) print(f"索引 {index_name} 创建完成,status_code={result.status_code}")

运行后会看到:

索引 tenantcompanya 创建完成,status_code=200 索引 tenantcompanyb 创建完成,status_code=200 索引 tenantcompanyc 创建完成,status_code=200

按租户写入数据

不同租户的数据写入各自对应的索引。

import alibabacloud_oss_v2 as oss import alibabacloud_oss_v2.vectors as oss_vectors ACCOUNT_ID = "" REGION = "cn-hangzhou" BUCKET = "" def create_vector_client(): credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = REGION cfg.account_id = ACCOUNT_ID return oss_vectors.Client(cfg) client = create_vector_client() # 向租户 A 的索引写入数据 result = client.put_vectors(oss_vectors.models.PutVectorsRequest( bucket=BUCKET, index_name="tenantcompanya", vectors=[{ "key": "faq_001", "data": {"float32": [0.1] * 1024}, "metadata": {"tenant": "company_a", "category": "faq"} }] )) print(f"租户 A 写入完成,status_code={result.status_code}") # 向租户 B 的索引写入数据 result = client.put_vectors(oss_vectors.models.PutVectorsRequest( bucket=BUCKET, index_name="tenantcompanyb", vectors=[{ "key": "manual_001", "data": {"float32": [0.2] * 1024}, "metadata": {"tenant": "company_b", "category": "manual"} }] )) print(f"租户 B 写入完成,status_code={result.status_code}")

运行后输出:

租户 A 写入完成,status_code=200 租户 B 写入完成,status_code=200

并发检索多个索引并合并结果

大表拆分之后,用并发检索多张小表再合并排序的方式,确实可以显著降低总响应时延。

from concurrent.futures import ThreadPoolExecutor, as_completed import alibabacloud_oss_v2 as oss import alibabacloud_oss_v2.vectors as oss_vectors ACCOUNT_ID = "" REGION = "cn-hangzhou" BUCKET = "" def create_vector_client(): credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider() cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = REGION cfg.account_id = ACCOUNT_ID return oss_vectors.Client(cfg) def search_index(client, index_name, query_vector, top_k=10): """检索单个索引""" result = client.query_vectors(oss_vectors.models.QueryVectorsRequest( bucket=BUCKET, index_name=index_name, query_vector=query_vector, return_metadata=True, return_distance=True, top_k=top_k, )) return {"index": index_name, "status_code": result.status_code, "vectors": result.vectors or []} def parallel_search(index_names, query_vector, top_k=10): """并发检索多个索引并合并结果""" client = create_vector_client() all_vectors = [] with ThreadPoolExecutor(max_workers=len(index_names)) as executor: futures = {executor.submit(search_index, client, idx, query_vector, top_k): idx for idx in index_names} for future in as_completed(futures): result = future.result() print(f"索引 {result['index']} 返回 {len(result['vectors'])} 条结果") all_vectors.extend(result["vectors"]) # 按 distance 升序排序,取全局 TopK all_vectors.sort(key=lambda v: v.get("distance", float("inf"))) return all_vectors[:top_k] # 并发检索 3 个分表索引 indices = ["tenantcompanya", "tenantcompanyb", "tenantcompanyc"] query_vec = {"float32": [0.1] * 1024} results = parallel_search(indices, query_vec, top_k=5) print(f"\n合并后全局 Top5:") for v in results: print(f"key={v.get('key')}, distance={v.get('distance')}, metadata={v.get('metadata')}")

运行后输出:

索引 tenantcompanya 返回 1 条结果 索引 tenantcompanyb 返回 1 条结果 索引 tenantcompanyc 返回 0 条结果 合并后全局 Top5: key=faq_001, distance=0.0, metadata={'tenant': 'company_a', 'category': 'faq'} key=manual_001, distance=0.19999998807907104, metadata={'tenant': 'company_b', 'category': 'manual'}

这里有个说明:并发检索多个索引之后,在客户端按 distance 排序合并就行了。如果对精度有更高要求,可以引入 Rerank 模型再做一次二次精排。

Go SDK

开始前安装 SDK:

go get github.com/aliyun/alibabacloud-oss-go-sdk-v2

同时确保已经配置了 OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET 环境变量。

创建多租户索引

package main import ( "context" "fmt" "log" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors" ) const ( region = "cn-hangzhou" bucketName = "" accountId = "" ) func main() { cfg := oss.LoadDefaultConfig(). WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()). WithRegion(region). WithAccountId(accountId) client := vectors.NewVectorsClient(cfg) tenantIDs := []string{"companya", "companyb", "companyc"} for _, tenantID := range tenantIDs { indexName := fmt.Sprintf("tenant%s", tenantID) result, err := client.PutVectorIndex(context.TODO(), &vectors.PutVectorIndexRequest{ Bucket: oss.Ptr(bucketName), IndexName: oss.Ptr(indexName), Dimension: oss.Ptr(1024), DataType: oss.Ptr("float32"), DistanceMetric: oss.Ptr("cosine"), }) if err != nil { log.Printf("索引 %s 创建失败: %v", indexName, err) continue } fmt.Printf("索引 %s 创建完成,status_code=%d\n", indexName, result.StatusCode) } }

运行后输出:

索引 tenantcompanya 创建完成,status_code=200 索引 tenantcompanyb 创建完成,status_code=200 索引 tenantcompanyc 创建完成,status_code=200

并发检索多个索引并合并结果

package main import ( "context" "fmt" "log" "sort" "sync" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/vectors" ) const ( region = "cn-hangzhou" bucketName = "" accountId = "" dimension = 1024 ) func makeVector(val float32, dim int) []float32 { v := make([]float32, dim) for i := range v { v[i] = val } return v } func main() { cfg := oss.LoadDefaultConfig(). WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()). WithRegion(region). WithAccountId(accountId) client := vectors.NewVectorsClient(cfg) indices := []string{"tenantcompanya", "tenantcompanyb", "tenantcompanyc"} queryVector := map[string]any{"float32": makeVector(0.1, dimension)} var mu sync.Mutex var allVectors []map[string]any var wg sync.WaitGroup for _, indexName := range indices { wg.Add(1) go func(idx string) { defer wg.Done() result, err := client.QueryVectors(context.TODO(), &vectors.QueryVectorsRequest{ Bucket: oss.Ptr(bucketName), IndexName: oss.Ptr(idx), QueryVector: queryVector, ReturnMetadata: oss.Ptr(true), ReturnDistance: oss.Ptr(true), TopK: oss.Ptr(10), }) if err != nil { log.Printf("索引 %s 检索失败: %v", idx, err) return } fmt.Printf("索引 %s 返回 %d 条结果\n", idx, len(result.Vectors)) mu.Lock() allVectors = append(allVectors, result.Vectors...) mu.Unlock() }(indexName) } wg.Wait() sort.Slice(allVectors, func(i, j int) bool { di, _ := allVectors[i]["distance"].(float64) dj, _ := allVectors[j]["distance"].(float64) return di < dj }) topK := 5 if len(allVectors) < topK { topK = len(allVectors) } fmt.Printf("\n合并后全局 Top%d:\n", topK) for _, v := range allVectors[:topK] { fmt.Printf("key=%v, distance=%v, metadata=%v\n", v["key"], v["distance"], v["metadata"]) } }

运行后输出:

索引 tenantcompanya 返回 1 条结果 索引 tenantcompanyc 返回 0 条结果 索引 tenantcompanyb 返回 1 条结果 合并后全局 Top2: key=faq_001, distance=0, metadata=map[category:faq tenant:company_a] key=manual_001, distance=0.19999998807907104, metadata=map[category:manual tenant:company_b]

最佳实践

  • 索引命名规范:用租户 ID 或业务维度作为索引名称的后缀(像 tenant{tenantid} 这种格式)。需要注意索引名只支持小写字母和数字,不支持下划线和连字符。
  • 租户数比较多的时候:直接利用索引名称做逻辑隔离。OSS 向量索引的创建是秒级的,管理成本几乎可以忽略。
  • 追求极低延迟:一旦单索引的数据量超过千万级别,可以按业务逻辑(比如时间、类别)做水平拆分,再用并发检索多个索引再合并结果的方式来处理。
  • 结果重排(Rerank):多索引表的结果合并之后,可以按 distance 相似度做简单排序,也可以引入专门的 Rerank 模型来做二次排序。
  • 索引清理:删除某个租户或者业务的数据,直接调用 DeleteVectorIndex 删除对应索引就行,省去了逐条过滤删除的麻烦。
来源:https://juejin.cn/post/7628637478082920511
上一篇阿里云PAI-DLC PyTorchJob任务提交参数详解 下一篇AI搜索优化本质争夺知识叙事权而非优化
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
Windows Docker Desktop RabbitMQ生产级部署完整指南
AI教程 · 2026-06-29

Windows Docker Desktop RabbitMQ生产级部署完整指南

前言 在 Windows 本地开发环境中,直接安装 RabbitMQ 确实颇为周折:需要单独配置 Erlang 运行环境、手动管理环境变量、服务启停全凭手工操作。更令人困扰的是,版本兼容冲突、端口占用、环境不一致等问题层出不穷。笔者见过不少开发者为搭建环境就得耗费整整半天时间。 相比之下,借助 Do

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践
AI教程 · 2026-06-29

AI搜索重构制造业采购逻辑的阿里云企业级GEOCMS优化实践

先分享一个切实感受。过去两年,我们与福建制造企业合作较为频繁,发现一个非常突出的现象:超过80%的企业官网,产品参数仍然存放在PDF或图片中。AI爬虫?根本无法抓取。这些企业技术实力不弱、资质证照齐全、应用案例也丰富,但在AI搜索这一全新战场上,它们几乎处于隐身状态。 一、一个正在发生的行业变化 A

阿里云Token Plan团队版功能价格与省钱购买指南
AI教程 · 2026-06-29

阿里云Token Plan团队版功能价格与省钱购买指南

阿里云百炼近期推出了名为“Token Plan 团队版”的全新服务,这一服务专为企业与开发者量身打造,定位为AI大模型订阅平台。通过引入Credits作为统一计量单位,将文本生成、图像生成等多模态AI能力纳入单一计费体系,同时无缝兼容主流AI编程工具及智能体(Agent)生态系统。其核心亮点包括:全

阿里云物联网.NET Core客户端位置信息上报
AI教程 · 2026-06-29

阿里云物联网.NET Core客户端位置信息上报

阿里云物联网平台的位置服务并非一个完全独立的功能模块。位置信息可包含二维坐标与三维坐标,而位置数据的来源本质上是借助设备属性进行上传。换言之,若要让设备上报位置,您需先将其视为一个普通属性进行处理。 1)添加二维位置数据 操作过程十分简洁。进入数据分析 → 空间数据可视化 → 二维数据,点击添加,将

年阿里云服务器选型配置与网站部署全攻略
AI教程 · 2026-06-29

年阿里云服务器选型配置与网站部署全攻略

2026年,阿里云服务器生态已高度成熟,形成了清晰的轻量应用服务器与ECS云服务器两大产品阵营。无论你是计划搭建个人博客、企业官网,还是运营电商平台、进行应用开发,基本都能找到理想的解决方案。本指南将从服务器选型、配置选择、部署流程到安全运维,系统梳理2026年最实用的操作要点,帮助你少走弯路,让网