lanny xu commited on
Commit
8f47b0a
·
1 Parent(s): 83b14fe

optimize query speed

Browse files
Files changed (3) hide show
  1. config.py +10 -2
  2. document_processor.py +197 -160
  3. requirements.txt +1 -1
config.py CHANGED
@@ -60,8 +60,8 @@ CHUNK_SIZE = 250
60
  CHUNK_OVERLAP = 50 # 添加重叠以保持上下文连贯性,提升检索准确率
61
 
62
  # 向量数据库配置
63
- VECTOR_STORE_TYPE = os.environ.get("VECTOR_STORE_TYPE", "chroma") # 可选: "chroma", "milvus"
64
- COLLECTION_NAME = "rag-chroma"
65
  EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # HuggingFace嵌入模型
66
 
67
  # Milvus 配置 (仅当 VECTOR_STORE_TYPE="milvus" 时生效)
@@ -75,6 +75,14 @@ MILVUS_PASSWORD = os.environ.get("MILVUS_PASSWORD", "") # Zilliz Cloud 的 API K
75
  # Milvus Lite 配置: 如果设置了 MILVUS_URI (如 "./milvus_demo.db"),将优先使用本地文件模式
76
  MILVUS_URI = os.environ.get("MILVUS_URI", "./milvus_rag.db")
77
 
 
 
 
 
 
 
 
 
78
  # 搜索配置
79
  WEB_SEARCH_RESULTS_COUNT = 3
80
 
 
60
  CHUNK_OVERLAP = 50 # 添加重叠以保持上下文连贯性,提升检索准确率
61
 
62
  # 向量数据库配置
63
+ VECTOR_STORE_TYPE = "milvus" # 强制使用 Milvus
64
+ COLLECTION_NAME = "rag-milvus"
65
  EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # HuggingFace嵌入模型
66
 
67
  # Milvus 配置 (仅当 VECTOR_STORE_TYPE="milvus" 时生效)
 
75
  # Milvus Lite 配置: 如果设置了 MILVUS_URI (如 "./milvus_demo.db"),将优先使用本地文件模式
76
  MILVUS_URI = os.environ.get("MILVUS_URI", "./milvus_rag.db")
77
 
78
+ # Milvus 性能调优 (百万级数据推荐配置)
79
+ # 索引类型: HNSW (最快/吃内存), IVF_SQ8 (省内存/速度快/轻微精度损失), IVF_FLAT (平衡)
80
+ MILVUS_INDEX_TYPE = "HNSW"
81
+ # 索引构建参数 (M: 邻居数, efConstruction: 构建深度)
82
+ MILVUS_INDEX_PARAMS = {"M": 8, "efConstruction": 64}
83
+ # 搜索参数 (ef: 搜索范围,值越小越快但精度越低。默认是 10,百万级建议设为 30-50)
84
+ MILVUS_SEARCH_PARAMS = {"ef": 10}
85
+
86
  # 搜索配置
87
  WEB_SEARCH_RESULTS_COUNT = 3
88
 
document_processor.py CHANGED
@@ -9,7 +9,7 @@ except ImportError:
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
10
 
11
  from langchain_community.document_loaders import WebBaseLoader
12
- from langchain_community.vectorstores import Chroma
13
  from langchain_community.embeddings import HuggingFaceEmbeddings
14
  from langchain_community.retrievers import BM25Retriever
15
 
