Purpose: INFRA-04 and INFRA-05 require pre-computed embeddings from these two API-based models. This plan creates the embedding infrastructure and populates the embedding columns for all 6078 line items.
Output: Embedding modules for both APIs, all line items have Google (768-dim) and Jina (1024-dim) embeddings stored.
<execution_context> @./.claude/get-shit-done/workflows/execute-plan.md @./.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/phases/02-embedding-generation/02-01-SUMMARY.md
@src/db.py @src/normalize.py @init.sql
src/embeddings/__init__.py:"""Embedding generation for semantic search comparison."""
from .text_prep import prepare_embedding_text
from .batch_processor import batch_embed_with_progress, update_embeddings_batch
__all__ = [
"prepare_embedding_text",
"batch_embed_with_progress",
"update_embeddings_batch",
]
src/embeddings/text_prep.py:"""Prepare text for embedding - consistent across all models."""
def prepare_embedding_text(supplier_name: str, description: str) -> str:
"""
Combine supplier name and description for embedding.
Uses normalized versions for consistent matching.
Simple concatenation with separator - embedding models
handle context internally.
Args:
supplier_name: Normalized supplier name
description: Normalized description
Returns:
Combined text ready for embedding
"""
return f"{supplier_name} | {description}"
src/embeddings/batch_processor.py:"""Generic batch processing utilities for embeddings."""
import time
from typing import Callable
from tqdm import tqdm
import numpy as np
from pgvector.psycopg import register_vector
def batch_embed_with_progress(
texts: list[str],
embed_fn: Callable[[list[str]], list[list[float]]],
batch_size: int = 100,
delay_between_batches: float = 0.1,
desc: str = "Embedding"
) -> list[list[float]]:
"""
Process embeddings in batches with progress bar.
Args:
texts: List of texts to embed
embed_fn: Function that takes list of texts and returns embeddings
batch_size: Number of texts per batch
delay_between_batches: Seconds to wait between batches (rate limiting)
desc: Progress bar description
Returns:
List of embedding vectors
"""
all_embeddings = []
for i in tqdm(range(0, len(texts), batch_size), desc=desc):
batch = texts[i:i + batch_size]
embeddings = embed_fn(batch)
all_embeddings.extend(embeddings)
if i + batch_size < len(texts): # Don't delay after last batch
time.sleep(delay_between_batches)
return all_embeddings
def update_embeddings_batch(
conn,
column_name: str,
id_embedding_pairs: list[tuple[int, list[float]]]
) -> int:
"""
Batch update embedding column for multiple rows.
Args:
conn: psycopg connection
column_name: Name of embedding column (embedding_google, embedding_jina, embedding_minilm)
id_embedding_pairs: List of (id, embedding_vector) tuples
Returns:
Number of rows updated
"""
register_vector(conn)
with conn.cursor() as cur:
cur.executemany(
f"UPDATE line_item SET {column_name} = %s WHERE id = %s",
[(np.array(emb), id_) for id_, emb in id_embedding_pairs]
)
conn.commit()
return len(id_embedding_pairs)
Add tqdm to dependencies if not already present:
uv add tqdm
src/embeddings/google_embed.py:"""Google text-multilingual-embedding-002 embeddings via Vertex AI."""
import os
from google import genai
from google.genai.types import EmbedContentConfig
# Configure for Vertex AI
os.environ.setdefault('GOOGLE_GENAI_USE_VERTEXAI', 'True')
# Client initialized lazily
_client = None
def _get_client():
global _client
if _client is None:
_client = genai.Client()
return _client
def embed_google(texts: list[str]) -> list[list[float]]:
"""
Embed texts using text-multilingual-embedding-002 via Vertex AI.
Args:
texts: List of texts to embed (max 250 per call)
Returns:
List of 768-dimensional embedding vectors
"""
client = _get_client()
response = client.models.embed_content(
model='text-multilingual-embedding-002',
contents=texts,
config=EmbedContentConfig(
task_type='RETRIEVAL_DOCUMENT',
),
)
return [embedding.values for embedding in response.embeddings]
def embed_google_batch(
conn,
batch_size: int = 100, # Conservative, API max is 250
delay: float = 0.2
) -> int:
"""
Generate Google embeddings for all line items.
Args:
conn: psycopg connection
batch_size: Texts per API call
delay: Seconds between batches
Returns:
Number of items embedded
"""
from .text_prep import prepare_embedding_text
from .batch_processor import batch_embed_with_progress, update_embeddings_batch
# Fetch all items needing embeddings
with conn.cursor() as cur:
cur.execute("""
SELECT id, supplier_name_normalized, description_normalized
FROM line_item
WHERE embedding_google IS NULL
ORDER BY id
""")
rows = cur.fetchall()
if not rows:
print("All items already have Google embeddings")
return 0
# Prepare texts
ids = [r[0] for r in rows]
texts = [prepare_embedding_text(r[1], r[2]) for r in rows]
# Generate embeddings
embeddings = batch_embed_with_progress(
texts,
embed_google,
batch_size=batch_size,
delay_between_batches=delay,
desc="Google embeddings"
)
# Write to database
pairs = list(zip(ids, embeddings))
updated = update_embeddings_batch(conn, 'embedding_google', pairs)
print(f"Generated {updated} Google embeddings (768 dimensions)")
return updated
if __name__ == "__main__":
from src.db import get_connection
conn = get_connection()
embed_google_batch(conn)
conn.close()
uv run python -m src.embeddings.google_embed
Expected: ~6078 embeddings generated in ~1-2 minutes (60 batches at 100 items each).
docker compose exec -T postgres psql -U dev -d dev -c "SELECT COUNT(*) FROM line_item WHERE embedding_google IS NOT NULL"
Expected: 6078 (all items have Google embeddings)
src/embeddings/jina_embed.py:"""Jina embeddings-v3 via REST API."""
import os
import time
import requests
from typing import Optional
JINA_API_URL = 'https://api.jina.ai/v1/embeddings'
def embed_jina(
texts: list[str],
api_key: Optional[str] = None,
max_retries: int = 3
) -> list[list[float]]:
"""
Embed texts using Jina embeddings-v3 API.
Args:
texts: List of texts to embed
api_key: Jina API key (defaults to JINA_API_KEY env var)
max_retries: Number of retries on rate limit
Returns:
List of 1024-dimensional embedding vectors
"""
api_key = api_key or os.environ.get('JINA_API_KEY')
if not api_key:
raise ValueError("JINA_API_KEY environment variable not set")
for attempt in range(max_retries):
response = requests.post(
JINA_API_URL,
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
},
json={
'input': texts,
'model': 'jina-embeddings-v3',
'dimensions': 1024,
'task': 'retrieval.passage',
},
)
if response.status_code == 429:
# Rate limited - exponential backoff
wait_time = 2 ** attempt
print(f"Rate limited, waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
return [d['embedding'] for d in response.json()['data']]
raise Exception("Max retries exceeded for Jina API")
def embed_jina_batch(
conn,
batch_size: int = 50, # Conservative for rate limits
delay: float = 0.6 # ~100 RPM = 1.66 req/sec, so 0.6s is safe
) -> int:
"""
Generate Jina embeddings for all line items.
Args:
conn: psycopg connection
batch_size: Texts per API call
delay: Seconds between batches (default respects free tier 100 RPM)
Returns:
Number of items embedded
"""
from .text_prep import prepare_embedding_text
from .batch_processor import batch_embed_with_progress, update_embeddings_batch
# Fetch all items needing embeddings
with conn.cursor() as cur:
cur.execute("""
SELECT id, supplier_name_normalized, description_normalized
FROM line_item
WHERE embedding_jina IS NULL
ORDER BY id
""")
rows = cur.fetchall()
if not rows:
print("All items already have Jina embeddings")
return 0
# Prepare texts
ids = [r[0] for r in rows]
texts = [prepare_embedding_text(r[1], r[2]) for r in rows]
# Generate embeddings with rate limit awareness
embeddings = batch_embed_with_progress(
texts,
embed_jina,
batch_size=batch_size,
delay_between_batches=delay,
desc="Jina embeddings"
)
# Write to database
pairs = list(zip(ids, embeddings))
updated = update_embeddings_batch(conn, 'embedding_jina', pairs)
print(f"Generated {updated} Jina embeddings (1024 dimensions)")
return updated
if __name__ == "__main__":
from src.db import get_connection
conn = get_connection()
embed_jina_batch(conn)
conn.close()
uv run python -m src.embeddings.jina_embed
Expected: ~6078 embeddings generated in ~10-15 minutes (rate limited at ~100 RPM).
Note: Jina free tier is 100 RPM. With batch_size=50 and delay=0.6s, we process ~80 batches/min = ~4000 items/min. For 6078 items, expect ~2 minutes. Adjust delay if rate limit errors occur.
docker compose exec -T postgres psql -U dev -d dev -c "SELECT COUNT(*) FROM line_item WHERE embedding_jina IS NOT NULL"
Expected: 6078 (all items have Jina embeddings)
<success_criteria>