warbler-cda / warbler_cda /retrieval_api.py
Bellok
again
1384c05
"""
Retrieval API - Anchor-Grounded Recall Context System
Provides anchor-grounded context retrieval and recall capabilities
for the Cognitive Geo-Thermal Lore Engine v0.3.
"""
from typing import List, Dict, Any, Optional, Tuple, Union
import time
import hashlib
from dataclasses import dataclass
from enum import Enum
class RetrievalMode(Enum):
"""Types of retrieval operations."""
SEMANTIC_SIMILARITY = "semantic_similarity" # Semantically similar content
HYBRID_SEMANTIC_FRACTALSTAT = "hybrid_semantic_fractalstat" # Hybrid semantic + FractalStat scoring
TEMPORAL_SEQUENCE = "temporal_sequence" # Retrieve by time sequence
ANCHOR_NEIGHBORHOOD = "anchor_neighborhood" # Content around specific anchors
PROVENANCE_CHAIN = "provenance_chain" # Follow provenance relationships
CONFLICT_AWARE = "conflict_aware" # Exclude conflicting content
COMPOSITE = "composite" # Multi-modal retrieval
@dataclass
class RetrievalQuery:
"""Structured query for context retrieval."""
query_id: str
mode: RetrievalMode
anchor_ids: Optional[List[str]] = None
semantic_query: Optional[str] = None
temporal_range: Optional[Tuple[float, float]] = None # (start_time, end_time)
max_results: int = 10
confidence_threshold: float = 0.6
exclude_conflicts: bool = True
include_provenance: bool = True
query_timestamp: float = None
fractalstat_hybrid: bool = True # Enable FractalStat hybrid scoring
fractalstat_address: Optional[Dict[str, Any]] = None # FractalStat coordinates for hybrid scoring
weight_semantic: float = 0.6 # Weight for semantic similarity in hybrid mode
weight_fractalstat: float = 0.4 # Weight for FractalStat resonance in hybrid mode
def __post_init__(self):
if self.query_timestamp is None:
self.query_timestamp = time.time()
if self.fractalstat_hybrid and not self.fractalstat_address:
# Default 8D FractalStat address if not specified
self.fractalstat_address = {
"realm": {"type": "default", "label": "retrieval_query"},
"lineage": 0,
"adjacency": 0.5,
"horizon": "scene",
"luminosity": 70.0, # Updated scale 0-100
"polarity": 0.0, # Updated scale -1 to 1
"dimensionality": 3,
"alignment": {"type": "true_neutral"}, # 8th dimension added
}
@dataclass
class RetrievalResult:
"""Result from a retrieval operation."""
result_id: str
content_type: str # "anchor", "micro_summary", "macro_distillation", "molten_glyph"
content_id: str
content: str
relevance_score: float
temporal_distance: float # How far from query time
anchor_connections: List[str] # Connected anchor IDs
provenance_depth: int
conflict_flags: List[str] # Any conflicts detected
metadata: Dict[str, Any]
fractalstat_resonance: float = 0.0 # FractalStat hybrid scoring component (if enabled)
semantic_similarity: float = 0.0 # Semantic scoring component (if hybrid)
@dataclass
class ContextAssembly:
"""Assembled context from multiple retrieval results."""
assembly_id: str
query: RetrievalQuery
results: List[RetrievalResult]
total_relevance: float
temporal_span_hours: float
anchor_coverage: List[str]
assembly_quality: float # Overall quality score
conflict_summary: Dict[str, int]
retrieval_timestamp: float
class RetrievalAPI:
"""
Anchor-grounded context retrieval system with optional FractalStat hybrid scoring.
Provides intelligent context assembly by combining semantic anchors,
micro-summaries, macro distillations, and memory fragments with
conflict awareness and provenance tracking.
Supports FractalStat hybrid scoring for multi-dimensional retrieval when enabled.
"""
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
semantic_anchors=None,
summarization_ladder=None,
conflict_detector=None,
embedding_provider=None,
fractalstat_bridge=None,
):
"""Initialize the retrieval API."""
self.config = config or {}
# Component dependencies
self.semantic_anchors = semantic_anchors
self.summarization_ladder = summarization_ladder
self.conflict_detector = conflict_detector
self.embedding_provider = embedding_provider
self.fractalstat_bridge = fractalstat_bridge # Optional FractalStat RAG bridge for hybrid scoring
# Configuration parameters
self.default_max_results = self.config.get("default_max_results", 10)
self.relevance_threshold = self.config.get("relevance_threshold", 0.5)
self.temporal_decay_hours = self.config.get("temporal_decay_hours", 24)
self.quality_threshold = self.config.get("quality_threshold", 0.6)
# FractalStat hybrid scoring configuration
self.enable_fractalstat_hybrid = self.config.get("enable_fractalstat_hybrid", False)
self.default_weight_semantic = self.config.get("default_weight_semantic", 0.6)
self.default_weight_fractalstat = self.config.get("default_weight_fractalstat", 0.4)
# Retrieval cache (for performance)
self.query_cache: Dict[str, ContextAssembly] = {}
self.cache_ttl_seconds = self.config.get("cache_ttl_seconds", 300) # 5 minutes
# Document FractalStat assignments cache (for rapid re-retrieval)
self.document_fractalstat_cache: Dict[str, Dict[str, Any]] = {}
# Simple in-memory document store for ingestion
self._context_store: Dict[str, Dict[str, Any]] = {}
# Metrics
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"cache_misses": 0,
"hybrid_queries": 0,
"average_results_per_query": 0.0,
"average_retrieval_time_ms": 0.0,
"quality_distribution": {"high": 0, "medium": 0, "low": 0},
}
def retrieve_context(self, query: Union[RetrievalQuery, Dict[str, Any]]) -> ContextAssembly:
"""
Main retrieval method - assemble context based on query.
Args:
query: RetrievalQuery object or dict with query parameters
Returns:
ContextAssembly with retrieved and assembled context
"""
start_time = time.time()
# Convert dict to RetrievalQuery if needed
if isinstance(query, dict):
query = self._dict_to_query(query)
self.metrics["total_queries"] += 1
# Check cache first
cache_key = self._generate_cache_key(query)
cached_result = self._get_cached_result(cache_key)
if cached_result:
self.metrics["cache_hits"] += 1
return cached_result
self.metrics["cache_misses"] += 1
# Perform retrieval based on mode
results = []
if query.mode == RetrievalMode.SEMANTIC_SIMILARITY:
results = self._retrieve_semantic_similarity(query)
elif query.mode == RetrievalMode.HYBRID_SEMANTIC_FRACTALSTAT:
# Hybrid mode: semantic search with FractalStat hybrid scoring
results = self._retrieve_semantic_similarity(query)
elif query.mode == RetrievalMode.TEMPORAL_SEQUENCE:
results = self._retrieve_temporal_sequence(query)
elif query.mode == RetrievalMode.ANCHOR_NEIGHBORHOOD:
results = self._retrieve_anchor_neighborhood(query)
elif query.mode == RetrievalMode.PROVENANCE_CHAIN:
results = self._retrieve_provenance_chain(query)
elif query.mode == RetrievalMode.CONFLICT_AWARE:
results = self._retrieve_conflict_aware(query)
elif query.mode == RetrievalMode.COMPOSITE:
results = self._retrieve_composite(query)
else:
# Default to semantic similarity
results = self._retrieve_semantic_similarity(query)
# Filter and rank results
filtered_results = self._filter_and_rank_results(results, query)
# Assemble final context
assembly = self._assemble_context(query, filtered_results)
# Cache result
self._cache_result(cache_key, assembly)
# Update metrics
elapsed_ms = (time.time() - start_time) * 1000
self._update_metrics(assembly, elapsed_ms)
return assembly
def query_semantic_anchors(
self, query_text: str, max_results: int = 5
) -> List[RetrievalResult]:
"""
Quick semantic anchor query for simple use cases.
Args:
query_text: Text to find similar anchors for
max_results: Maximum number of results
Returns:
List of RetrievalResult objects for matching anchors
"""
query = RetrievalQuery(
query_id=f"quick_{int(time.time())}",
mode=RetrievalMode.SEMANTIC_SIMILARITY,
semantic_query=query_text,
max_results=max_results,
)
assembly = self.retrieve_context(query)
return assembly.results
def get_anchor_context(self, anchor_id: str, context_radius: int = 3) -> ContextAssembly:
"""
Get context around a specific anchor.
Args:
anchor_id: ID of anchor to get context for
context_radius: How many related items to include
Returns:
ContextAssembly with anchor neighborhood context
"""
query = RetrievalQuery(
query_id=f"anchor_ctx_{anchor_id}_{int(time.time())}",
mode=RetrievalMode.ANCHOR_NEIGHBORHOOD,
anchor_ids=[anchor_id],
max_results=context_radius * 2,
)
return self.retrieve_context(query)
def trace_provenance(self, content_id: str, max_depth: int = 5) -> ContextAssembly:
"""
Trace provenance chain for a piece of content.
Args:
content_id: ID of content to trace
max_depth: Maximum provenance depth
Returns:
ContextAssembly with provenance chain
"""
query = RetrievalQuery(
query_id=f"prov_{content_id}_{int(time.time())}",
mode=RetrievalMode.PROVENANCE_CHAIN,
anchor_ids=[content_id],
max_results=max_depth,
)
return self.retrieve_context(query)
def add_document(
self,
doc_id: str,
content: str,
metadata: Dict[str, Any] = None,
embedding: Optional[List[float]] = None,
fractalstat_coordinates: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Add a document to the context store for retrieval.
Args:
doc_id: Unique document identifier
content: Document content
metadata: Optional metadata (realm, type, etc.)
embedding: Optional pre-computed embedding vector
fractalstat_coordinates: Optional pre-computed FractalStat coordinates
Returns:
True if added successfully
"""
if doc_id in self._context_store:
return False # Document already exists
doc_entry = {
"content": content,
"metadata": metadata or {},
"added_at": time.time(),
"length": len(content),
"content_hash": hashlib.sha256(content.encode()).hexdigest(),
}
if embedding is None and self.embedding_provider:
embedding = self.embedding_provider.embed_text(content)
if embedding:
doc_entry["embedding"] = embedding
if (
fractalstat_coordinates is None
and embedding
and hasattr(self.embedding_provider, "compute_fractalstat_from_embedding")
):
fractalstat_coordinates = self.embedding_provider.compute_fractalstat_from_embedding(embedding)
if fractalstat_coordinates:
doc_entry["fractalstat_coordinates"] = fractalstat_coordinates
self._context_store[doc_id] = doc_entry
return True
def get_context_store_size(self) -> int:
"""Get number of documents in context store."""
return len(self._context_store)
def get_retrieval_metrics(self) -> Dict[str, Any]:
"""Get retrieval performance and usage metrics."""
return {
"retrieval_metrics": self.metrics.copy(),
"cache_performance": {
"hit_rate": self._calculate_cache_hit_rate(),
"cache_size": len(self.query_cache),
"cache_efficiency": self._calculate_cache_efficiency(),
},
"context_store_size": self.get_context_store_size(),
"system_health": {
"components_available": self._check_component_availability(),
"average_quality": self._calculate_average_quality(),
"retrieval_success_rate": self._calculate_success_rate(),
},
}
def _dict_to_query(self, query_dict: Dict[str, Any]) -> RetrievalQuery:
"""Convert dictionary to RetrievalQuery object."""
return RetrievalQuery(
query_id=query_dict.get("query_id", f"query_{int(time.time())}"),
mode=RetrievalMode(query_dict.get("mode", "semantic_similarity")),
anchor_ids=query_dict.get("anchor_ids"),
semantic_query=query_dict.get("semantic_query"),
temporal_range=query_dict.get("temporal_range"),
max_results=query_dict.get("max_results", self.default_max_results),
confidence_threshold=query_dict.get("confidence_threshold", 0.6),
exclude_conflicts=query_dict.get("exclude_conflicts", True),
include_provenance=query_dict.get("include_provenance", True),
fractalstat_hybrid=query_dict.get("fractalstat_hybrid", self.enable_fractalstat_hybrid),
fractalstat_address=query_dict.get("fractalstat_address"),
weight_semantic=query_dict.get("weight_semantic", self.default_weight_semantic),
weight_fractalstat=query_dict.get("weight_fractalstat", self.default_weight_fractalstat),
)
def _retrieve_semantic_similarity(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve content based on semantic similarity."""
results = []
if not query.semantic_query:
return results
# DEBUG
import sys
print(
f"DEBUG: _retrieve_semantic_similarity called with query='{query.semantic_query}'",
file=sys.stderr,
)
print(
f"DEBUG: embedding_provider={self.embedding_provider}, "
f"semantic_anchors={self.semantic_anchors}",
file=sys.stderr,
)
print(f"DEBUG: context_store size={len(self._context_store)}", file=sys.stderr)
print(f"DEBUG: threshold={query.confidence_threshold}, max_results={query.max_results}", file=sys.stderr)
# If embedding provider available, use it
if self.embedding_provider:
# Get query embedding
try:
query_embedding = self.embedding_provider.embed_text(query.semantic_query)
print(f"DEBUG: query_embedding first 5 values: {query_embedding[:5] if query_embedding else 'None'}", file=sys.stderr)
except OSError:
return results
# Search semantic anchors
if self.semantic_anchors:
for anchor_id, anchor in self.semantic_anchors.anchors.items():
if anchor.embedding:
query_embedding = self.embedding_provider.embed_text(query.semantic_query)
similarity = self.embedding_provider.calculate_similarity(
query_embedding, anchor.embedding
)
if similarity >= query.confidence_threshold:
result = RetrievalResult(
result_id=f"anchor_{anchor_id}",
content_type="anchor",
content_id=anchor_id,
content=anchor.concept_text,
relevance_score=similarity,
temporal_distance=self._calculate_temporal_distance(
anchor.provenance.first_seen, query.query_timestamp
),
anchor_connections=[anchor_id],
provenance_depth=1,
conflict_flags=[],
metadata={
"heat": anchor.heat,
"updates": anchor.provenance.update_count,
"semantic_drift": anchor.semantic_drift,
},
)
results.append(result)
# Search micro-summaries if available
if self.summarization_ladder:
for micro in self.summarization_ladder.micro_summaries:
if micro.semantic_centroid:
similarity = self.embedding_provider.calculate_similarity(
query_embedding, micro.semantic_centroid
)
if similarity >= query.confidence_threshold:
result = RetrievalResult(
result_id=f"micro_{micro.summary_id}",
content_type="micro_summary",
content_id=micro.summary_id,
content=micro.compressed_text,
relevance_score=similarity,
temporal_distance=self._calculate_temporal_distance(
micro.creation_timestamp, query.query_timestamp
),
anchor_connections=[],
provenance_depth=2,
conflict_flags=[],
metadata={
"window_size": micro.window_size,
"heat_aggregate": micro.heat_aggregate,
"fragments": micro.window_fragments,
},
)
results.append(result)
# Always search context store (uses embeddings if available, falls back to keyword)
context_results = self._search_context_store(query)
results.extend(context_results)
return results
def _search_context_store(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""
Search context store using embeddings (semantic) or keyword fallback.
Prefers embedding-based semantic search when available.
"""
results = []
if not query.semantic_query or not self._context_store:
return results
try:
if self.embedding_provider and hasattr(self.embedding_provider, "semantic_search"):
return self._search_context_store_semantic(query)
except OSError:
pass
return self._search_context_store_keyword(query)
def _search_context_store_semantic(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Search context store using semantic embeddings."""
results = []
if not query.semantic_query:
return results
embeddings_list = []
doc_ids = []
for doc_id, doc_data in self._context_store.items():
if "embedding" in doc_data:
embeddings_list.append(doc_data["embedding"])
doc_ids.append(doc_id)
# DEBUG - check first few embeddings
if embeddings_list:
import sys
print(f"DEBUG: First document embedding first 5: {embeddings_list[0][:5] if len(embeddings_list) > 0 else 'None'}", file=sys.stderr)
if len(embeddings_list) > 1:
print(f"DEBUG: Second document embedding first 5: {embeddings_list[1][:5] if len(embeddings_list) > 1 else 'None'}", file=sys.stderr)
same = embeddings_list[0][:10] == embeddings_list[1][:10] # Compare first 10 values
print(f"DEBUG: First two embeddings identical? {same}", file=sys.stderr)
if not embeddings_list:
return self._search_context_store_keyword(query)
try:
similarities = self.embedding_provider.semantic_search(
query.semantic_query, embeddings_list, top_k=query.max_results
)
for doc_idx, sim_score in similarities:
doc_id = doc_ids[doc_idx]
doc_data = self._context_store[doc_id]
fractstat_resonance = 0.0
if "fractalstat_coordinates" in doc_data and query.fractalstat_hybrid:
fractstat_resonance = self._calculate_fractalstat_resonance(
doc_data["fractalstat_coordinates"], query.fractalstat_address
)
hybrid_score = sim_score
if query.fractalstat_hybrid:
hybrid_score = (
query.weight_semantic * sim_score + query.weight_fractalstat * fractstat_resonance
)
# DEBUG - comprehensive scoring diagnostics
import sys
print(
f"DEBUG result: doc={doc_id[:50]}..., semantic={sim_score:.6f}, "
f"fractalstat={fractstat_resonance:.6f}, hybrid={hybrid_score:.6f}, "
f"threshold={query.confidence_threshold}, content_len={len(self._context_store[doc_id].get('content', ''))}",
file=sys.stderr,
)
if hybrid_score >= query.confidence_threshold:
result = RetrievalResult(
result_id=f"ctx_{doc_id}",
content_type="context_store",
content_id=doc_id,
content=doc_data.get("content", "")[:500],
relevance_score=hybrid_score,
temporal_distance=0.0,
anchor_connections=[],
provenance_depth=1,
conflict_flags=[],
metadata=doc_data.get("metadata", {}),
semantic_similarity=sim_score,
fractalstat_resonance=fractstat_resonance,
)
results.append(result)
except OSError:
return self._search_context_store_keyword(query)
return results
def _search_context_store_keyword(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Fallback keyword-based search of context store."""
results = []
if not query.semantic_query:
return results
query_terms = query.semantic_query.lower().split()
scored_docs = []
for doc_id, doc_data in self._context_store.items():
content = doc_data.get("content", "").lower()
matches = sum(1 for term in query_terms if term in content)
if matches > 0:
relevance_score = (matches / len(query_terms)) ** 0.5
scored_docs.append((doc_id, doc_data, relevance_score))
scored_docs.sort(key=lambda x: x[2], reverse=True)
for doc_id, doc_data, relevance_score in scored_docs[: query.max_results]:
if relevance_score >= query.confidence_threshold:
result = RetrievalResult(
result_id=f"ctx_{doc_id}",
content_type="context_store",
content_id=doc_id,
content=doc_data.get("content", "")[:500],
relevance_score=relevance_score,
temporal_distance=0.0,
anchor_connections=[],
provenance_depth=1,
conflict_flags=[],
metadata=doc_data.get("metadata", {}),
semantic_similarity=relevance_score,
)
results.append(result)
return results
def _calculate_fractalstat_resonance(
self, doc_fractalstat: Dict[str, Any], query_fractalstat: Optional[Dict[str, Any]]
) -> float:
"""Calculate 8D FractalStat resonance between document and query coordinates."""
if not query_fractalstat or not doc_fractalstat:
return 0.5
try:
# 7 dimensions from original FractalStat plus alignment (8th dimension)
lineage_dist = abs(doc_fractalstat.get("lineage", 0.5) - query_fractalstat.get("lineage", 0.5))
adjacency_dist = abs(
doc_fractalstat.get("adjacency", 0.5) - query_fractalstat.get("adjacency", 0.5)
)
luminosity_dist = abs(
doc_fractalstat.get("luminosity", 70.0) - query_fractalstat.get("luminosity", 70.0)
) / 100.0 # Normalize to 0-1 scale
polarity_dist = abs(
doc_fractalstat.get("polarity", 0.0) - query_fractalstat.get("polarity", 0.0)
) / 2.0 # Normalize from [-1,1] to [0,1]
dimensionality_dist = abs(
doc_fractalstat.get("dimensionality", 3) - query_fractalstat.get("dimensionality", 3)
) / 7.0 # Normalize from [1,8] to [0,1]
# 8th dimension: Alignment resonance with social dynamics
doc_alignment = doc_fractalstat.get("alignment", {}).get("type", "true_neutral")
query_alignment = query_fractalstat.get("alignment", {}).get("type", "true_neutral")
# Alignment synergy matrix for social coordination patterns - stricter resonance
alignment_synergy = {
("harmonic", "harmonic"): 1.0,
("harmonic", "symbiotic"): 0.9,
("symbiotic", "symbiotic"): 1.0,
("harmonic", "entropic"): 0.3, # Much lower for opposing dynamics
("entropic", "entropic"): 1.0,
("true_neutral", "true_neutral"): 0.7,
("balanced", "balanced"): 0.7,
("chaotic", "chaotic"): 1.0,
("harmonic", "chaotic"): 0.2, # Very low for harmonic-chaotic
("symbiotic", "chaotic"): 0.3,
("chaotic", "entropic"): 0.8,
}.get((doc_alignment, query_alignment), 0.4) # Lower default
alignment_resonance = alignment_synergy
# Calculate average distance across 7 core dimensions (original FractalStat)
core_avg_distance = (
lineage_dist
+ adjacency_dist
+ luminosity_dist
+ polarity_dist
+ dimensionality_dist
) / 5.0
# Combine core dimensions with alignment synergy - extremely strict for significant differences
# When core distances are significant (>0.25), heavily penalize coherence
if core_avg_distance > 0.7:
# Very different: almost no core contribution
core_weight = 0.001 # Minimal core contribution
alignment_weight = 0.999 # Dominated by alignment, even if same
elif core_avg_distance > 0.25:
# Different: heavy core penalty
core_weight = 0.01
alignment_weight = 0.99
else:
# Moderate differences: balanced weights
core_weight = 0.8
alignment_weight = 0.2
total_resonance = (core_weight * (1.0 - core_avg_distance) + alignment_weight * alignment_resonance)
resonance = max(0.0, min(1.0, total_resonance ** 3.0)) # Even stronger decay
return resonance
except OSError:
return 0.5
def _retrieve_temporal_sequence(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve content based on temporal sequence."""
results = []
if not query.temporal_range:
# Default to last 24 hours
end_time = query.query_timestamp
start_time = end_time - (24 * 3600)
temporal_range = (start_time, end_time)
else:
temporal_range = query.temporal_range
# Collect items in temporal range
temporal_items = []
# Add anchors
if self.semantic_anchors:
for anchor_id, anchor in self.semantic_anchors.anchors.items():
if temporal_range[0] <= anchor.provenance.first_seen <= temporal_range[1]:
temporal_items.append(
("anchor", anchor_id, anchor.provenance.first_seen, anchor)
)
# Add micro-summaries
if self.summarization_ladder:
for micro in self.summarization_ladder.micro_summaries:
if temporal_range[0] <= micro.creation_timestamp <= temporal_range[1]:
temporal_items.append(
("micro_summary", micro.summary_id, micro.creation_timestamp, micro)
)
# Sort by timestamp
temporal_items.sort(key=lambda x: x[2])
# Convert to results
for item_type, item_id, timestamp, item_data in temporal_items[: query.max_results]:
if item_type == "anchor":
anchor = item_data
result = RetrievalResult(
result_id=f"temporal_anchor_{item_id}",
content_type="anchor",
content_id=item_id,
content=anchor.concept_text,
relevance_score=self._calculate_temporal_relevance(
timestamp, query.query_timestamp
),
temporal_distance=abs(timestamp - query.query_timestamp),
anchor_connections=[item_id],
provenance_depth=1,
conflict_flags=[],
metadata={"timestamp": timestamp, "heat": anchor.heat},
)
results.append(result)
elif item_type == "micro_summary":
micro = item_data
result = RetrievalResult(
result_id=f"temporal_micro_{item_id}",
content_type="micro_summary",
content_id=item_id,
content=micro.compressed_text,
relevance_score=self._calculate_temporal_relevance(
timestamp, query.query_timestamp
),
temporal_distance=abs(timestamp - query.query_timestamp),
anchor_connections=[],
provenance_depth=2,
conflict_flags=[],
metadata={"timestamp": timestamp, "window_size": micro.window_size},
)
results.append(result)
return results
def _retrieve_anchor_neighborhood(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve content in the neighborhood of specific anchors."""
results = []
if not query.anchor_ids or not self.semantic_anchors:
return results
for anchor_id in query.anchor_ids:
if anchor_id not in self.semantic_anchors.anchors:
continue
target_anchor = self.semantic_anchors.anchors[anchor_id]
# Find semantically similar anchors
for other_id, other_anchor in self.semantic_anchors.anchors.items():
if other_id == anchor_id:
continue
if target_anchor.embedding and other_anchor.embedding:
similarity = self.embedding_provider.calculate_similarity(
target_anchor.embedding, other_anchor.embedding
)
if similarity >= query.confidence_threshold:
result = RetrievalResult(
result_id=f"neighbor_{other_id}",
content_type="anchor",
content_id=other_id,
content=other_anchor.concept_text,
relevance_score=similarity,
temporal_distance=abs(
target_anchor.provenance.first_seen
- other_anchor.provenance.first_seen
),
anchor_connections=[anchor_id, other_id],
provenance_depth=1,
conflict_flags=[],
metadata={"neighbor_of": anchor_id, "similarity": similarity},
)
results.append(result)
return results
def _retrieve_provenance_chain(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve content following provenance relationships."""
results = []
# This would trace through the provenance chain of anchors, micro-summaries, etc.
# For now, implement a simplified version
if query.anchor_ids and self.semantic_anchors:
for anchor_id in query.anchor_ids:
if anchor_id in self.semantic_anchors.anchors:
anchor = self.semantic_anchors.anchors[anchor_id]
# Include the anchor itself
result = RetrievalResult(
result_id=f"prov_root_{anchor_id}",
content_type="anchor",
content_id=anchor_id,
content=anchor.concept_text,
relevance_score=1.0,
temporal_distance=0,
anchor_connections=[anchor_id],
provenance_depth=0,
conflict_flags=[],
metadata={
"provenance_role": "root",
"updates": anchor.provenance.update_count,
},
)
results.append(result)
# Add related content from update history
for i, update in enumerate(anchor.provenance.update_history):
if i >= query.max_results - 1:
break
result = RetrievalResult(
result_id=f"prov_update_{anchor_id}_{i}",
content_type="provenance_update",
content_id=f"{anchor_id}_update_{i}",
content=(
f"Update: {update.get('context', {}).get('mist_id', 'unknown')}"
),
relevance_score=0.8 - (i * 0.1),
temporal_distance=abs(update["timestamp"] - query.query_timestamp),
anchor_connections=[anchor_id],
provenance_depth=i + 1,
conflict_flags=[],
metadata={"update_context": update.get("context", {})},
)
results.append(result)
return results
def _retrieve_conflict_aware(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve content while avoiding conflicts."""
# First get base results
base_results = self._retrieve_semantic_similarity(query)
if not query.exclude_conflicts or not self.conflict_detector:
return base_results
# Filter out conflicting content
filtered_results = []
for result in base_results:
conflicts = []
# Check for conflicts involving this content
if hasattr(self.conflict_detector, "get_conflict_analysis"):
conflict_analysis = self.conflict_detector.get_conflict_analysis(result.content_id)
if conflict_analysis.get("conflicts_found", 0) > 0:
conflicts = [
f"conflict_confidence_{conflict_analysis.get('max_confidence', 0):.2f}"
]
# Include result but flag conflicts
result.conflict_flags = conflicts
if not conflicts or not query.exclude_conflicts:
filtered_results.append(result)
return filtered_results
def _retrieve_composite(self, query: RetrievalQuery) -> List[RetrievalResult]:
"""Retrieve using multiple modes and combine results."""
all_results = []
# Semantic similarity results (highest weight)
semantic_results = self._retrieve_semantic_similarity(query)
for result in semantic_results:
result.relevance_score *= 1.0 # Full weight
all_results.extend(semantic_results)
# Temporal sequence results (medium weight)
temporal_results = self._retrieve_temporal_sequence(query)
for result in temporal_results:
result.relevance_score *= 0.7 # Reduced weight
all_results.extend(temporal_results)
# Anchor neighborhood results (lower weight)
if query.anchor_ids:
neighborhood_results = self._retrieve_anchor_neighborhood(query)
for result in neighborhood_results:
result.relevance_score *= 0.5 # Lower weight
all_results.extend(neighborhood_results)
# Remove duplicates (by content_id)
seen_content_ids = set()
unique_results = []
for result in all_results:
if result.content_id not in seen_content_ids:
unique_results.append(result)
seen_content_ids.add(result.content_id)
return unique_results
def _filter_and_rank_results(
self, results: List[RetrievalResult], query: RetrievalQuery
) -> List[RetrievalResult]:
"""Filter and rank results based on query parameters."""
# Apply FractalStat hybrid scoring if enabled
if query.fractalstat_hybrid:
results = self._apply_hybrid_scoring(results, query)
self.metrics["hybrid_queries"] += 1
# Filter by confidence threshold
filtered = [r for r in results if r.relevance_score >= query.confidence_threshold]
# Apply temporal decay
for result in filtered:
age_hours = result.temporal_distance / 3600
decay_factor = max(0.1, 1.0 - (age_hours / self.temporal_decay_hours))
result.relevance_score *= decay_factor
# Sort by relevance score
filtered.sort(key=lambda x: x.relevance_score, reverse=True)
# Limit results
return filtered[: query.max_results]
def _assemble_context(
self, query: RetrievalQuery, results: List[RetrievalResult]
) -> ContextAssembly:
"""Assemble final context from filtered results."""
if not results:
# Empty assembly
return ContextAssembly(
assembly_id=f"empty_{query.query_id}",
query=query,
results=[],
total_relevance=0.0,
temporal_span_hours=0.0,
anchor_coverage=[],
assembly_quality=0.0,
conflict_summary={},
retrieval_timestamp=time.time(),
)
# Calculate metrics
total_relevance = sum(r.relevance_score for r in results)
# Temporal span
timestamps = [r.temporal_distance for r in results]
temporal_span_hours = (
(max(timestamps) - min(timestamps)) / 3600 if len(timestamps) > 1 else 0
)
# Anchor coverage
anchor_coverage = []
for result in results:
anchor_coverage.extend(result.anchor_connections)
anchor_coverage = list(set(anchor_coverage))
# Assembly quality score
assembly_quality = self._calculate_assembly_quality(results, query)
# Conflict summary
conflict_summary = {}
for result in results:
for flag in result.conflict_flags:
conflict_summary[flag] = conflict_summary.get(flag, 0) + 1
return ContextAssembly(
assembly_id=f"assembly_{query.query_id}_{int(time.time())}",
query=query,
results=results,
total_relevance=total_relevance,
temporal_span_hours=temporal_span_hours,
anchor_coverage=anchor_coverage,
assembly_quality=assembly_quality,
conflict_summary=conflict_summary,
retrieval_timestamp=time.time(),
)
def _calculate_temporal_distance(self, timestamp: float, reference_time: float) -> float:
"""Calculate temporal distance between two timestamps."""
return abs(timestamp - reference_time)
def _calculate_temporal_relevance(self, timestamp: float, reference_time: float) -> float:
"""Calculate relevance based on temporal proximity."""
distance_seconds = abs(timestamp - reference_time)
distance_hours = distance_seconds / 3600
# Exponential decay over 24 hours
return max(0.1, 1.0 - (distance_hours / 24.0))
def _calculate_assembly_quality(
self, results: List[RetrievalResult], query: RetrievalQuery
) -> float:
"""Calculate overall quality score for assembled context."""
if not results:
return 0.0
# Average relevance score
avg_relevance = sum(r.relevance_score for r in results) / len(results)
# Coverage score (how well we covered the query)
coverage_score = min(len(results) / query.max_results, 1.0)
# Conflict penalty
total_conflicts = sum(len(r.conflict_flags) for r in results)
conflict_penalty = max(0, 1.0 - (total_conflicts * 0.1))
# Diversity score (different content types)
content_types = set(r.content_type for r in results)
diversity_score = min(len(content_types) / 3.0, 1.0) # Max 3 types
# Weighted average
quality = (
avg_relevance * 0.4
+ coverage_score * 0.2
+ conflict_penalty * 0.2
+ diversity_score * 0.2
)
return quality
def _generate_cache_key(self, query: RetrievalQuery) -> str:
"""Generate cache key for query."""
key_parts = [
query.query_id,
query.mode.value,
str(query.anchor_ids) if query.anchor_ids else "none",
query.semantic_query or "none",
str(query.temporal_range) if query.temporal_range else "none",
str(query.max_results),
str(query.confidence_threshold),
]
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
def _get_cached_result(self, cache_key: str) -> Optional[ContextAssembly]:
"""Get cached result if still valid."""
if cache_key in self.query_cache:
assembly = self.query_cache[cache_key]
age_seconds = time.time() - assembly.retrieval_timestamp
if age_seconds < self.cache_ttl_seconds:
return assembly
else:
# Remove stale cache entry
del self.query_cache[cache_key]
return None
def _cache_result(self, cache_key: str, assembly: ContextAssembly):
"""Cache retrieval result."""
self.query_cache[cache_key] = assembly
# Cleanup old cache entries
current_time = time.time()
stale_keys = [
key
for key, cached_assembly in self.query_cache.items()
if current_time - cached_assembly.retrieval_timestamp > self.cache_ttl_seconds
]
for key in stale_keys:
del self.query_cache[key]
def _update_metrics(self, assembly: ContextAssembly, elapsed_ms: float):
"""Update performance metrics."""
self.metrics["average_results_per_query"] = (
self.metrics["average_results_per_query"] * (self.metrics["total_queries"] - 1)
+ len(assembly.results)
) / self.metrics["total_queries"]
self.metrics["average_retrieval_time_ms"] = (
self.metrics["average_retrieval_time_ms"] * (self.metrics["total_queries"] - 1)
+ elapsed_ms
) / self.metrics["total_queries"]
# Quality distribution
if assembly.assembly_quality >= 0.8:
self.metrics["quality_distribution"]["high"] += 1
elif assembly.assembly_quality >= 0.6:
self.metrics["quality_distribution"]["medium"] += 1
else:
self.metrics["quality_distribution"]["low"] += 1
def _calculate_cache_hit_rate(self) -> float:
"""Calculate cache hit rate."""
total_requests = self.metrics["cache_hits"] + self.metrics["cache_misses"]
if total_requests == 0:
return 0.0
return self.metrics["cache_hits"] / total_requests
def _calculate_cache_efficiency(self) -> float:
"""Calculate cache efficiency score."""
hit_rate = self._calculate_cache_hit_rate()
cache_size_penalty = min(len(self.query_cache) / 100.0, 0.2) # Penalty for large cache
return max(0, hit_rate - cache_size_penalty)
def _check_component_availability(self) -> Dict[str, bool]:
"""Check availability of dependent components."""
return {
"semantic_anchors": self.semantic_anchors is not None,
"summarization_ladder": self.summarization_ladder is not None,
"conflict_detector": self.conflict_detector is not None,
"embedding_provider": self.embedding_provider is not None,
"fractalstat_bridge": self.fractalstat_bridge is not None,
}
def _calculate_average_quality(self) -> float:
"""Calculate average assembly quality."""
total_quality = sum(self.metrics["quality_distribution"].values())
if total_quality == 0:
return 0.0
weighted_quality = (
self.metrics["quality_distribution"]["high"] * 1.0
+ self.metrics["quality_distribution"]["medium"] * 0.7
+ self.metrics["quality_distribution"]["low"] * 0.3
)
return weighted_quality / total_quality
def _calculate_success_rate(self) -> float:
"""Calculate retrieval success rate."""
successful_retrievals = (
self.metrics["quality_distribution"]["high"]
+ self.metrics["quality_distribution"]["medium"]
)
total_retrievals = sum(self.metrics["quality_distribution"].values())
if total_retrievals == 0:
return 1.0
return successful_retrievals / total_retrievals
# ========================================================================
# FractalStat Hybrid Scoring Support (Phase 2)
# ========================================================================
def _auto_assign_fractalstat_address(
self, content_id: str, metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""
Auto-assign FractalStat address to content based on metadata.
Supports fallback to default if metadata incomplete.
Args:
content_id: ID of the content being assigned
metadata: Document metadata with optional realm, lineage, etc.
Returns:
FractalStat address dictionary with all 8 dimensions
"""
# Check cache first
if content_id in self.document_fractalstat_cache:
return self.document_fractalstat_cache[content_id]
# Extract from metadata or use defaults
realm_type = metadata.get("realm_type", "data")
realm_label = metadata.get("realm_label", "content")
lineage = metadata.get("lineage", 0)
# Compute adjacency from connectivity hints
adjacency = min(1.0, metadata.get("connection_count", 0) / 10.0)
# Horizon from lifecycle stage
lifecycle = metadata.get("lifecycle_stage", "scene")
horizon_map = {
"genesis": "logline",
"emergence": "outline",
"peak": "scene",
"decay": "panel",
}
horizon = horizon_map.get(lifecycle, "scene")
# Luminosity from activity/heat
luminosity = min(1.0, max(0.0, metadata.get("activity_level", 0.5)))
# Polarity from update frequency / resonance
polarity = min(1.0, metadata.get("resonance_factor", 0.5))
# Dimensionality from thread count
thread_count = metadata.get("thread_count", 3)
dimensionality = min(8, max(1, thread_count)) # Updated to allow up to 8
# Alignment from social/coordination hints
alignment_type = metadata.get("alignment_type", "true_neutral")
fractalstat_address = {
"realm": {"type": realm_type, "label": realm_label},
"lineage": lineage,
"adjacency": round(adjacency, 2),
"horizon": horizon,
"luminosity": round(luminosity, 2),
"polarity": round(polarity, 2),
"dimensionality": dimensionality,
"alignment": {"type": alignment_type},
}
# Cache the assignment
self.document_fractalstat_cache[content_id] = fractalstat_address
return fractalstat_address
def _apply_hybrid_scoring(
self, results: List[RetrievalResult], query: RetrievalQuery
) -> List[RetrievalResult]:
"""
Apply FractalStat hybrid scoring to retrieval results.
Combines semantic similarity with FractalStat resonance scoring.
Updates relevance_score to reflect hybrid score.
Args:
results: Initial retrieval results with semantic scores
query: Query object with FractalStat address and weights
Returns:
Results with updated hybrid relevance scores
"""
if not query.fractalstat_hybrid or not self.fractalstat_bridge:
return results # No hybrid scoring if not enabled or bridge missing
try:
from warbler_cda.fractalstat_rag_bridge import FractalStatAddress as FractalStatAddress, Realm
except ImportError:
# Fallback if bridge not available
return results
# Convert query FractalStat dict to FractalStatAddress object
if not query.fractalstat_address:
return results
try:
q_fractalstat_dict = query.fractalstat_address
query_realm = Realm(
type=q_fractalstat_dict["realm"]["type"], label=q_fractalstat_dict["realm"]["label"]
)
# Get alignment from query address or default to true_neutral
query_alignment_type = q_fractalstat_dict.get("alignment", {}).get("type", "true_neutral")
from warbler_cda.fractalstat_rag_bridge import Alignment as BridgeAlignment
query_alignment = BridgeAlignment(type=query_alignment_type)
query_fractalstat = FractalStatAddress(
realm=query_realm,
lineage=q_fractalstat_dict["lineage"],
adjacency=q_fractalstat_dict["adjacency"],
horizon=q_fractalstat_dict["horizon"],
luminosity=q_fractalstat_dict["luminosity"],
polarity=q_fractalstat_dict["polarity"],
dimensionality=q_fractalstat_dict["dimensionality"],
alignment=query_alignment,
)
except OSError:
# Invalid FractalStat address, fall back to semantic
return results
# Apply hybrid scoring to each result
for result in results:
# Get or compute FractalStat address for this result's content
if "fractalstat" not in result.metadata:
# Auto-assign if not already present
result.metadata["fractalstat"] = self._auto_assign_fractalstat_address(
result.content_id, result.metadata
)
# Extract document FractalStat address
doc_fractalstat_dict = result.metadata.get("fractalstat", {})
if not doc_fractalstat_dict:
continue
try:
doc_realm = Realm(
type=doc_fractalstat_dict["realm"]["type"], label=doc_fractalstat_dict["realm"]["label"]
)
# Get alignment from document address or default to true_neutral
doc_alignment_type = doc_fractalstat_dict.get("alignment", {}).get("type", "true_neutral")
doc_alignment = BridgeAlignment(type=doc_alignment_type)
doc_fractalstat = FractalStatAddress(
realm=doc_realm,
lineage=doc_fractalstat_dict["lineage"],
adjacency=doc_fractalstat_dict["adjacency"],
horizon=doc_fractalstat_dict["horizon"],
luminosity=doc_fractalstat_dict["luminosity"],
polarity=doc_fractalstat_dict["polarity"],
dimensionality=doc_fractalstat_dict["dimensionality"],
alignment=doc_alignment,
)
except OSError:
# Skip if document FractalStat invalid
continue
# Compute FractalStat resonance score
fractalstat_res = self.fractalstat_bridge.fractalstat_resonance(query_fractalstat, doc_fractalstat)
result.fractalstat_resonance = fractalstat_res
# Compute semantic similarity (if available)
semantic_sim = result.relevance_score # Current score is semantic
result.semantic_similarity = semantic_sim
# Combine into hybrid score
hybrid = (query.weight_semantic * semantic_sim) + (query.weight_fractalstat * fractalstat_res)
result.relevance_score = max(0.0, min(hybrid, 1.0))
return results
def _get_fractalstat_address_for_content(
self, content_id: str, metadata: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Get or compute FractalStat address for content with caching.
Args:
content_id: ID of content
metadata: Content metadata
Returns:
FractalStat address dictionary or None
"""
if content_id in self.document_fractalstat_cache:
return self.document_fractalstat_cache[content_id]
return self._auto_assign_fractalstat_address(content_id, metadata)