@@ -32,6 +32,9 @@ from config import (
32
  MILVUS_USER,
33
  MILVUS_PASSWORD,
34
  MILVUS_URI,
 
 
 
35
  # 查询扩展配置
36
  ENABLE_QUERY_EXPANSION,
37
  QUERY_EXPANSION_MODEL,
@@ -81,6 +84,29 @@ class CustomEnsembleRetriever:
81
  doc.metadata["retriever_weight"] = self.weights[i]
82
  all_results.append(doc)
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  # 根据权重排序并去重
85
  # 简单实现:先按检索器索引排序,再按权重排序
86
  all_results.sort(key=lambda x: (x.metadata["retriever_index"], -x.metadata["retriever_weight"]))
@@ -239,71 +265,86 @@ class DocumentProcessor:
239
  if persist_directory is None:
240
  import os
241
  current_dir = os.path.dirname(os.path.abspath(__file__))
242
- persist_directory = os.path.join(current_dir, 'chroma_db')
243
  os.makedirs(persist_directory, exist_ok=True)
244
- print(f"💾 使用默认持久化目录: {persist_directory}")
245
 
246
- if VECTOR_STORE_TYPE.lower() == "milvus":
247
- try:
248
- from langchain_community.vectorstores import Milvus
 
 
 
 
 
 
 
 
249
 
250
- # 准备连接参数
251
- connection_args = {}
252
 
253
- # 优先使用 URI (支持 Milvus Lite 本地文件 或 Zilliz Cloud)
254
- # 只要 MILVUS_URI 被设置(config中默认是 ./milvus_rag.db),且不是空字符串
255
- if MILVUS_URI and len(MILVUS_URI.strip()) > 0:
256
- # 判断是本地文件还是云服务
257
- is_local_file = not (MILVUS_URI.startswith("http://") or MILVUS_URI.startswith("https://"))
258
- mode_name = "Lite (Local File)" if is_local_file else "Cloud (HTTP)"
259
-
260
- print(f"🔄 正在连接 Milvus {mode_name} ({MILVUS_URI})...")
261
- connection_args["uri"] = MILVUS_URI
262
-
263
- # 如果是云服务,通常需要 token (使用 password 字段作为 token)
264
- if not is_local_file and MILVUS_PASSWORD:
265
- connection_args["token"] = MILVUS_PASSWORD
266
- else:
267
- # 传统的 Host/Port 连接
268
- print(f"🔄 正在连接 Milvus Server ({MILVUS_HOST}:{MILVUS_PORT})...")
269
- connection_args = {
270
- "host": MILVUS_HOST,
271
- "port": MILVUS_PORT,
272
- "user": MILVUS_USER,
273
- "password": MILVUS_PASSWORD
274
- }
275
 
276
- self.vectorstore = Milvus.from_documents(
277
- documents=doc_splits,
278
- embedding=self.embeddings,
279
- collection_name=COLLECTION_NAME,
280
- connection_args=connection_args,
281
- drop_old=True # 重新创建索引
282
- )
283
- print("✅ Milvus 向量数据库初始化成功")
284
- except ImportError:
285
- print("❌ 未安装 pymilvus,请运行: pip install pymilvus")
286
- raise
287
- except Exception as e:
288
- print(f"❌ Milvus 连接失败: {e}")
289
- print("⚠️ 回退到 Chroma 数据库...")
290
- # Fallback to Chroma
291
- self.vectorstore = Chroma.from_documents(
292
- documents=doc_splits,
293
- collection_name=COLLECTION_NAME,
294
- embedding=self.embeddings,
295
- persist_directory=persist_directory
296
- )
297
- else:
298
- # Default: Chroma
299
- self.vectorstore = Chroma.from_documents(
300
  documents=doc_splits,
301
- collection_name=COLLECTION_NAME,
302
  embedding=self.embeddings,
303
- persist_directory=persist_directory # 添加持久化目录
 
 
 
 
 
 
 
 
 
 
 
304
  )
305
-
306
- self.retriever = self.vectorstore.as_retriever()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
307
 
308
  # 如果启用混合检索,创建BM25检索器和集成检索器
309
  if ENABLE_HYBRID_SEARCH:
@@ -394,17 +435,48 @@ class DocumentProcessor:
394
  print(f"⚠️ 异步查询扩展失败: {e}")
395
  return [query]
396
 
397
- async def async_hybrid_retrieve(self, query: str, top_k: int = 5) -> List:
398
- """异步混合检索"""
 
 
 
 
 
 
 
 
 
399
  if not ENABLE_HYBRID_SEARCH or not self.ensemble_retriever:
 
 
 
400
  return await self.retriever.ainvoke(query)
401
 
402
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
403
  results = await self.ensemble_retriever.ainvoke(query)
404
  return results[:top_k]
405
  except Exception as e:
406
  print(f"⚠️ 异步混合检索失败: {e}")
407
  print("回退到向量检索")
 
 
408
  return await self.retriever.ainvoke(query)
409
 
410
  async def async_enhanced_retrieve(self, query: str, top_k: int = 5, rerank_candidates: int = 20,
@@ -431,11 +503,34 @@ class DocumentProcessor:
431
  # 混合检索或向量检索
432
  all_candidate_docs = []
433
 
 
 
 
 
 
 
 
 
 
 
 
 
434
  async def retrieve_single(q):
435
  if ENABLE_HYBRID_SEARCH:
436
- docs = await self.async_hybrid_retrieve(q, rerank_candidates)
 
437
  else:
438
- docs = await self.retriever.ainvoke(q)
 
 
 
 
 
 
 
 
 
 
439
  if len(docs) > rerank_candidates:
440
  docs = docs[:rerank_candidates]
441
  return docs
@@ -534,15 +629,23 @@ class DocumentProcessor:
534
  # 如果多模态未启用,回退到文本检索
535
  return self.hybrid_retrieve(query, top_k) if ENABLE_HYBRID_SEARCH else self.retriever.invoke(query)[:top_k]
536
 
537
- # 文本检索
538
- text_docs = self.hybrid_retrieve(query, top_k) if ENABLE_HYBRID_SEARCH else self.retriever.invoke(query)[:top_k]
 
539
 
540
- # 如果没有提供图像,直接返回文本检索结果
 
 
 
 
 
 
 
541
  if not image_paths:
542
  return text_docs
543
 
544
  try:
545
- # 图像检索
546
  image_results = []
547
  for image_path in image_paths:
548
  # 检查文件格式
@@ -554,13 +657,30 @@ class DocumentProcessor:
554
  # 编码图像
555
  image_embedding = self.encode_image(image_path)
556
 
557
- # 这里应该实现图像到文本的匹配逻辑
558
- # 由于原始实现中没有图像数据库,我们简化处理
559
- # 在实际应用中,应该有一个图像数据库和相应的检索逻辑
 
 
 
 
 
 
560
 
561
- # 合并文本和图像结果(简化版本)
562
- # 在实际应用中,应该有更复杂的融合逻辑
563
- final_docs = text_docs # 简化版本,仅返回文本结果
 
 
 
 
 
 
 
 
 
 
 
564
 
565
  print(f"✅ 多模态检索完成,返回 {len(final_docs)} 个结果")
566
  return final_docs
@@ -734,95 +854,12 @@ class DocumentProcessor:
734
 
735
 
736
  def initialize_document_processor():
737
- """初始化文档处理器并设置知识库,支持持久化加载和去重"""
738
- import os
739
- import json
740
- import hashlib
741
-
742
- # 设置持久化目录(相对路径)
743
- current_dir = os.path.dirname(os.path.abspath(__file__))
744
- persist_dir = os.path.join(current_dir, 'chroma_db')
745
- metadata_file = os.path.join(current_dir, 'document_metadata.json')
746
 
747
- processor: DocumentProcessor = DocumentProcessor()
748
-
749
- # 加载已处理文档的元数据
750
- processed_sources = set()
751
- if os.path.exists(metadata_file):
752
- try:
753
- with open(metadata_file, 'r', encoding='utf-8') as f:
754
- metadata = json.load(f)
755
- processed_sources = set(metadata.get('processed_sources', []))
756
- print(f"📊 已加载元数据,发现 {len(processed_sources)} 个已处理的数据源")
757
- except Exception as e:
758
- print(f"⚠️ 加载元数据失败: {e}")
759
-
760
- # 检查是否已存在持久化的向量数据库
761
- if os.path.exists(persist_dir) and os.listdir(persist_dir):
762
- print(f"✅ 检测到已存在的向量数据库: {persist_dir}")
763
- print("📂 正在加载持久化的向量数据库...")
764
- try:
765
- # 加载已有的向量数据库
766
- vectorstore = Chroma(
767
- persist_directory=persist_dir,
768
- embedding_function=processor.embeddings,
769
- collection_name=COLLECTION_NAME
770
- )
771
- retriever = vectorstore.as_retriever()
772
-
773
- # 获取文档数量
774
- doc_count = vectorstore._collection.count()
775
- print(f"✅ 已加载持久化的向量数据库,共 {doc_count} 个文档块")
776
-
777
- # 设置processor的vectorstore和retriever
778
- processor.vectorstore = vectorstore
779
- processor.retriever = retriever
780
-
781
- # 检查是否需要添加新数据源
782
- default_urls = set(KNOWLEDGE_BASE_URLS)
783
- new_urls = default_urls - processed_sources
784
-
785
- if new_urls:
786
- print(f"🆕 检测到 {len(new_urls)} 个新的数据源,正在添加...")
787
- try:
788
- # 加载新数据源
789
- new_docs = processor.load_documents(list(new_urls))
790
- new_doc_splits = processor.split_documents(new_docs)
791
-
792
- # 添加到现有向量数据库
793
- vectorstore.add_documents(new_doc_splits)
794
- print(f"✅ 已添加 {len(new_doc_splits)} 个新文档块")
795
-
796
- # 更新元数据
797
- processed_sources.update(new_urls)
798
- with open(metadata_file, 'w', encoding='utf-8') as f:
799
- json.dump({'processed_sources': list(processed_sources)}, f, ensure_ascii=False, indent=2)
800
-
801
- except Exception as e:
802
- print(f"⚠️ 添加新数据源失败: {e}")
803
- else:
804
- print("✅ 所有默认数据源已处理,无需重复加载")
805
-
806
- # doc_splits 设置为 None,因为已经持久化了
807
- doc_splits = None
808
-
809
- return processor, vectorstore, retriever, doc_splits
810
-
811
- except Exception as e:
812
- print(f"⚠️ 加载持久化向量数据库失败: {e}")
813
- print("🔧 将重新创建向量数据库...")
814
-
815
- # 如果没有持久化数据或加载失败,创建新的
816
- print("🔧 正在创建新的向量数据库...")
817
  vectorstore, retriever, doc_splits = processor.setup_knowledge_base()
818
 
819
- # 保存元数据
820
- try:
821
- processed_sources.update(KNOWLEDGE_BASE_URLS)
822
- with open(metadata_file, 'w', encoding='utf-8') as f:
823
- json.dump({'processed_sources': list(processed_sources)}, f, ensure_ascii=False, indent=2)
824
- print(f"✅ 元数据已保存到: {metadata_file}")
825
- except Exception as e:
826
- print(f"⚠️ 保存元数据失败: {e}")
827
-
828
  return processor, vectorstore, retriever, doc_splits
 
9
  from langchain.text_splitter import RecursiveCharacterTextSplitter
10
 
11
  from langchain_community.document_loaders import WebBaseLoader
12
+ from langchain_community.vectorstores import Milvus
13
  from langchain_community.embeddings import HuggingFaceEmbeddings
14
  from langchain_community.retrievers import BM25Retriever
15
 
 
32
  MILVUS_USER,
33
  MILVUS_PASSWORD,
34
  MILVUS_URI,
35
+ MILVUS_INDEX_TYPE,
36
+ MILVUS_INDEX_PARAMS,
37
+ MILVUS_SEARCH_PARAMS,
38
  # 查询扩展配置
39
  ENABLE_QUERY_EXPANSION,
40
  QUERY_EXPANSION_MODEL,
 
84
  doc.metadata["retriever_weight"] = self.weights[i]
85
  all_results.append(doc)
86
 
87
+ return self._process_results(all_results)
88
+
89
+ async def ainvoke(self, query):
90
+ """异步执行检索并合并结果"""
91
+ import asyncio
92
+
93
+ # 并发获取各检索器的结果
94
+ # 注意:假设所有 retriever 都支持 ainvoke
95
+ tasks = [retriever.ainvoke(query) for retriever in self.retrievers]
96
+ results_list = await asyncio.gather(*tasks)
97
+
98
+ all_results = []
99
+ for i, results in enumerate(results_list):
100
+ for doc in results:
101
+ # 添加检索器索引和权重信息
102
+ doc.metadata["retriever_index"] = i
103
+ doc.metadata["retriever_weight"] = self.weights[i]
104
+ all_results.append(doc)
105
+
106
+ return self._process_results(all_results)
107
+
108
+ def _process_results(self, all_results):
109
+ """排序和去重处理"""
110
  # 根据权重排序并去重
111
  # 简单实现:先按检索器索引排序,再按权重排序
112
  all_results.sort(key=lambda x: (x.metadata["retriever_index"], -x.metadata["retriever_weight"]))
 
265
  if persist_directory is None:
266
  import os
267
  current_dir = os.path.dirname(os.path.abspath(__file__))
268
+ persist_directory = os.path.join(current_dir, 'milvus_data')
269
  os.makedirs(persist_directory, exist_ok=True)
270
+ # print(f"💾 使用默认持久化目录: {persist_directory}") # Milvus 不需要这个
271
 
272
+ # 强制使用 Milvus
273
+ try:
274
+ # 准备连接参数
275
+ connection_args = {}
276
+
277
+ # 优先使用 URI (支持 Milvus Lite 本地文件 或 Zilliz Cloud)
278
+ # 只要 MILVUS_URI 被设置(config中默认是 ./milvus_rag.db),且不是空字符串
279
+ if MILVUS_URI and len(MILVUS_URI.strip()) > 0:
280
+ # 判断是本地文件还是云服务
281
+ is_local_file = not (MILVUS_URI.startswith("http://") or MILVUS_URI.startswith("https://"))
282
+ mode_name = "Lite (Local File)" if is_local_file else "Cloud (HTTP)"
283
 
284
+ print(f"🔄 正在连接 Milvus {mode_name} ({MILVUS_URI})...")
285
+ connection_args["uri"] = MILVUS_URI
286
 
287
+ # 如果是云服务,通常需要 token (使用 password 字段作为 token)
288
+ if not is_local_file and MILVUS_PASSWORD:
289
+ connection_args["token"] = MILVUS_PASSWORD
290
+ else:
291
+ # 传统的 Host/Port 连接
292
+ print(f"🔄 正在连接 Milvus Server ({MILVUS_HOST}:{MILVUS_PORT})...")
293
+ connection_args = {
294
+ "host": MILVUS_HOST,
295
+ "port": MILVUS_PORT,
296
+ "user": MILVUS_USER,
297
+ "password": MILVUS_PASSWORD
298
+ }
 
 
 
 
 
 
 
 
 
 
299
 
300
+ # 添加元数据标签 (Metadata Filtering)
301
+ # 假设 doc_splits 中的文档根据来源或其他属性进行了分类
302
+ # 这里简单示例:如果文档有 'source_type' 元数据,可以利用它
303
+ # 实际应用中,你应该在 split_documents 阶段就给文档打好标签
304
+ for doc in doc_splits:
305
+ if 'source_type' not in doc.metadata:
306
+ # 简单逻辑:根据内容判断是文本还是图像描述(如果是多模态)
307
+ # 或者根据文件名后缀判断
308
+ source = doc.metadata.get('source', '')
309
+ if any(fmt in source.lower() for fmt in SUPPORTED_IMAGE_FORMATS):
310
+ doc.metadata['data_type'] = 'image'
311
+ else:
312
+ doc.metadata['data_type'] = 'text'
313
+
314
+ self.vectorstore = Milvus.from_documents(
 
 
 
 
 
 
 
 
 
315
  documents=doc_splits,
 
316
  embedding=self.embeddings,
317
+ collection_name=COLLECTION_NAME,
318
+ connection_args=connection_args,
319
+ index_params={
320
+ "metric_type": "L2",
321
+ "index_type": MILVUS_INDEX_TYPE,
322
+ "params": MILVUS_INDEX_PARAMS
323
+ },
324
+ search_params={
325
+ "metric_type": "L2",
326
+ "params": MILVUS_SEARCH_PARAMS
327
+ },
328
+ drop_old=True # 重新创建索引
329
  )
330
+ print("✅ Milvus 向量数据库初始化成功")
331
+ except ImportError:
332
+ print("❌ 未安装 pymilvus,请运行: pip install pymilvus")
333
+ raise
334
+ except Exception as e:
335
+ print(f"❌ Milvus 连接失败: {e}")
336
+ raise # 不再回退到 Chroma
337
+
338
+ # 配置检索器参数,应用元数据过滤
339
+ # 默认情况下不添加严格过滤,由上层逻辑决定
340
+ # 但如果只启用纯文本检索,可以默认只检索文本
341
+ retriever_kwargs = {}
342
+ # if ENABLE_MULTIMODAL:
343
+ # 针对文本检索,过滤出 data_type='text' 的数据
344
+ # 注意:这里注释掉是为了支持通过文本检索图像的场景
345
+ # retriever_kwargs["expr"] = "data_type == 'text'"
346
+
347
+ self.retriever = self.vectorstore.as_retriever(search_kwargs=retriever_kwargs)
348
 
349
  # 如果启用混合检索,创建BM25检索器和集成检索器
350
  if ENABLE_HYBRID_SEARCH:
 
435
  print(f"⚠️ 异步查询扩展失败: {e}")
436
  return [query]
437
 
438
+ async def async_hybrid_retrieve(self, query: str, top_k: int = 5, filter_type: str = "text") -> List:
439
+ """异步混合检索
440
+
441
+ Args:
442
+ filter_type: 数据类型过滤,"text" (默认), "image", 或 "all" (不过滤)
443
+ """
444
+ # 构建搜索参数
445
+ search_kwargs = {}
446
+ if filter_type != "all" and ENABLE_MULTIMODAL:
447
+ search_kwargs["expr"] = f"data_type == '{filter_type}'"
448
+
449
  if not ENABLE_HYBRID_SEARCH or not self.ensemble_retriever:
450
+ # 纯向量检索,��接支持 search_kwargs
451
+ if self.vectorstore:
452
+ return await self.vectorstore.asimilarity_search(query, k=top_k, **search_kwargs)
453
  return await self.retriever.ainvoke(query)
454
 
455
  try:
456
+ # 混合检索
457
+ # 注意:目前 CustomEnsembleRetriever 的 invoke/ainvoke 尚未透传 search_kwargs
458
+ # 为了让混合检索也享受到过滤优化,我们需要修改 CustomEnsembleRetriever 或者在这里处理
459
+ # 鉴于 CustomEnsembleRetriever 比较简单,我们假设它主要用于文本
460
+ # 如果需要严格过滤,最好在 vectorstore 层面处理
461
+
462
+ # 临时方案:如果是混合检索且需要过滤,我们可能需要传递给 retriever
463
+ # 但标准 retriever 接口不支持动态传参。
464
+ # 策略:如果 filter_type 是 text (默认),且我们在 init 时已经设置了默认不严格过滤,
465
+ # 这里其实无法动态改变 retriever 的行为,除非我们重新生成一个 retriever 或者修改 retriever.search_kwargs
466
+
467
+ # 动态修改 retriever 的 search_kwargs (这是 LangChain retriever 的特性)
468
+ if filter_type != "all" and ENABLE_MULTIMODAL:
469
+ self.retriever.search_kwargs["expr"] = f"data_type == '{filter_type}'"
470
+ else:
471
+ self.retriever.search_kwargs.pop("expr", None)
472
+
473
  results = await self.ensemble_retriever.ainvoke(query)
474
  return results[:top_k]
475
  except Exception as e:
476
  print(f"⚠️ 异步混合检索失败: {e}")
477
  print("回退到向量检索")
478
+ if self.vectorstore:
479
+ return await self.vectorstore.asimilarity_search(query, k=top_k, **search_kwargs)
480
  return await self.retriever.ainvoke(query)
481
 
482
  async def async_enhanced_retrieve(self, query: str, top_k: int = 5, rerank_candidates: int = 20,
 
503
  # 混合检索或向量检索
504
  all_candidate_docs = []
505
 
506
+ # 决定过滤策略
507
+ # 默认情况下,如果只是文本查询,为了性能优化,我们只检索文本数据
508
+ # 如果提供了图像,或者用户显式要求,可以放开限制
509
+ filter_type = "text" # 默认只搜文本,实现百万级数据的性能优化
510
+ if image_paths:
511
+ filter_type = "all" # 跨模态时搜所有
512
+
513
+ # 构建过滤表达式 (仅用于直接调用 vectorstore 的情况,async_hybrid_retrieve 内部已处理)
514
+ search_kwargs = {}
515
+ if filter_type != "all" and ENABLE_MULTIMODAL:
516
+ search_kwargs["expr"] = f"data_type == '{filter_type}'"
517
+
518
  async def retrieve_single(q):
519
  if ENABLE_HYBRID_SEARCH:
520
+ # 使用支持动态过滤的 hybrid retrieve
521
+ docs = await self.async_hybrid_retrieve(q, rerank_candidates, filter_type=filter_type)
522
  else:
523
+ # 使用带有过滤条件的检索
524
+ if self.vectorstore:
525
+ docs = await self.vectorstore.asimilarity_search(
526
+ q,
527
+ k=rerank_candidates,
528
+ **search_kwargs # 传入 expr
529
+ )
530
+ else:
531
+ # Fallback
532
+ docs = await self.retriever.ainvoke(q)
533
+
534
  if len(docs) > rerank_candidates:
535
  docs = docs[:rerank_candidates]
536
  return docs
 
629
  # 如果多模态未启用,回退到文本检索
630
  return self.hybrid_retrieve(query, top_k) if ENABLE_HYBRID_SEARCH else self.retriever.invoke(query)[:top_k]
631
 
632
+ # 1. 文本查询 (Text-to-Text & Text-to-Image)
633
+ # 如果提供了文本查询,我们希望它能检索到文本和相关图像
634
+ # 此时不应该限制 data_type,或者应该显式包含两者
635
 
636
+ # 如果没有提供图像,这可能是一个纯文本查询,但也可能想搜图
637
+ # 这里我们让 self.retriever (或 hybrid) 负责所有模态的检索
638
+ # (前提是它们都在同一个向量空间,CLIP 可以做到这一点)
639
+ text_docs = []
640
+ if query:
641
+ text_docs = self.hybrid_retrieve(query, top_k) if ENABLE_HYBRID_SEARCH else self.retriever.invoke(query)[:top_k]
642
+
643
+ # 如果没有提供图像输入,直接返回文本查询的结果
644
  if not image_paths:
645
  return text_docs
646
 
647
  try:
648
+ # 2. 图像查询 (Image-to-Text & Image-to-Image)
649
  image_results = []
650
  for image_path in image_paths:
651
  # 检查文件格式
 
657
  # 编码图像
658
  image_embedding = self.encode_image(image_path)
659
 
660
+ # 使用图像嵌入进行检索
661
+ if self.vectorstore:
662
+ # 图像可以检索文本描述,也可以检索相似图像
663
+ # 这里我们不做限制,检索所有类型
664
+ img_docs = self.vectorstore.similarity_search_by_vector(
665
+ embedding=image_embedding,
666
+ k=top_k
667
+ )
668
+ image_results.extend(img_docs)
669
 
670
+ # 合并文本查询结果和图像查询结果
671
+ # 简单合并并去重
672
+ all_docs = text_docs + image_results
673
+
674
+ # 去重
675
+ unique_docs = []
676
+ seen_content = set()
677
+ for doc in all_docs:
678
+ content = doc.page_content
679
+ if content not in seen_content:
680
+ seen_content.add(content)
681
+ unique_docs.append(doc)
682
+
683
+ final_docs = unique_docs[:top_k]
684
 
685
  print(f"✅ 多模态检索完成,返回 {len(final_docs)} 个结果")
686
  return final_docs
 
854
 
855
 
856
  def initialize_document_processor():
857
+ """初始化文档处理器并设置知识库"""
858
+ print("🚀 初始化文档处理器 (Milvus 版)...")
859
+ processor = DocumentProcessor()
 
 
 
 
 
 
860
 
861
+ # 直接设置知识库
862
+ # Milvus 的连接和索引逻辑在 DocumentProcessor.create_vectorstore 中处理
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
863
  vectorstore, retriever, doc_splits = processor.setup_knowledge_base()
864
 
 
 
 
 
 
 
 
 
 
865
  return processor, vectorstore, retriever, doc_splits
requirements.txt CHANGED
@@ -12,7 +12,7 @@ langchain-text-splitters>=0.0.1
12
  langchain-ollama>=0.1.0
13
 
14
  # 向量数据库和嵌入
15
- chromadb>=0.4.0
16
  pymilvus>=2.4.2 # Milvus 客户端及 Lite 本地模式支持
17
  sentence-transformers>=2.2.0
18
  torch>=2.0.0
 
12
  langchain-ollama>=0.1.0
13
 
14
  # 向量数据库和嵌入
15
+ # chromadb>=0.4.0 # 已移除,统一使用 Milvus
16
  pymilvus>=2.4.2 # Milvus 客户端及 Lite 本地模式支持
17
  sentence-transformers>=2.2.0
18
  torch>=2.0.0