For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement semantic search-based curation of historical bookings for LLM context, replacing pg_trgm matching.
Architecture: Per-line-item semantic search with hybrid top-K selection, clustering by account/CC, and deduplication within clusters to preserve diversity while removing redundancy.
Tech Stack: Python, Flask, PostgreSQL/pgvector, Google text-multilingual-embedding-002, MiniLM (sentence-transformers)
Files:
init.sqlStep 1: Update init.sql with new schema
Remove normalized columns, Jina embedding, and pg_trgm index. Keep only raw text columns and Google/MiniLM embeddings.
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Line item table with raw text (no normalization)
CREATE TABLE line_item (
id BIGSERIAL PRIMARY KEY,
-- Original values (used directly for embeddings)
supplier_name TEXT NOT NULL,
description TEXT NOT NULL,
debit_account TEXT NOT NULL,
credit_account TEXT,
cost_center TEXT,
net_amount NUMERIC,
-- Embedding columns (Google and MiniLM only)
embedding_google vector(768),
embedding_minilm vector(384),
-- Metadata
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
is_test_set BOOLEAN DEFAULT FALSE
);
-- Embedding model configuration tracking
CREATE TABLE IF NOT EXISTS embedding_model_config (
id SERIAL PRIMARY KEY,
model_name TEXT NOT NULL UNIQUE,
column_name TEXT NOT NULL,
dimensions INT NOT NULL,
distance_metric TEXT NOT NULL,
task_type TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Insert model configurations
INSERT INTO embedding_model_config (model_name, column_name, dimensions, distance_metric, task_type) VALUES
('text-multilingual-embedding-002', 'embedding_google', 768, 'cosine', 'RETRIEVAL_DOCUMENT'),
('all-MiniLM-L6-v2', 'embedding_minilm', 384, 'cosine', NULL)
ON CONFLICT (model_name) DO NOTHING;
-- HNSW indexes for fast similarity search (create after data load)
-- CREATE INDEX idx_embedding_google_hnsw ON line_item USING hnsw (embedding_google vector_cosine_ops) WITH (m = 16, ef_construction = 64);
-- CREATE INDEX idx_embedding_minilm_hnsw ON line_item USING hnsw (embedding_minilm vector_cosine_ops) WITH (m = 16, ef_construction = 64);
Step 2: Verify the file is correct
Read through init.sql to confirm no normalized columns or Jina references remain.
Step 3: Commit
git add init.sql
git commit -m "refactor: remove normalized columns and Jina from schema"
Files:
src/normalize.pysrc/embeddings/jina_embed.pysrc/search/llm_matching.pyStep 1: Remove the files
rm src/normalize.py
rm src/embeddings/jina_embed.py
rm src/search/llm_matching.py
Step 2: Verify files are deleted
ls src/normalize.py src/embeddings/jina_embed.py src/search/llm_matching.py
Expected: "No such file or directory" for each.
Step 3: Commit
git add -u
git commit -m "refactor: remove normalize.py, jina_embed.py, llm_matching.py"
Files:
src/embeddings/text_prep.pyStep 1: Update prepare_embedding_text
The function already takes supplier_name and description as arguments. Update the docstring to clarify we use raw text now.
"""Prepare text for embedding - consistent across all models."""
def prepare_embedding_text(supplier_name: str, description: str) -> str:
"""
Combine supplier name and description for embedding.
Uses raw text (no normalization) for better semantic matching.
Simple concatenation with separator - embedding models
handle context internally.
Args:
supplier_name: Raw supplier name
description: Raw line item description
Returns:
Combined text ready for embedding
"""
return f"{supplier_name} | {description}"
Step 2: Commit
git add src/embeddings/text_prep.py
git commit -m "refactor: update text_prep.py docstring for raw text"
Files:
src/import_csv.pyStep 1: Update import_historical_csv to remove normalization
Remove normalization imports and normalized column handling.
"""Import historical.csv into line_item table using COPY protocol."""
import sys
from io import StringIO
import csv
import pandas as pd
from db import get_connection
def import_historical_csv(csv_path: str) -> dict:
"""Import historical.csv into line_item table.
Returns dict with statistics: total_rows, skipped_rows, imported_rows
"""
# Load CSV with proper encoding for German text
df = pd.read_csv(csv_path, encoding='utf-8-sig')
total_rows = len(df)
# Map CSV columns to schema (adjust if actual headers differ)
column_mapping = {
'Supplier Name': 'supplier_name',
'Line Item Description': 'description',
'Debit Account': 'debit_account',
'Credit Account': 'credit_account',
'Cost Center': 'cost_center',
'Net Amount': 'net_amount',
}
# Check for expected columns, try alternatives if not found
for expected, target in list(column_mapping.items()):
if expected not in df.columns:
# Try lowercase or variations
alternatives = [c for c in df.columns if c.lower().replace('_', ' ') == expected.lower()]
if alternatives:
column_mapping[alternatives[0]] = target
del column_mapping[expected]
df = df.rename(columns=column_mapping)
# Skip rows with missing required fields (per user decision)
required = ['supplier_name', 'description', 'debit_account']
before_filter = len(df)
df = df.dropna(subset=required)
for col in required:
df = df[df[col].astype(str).str.strip() != '']
skipped_rows = before_filter - len(df)
# Prepare CSV buffer for COPY protocol (no normalized columns)
buffer = StringIO()
writer = csv.writer(buffer)
for _, row in df.iterrows():
writer.writerow([
row['supplier_name'],
row['description'],
row['debit_account'],
row.get('credit_account') if pd.notna(row.get('credit_account')) else None,
row.get('cost_center') if pd.notna(row.get('cost_center')) else None,
row.get('net_amount') if pd.notna(row.get('net_amount')) else None,
])
buffer.seek(0)
# Bulk import using COPY (10-100x faster than INSERT)
conn = get_connection(autocommit=False)
try:
with conn.cursor() as cur:
with cur.copy("""
COPY line_item (
supplier_name, description, debit_account,
credit_account, cost_center, net_amount
) FROM STDIN WITH CSV
""") as copy:
while data := buffer.read(8192):
copy.write(data)
conn.commit()
imported_rows = len(df)
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
return {
'total_rows': total_rows,
'skipped_rows': skipped_rows,
'imported_rows': imported_rows,
}
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python import_csv.py <path_to_csv>")
sys.exit(1)
csv_path = sys.argv[1]
print(f"Importing from: {csv_path}")
stats = import_historical_csv(csv_path)
print(f"Total rows in CSV: {stats['total_rows']}")
print(f"Skipped (missing required fields): {stats['skipped_rows']}")
print(f"Imported: {stats['imported_rows']}")
Step 2: Commit
git add src/import_csv.py
git commit -m "refactor: remove normalization from import_csv.py"
Files:
src/embeddings/google_embed.pysrc/embeddings/minilm_embed.pyStep 1: Update google_embed.py
Change SQL query to select raw columns instead of normalized.
"""Google text-multilingual-embedding-002 embeddings via Vertex AI."""
import os
from pathlib import Path
from dotenv import load_dotenv
from google import genai
from google.genai.types import EmbedContentConfig
# Load .env for GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_LOCATION
load_dotenv(Path(__file__).parent.parent.parent / '.env')
# Configure for Vertex AI
os.environ.setdefault('GOOGLE_GENAI_USE_VERTEXAI', 'True')
# Client initialized lazily
_client = None
def _get_client():
global _client
if _client is None:
project = os.environ.get('GOOGLE_CLOUD_PROJECT')
location = os.environ.get('GOOGLE_CLOUD_LOCATION', 'us-central1')
_client = genai.Client(
vertexai=True,
project=project,
location=location,
)
return _client
def embed_google(texts: list[str]) -> list[list[float]]:
"""
Embed texts using text-multilingual-embedding-002 via Vertex AI.
Args:
texts: List of texts to embed (max 250 per call)
Returns:
List of 768-dimensional embedding vectors
"""
client = _get_client()
response = client.models.embed_content(
model='text-multilingual-embedding-002',
contents=texts,
config=EmbedContentConfig(
task_type='RETRIEVAL_DOCUMENT',
),
)
return [embedding.values for embedding in response.embeddings]
def embed_google_batch(
conn,
batch_size: int = 100, # Conservative, API max is 250
delay: float = 0.2
) -> int:
"""
Generate Google embeddings for all line items.
Args:
conn: psycopg connection
batch_size: Texts per API call
delay: Seconds between batches
Returns:
Number of items embedded
"""
from .text_prep import prepare_embedding_text
from .batch_processor import batch_embed_with_progress, update_embeddings_batch
# Fetch all items needing embeddings - use raw text columns
with conn.cursor() as cur:
cur.execute("""
SELECT id, supplier_name, description
FROM line_item
WHERE embedding_google IS NULL
ORDER BY id
""")
rows = cur.fetchall()
if not rows:
print("All items already have Google embeddings")
return 0
# Prepare texts using raw values
ids = [r[0] for r in rows]
texts = [prepare_embedding_text(r[1], r[2]) for r in rows]
# Generate embeddings
embeddings = batch_embed_with_progress(
texts,
embed_google,
batch_size=batch_size,
delay_between_batches=delay,
desc="Google embeddings"
)
# Write to database
pairs = list(zip(ids, embeddings))
updated = update_embeddings_batch(conn, 'embedding_google', pairs)
print(f"Generated {updated} Google embeddings (768 dimensions)")
return updated
if __name__ == "__main__":
from src.db import get_connection
conn = get_connection()
embed_google_batch(conn)
conn.close()
Step 2: Update minilm_embed.py
Same change - use raw text columns.
"""Local all-MiniLM-L6-v2 embeddings via sentence-transformers."""
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import numpy as np
# Model loaded lazily (22MB download on first use)
_model = None
def _get_model():
global _model
if _model is None:
print("Loading MiniLM model (first time may download)...")
_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
return _model
def embed_minilm(texts: list[str]) -> list[list[float]]:
"""
Embed texts using local MiniLM model.
Args:
texts: List of texts to embed
Returns:
List of 384-dimensional embedding vectors
"""
model = _get_model()
embeddings = model.encode(
texts,
batch_size=128,
show_progress_bar=False,
normalize_embeddings=True, # For cosine similarity
)
return embeddings.tolist()
def embed_minilm_batch(
conn,
batch_size: int = 500, # Larger batches OK for local model
) -> int:
"""
Generate MiniLM embeddings for all line items.
Args:
conn: psycopg connection
batch_size: Items per database write batch
Returns:
Number of items embedded
"""
from .text_prep import prepare_embedding_text
from .batch_processor import update_embeddings_batch
# Fetch all items needing embeddings - use raw text columns
with conn.cursor() as cur:
cur.execute("""
SELECT id, supplier_name, description
FROM line_item
WHERE embedding_minilm IS NULL
ORDER BY id
""")
rows = cur.fetchall()
if not rows:
print("All items already have MiniLM embeddings")
return 0
# Prepare texts using raw values
ids = [r[0] for r in rows]
texts = [prepare_embedding_text(r[1], r[2]) for r in rows]
# Generate embeddings - local model is fast, do all at once
print(f"Generating {len(texts)} MiniLM embeddings...")
model = _get_model()
embeddings = model.encode(
texts,
batch_size=128,
show_progress_bar=True,
normalize_embeddings=True,
)
embeddings = embeddings.tolist()
# Write to database in batches
total_updated = 0
for i in tqdm(range(0, len(ids), batch_size), desc="Writing to DB"):
batch_ids = ids[i:i + batch_size]
batch_embeddings = embeddings[i:i + batch_size]
pairs = list(zip(batch_ids, batch_embeddings))
updated = update_embeddings_batch(conn, 'embedding_minilm', pairs)
total_updated += updated
print(f"Generated {total_updated} MiniLM embeddings (384 dimensions)")
return total_updated
if __name__ == "__main__":
from src.db import get_connection
conn = get_connection()
embed_minilm_batch(conn)
conn.close()
Step 3: Commit
git add src/embeddings/google_embed.py src/embeddings/minilm_embed.py
git commit -m "refactor: use raw text columns in embedding scripts"
Files:
src/search/pgvector_search.pyStep 1: Remove Jina-related code
Remove embed_query_jina function and Jina references from search_all_models. Update ALLOWED_EMBEDDING_COLUMNS.
"""pgvector semantic search with query embedding functions for Google and MiniLM."""
import os
from pathlib import Path
from typing import Optional
import time
from dotenv import load_dotenv
from google import genai
from google.genai.types import EmbedContentConfig
from sentence_transformers import SentenceTransformer
# Load .env for API credentials
load_dotenv(Path(__file__).parent.parent.parent / '.env')
# Configure for Vertex AI
os.environ.setdefault('GOOGLE_GENAI_USE_VERTEXAI', 'True')
# Lazy-initialized clients/models
_google_client = None
_minilm_model = None
# Allowed embedding columns for validation
ALLOWED_EMBEDDING_COLUMNS = ['embedding_google', 'embedding_minilm']
def _get_google_client():
"""Get or initialize Google Vertex AI client."""
global _google_client
if _google_client is None:
project = os.environ.get('GOOGLE_CLOUD_PROJECT')
location = os.environ.get('GOOGLE_CLOUD_LOCATION', 'us-central1')
_google_client = genai.Client(
vertexai=True,
project=project,
location=location,
)
return _google_client
def _get_minilm_model():
"""Get or initialize MiniLM model."""
global _minilm_model
if _minilm_model is None:
print("Loading MiniLM model for query embedding...")
_minilm_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
return _minilm_model
def embed_query_google(query_text: str) -> list[float]:
"""
Embed query text using Google text-multilingual-embedding-002 via Vertex AI.
CRITICAL: Uses RETRIEVAL_QUERY task type (not RETRIEVAL_DOCUMENT) for optimal
retrieval accuracy when searching against document embeddings.
Args:
query_text: The search query text
Returns:
768-dimensional embedding vector
"""
client = _get_google_client()
response = client.models.embed_content(
model='text-multilingual-embedding-002',
contents=[query_text],
config=EmbedContentConfig(
task_type='RETRIEVAL_QUERY',
),
)
return response.embeddings[0].values
def embed_query_minilm(query_text: str) -> list[float]:
"""
Embed query text using local MiniLM model.
Note: MiniLM doesn't distinguish between query and document embeddings,
so we use the same encoding approach as for documents with normalization
for cosine similarity.
Args:
query_text: The search query text
Returns:
384-dimensional embedding vector (normalized)
"""
model = _get_minilm_model()
embedding = model.encode(
query_text,
normalize_embeddings=True,
)
return embedding.tolist()
def search_pgvector(
conn,
query_embedding: list[float],
embedding_column: str,
k: int = 5,
threshold: float = 0.0
) -> list[dict]:
"""
Search pgvector for top-K similar items using cosine distance.
Args:
conn: psycopg database connection
query_embedding: Query embedding vector
embedding_column: Column to search (embedding_google or embedding_minilm)
k: Maximum number of results to return
threshold: Minimum similarity threshold (0.0 to 1.0)
Returns:
List of dicts with item fields and similarity scores, ordered by similarity descending
"""
# Validate embedding column to prevent SQL injection
if embedding_column not in ALLOWED_EMBEDDING_COLUMNS:
raise ValueError(
f"Invalid embedding column: {embedding_column}. "
f"Must be one of: {ALLOWED_EMBEDDING_COLUMNS}"
)
with conn.cursor() as cur:
# Set HNSW search parameters for optimal performance
cur.execute("SET hnsw.ef_search = 40")
# Query with cosine distance, convert to similarity
# Cosine distance range is [0, 2], similarity = 1 - distance
query = f"""
SELECT
id,
supplier_name,
description,
debit_account,
credit_account,
cost_center,
net_amount,
1 - ({embedding_column} <=> %s::vector) AS similarity
FROM line_item
WHERE {embedding_column} IS NOT NULL
AND 1 - ({embedding_column} <=> %s::vector) >= %s
ORDER BY {embedding_column} <=> %s::vector
LIMIT %s
"""
cur.execute(query, (query_embedding, query_embedding, threshold, query_embedding, k))
columns = [
'id', 'supplier_name', 'description', 'debit_account',
'credit_account', 'cost_center', 'net_amount', 'similarity'
]
results = []
for row in cur.fetchall():
results.append(dict(zip(columns, row)))
return results
def search_single_model(
conn,
query_text: str,
model: str,
k: int = 5,
threshold: float = 0.0
) -> list[dict]:
"""
Search using a single embedding model.
Args:
conn: psycopg database connection
query_text: The search query text
model: Model name ('google' or 'minilm')
k: Number of results
threshold: Minimum similarity threshold
Returns:
List of result dicts with similarity scores
"""
embed_funcs = {
'google': embed_query_google,
'minilm': embed_query_minilm,
}
column_map = {
'google': 'embedding_google',
'minilm': 'embedding_minilm',
}
if model not in embed_funcs:
raise ValueError(f"Unknown model: {model}. Must be one of: {list(embed_funcs.keys())}")
embedding = embed_funcs[model](query_text)
return search_pgvector(conn, embedding, column_map[model], k, threshold)
def search_all_models(
conn,
query_text: str,
k: int = 5,
thresholds: Optional[dict] = None
) -> dict:
"""
Search using both embedding models and return combined results with timings.
Args:
conn: psycopg database connection
query_text: The search query text
k: Number of results per model
thresholds: Dict with model-specific thresholds, e.g., {'google': 0.6, 'minilm': 0.7}
Returns:
Dict with keys 'google', 'minilm' (each containing list of result dicts)
and '_timings' with per-model latency in milliseconds
"""
if thresholds is None:
thresholds = {'google': 0.0, 'minilm': 0.0}
results = {}
timings = {}
# Google embedding + search
start = time.perf_counter()
google_embedding = embed_query_google(query_text)
results['google'] = search_pgvector(
conn, google_embedding, 'embedding_google', k, thresholds.get('google', 0.0)
)
timings['google'] = (time.perf_counter() - start) * 1000
# MiniLM embedding + search
start = time.perf_counter()
minilm_embedding = embed_query_minilm(query_text)
results['minilm'] = search_pgvector(
conn, minilm_embedding, 'embedding_minilm', k, thresholds.get('minilm', 0.0)
)
timings['minilm'] = (time.perf_counter() - start) * 1000
results['_timings'] = timings
return results
Step 2: Commit
git add src/search/pgvector_search.py
git commit -m "refactor: remove Jina from pgvector_search.py, add threshold support"
Files:
src/search/__init__.pyStep 1: Remove Jina and llm_matching exports
"""Search module exports."""
from .pgvector_search import (
embed_query_google,
embed_query_minilm,
search_pgvector,
search_single_model,
search_all_models,
)
__all__ = [
'embed_query_google',
'embed_query_minilm',
'search_pgvector',
'search_single_model',
'search_all_models',
]
Step 2: Commit
git add src/search/__init__.py
git commit -m "refactor: remove Jina and llm_matching exports from search module"
Files:
src/search/curation.pyStep 1: Create the curation module
This module implements the clustering and deduplication logic.
"""Curation module for selecting diverse historical bookings for LLM context."""
from collections import defaultdict
from typing import Optional
from .pgvector_search import (
embed_query_google,
embed_query_minilm,
search_pgvector,
)
# Default configuration
DEFAULT_K = 10
DEFAULT_THRESHOLDS = {
'google': 0.6,
'minilm': 0.7,
}
DEDUP_SIMILARITY_THRESHOLD = 0.9
def curate_bookings_for_invoice(
conn,
supplier_name: str,
line_items: list[str],
model: str = 'minilm',
k: int = DEFAULT_K,
threshold: Optional[float] = None,
) -> dict:
"""
Curate historical bookings for an invoice with N line items.
Algorithm:
1. For each line item, semantic search top-K with threshold
2. Merge all results into one pool
3. Cluster by (debit_account, cost_center)
4. Deduplicate within clusters (similarity > 0.9 = duplicate)
5. Return diverse representatives per cluster
Args:
conn: psycopg database connection
supplier_name: Invoice supplier name
line_items: List of line item descriptions
model: 'google' or 'minilm'
k: Max results per line item search
threshold: Similarity threshold (uses model default if None)
Returns:
Dict with:
- 'clusters': List of cluster dicts, each with:
- 'debit_account': str
- 'cost_center': str or None
- 'total_matches': int (original count before dedup)
- 'items': List of diverse representative items
- 'metadata': Dict with search stats
"""
if threshold is None:
threshold = DEFAULT_THRESHOLDS.get(model, 0.6)
# Select embedding function and column based on model
if model == 'google':
embed_func = embed_query_google
embedding_column = 'embedding_google'
else:
embed_func = embed_query_minilm
embedding_column = 'embedding_minilm'
# Step 1: Per-line-item semantic search
all_results = []
for line_item in line_items:
query = f"{supplier_name} | {line_item}"
query_embedding = embed_func(query)
results = search_pgvector(conn, query_embedding, embedding_column, k, threshold)
# Tag each result with source line item
for r in results:
r['_source_line_item'] = line_item
all_results.extend(results)
# Step 2: Merge and deduplicate by ID (same item might match multiple line items)
seen_ids = set()
merged = []
for r in all_results:
if r['id'] not in seen_ids:
seen_ids.add(r['id'])
merged.append(r)
# Step 3: Cluster by (debit_account, cost_center)
clusters = defaultdict(list)
for r in merged:
key = (r['debit_account'], r.get('cost_center'))
clusters[key].append(r)
# Step 4: Deduplicate within clusters
output_clusters = []
for (debit_account, cost_center), items in clusters.items():
total_matches = len(items)
diverse_items = _deduplicate_cluster(conn, items, model)
output_clusters.append({
'debit_account': debit_account,
'cost_center': cost_center,
'total_matches': total_matches,
'items': diverse_items,
})
# Sort clusters by total_matches descending (most frequent patterns first)
output_clusters.sort(key=lambda c: c['total_matches'], reverse=True)
return {
'clusters': output_clusters,
'metadata': {
'model': model,
'k': k,
'threshold': threshold,
'line_items_count': len(line_items),
'total_raw_matches': len(all_results),
'unique_matches': len(merged),
'clusters_count': len(output_clusters),
},
}
def _deduplicate_cluster(conn, items: list[dict], model: str) -> list[dict]:
"""
Remove near-duplicate items within a cluster.
Items with description similarity > DEDUP_SIMILARITY_THRESHOLD are considered
duplicates. Keep the one with highest original search similarity.
Args:
conn: psycopg database connection
items: List of result dicts from same cluster
model: 'google' or 'minilm' (for embedding descriptions)
Returns:
Deduplicated list of items
"""
if len(items) <= 1:
return items
# Get embeddings for all descriptions
if model == 'google':
embed_func = embed_query_google
else:
embed_func = embed_query_minilm
descriptions = [item['description'] for item in items]
# Embed all descriptions
embeddings = []
for desc in descriptions:
embeddings.append(embed_func(desc))
# Compute pairwise similarities and mark duplicates
n = len(items)
is_duplicate = [False] * n
for i in range(n):
if is_duplicate[i]:
continue
for j in range(i + 1, n):
if is_duplicate[j]:
continue
sim = _cosine_similarity(embeddings[i], embeddings[j])
if sim > DEDUP_SIMILARITY_THRESHOLD:
# Keep the one with higher original similarity
if items[i]['similarity'] >= items[j]['similarity']:
is_duplicate[j] = True
else:
is_duplicate[i] = True
break # i is now duplicate, no need to check more
# Return non-duplicates
return [item for item, is_dup in zip(items, is_duplicate) if not is_dup]
def _cosine_similarity(vec1: list[float], vec2: list[float]) -> float:
"""Compute cosine similarity between two vectors."""
dot = sum(a * b for a, b in zip(vec1, vec2))
norm1 = sum(a * a for a in vec1) ** 0.5
norm2 = sum(b * b for b in vec2) ** 0.5
if norm1 == 0 or norm2 == 0:
return 0.0
return dot / (norm1 * norm2)
def curated_bookings_to_csv(curated_result: dict) -> str:
"""
Convert curated bookings to CSV format for LLM context.
Args:
curated_result: Output from curate_bookings_for_invoice
Returns:
CSV string with header and data rows
"""
lines = ["supplier_name,description,net_amount,debit_account,cost_center,cluster_count"]
for cluster in curated_result['clusters']:
for item in cluster['items']:
supplier = _csv_escape(item.get('supplier_name', ''))
description = _csv_escape(item.get('description', ''))
amount = item.get('net_amount', '')
debit = item.get('debit_account', '')
cost_center = item.get('cost_center', '') or ''
cluster_count = cluster['total_matches']
lines.append(f"{supplier},{description},{amount},{debit},{cost_center},{cluster_count}")
return '\n'.join(lines)
def _csv_escape(value: str) -> str:
"""Escape a value for CSV output."""
if not value:
return ''
value = str(value)
if ',' in value or '"' in value or '\n' in value:
return '"' + value.replace('"', '""') + '"'
return value
Step 2: Commit
git add src/search/curation.py
git commit -m "feat: add curation module for clustering and deduplication"
Files:
src/search/__init__.pyStep 1: Add curation exports
"""Search module exports."""
from .pgvector_search import (
embed_query_google,
embed_query_minilm,
search_pgvector,
search_single_model,
search_all_models,
)
from .curation import (
curate_bookings_for_invoice,
curated_bookings_to_csv,
DEFAULT_K,
DEFAULT_THRESHOLDS,
)
__all__ = [
'embed_query_google',
'embed_query_minilm',
'search_pgvector',
'search_single_model',
'search_all_models',
'curate_bookings_for_invoice',
'curated_bookings_to_csv',
'DEFAULT_K',
'DEFAULT_THRESHOLDS',
]
Step 2: Commit
git add src/search/__init__.py
git commit -m "feat: export curation functions from search module"
Files:
src/app.pyStep 1: Replace the search endpoint to use curation
This is a significant rewrite. The new / route accepts a query (supplier | description), runs curation with both models, and displays side-by-side curated CSV results.
"""Flask web application for semantic search curation."""
import csv
import html
import io
import os
import tempfile
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import asdict
from datetime import datetime
from flask import Flask, render_template, request, send_file, Response
from src.db import get_connection
from src.search import search_all_models
from src.search.curation import (
curate_bookings_for_invoice,
curated_bookings_to_csv,
DEFAULT_K,
DEFAULT_THRESHOLDS,
)
from src.evaluation.benchmark import run_benchmark, fetch_test_queries, run_benchmark_with_raw_results
from src.reporting.report_generator import generate_report
app = Flask(__name__, template_folder='templates')
@app.route('/', methods=['GET', 'POST'])
def search():
"""Handle search form and display curated results from both models."""
results = None
query = ''
k = DEFAULT_K
threshold_google = DEFAULT_THRESHOLDS['google']
threshold_minilm = DEFAULT_THRESHOLDS['minilm']
error = None
if request.method == 'POST':
query = request.form.get('query', '').strip()
k = int(request.form.get('k', DEFAULT_K))
threshold_google = float(request.form.get('threshold_google', DEFAULT_THRESHOLDS['google']))
threshold_minilm = float(request.form.get('threshold_minilm', DEFAULT_THRESHOLDS['minilm']))
if query:
conn = get_connection()
try:
# Parse query: "supplier | description"
parts = query.split('|', 1)
if len(parts) == 2:
supplier_name = parts[0].strip()
line_items = [parts[1].strip()]
else:
supplier_name = query.strip()
line_items = [query.strip()]
results = {}
# Run curation for both models
for model, threshold in [('google', threshold_google), ('minilm', threshold_minilm)]:
start = time.perf_counter()
curated = curate_bookings_for_invoice(
conn,
supplier_name,
line_items,
model=model,
k=k,
threshold=threshold,
)
elapsed_ms = (time.perf_counter() - start) * 1000
curated['csv'] = curated_bookings_to_csv(curated)
curated['timing_ms'] = elapsed_ms
results[model] = curated
except Exception as e:
error = str(e)
finally:
conn.close()
return render_template(
'curation.html',
query=query,
k=k,
threshold_google=threshold_google,
threshold_minilm=threshold_minilm,
results=results,
error=error,
)
@app.route('/benchmark', methods=['GET', 'POST'])
def benchmark():
"""Handle benchmark form and display results."""
results = None
error = None
limit = 20
k = 5
total_queries = 0
# Get total available queries count
try:
conn = get_connection()
test_queries = fetch_test_queries(conn)
total_queries = len(test_queries)
conn.close()
except Exception as e:
error = f"Failed to fetch test queries: {e}"
if request.method == 'POST':
limit = int(request.form.get('limit', 20))
k = int(request.form.get('k', 5))
try:
conn = get_connection()
benchmark_results = run_benchmark(conn, k=k, limit=limit)
conn.close()
# Convert dataclass results to dicts for template
results = {
model: asdict(data)
for model, data in benchmark_results.items()
}
except Exception as e:
error = str(e)
return render_template(
'benchmark.html',
results=results,
error=error,
limit=limit,
k=k,
total_queries=total_queries
)
@app.route('/benchmark/quick')
def benchmark_quick():
"""Run quick benchmark with limit=20."""
error = None
results = None
k = 5
try:
conn = get_connection()
benchmark_results = run_benchmark(conn, k=k, limit=20)
test_queries = fetch_test_queries(conn)
total_queries = len(test_queries)
conn.close()
results = {
model: asdict(data)
for model, data in benchmark_results.items()
}
except Exception as e:
error = str(e)
total_queries = 0
return render_template(
'benchmark.html',
results=results,
error=error,
limit=20,
k=k,
total_queries=total_queries
)
@app.route('/report/export', methods=['POST'])
def export_report():
"""Generate and download static HTML report."""
# Get parameters from form
limit = int(request.form.get('limit', 100))
k = int(request.form.get('k', 5))
try:
conn = get_connection()
# Run benchmark with raw results for report
benchmark_results, raw_results = run_benchmark_with_raw_results(
conn, k=k, limit=limit
)
conn.close()
if not benchmark_results:
return "No benchmark results available", 400
# Convert BenchmarkResults to dicts for template
results_dict = {
model: asdict(data)
for model, data in benchmark_results.items()
}
# Generate report to tempfile
timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
filename = f'semantic_search_report_{timestamp}.html'
# Use tempfile with a known suffix for cleanup
fd, temp_path = tempfile.mkstemp(suffix='.html')
os.close(fd)
try:
generate_report(results_dict, raw_results, temp_path)
# Return file as download
return send_file(
temp_path,
mimetype='text/html',
as_attachment=True,
download_name=filename
)
finally:
pass
except Exception as e:
return f"Error generating report: {e}", 500
@app.route('/batch-report', methods=['GET', 'POST'])
def batch_report():
"""Handle batch report form and generate streaming HTML report with curation."""
if request.method == 'GET':
return render_template('batch_report_curation.html')
# POST: Process CSV and stream results
if 'csv_file' not in request.files:
return render_template('batch_report_curation.html', error='No file uploaded')
csv_file = request.files['csv_file']
if csv_file.filename == '':
return render_template('batch_report_curation.html', error='No file selected')
try:
# Get parameters
k = int(request.form.get('k', DEFAULT_K))
threshold_google = float(request.form.get('threshold_google', DEFAULT_THRESHOLDS['google']))
threshold_minilm = float(request.form.get('threshold_minilm', DEFAULT_THRESHOLDS['minilm']))
# Parse CSV
content = csv_file.read().decode('utf-8')
reader = csv.DictReader(io.StringIO(content))
# Validate headers
if not reader.fieldnames or 'supplier_name' not in reader.fieldnames or 'line_item_description' not in reader.fieldnames:
return render_template(
'batch_report_curation.html',
error='Invalid CSV format. Required headers: supplier_name, line_item_description'
)
# Group line items by supplier
suppliers = defaultdict(list)
for row in reader:
supplier = row.get('supplier_name', '').strip()
description = row.get('line_item_description', '').strip()
if supplier and description:
suppliers[supplier].append(description)
if not suppliers:
return render_template('batch_report_curation.html', error='No items to process')
# Return streaming response
return Response(
generate_batch_curation_report(suppliers, k, threshold_google, threshold_minilm),
mimetype='text/html',
headers={'X-Accel-Buffering': 'no'}
)
except UnicodeDecodeError:
return render_template('batch_report_curation.html', error='Invalid file encoding. Please use UTF-8.')
except csv.Error as e:
return render_template('batch_report_curation.html', error=f'CSV parsing error: {e}')
except Exception as e:
return render_template('batch_report_curation.html', error=f'Error processing file: {e}')
def generate_batch_curation_report(suppliers: dict, k: int, threshold_google: float, threshold_minilm: float):
"""Generator that yields HTML chunks for streaming batch curation report."""
# HTML header
yield '''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Batch Curation Results - Semantic Search</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background-color: #f5f5f5; }
.supplier-section { margin-bottom: 30px; }
.supplier-header { background-color: #343a40; color: white; padding: 15px 20px; border-radius: 8px 8px 0 0; }
.model-results { display: flex; gap: 20px; padding: 15px; background: white; border: 1px solid #dee2e6; }
.model-column { flex: 1; }
.model-title { font-weight: bold; margin-bottom: 10px; }
.csv-output { font-family: monospace; font-size: 0.8rem; white-space: pre-wrap; background: #f8f9fa; padding: 10px; border-radius: 4px; max-height: 400px; overflow-y: auto; }
.cluster-badge { display: inline-block; padding: 2px 8px; margin: 2px; background: #e9ecef; border-radius: 4px; font-size: 0.8rem; }
.metadata { font-size: 0.8rem; color: #666; margin-top: 10px; }
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">Batch Curation Results</h1>
<div class="mb-3">
<a href="/batch-report" class="btn btn-outline-secondary">← Upload Another File</a>
</div>
'''
conn = get_connection()
try:
for supplier_name, line_items in suppliers.items():
supplier_escaped = html.escape(supplier_name)
yield f'''
<div class="supplier-section">
<div class="supplier-header">
<strong>{supplier_escaped}</strong> ({len(line_items)} line items)
</div>
<div class="model-results">
'''
# Run curation for both models
for model, threshold in [('google', threshold_google), ('minilm', threshold_minilm)]:
try:
curated = curate_bookings_for_invoice(
conn,
supplier_name,
line_items,
model=model,
k=k,
threshold=threshold,
)
csv_output = curated_bookings_to_csv(curated)
meta = curated['metadata']
# Cluster summary badges
cluster_badges = ''.join([
f'<span class="cluster-badge">{c["debit_account"]}/{c["cost_center"] or "-"} ({c["total_matches"]})</span>'
for c in curated['clusters'][:5] # Show top 5
])
if len(curated['clusters']) > 5:
cluster_badges += f'<span class="cluster-badge">+{len(curated["clusters"]) - 5} more</span>'
yield f'''
<div class="model-column">
<div class="model-title">{model.upper()} (threshold: {threshold})</div>
<div>{cluster_badges}</div>
<div class="csv-output">{html.escape(csv_output)}</div>
<div class="metadata">
Raw matches: {meta['total_raw_matches']} |
Unique: {meta['unique_matches']} |
Clusters: {meta['clusters_count']}
</div>
</div>
'''
except Exception as e:
yield f'''
<div class="model-column">
<div class="model-title">{model.upper()}</div>
<div class="text-danger">Error: {html.escape(str(e))}</div>
</div>
'''
yield '''
</div>
</div>
'''
finally:
conn.close()
yield '''
</div>
</body>
</html>
'''
if __name__ == '__main__':
app.run(debug=True, port=5000)
Step 2: Commit
git add src/app.py
git commit -m "feat: update app.py with curation-based search UI"
Files:
src/templates/curation.htmlStep 1: Create the template
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Semantic Search Curation</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background-color: #f5f5f5; }
.nav-pills .nav-link { color: #0066cc; }
.nav-pills .nav-link.active { background-color: #0066cc; }
.model-results { display: flex; gap: 20px; margin-top: 20px; }
.model-column { flex: 1; background: white; border-radius: 8px; padding: 20px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); }
.model-title { font-weight: bold; font-size: 1.2rem; margin-bottom: 15px; padding-bottom: 10px; border-bottom: 2px solid #dee2e6; }
.csv-output { font-family: monospace; font-size: 0.85rem; white-space: pre-wrap; background: #f8f9fa; padding: 15px; border-radius: 4px; max-height: 500px; overflow-y: auto; }
.cluster-badge { display: inline-block; padding: 4px 10px; margin: 3px; background: #e9ecef; border-radius: 4px; font-size: 0.85rem; }
.cluster-badge.high { background: #d4edda; color: #155724; }
.metadata { font-size: 0.85rem; color: #666; margin-top: 15px; padding-top: 10px; border-top: 1px solid #dee2e6; }
.timing { font-size: 0.8rem; color: #999; }
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">Semantic Search Curation</h1>
<!-- Navigation -->
<ul class="nav nav-pills mb-4">
<li class="nav-item">
<a class="nav-link active" href="/">Search</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/benchmark">Benchmark</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/batch-report">Batch Report</a>
</li>
</ul>
<!-- Search Form -->
<div class="card mb-4">
<div class="card-body">
<form method="POST">
<div class="mb-3">
<label for="query" class="form-label">Query (format: supplier | description)</label>
<input type="text" class="form-control" id="query" name="query"
value="{{ query }}" placeholder="Bechtle GmbH | Adobe Acrobat subscription">
</div>
<div class="row mb-3">
<div class="col-md-4">
<label for="k" class="form-label">Max results per search (K)</label>
<input type="number" class="form-control" id="k" name="k" value="{{ k }}" min="1" max="50">
</div>
<div class="col-md-4">
<label for="threshold_google" class="form-label">Google threshold</label>
<input type="number" class="form-control" id="threshold_google" name="threshold_google"
value="{{ threshold_google }}" min="0" max="1" step="0.05">
</div>
<div class="col-md-4">
<label for="threshold_minilm" class="form-label">MiniLM threshold</label>
<input type="number" class="form-control" id="threshold_minilm" name="threshold_minilm"
value="{{ threshold_minilm }}" min="0" max="1" step="0.05">
</div>
</div>
<button type="submit" class="btn btn-primary">Search & Curate</button>
</form>
</div>
</div>
{% if error %}
<div class="alert alert-danger">{{ error }}</div>
{% endif %}
{% if results %}
<div class="model-results">
{% for model in ['google', 'minilm'] %}
{% set data = results[model] %}
<div class="model-column">
<div class="model-title">
{{ model.upper() }}
<span class="timing">({{ "%.1f"|format(data.timing_ms) }}ms)</span>
</div>
<!-- Cluster badges -->
<div class="mb-3">
{% for cluster in data.clusters[:8] %}
<span class="cluster-badge {% if cluster.total_matches >= 5 %}high{% endif %}">
{{ cluster.debit_account }}/{{ cluster.cost_center or '-' }}
({{ cluster.total_matches }})
</span>
{% endfor %}
{% if data.clusters|length > 8 %}
<span class="cluster-badge">+{{ data.clusters|length - 8 }} more</span>
{% endif %}
</div>
<!-- CSV output -->
<div class="csv-output">{{ data.csv }}</div>
<!-- Metadata -->
<div class="metadata">
<strong>Stats:</strong>
Raw matches: {{ data.metadata.total_raw_matches }} |
Unique: {{ data.metadata.unique_matches }} |
Clusters: {{ data.metadata.clusters_count }} |
Items in CSV: {{ data.clusters | sum(attribute='items') | length if data.clusters else 0 }}
</div>
</div>
{% endfor %}
</div>
{% endif %}
</div>
</body>
</html>
Step 2: Commit
git add src/templates/curation.html
git commit -m "feat: add curation.html template for search UI"
Files:
src/templates/batch_report_curation.htmlStep 1: Create the template
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Batch Curation Report - Semantic Search</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background-color: #f5f5f5; }
.nav-pills .nav-link { color: #0066cc; }
.nav-pills .nav-link.active { background-color: #0066cc; }
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">Batch Curation Report</h1>
<!-- Navigation -->
<ul class="nav nav-pills mb-4">
<li class="nav-item">
<a class="nav-link" href="/">Search</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/benchmark">Benchmark</a>
</li>
<li class="nav-item">
<a class="nav-link active" href="/batch-report">Batch Report</a>
</li>
</ul>
{% if error %}
<div class="alert alert-danger">{{ error }}</div>
{% endif %}
<div class="card">
<div class="card-body">
<h5 class="card-title">Upload CSV for Batch Processing</h5>
<p class="text-muted">
Upload a CSV file with columns: <code>supplier_name</code>, <code>line_item_description</code>
</p>
<form method="POST" enctype="multipart/form-data">
<div class="mb-3">
<label for="csv_file" class="form-label">CSV File</label>
<input type="file" class="form-control" id="csv_file" name="csv_file" accept=".csv" required>
</div>
<div class="row mb-3">
<div class="col-md-4">
<label for="k" class="form-label">Max results per search (K)</label>
<input type="number" class="form-control" id="k" name="k" value="10" min="1" max="50">
</div>
<div class="col-md-4">
<label for="threshold_google" class="form-label">Google threshold</label>
<input type="number" class="form-control" id="threshold_google" name="threshold_google"
value="0.6" min="0" max="1" step="0.05">
</div>
<div class="col-md-4">
<label for="threshold_minilm" class="form-label">MiniLM threshold</label>
<input type="number" class="form-control" id="threshold_minilm" name="threshold_minilm"
value="0.7" min="0" max="1" step="0.05">
</div>
</div>
<button type="submit" class="btn btn-primary">Process & Generate Report</button>
</form>
</div>
</div>
<div class="card mt-4">
<div class="card-body">
<h6>Expected CSV Format</h6>
<pre class="bg-light p-3 rounded">supplier_name,line_item_description
Bechtle GmbH,Adobe Acrobat subscription
Bechtle GmbH,Microsoft 365 license
BearingPoint GmbH,Managed Platforms</pre>
</div>
</div>
</div>
</body>
</html>
Step 2: Commit
git add src/templates/batch_report_curation.html
git commit -m "feat: add batch_report_curation.html template"
Step 1: Stop any existing database connections
# If you have a running app, stop it first
Step 2: Recreate the database
# Connect to postgres and recreate the database
psql -h localhost -p 5433 -U dev -d postgres -c "DROP DATABASE IF EXISTS semantic_search;"
psql -h localhost -p 5433 -U dev -d postgres -c "CREATE DATABASE semantic_search;"
psql -h localhost -p 5433 -U dev -d semantic_search -f init.sql
Step 3: Re-import historical data
cd /home/volrath/code/worktrees-orcha-semantic-search/spikes/semantic-search
python src/import_csv.py historical.csv
Expected: "Imported: ~6000" rows
Step 4: Re-embed with Google
python -m src.embeddings.google_embed
Expected: "Generated ~6000 Google embeddings"
Step 5: Re-embed with MiniLM
python -m src.embeddings.minilm_embed
Expected: "Generated ~6000 MiniLM embeddings"
Step 6: Create HNSW indexes
psql -h localhost -p 5433 -U dev -d semantic_search -c "CREATE INDEX idx_embedding_google_hnsw ON line_item USING hnsw (embedding_google vector_cosine_ops) WITH (m = 16, ef_construction = 64);"
psql -h localhost -p 5433 -U dev -d semantic_search -c "CREATE INDEX idx_embedding_minilm_hnsw ON line_item USING hnsw (embedding_minilm vector_cosine_ops) WITH (m = 16, ef_construction = 64);"
Step 7: Verify
psql -h localhost -p 5433 -U dev -d semantic_search -c "SELECT COUNT(*) FROM line_item WHERE embedding_google IS NOT NULL AND embedding_minilm IS NOT NULL;"
Expected: ~6000
Step 1: Run the Flask app
cd /home/volrath/code/worktrees-orcha-semantic-search/spikes/semantic-search
python -m src.app
Step 2: Test single search
Open http://localhost:5000 and enter:
Bechtle GmbH | Adobe Acrobat subscriptionVerify: Both models return curated CSV results with clusters.
Step 3: Test batch report
regnology.csvStep 4: Commit any remaining changes
git status
# If any uncommitted changes, commit them
After completing all tasks:
The spike now demonstrates semantic search-based curation of historical bookings for LLM context, with side-by-side comparison of Google and MiniLM models.