Purpose: Provides the Python tooling for embedding generation and creates the dataset for semantic search comparison.
Output: Python environment ready for development, ~6K line items imported into line_item table with normalized fields.
<execution_context> @./.claude/get-shit-done/workflows/execute-plan.md @./.claude/get-shit-done/templates/summary.md </execution_context>
# Initialize project (creates pyproject.toml)
uv init --name semantic-search-spike --python 3.12
# Create virtual environment
uv venv
# Add Phase 1 dependencies
uv add psycopg pgvector pandas
# Add Phase 2+ dependencies (install now to satisfy INFRA-02)
uv add google-genai sentence-transformers ragas
The pyproject.toml should include:
Create src/ directory structure:
mkdir -p src
touch src/__init__.py
"""Text normalization functions ported from Orcha's util.text namespace."""
import re
UMLAUT_MAP = {
'ä': 'ae', 'ö': 'oe', 'ü': 'ue', 'ß': 'ss',
'Ä': 'Ae', 'Ö': 'Oe', 'Ü': 'Ue',
}
ACCENT_MAP = {
'à': 'a', 'á': 'a', 'â': 'a', 'ã': 'a', 'å': 'a',
'è': 'e', 'é': 'e', 'ê': 'e', 'ë': 'e',
'ì': 'i', 'í': 'i', 'î': 'i', 'ï': 'i',
'ò': 'o', 'ó': 'o', 'ô': 'o', 'õ': 'o', 'ø': 'o',
'ù': 'u', 'ú': 'u', 'û': 'u',
'ñ': 'n', 'ç': 'c',
}
PUNCTUATION_PATTERN = re.compile(r'[.,;:!?"\'`\-_/\\@#$%^&*()=+\[\]{}|<>]')
COMPANY_SUFFIX_PATTERN = re.compile(
r'(?i)\s*(gmbh|ag|kg|ohg|ug|se|gbr|co|inc|ltd|llc|plc|sa|srl|bv|nv|ek)\s*$'
)
CO_PARTNER_PATTERN = re.compile(r'(?i)\s*&\s*(co\.?|partner|söhne|sohn)\s*')
THE_PREFIX_PATTERN = re.compile(r'(?i)^the\s+')
def normalize_text(s: str | None) -> str | None:
"""Normalize text for fuzzy matching."""
if s is None:
return None
result = s.lower()
for char, replacement in UMLAUT_MAP.items():
result = result.replace(char.lower(), replacement.lower())
for char, replacement in ACCENT_MAP.items():
result = result.replace(char, replacement)
result = PUNCTUATION_PATTERN.sub('', result)
result = re.sub(r'\s+', ' ', result)
return result.strip()
def normalize_supplier_name(s: str | None) -> str | None:
"""Normalize supplier name with company suffix stripping."""
if s is None:
return None
result = s.lower()
# Strip & Co patterns before punctuation removal
result = CO_PARTNER_PATTERN.sub(' ', result)
# Apply standard normalization
for char, replacement in UMLAUT_MAP.items():
result = result.replace(char.lower(), replacement.lower())
for char, replacement in ACCENT_MAP.items():
result = result.replace(char, replacement)
result = PUNCTUATION_PATTERN.sub('', result)
# Strip company suffixes after punctuation removal
result = COMPANY_SUFFIX_PATTERN.sub('', result)
# Strip "the" prefix
result = THE_PREFIX_PATTERN.sub('', result)
# Collapse spaces and trim
result = re.sub(r'\s+', ' ', result)
return result.strip()
This exactly ports Orcha's normalize-supplier-name logic per user decision.
"""Database connection utilities with pgvector support."""
import psycopg
from pgvector.psycopg import register_vector
def get_connection(autocommit: bool = True) -> psycopg.Connection:
"""Get database connection with pgvector support."""
conn = psycopg.connect(
host='localhost',
port=5433,
dbname='semantic_search',
user='dev',
password='dev',
autocommit=autocommit,
)
register_vector(conn)
return conn
Create src/import_csv.py with CSV import logic:
"""Import historical.csv into line_item table using COPY protocol."""
import sys
from io import StringIO
import csv
import pandas as pd
from db import get_connection
from normalize import normalize_supplier_name, normalize_text
def import_historical_csv(csv_path: str) -> dict:
"""Import historical.csv into line_item table.
Returns dict with statistics: total_rows, skipped_rows, imported_rows
"""
# Load CSV with proper encoding for German text
df = pd.read_csv(csv_path, encoding='utf-8-sig')
total_rows = len(df)
# Map CSV columns to schema (adjust if actual headers differ)
column_mapping = {
'Supplier Name': 'supplier_name',
'Line Item Description': 'description',
'Debit Account': 'debit_account',
'Credit Account': 'credit_account',
'Cost Center': 'cost_center',
'Net Amount': 'net_amount',
}
# Check for expected columns, try alternatives if not found
for expected, target in list(column_mapping.items()):
if expected not in df.columns:
# Try lowercase or variations
alternatives = [c for c in df.columns if c.lower().replace('_', ' ') == expected.lower()]
if alternatives:
column_mapping[alternatives[0]] = target
del column_mapping[expected]
df = df.rename(columns=column_mapping)
# Skip rows with missing required fields (per user decision)
required = ['supplier_name', 'description', 'debit_account']
before_filter = len(df)
df = df.dropna(subset=required)
for col in required:
df = df[df[col].astype(str).str.strip() != '']
skipped_rows = before_filter - len(df)
# Add normalized columns
df['supplier_name_normalized'] = df['supplier_name'].apply(normalize_supplier_name)
df['description_normalized'] = df['description'].apply(normalize_text)
# Prepare CSV buffer for COPY protocol
buffer = StringIO()
writer = csv.writer(buffer)
for _, row in df.iterrows():
writer.writerow([
row['supplier_name'],
row['description'],
row['debit_account'],
row.get('credit_account') if pd.notna(row.get('credit_account')) else None,
row.get('cost_center') if pd.notna(row.get('cost_center')) else None,
row.get('net_amount') if pd.notna(row.get('net_amount')) else None,
row['supplier_name_normalized'],
row['description_normalized'],
])
buffer.seek(0)
# Bulk import using COPY (10-100x faster than INSERT)
conn = get_connection(autocommit=False)
try:
with conn.cursor() as cur:
with cur.copy("""
COPY line_item (
supplier_name, description, debit_account,
credit_account, cost_center, net_amount,
supplier_name_normalized, description_normalized
) FROM STDIN WITH CSV
""") as copy:
while data := buffer.read(8192):
copy.write(data)
conn.commit()
imported_rows = len(df)
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
return {
'total_rows': total_rows,
'skipped_rows': skipped_rows,
'imported_rows': imported_rows,
}
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python import_csv.py <path_to_csv>")
sys.exit(1)
csv_path = sys.argv[1]
print(f"Importing from: {csv_path}")
stats = import_historical_csv(csv_path)
print(f"Total rows in CSV: {stats['total_rows']}")
print(f"Skipped (missing required fields): {stats['skipped_rows']}")
print(f"Imported: {stats['imported_rows']}")
Note: Uses COPY protocol for efficiency (10-100x faster than row-by-row INSERT per research).
Start database (should already be running from Plan 01, but ensure):
docker compose up -d
# Wait for healthy
sleep 5
docker compose ps
Run import script:
cd src
uv run python import_csv.py ../orcha/dump/regnology/historical.csv
Verify import:
docker compose exec db psql -U dev -d semantic_search -c "SELECT COUNT(*) FROM line_item;"
docker compose exec db psql -U dev -d semantic_search -c "SELECT supplier_name, description, debit_account, cost_center FROM line_item LIMIT 5;"
docker compose exec db psql -U dev -d semantic_search -c "SELECT supplier_name_normalized, description_normalized FROM line_item LIMIT 3;"
Expected: ~6K rows imported (exact count depends on rows skipped for missing fields).
If CSV path differs, adjust path in import command.
<success_criteria>