import os import json import time import pickle import hashlib import re from pathlib import Path from typing import List, Tuple, Optional, Dict from langchain_community.vectorstores import FAISS from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter try: from backend.embeddings import get_embeddings_wrapper from backend.document_processor import DocumentProcessor from backend.chat_history import ChatHistory except ModuleNotFoundError: from embeddings import get_embeddings_wrapper from document_processor import DocumentProcessor from chat_history import ChatHistory from openai import OpenAI import httpx import tiktoken # Try to import EnsembleRetriever and BM25Retriever for hybrid search try: from langchain.retrievers import EnsembleRetriever from langchain_community.retrievers import BM25Retriever ENSEMBLE_AVAILABLE = True except ImportError: ENSEMBLE_AVAILABLE = False EnsembleRetriever = None BM25Retriever = None class NoProxyHTTPClient(httpx.Client): def __init__(self, *args, **kwargs): kwargs.pop("proxies", None) super().__init__(*args, **kwargs) class RAGSystem: """RAG system that indexes summaries and retrieves full text from JSON""" def __init__(self, vectorstore_path: str = "vectorstore", json_path: Optional[str] = None, openai_api_key: Optional[str] = None): self.vectorstore_path = vectorstore_path # JSON path relative to project root if json_path is None: project_root = Path(__file__).resolve().parents[1] self.json_path = str(project_root / "processed_documents.json") else: self.json_path = json_path self.vectorstore = None # Chunk vectorstores directory path if json_path is None: project_root = Path(__file__).resolve().parents[1] self.chunk_vectorstores_path = str(project_root / "chunk_vectorstores") else: project_root = Path(json_path).parent self.chunk_vectorstores_path = str(project_root / "chunk_vectorstores") # Create directory if it doesn't exist os.makedirs(self.chunk_vectorstores_path, exist_ok=True) # Initialize embeddings (supports OpenAI or HuggingFace based on EMBEDDINGS_PROVIDER env var) provider = os.getenv("EMBEDDINGS_PROVIDER", "openai").lower() if provider in ["huggingface", "hf", "nebius"]: # For HuggingFace, use HF_TOKEN embeddings_api_key = os.getenv("HF_TOKEN") if not embeddings_api_key: raise ValueError("HF_TOKEN is required for HuggingFace embeddings. Set HF_TOKEN environment variable.") else: # For OpenAI, use OPENAI_API_KEY embeddings_api_key = openai_api_key or os.getenv("OPENAI_API_KEY") if not embeddings_api_key: raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable.") print(f"[RAGSystem] Using embeddings provider: {provider}") self.embeddings = get_embeddings_wrapper(api_key=embeddings_api_key) # Initialize document processor (always uses OpenAI for LLM processing) openai_api_key_for_processor = openai_api_key or os.getenv("OPENAI_API_KEY") if not openai_api_key_for_processor: raise ValueError("OpenAI API key is required for document processing. Set OPENAI_API_KEY environment variable.") self.processor = DocumentProcessor(api_key=openai_api_key_for_processor) # Initialize LLM client for answering questions os.environ.setdefault("OPENAI_API_KEY", openai_api_key_for_processor) http_client = NoProxyHTTPClient(timeout=60.0) self.llm_client = OpenAI(http_client=http_client) self.llm_model = os.getenv("OPENAI_LLM_MODEL", "gpt-4o-mini") # Chat history manager self.chat_history = ChatHistory(max_history=int(os.getenv("CHAT_HISTORY_TURNS", "10"))) # Cache for JSON file contents and document texts self._json_cache = None self._json_cache_path = None self._text_cache: Dict[str, str] = {} # Cache for document texts by filename # Cache for per-document chunk vectorstores: {filename: {\"vectorstore\": FAISS, \"chunks\": List[Document]}} self._chunk_cache: Dict[str, Dict[str, object]] = {} # Hybrid search configuration (EnsembleRetriever: BM25 + Semantic) self.use_hybrid_search = os.getenv("USE_HYBRID_SEARCH", "false").lower() == "true" self.hybrid_weights = [ float(os.getenv("HYBRID_BM25_WEIGHT", "0.5")), # BM25 weight float(os.getenv("HYBRID_SEMANTIC_WEIGHT", "0.5")) # Semantic (FAISS) weight ] # Normalize weights to sum to 1.0 total_weight = sum(self.hybrid_weights) if total_weight > 0: self.hybrid_weights = [w / total_weight for w in self.hybrid_weights] if self.use_hybrid_search: if not ENSEMBLE_AVAILABLE: print("[RAGSystem] WARNING: USE_HYBRID_SEARCH enabled but EnsembleRetriever not available. Disabling hybrid search.") self.use_hybrid_search = False else: print(f"[RAGSystem] Hybrid search enabled (BM25: {self.hybrid_weights[0]:.2f}, Semantic: {self.hybrid_weights[1]:.2f})") else: print("[RAGSystem] Hybrid search disabled (using semantic search only)") # Try to load existing vectorstore self._load_vectorstore() def _load_vectorstore(self): """Load existing vectorstore if it exists""" if os.path.exists(self.vectorstore_path): try: self.vectorstore = FAISS.load_local( self.vectorstore_path, embeddings=self.embeddings, allow_dangerous_deserialization=True ) # Ensure embedding function is properly set. # LangChain now expects an Embeddings *object* here, not a raw function. if not hasattr(self.vectorstore, "embedding_function") or self.vectorstore.embedding_function is None: self.vectorstore.embedding_function = self.embeddings # Some versions may set a non-callable placeholder; override with our wrapper. elif not callable(getattr(self.vectorstore.embedding_function, "embed_query", None)): self.vectorstore.embedding_function = self.embeddings print(f"Loaded existing vectorstore from {self.vectorstore_path}") except Exception as e: print(f"Could not load existing vectorstore: {e}") self.vectorstore = None def _count_tokens(self, messages: List[Dict[str, str]]) -> int: """Count tokens in messages for the current model""" try: encoding = tiktoken.encoding_for_model(self.llm_model) except KeyError: # Fallback to cl100k_base for unknown models encoding = tiktoken.get_encoding("cl100k_base") tokens = 0 for message in messages: tokens += 4 # Every message follows {role/name}\n{content}\n for key, value in message.items(): tokens += len(encoding.encode(str(value))) if key == "name": # If there's a name, the role is omitted tokens -= 1 # Role is always required and always 1 token tokens += 2 # Every reply is primed with assistant return tokens def _filter_thinking_process(self, text: str) -> str: """Filter out thinking process markers from Qwen model responses""" if not text: return text # Quick path: remove ... sections via regex import re filtered = re.sub(r".*?", "", text, flags=re.DOTALL | re.IGNORECASE) # Remove legacy line-based markers (think>, think:) lines = [] for line in filtered.splitlines(): strip = line.strip() if strip.lower().startswith("think>") or strip.lower().startswith("think:"): continue lines.append(line) return "\n".join(lines).strip() def process_and_index_documents(self, documents_folder: str) -> int: """ Process all PDFs, save to JSON, and index summaries. Skips already processed documents and only indexes new ones. Returns: Number of new documents processed """ # Step 1: Process all PDFs with LLM (skips existing ones) print("Processing PDFs with LLM...") new_processed_docs = self.processor.process_all_pdfs(documents_folder, skip_existing=True) # Step 2: Ensure vectorstore exists - build from existing documents if needed if self.vectorstore is None: # Load all existing documents to build initial vectorstore all_docs = self.processor.load_from_json() if all_docs: print("Building vectorstore from existing processed documents...") all_summary_docs = [] for doc in all_docs: all_summary_docs.append(Document( page_content=doc["summary"], metadata={"filename": doc["filename"]} )) self.vectorstore = FAISS.from_documents( all_summary_docs, embedding=self.embeddings ) os.makedirs(self.vectorstore_path, exist_ok=True) self.vectorstore.save_local(self.vectorstore_path) print(f"Built vectorstore from {len(all_docs)} existing documents") else: print("Warning: No processed documents found in JSON file.") # Step 3: Index summaries for new documents only if new_processed_docs: print("Indexing summaries for new documents...") summary_docs = [] for doc in new_processed_docs: summary_docs.append(Document( page_content=doc["summary"], metadata={"filename": doc["filename"]} )) # Update vectorstore with new documents if self.vectorstore is None: print("Creating new vectorstore from new documents...") self.vectorstore = FAISS.from_documents( summary_docs, embedding=self.embeddings ) else: print("Updating existing vectorstore with new documents...") self.vectorstore.add_documents(summary_docs) # Save vectorstore os.makedirs(self.vectorstore_path, exist_ok=True) self.vectorstore.save_local(self.vectorstore_path) print(f"Indexed {len(new_processed_docs)} new document summaries") else: if self.vectorstore is not None: print("No new documents to process. Using existing vectorstore.") else: print("Warning: No documents found to process or index. Vectorstore not available.") return len(new_processed_docs) @staticmethod def _parse_llm_response(raw_response: str) -> Tuple[str, bool]: """ Parse LLM response to extract answer and found flag. Args: raw_response: The raw response from LLM Returns: Tuple of (answer text, found flag) where found=True means answer was found in context """ # Try to parse as JSON first try: # Look for JSON in the response (might be wrapped in markdown code blocks) response_text = raw_response.strip() # Remove markdown code blocks if present if response_text.startswith("```json"): response_text = response_text[7:] # Remove ```json elif response_text.startswith("```"): response_text = response_text[3:] # Remove ``` if response_text.endswith("```"): response_text = response_text[:-3] # Remove closing ``` response_text = response_text.strip() # Try to parse as JSON parsed = json.loads(response_text) answer = parsed.get("answer", raw_response) found = parsed.get("found", True) # Default to True for backward compatibility return answer, bool(found) except (json.JSONDecodeError, ValueError) as e: # If JSON parsing fails, return the raw response with found=True (backward compatibility) return raw_response, True def _load_json_cached(self) -> List[Dict[str, str]]: """Load JSON file with caching to avoid repeated file I/O""" json_path = Path(self.json_path) # Check if cache is valid (file hasn't changed) if self._json_cache is not None and self._json_cache_path == str(json_path): if json_path.exists(): # Check if file modification time changed current_mtime = json_path.stat().st_mtime if hasattr(self, '_json_cache_mtime') and self._json_cache_mtime == current_mtime: return self._json_cache # Load from file if not json_path.exists(): return [] try: with open(json_path, "r", encoding="utf-8") as f: docs = json.load(f) # Cache the results self._json_cache = docs if isinstance(docs, list) else [] self._json_cache_path = str(json_path) self._json_cache_mtime = json_path.stat().st_mtime return self._json_cache except Exception as e: return [] def _get_text_by_filename_cached(self, filename: str) -> Optional[str]: """Get full text for a document by filename using cache""" # Check text cache first if filename in self._text_cache: return self._text_cache[filename] # Load from JSON cache docs = self._load_json_cached() for doc in docs: if doc.get("filename") == filename: text = doc.get("text", "") # Cache the text self._text_cache[filename] = text return text return None def _sanitize_filename(self, filename: str) -> str: """ Sanitize filename to create a safe directory name. Handles Arabic filenames and special characters. Args: filename: Original filename Returns: Sanitized directory name safe for filesystem """ # Remove extension name_without_ext = Path(filename).stem # Create a hash of the original filename for uniqueness # This ensures Arabic and special characters are handled filename_hash = hashlib.md5(filename.encode('utf-8')).hexdigest()[:8] # Sanitize: keep alphanumeric, Arabic characters, spaces, hyphens, underscores # Replace other special chars with underscore sanitized = re.sub(r'[^\w\s\u0600-\u06FF\-]', '_', name_without_ext) # Replace multiple spaces/underscores with single underscore sanitized = re.sub(r'[\s_]+', '_', sanitized) # Remove leading/trailing underscores sanitized = sanitized.strip('_') # Combine sanitized name with hash for uniqueness if sanitized: return f"{sanitized}_{filename_hash}" else: return filename_hash def _get_chunk_vectorstore_path(self, filename: str) -> str: """ Get the directory path for a document's chunk vectorstore. Args: filename: Document filename Returns: Path to the directory containing the chunk vectorstore """ sanitized_name = self._sanitize_filename(filename) return str(Path(self.chunk_vectorstores_path) / sanitized_name) def _save_chunk_vectorstore(self, filename: str, vectorstore: FAISS, chunks: List[Document]) -> None: """ Save chunk vectorstore and chunks metadata to disk. Args: filename: Document filename vectorstore: FAISS vectorstore to save chunks: List of Document objects (chunks metadata) """ chunk_vs_path = self._get_chunk_vectorstore_path(filename) os.makedirs(chunk_vs_path, exist_ok=True) # Save FAISS vectorstore (saves index.faiss and index.pkl) vectorstore.save_local(chunk_vs_path) # Save chunks metadata as pickle chunks_path = Path(chunk_vs_path) / "chunks.pkl" with open(chunks_path, 'wb') as f: pickle.dump(chunks, f) print(f"[Chunk Vectorstore] Saved chunk vectorstore ") def _load_chunk_vectorstore(self, filename: str) -> Optional[Tuple[FAISS, List[Document]]]: """ Load chunk vectorstore and chunks metadata from disk if exists. Args: filename: Document filename Returns: Tuple of (FAISS vectorstore, List[Document]) if found, None otherwise """ chunk_vs_path = self._get_chunk_vectorstore_path(filename) chunk_vs_path_obj = Path(chunk_vs_path) # Check if vectorstore files exist faiss_index = chunk_vs_path_obj / "index.faiss" faiss_pkl = chunk_vs_path_obj / "index.pkl" chunks_pkl = chunk_vs_path_obj / "chunks.pkl" if not (faiss_index.exists() and faiss_pkl.exists() and chunks_pkl.exists()): return None try: # Load FAISS vectorstore vectorstore = FAISS.load_local( chunk_vs_path, embeddings=self.embeddings, allow_dangerous_deserialization=True ) # Ensure embedding function is properly set to the embeddings wrapper object. if not hasattr(vectorstore, "embedding_function") or vectorstore.embedding_function is None: vectorstore.embedding_function = self.embeddings elif not callable(getattr(vectorstore.embedding_function, "embed_query", None)): vectorstore.embedding_function = self.embeddings # Load chunks metadata with open(chunks_pkl, 'rb') as f: chunks = pickle.load(f) print(f"[Chunk Vectorstore] Loaded chunk vectorstore for '{filename}'") return vectorstore, chunks except Exception as e: print(f"[Chunk Vectorstore] Error loading chunk vectorstore for '{filename}': {e}") return None def _get_or_build_chunk_vectorstore( self, filename: str, full_text: str, chunk_size: int = 2000, chunk_overlap: int = 300 ) -> Tuple[FAISS, List[Document]]: """ Build or retrieve a FAISS vectorstore of semantic chunks for a single document. Checks disk first, then memory cache, then builds if needed. Args: filename: Document filename used as key in cache/metadata full_text: Full document text to chunk chunk_size: Approximate character length for each chunk chunk_overlap: Overlap between consecutive chunks (characters) Returns: Tuple of (FAISS vectorstore over chunks, list of chunk Documents) """ # Step 1: Return from memory cache if available (fastest) if filename in self._chunk_cache: entry = self._chunk_cache[filename] return entry["vectorstore"], entry["chunks"] # type: ignore[return-value] # Step 2: Try to load from disk # NOTE: Existing chunk indexes on disk may have been created with a different distance # metric. To guarantee consistency, delete the `chunk_vectorstores/` directory so # these indexes are rebuilt on demand with the default L2 distance metric. loaded = self._load_chunk_vectorstore(filename) if loaded is not None: vectorstore, chunks = loaded # Cache in memory for faster access self._chunk_cache[filename] = { "vectorstore": vectorstore, "chunks": chunks, } return vectorstore, chunks # Step 3: Build new vectorstore (not found in cache or disk) print(f"[Chunk Vectorstore] Building new chunk vectorstore for '{filename}'") # Create text splitter tuned for Arabic legal text text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=[ "\n\n", "\n", "المادة ", "مادة ", ". ", " ", "" ], ) chunks = text_splitter.split_text(full_text) chunk_docs: List[Document] = [] for idx, chunk in enumerate(chunks): chunk_docs.append( Document( page_content=chunk, metadata={ "filename": filename, "chunk_index": idx, }, ) ) if not chunk_docs: # Fallback: single chunk with entire text chunk_docs = [ Document( page_content=full_text, metadata={ "filename": filename, "chunk_index": 0, }, ) ] # Build chunk-level FAISS index (uses default L2 distance metric) chunk_vectorstore = FAISS.from_documents( chunk_docs, embedding=self.embeddings ) # Step 4: Save to disk for future use self._save_chunk_vectorstore(filename, chunk_vectorstore, chunk_docs) # Step 5: Cache in memory for current session self._chunk_cache[filename] = { "vectorstore": chunk_vectorstore, "chunks": chunk_docs, } return chunk_vectorstore, chunk_docs def _create_ensemble_retriever( self, chunk_vs: FAISS, chunk_docs: List[Document], top_k: int ) -> EnsembleRetriever: """ Create an ensemble retriever combining BM25 and semantic search. Args: chunk_vs: FAISS vectorstore for semantic search chunk_docs: List of all chunk Documents for BM25 top_k: Number of results to return Returns: EnsembleRetriever combining both methods """ if not ENSEMBLE_AVAILABLE: raise ImportError("EnsembleRetriever and BM25Retriever are not available. Install rank-bm25 and ensure langchain-community is up to date.") # Create BM25 retriever from chunk documents bm25_retriever = BM25Retriever.from_documents(chunk_docs) bm25_retriever.k = top_k # Create FAISS retriever faiss_retriever = chunk_vs.as_retriever( search_kwargs={"k": top_k} ) # Combine them with weighted ensemble ensemble_retriever = EnsembleRetriever( retrievers=[bm25_retriever, faiss_retriever], weights=self.hybrid_weights ) return ensemble_retriever def _build_document_candidates( self, question: str, label: str, previous_document: Optional[str], k: int = 5 ) -> List[str]: """ Build a list of document candidates to try based on question label. Args: question: The user's question label: Question label ("law-new" or "law-followup") previous_document: Previous document filename if available k: Number of documents to retrieve from summary search Returns: List of document filenames to try in order """ candidate_docs: List[str] = [] if label == "law-followup" and previous_document: # For follow-up: start with previous document, then search summary candidate_docs.append(previous_document) print(f"[RAG] Follow-up question: starting with previous document: {previous_document}") # Search summary vectorstore excluding the previous document similar_docs_with_scores = self.vectorstore.similarity_search_with_score(question, k=k+1) # Filter out the previous document and add others for doc, score in similar_docs_with_scores: filename = doc.metadata.get("filename", "") if filename != previous_document and filename not in candidate_docs: candidate_docs.append(filename) if len(candidate_docs) >= k + 1: # +1 because we already have previous_doc break if len(candidate_docs) > 1: print(f"[RAG] Added {len(candidate_docs) - 1} additional documents from summary search") else: # For new questions: search summary vectorstore similar_docs_with_scores = self.vectorstore.similarity_search_with_score(question, k=k) if not similar_docs_with_scores: return [] for doc, score in similar_docs_with_scores: filename = doc.metadata.get("filename", "") if filename: candidate_docs.append(filename) if candidate_docs: print(f"[RAG] New question: found {len(candidate_docs)} candidate documents") return candidate_docs def _classify_question(self, question: str, use_history: bool = True, model_provider: str = "openai") -> Tuple[str, Optional[str], Optional[List[str]], Optional[List[str]]]: """ Classify question into one of three categories: law-new, law-followup, or general. Args: question: The user's question use_history: Whether to use chat history model_provider: Model provider to use Returns: Tuple of (label, answer, sources, chunks) where: - label: "law-new", "law-followup", or "general" - For "general": answer contains the answer string, sources=[], chunks=None - For "law-new" or "law-followup": answer=None, sources=None, chunks=None (RAG will handle answering) """ # Get previous turn context for distinguishing law-new from law-followup previous_context = "" if use_history: last_turn = self.chat_history.get_last_turn() if last_turn and len(last_turn) >= 2: prev_user = last_turn[0].get("content", "") if last_turn[0].get("role") == "user" else "" prev_assistant = last_turn[1].get("content", "") if last_turn[1].get("role") == "assistant" else "" if prev_user and prev_assistant: previous_context = f"\n\nPrevious conversation:\nUser: {prev_user}\nAssistant: {prev_assistant}" classification_prompt = f"""Classify the following question as one of: "law-new", "law-followup", or "general". A "law-new" question is: - A law-related question that starts a new topic/thread or asking about different system (نظام) - Not primarily dependent on the immediately previous answer - About legal documents, regulations, laws, articles (المادة), legal cases, procedures, terms, definitions - Anything related to legal matters in documents, but as a new inquiry A "law-followup" question is: - A law-related question that is a follow-up, inference, or clarification based on the previous assistant response - Refers to or builds upon the previous answer (e.g., "what about...", "can you explain more about...", "based on that...", "how about...", "what if...") - Asks for clarification, elaboration, or related information about what was just discussed - Continues the conversation thread about the same legal topic - Uses pronouns or references that relate to the previous response A "general" question is: - Greetings (السلام عليكم, مرحبا, etc.) - Casual conversation - Questions not related to legal documents or law {previous_context} Current Question: {question} If the question is "general", provide a helpful answer in Arabic. If the question is "law-new", respond with only "law-new". If the question is "law-followup", respond with only "law-followup". """ try: # Initialize client based on model_provider if model_provider.lower() in ["qwen", "huggingface"]: hf_token = os.getenv("HF_TOKEN") if not hf_token: # Fallback to OpenAI if HF_TOKEN not available llm_client = self.llm_client llm_model = self.llm_model else: http_client = NoProxyHTTPClient(timeout=60.0) llm_client = OpenAI( base_url="https://router.huggingface.co/v1", api_key=hf_token, http_client=http_client ) llm_model = os.getenv("QWEN_MODEL", "Qwen/Qwen3-32B:nscale") else: llm_client = self.llm_client llm_model = self.llm_model # Build messages with chat history if enabled history_messages = [] if use_history: history_messages = self.chat_history.get_recent_history(n_turns=2) system_prompt = """You are a helpful assistant devloped by "الدكتور الدوسري ". Classify questions into one of three categories and answer general questions in Arabic. If the question is a greeting or general question, provide a friendly, helpful answer in Arabic and mention that you are developed to Answer questions relateed to the KSA personal laws and regulations. If the question is law-related and starts a new topic, respond with only "law-new". If the question is law-related and is a follow-up to the previous response, respond with only "law-followup". Respond with ONLY one of: "law-new", "law-followup", or provide an answer if it's general.""" messages = [{"role": "system", "content": system_prompt}] # Add chat history if history_messages: for msg in history_messages[:-1] if len(history_messages) > 0 and history_messages[-1].get("content") == question else history_messages: messages.append(msg) messages.append({"role": "user", "content": classification_prompt}) response = llm_client.chat.completions.create( model=llm_model, messages=messages, temperature=0.3 ) raw_response = response.choices[0].message.content.strip() # Filter thinking process from Qwen responses if model_provider.lower() in ["qwen", "huggingface"]: raw_response = self._filter_thinking_process(raw_response) # Check classification result response_lower = raw_response.lower().strip() is_law_new = "law-new" in response_lower and len(response_lower) < 20 is_law_followup = "law-followup" in response_lower and len(response_lower) < 20 if is_law_new: print(f"[Classification] Question classified as: law-new") return ("law-new", None, None, None) # Continue with RAG flow elif is_law_followup: print(f"[Classification] Question classified as: law-followup") return ("law-followup", None, None, None) # Continue with RAG flow, will reuse chunks if available else: # General question - use the response as answer answer, _ = self._parse_llm_response(raw_response) # Unpack tuple, ignore found flag for general questions # Update chat history self.chat_history.add_message("user", question) self.chat_history.add_message("assistant", answer) print(f"[Classification] Question classified as: general, answered directly") return ("general", answer, [], None) # Return answer with empty sources and no chunks except Exception as e: # On error, default to law-new to use RAG flow print(f"[Classification] Error classifying question, defaulting to law-new: {e}") return ("law-new", None, None, None) def answer_question( self, question: str, use_history: bool = True, model_provider: str = "openai", context_mode: str = "full", ) -> Tuple[str, List[str], Optional[List[str]]]: """ Answer a question using RAG with multi-turn chat history Args: question: The user's question use_history: Whether to use chat history model_provider: Model provider to use - "openai" (default) or "qwen"/"huggingface" for Qwen model context_mode: Context construction mode - "full" (entire document) or "chunks" (top semantic chunks) Returns: Tuple of (answer, list of source filenames, optional list of chunk texts for testing) """ start_time = time.perf_counter() # Log formatted query for testing print("[QUERY] Formatted Query Log") print("=" * 80) print(f"[QUERY] Question: {question}") print("=" * 80) if self.vectorstore is None: raise ValueError("No documents indexed. Please process documents first.") # Step 0: Classify question into law-new, law-followup, or general classification_start = time.perf_counter() label, answer, sources, chunks = self._classify_question(question, use_history, model_provider) classification_time = (time.perf_counter() - classification_start) * 1000 print(f"[Timing] Question classification: {classification_time:.2f}ms") # If general question was handled, return the result immediately if label == "general": return answer, sources, chunks # Step 1: Build document candidate list based on label search_start = time.perf_counter() previous_document = None if use_history: previous_document = self.chat_history.get_last_document() # Build candidate documents list candidate_documents = self._build_document_candidates(question, label, previous_document, k=5) if not candidate_documents: return "I couldn't find any relevant information to answer your question.", [], None # Step 2: Prepare chat history and context mode history_messages = [] if use_history and label == "law-followup": # Get last 3 messages (get 2 turns = 4 messages, then take last 3) history_messages = self.chat_history.get_recent_history(n_turns=2) # Decide how to construct document context for the LLM context_mode_normalized = (context_mode or "full").lower() if context_mode_normalized not in ["full", "chunks"]: context_mode_normalized = "full" # Build prompts (reusable parts) mode_note = "" if context_mode_normalized == "chunks": mode_note = ( "\n\nNote: The provided document text consists of selected relevant excerpts (مقاطع) " "from the same document, not the full law. Answer strictly based on these excerpts." ) system_prompt = f"""You are a helpful legal document assistant. Answer questions based on the provided document text. {mode_note} MODE 1 - General Questions: - Understand the context and provide a clear, helpful answer - You may paraphrase or summarize information from the document - Explain concepts in your own words while staying true to the document's meaning MODE 2 - Legal Articles/Terms (المادة): - When the user asks about specific legal articles (المادة), legal terms, or exact regulations, you MUST quote the EXACT text from the document (con text) verbatim - Copy the complete text word-for-word, including all numbers, punctuation, and formatting - Do NOT paraphrase, summarize, or generate new text for legal articles (المادة) - NEVER create or generate legal text - only use what exists in the document IMPORTANT - Response Format: - Do NOT include source citations in your answer (e.g., do NOT write "المصدر: نظام الاحوال الشخصية.pdf" or similar source references) - Do NOT mention the document filename or source at the end of your answer - Simply provide the answer directly without any source attribution - Whenever you refere to the document (context or filename) in response, refer to it by the filename WITHOUT the extension such as ".pdf" or".doc" RESPONSE FORMAT (JSON) - CRITICAL: You MUST respond in JSON format with the following structure: {{ "answer": "your answer in Arabic here", "found": true or false }} - "answer": Your answer to the question in Arabic. - "found": Set to TRUE only if you can answer the question based on the provided document context. Set to FALSE if the question cannot be answered from the provided document excerpts OR if your answer says you could not find it. MUST Answer in Arabic.""" # Check if question contains legal article/term keywords is_legal_term_question = any(keyword in question for keyword in ["المادة", "مادة", "article", "نص","النص", "نص القانون"]) legal_instruction = "" if is_legal_term_question: legal_instruction = "\n\nCRITICAL: The user is asking about a legal article or legal term. Carefully search the provided context to find the relevant article. Reference the article correctly as it has been stated in the context. Articles might be referenced by their content, position, or topic - for example, 'المادة الأولى' might refer to the first article in a section even if not explicitly numbered. Find and quote the relevant text accurately from the document, maintaining the exact wording as it appears. Do NOT create or generate legal text - only use what exists in the document." else: legal_instruction = "\n\nAnswer the question by understanding the context from the document. You may paraphrase or explain in your own words while staying true to the document's meaning." # Step 3: Initialize LLM client llm_start = time.perf_counter() try: # Initialize client based on model_provider if model_provider.lower() in ["qwen", "huggingface"]: # Use HuggingFace router with Qwen model hf_token = os.getenv("HF_TOKEN") if not hf_token: raise ValueError("HF_TOKEN environment variable is required for Qwen model testing.") http_client = NoProxyHTTPClient(timeout=60.0) llm_client = OpenAI( base_url="https://router.huggingface.co/v1", api_key=hf_token, http_client=http_client ) llm_model = os.getenv("QWEN_MODEL", "Qwen/Qwen3-32B:nscale") else: # Use OpenAI (default) openai llm_client = self.llm_client llm_model = self.llm_model # Step 4: Iterate through candidate documents answer = None found = False final_matched_filename = None final_selected_chunks: Optional[List[str]] = None for doc_idx, current_filename in enumerate(candidate_documents): print(f"[RAG] Trying document {doc_idx + 1}/{len(candidate_documents)}: {current_filename}") # Get full text for this document current_full_text = self._get_text_by_filename_cached(current_filename) if not current_full_text: print(f"[RAG] Could not retrieve text for {current_filename}, skipping...") continue # Build context for this document prompt_start = time.perf_counter() document_context_label = "Document Text" selected_chunks: Optional[List[str]] = None # Check if we can reuse previous chunks (only for follow-up on first document) previous_chunks = None if label == "law-followup" and doc_idx == 0 and use_history and current_filename == previous_document: previous_chunks = self.chat_history.get_last_chunks() if previous_chunks: print(f"[RAG] Reusing previous chunks for law-followup question ({len(previous_chunks)} chunks)") selected_chunks = previous_chunks document_context_label = "Selected Document Excerpts" chunk_texts: List[str] = [] for idx, chunk_text in enumerate(previous_chunks, start=1): chunk_texts.append(f"[مقطع {idx}]\n{chunk_text}") document_context = "\n\n".join(chunk_texts)[:25000] # If not reusing chunks, build context normally if previous_chunks is None: if context_mode_normalized == "full": print(f"[RAG] Using full document mode for {current_filename}") document_context = current_full_text[:16000] # Limit to avoid token limits else: print(f"[RAG] Using chunk mode for {current_filename}") # Build or load chunk vectorstore and retrieve top-k chunks chunk_vs, chunk_docs = self._get_or_build_chunk_vectorstore(current_filename, current_full_text) top_k = 4 try: if self.use_hybrid_search: # Use ensemble retriever for hybrid search (BM25 + Semantic) ensemble_retriever = self._create_ensemble_retriever(chunk_vs, chunk_docs, top_k) top_chunks = ensemble_retriever.get_relevant_documents(question) print(f"[RAG] Used hybrid search (BM25 + Semantic) for chunk retrieval") else: # Use semantic search only top_chunks = chunk_vs.similarity_search(question, k=top_k) except Exception as e: print(f"[RAG] Chunk retrieval failed for {current_filename}, falling back to full text: {e}") document_context = current_full_text[:25000] context_mode_normalized = "full" else: if not top_chunks: print(f"[RAG] No chunks returned for {current_filename}, falling back to full text") document_context = current_full_text[:8000] context_mode_normalized = "full" else: document_context_label = "Selected Document Excerpts" chunk_texts: List[str] = [] selected_chunks = [] # Store raw chunk texts for return for idx, doc in enumerate(top_chunks, start=1): chunk_text = doc.page_content selected_chunks.append(chunk_text) # Store raw chunk text chunk_texts.append(f"[مقطع {idx}]\n{chunk_text}") document_context = "\n\n".join(chunk_texts)[:20000] # Build user prompt user_prompt = f"""{document_context_label}: {document_context} User Question: {question} {legal_instruction} Please answer the question based on the document text above. Preserve any numbering, bullet points, and headings from the document when they are relevant, and use clear Arabic headings and numbered lists in your answer. MUST Answer the Question in Arabic.""" # Build messages with chat history messages = [{"role": "system", "content": system_prompt}] # Add chat history (excluding the last user message if it's the current question) if history_messages: skip_last = len(history_messages) > 0 and history_messages[-1].get("content") == question for msg in history_messages[:-1] if skip_last else history_messages: messages.append(msg) messages.append({"role": "user", "content": user_prompt}) prompt_time = (time.perf_counter() - prompt_start) * 1000 print(f"[Timing] Prompt construction: {prompt_time:.2f}ms") # Call LLM response = llm_client.chat.completions.create( model=llm_model, messages=messages, temperature=0.3 ) raw_response = response.choices[0].message.content llm_time = (time.perf_counter() - llm_start) * 1000 print(f"[Timing] LLM API call: {llm_time:.2f}ms") # Filter thinking process from Qwen responses if model_provider.lower() in ["qwen", "huggingface"]: raw_response = self._filter_thinking_process(raw_response) # Parse LLM response to extract answer and found flag parse_start = time.perf_counter() answer, found = self._parse_llm_response(raw_response) parse_time = (time.perf_counter() - parse_start) * 1000 if found: final_matched_filename = current_filename final_selected_chunks = selected_chunks print(f"[RAG] Answer found in document: {final_matched_filename}") break # Found! Return immediately else: print(f"[RAG] Answer not found in document: {current_filename}") # For law-new: continue to next document # For law-followup: continue searching until found if label == "law-new": print(f"[RAG] law-new: trying next document...") elif label == "law-followup": print(f"[RAG] law-followup: continuing search...") # Step 5: Handle final result if not found: # For law-new: return after trying all documents # For law-followup: should have kept searching, but if still not found, return message answer = "لم أجد الإجابة في أي من الوثائق المتاحة." if answer is None else answer final_matched_filename = candidate_documents[0] if candidate_documents else None print(f"[RAG] Answer not found in any of the {len(candidate_documents)} documents tried") # Step 6: Update chat history with document source and chunks self.chat_history.add_message("user", question) self.chat_history.add_message("assistant", answer, source_document=final_matched_filename, chunks=final_selected_chunks) total_time = (time.perf_counter() - start_time) * 1000 print(f"[Timing] Total inference time: {total_time:.2f}ms") return answer, [final_matched_filename], final_selected_chunks except Exception as e: total_time = (time.perf_counter() - start_time) * 1000 print(f"[Timing] Total inference time (error): {total_time:.2f}ms") error_msg = f"Error generating answer: {str(e)}" # Get error filename safely (candidate_documents might not be defined if error occurs early) try: error_filename = candidate_documents[0] if candidate_documents else None except NameError: error_filename = None self.chat_history.add_message("user", question) self.chat_history.add_message("assistant", error_msg, source_document=error_filename, chunks=None) return error_msg, [error_filename] if error_filename else [], None def clear_chat_history(self): """Clear chat history""" self.chat_history.clear()