Why This Step Exists

The retriever from Step 6 always considers every chunk in the database. That's fine when "all of your docs" is the right scope for every question. The moment it isn't — "only billing-team docs," "only the v2 API reference," "only documents updated since the migration" — pure cosine ranking starts mixing relevant and irrelevant chunks together. You get correct-shaped answers grounded in the wrong material.

Context-aware retrieval fixes this by handing the database two pieces of information per query: the question's embedding (similarity) and the filters that scope the answer (relevance). The DB pre-filters with the WHERE clause, then ranks the survivors by distance, then returns the top K. No extra LLM call, no extra service, no re-ranking pass.

The Three Layers (Recap)

We already do layer 1; this step builds layer 2; the optional sidebar covers layer 3.

LayerWhat it isWhere it lives in this blueprint
1. Task-typed embeddingsRETRIEVAL_DOCUMENT at ingest vs. RETRIEVAL_QUERY at query timeStep 5 (embed(..., task=...))
2. Metadata-filtered retrievalStructured columns + WHERE clause that the index pushes throughThis step
3. Contextual chunkingPrepend a per-doc context line to each chunk before embeddingSidebar at the bottom of this step

1. Extend the Schema

Three columns cover ~90% of real filtering needs. Apply this against the existing table — it's additive and idempotent.

-- sql/002_add_metadata.sql
ALTER TABLE chunks
  ADD COLUMN IF NOT EXISTS category     TEXT,
  ADD COLUMN IF NOT EXISTS tags         TEXT[]      NOT NULL DEFAULT '{}',
  ADD COLUMN IF NOT EXISTS published_at TIMESTAMPTZ;
 
-- B-tree indexes for hot single-column filters
CREATE INDEX IF NOT EXISTS chunks_category_idx     ON chunks (category);
CREATE INDEX IF NOT EXISTS chunks_published_at_idx ON chunks (published_at DESC);
 
-- GIN index for tag containment / overlap queries (tags @> ARRAY['x'])
CREATE INDEX IF NOT EXISTS chunks_tags_gin_idx     ON chunks USING gin (tags);

Apply it:

gcloud sql connect rag-db --user=$USER_EMAIL --database=rag --quiet < sql/002_add_metadata.sql

Why these three columns:

  • category TEXT — a single discriminator ("billing", "engineering", "v2-api") is what 80% of filters look like in practice. One value per chunk, B-tree index, fast.
  • tags TEXT[] — Postgres native arrays. Multiple labels per chunk ({"v2", "rate-limits"}). The GIN index makes tags @> ARRAY['v2'] a millisecond operation even at scale.
  • published_at TIMESTAMPTZ — for "recent first" / date-bounded filters. We keep created_at separately (when the row was inserted); published_at is when the source document was published.

Skip anything you don't need. Adding a column is cheap; building a feature on one you didn't need is not.

2. Capture Metadata at Ingest Time

The cleanest pattern: a sidecar JSON file next to each text file. data/billing-faq.txt gets data/billing-faq.meta.json:

{
  "category": "billing",
  "tags": ["faq", "refunds"],
  "published_at": "2026-03-12T00:00:00Z"
}

Update the ingest to read sidecars when they exist, and to write the metadata into the new columns:

# src/ingest.py — diff against Step 5
import json
from datetime import datetime
from pathlib import Path
 
from .chunker import chunk_text
from .db import get_conn
from .embeddings import embed, vec_to_pg
 
DATA_DIR = Path("data")
BATCH_SIZE = 50
 
 
def _load_meta(text_path: Path) -> dict:
    """Look for <name>.meta.json next to <name>.txt. Return {} if missing."""
    meta_path = text_path.with_suffix(".meta.json")
    if not meta_path.exists():
        return {}
    raw = json.loads(meta_path.read_text(encoding="utf-8"))
    if raw.get("published_at"):
        raw["published_at"] = datetime.fromisoformat(
            raw["published_at"].replace("Z", "+00:00")
        )
    raw.setdefault("category", None)
    raw.setdefault("tags", [])
    raw.setdefault("published_at", None)
    return raw
 
 
def gather_chunks() -> list[tuple[str, int, str, dict]]:
    """Return (source, chunk_index, content, meta) for every chunk."""
    out: list[tuple[str, int, str, dict]] = []
    for path in sorted(DATA_DIR.glob("*.txt")):
        meta = _load_meta(path)
        text = path.read_text(encoding="utf-8")
        for i, content in enumerate(chunk_text(text)):
            out.append((path.name, i, content, meta))
    return out
 
 
def insert_chunks(rows):
    """rows: list of (source, chunk_index, content, embedding, meta)."""
    with get_conn() as conn:
        cur = conn.cursor()
        cur.executemany(
            """
            INSERT INTO chunks
              (source, chunk_index, content, embedding,
               category, tags, published_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            """,
            [
                (s, i, c, vec_to_pg(v),
                 m.get("category"), m.get("tags", []), m.get("published_at"))
                for s, i, c, v, m in rows
            ],
        )
        conn.commit()

The main() driver changes only to pass meta through:

def main() -> None:
    chunks = gather_chunks()
    rows = []
    for start in range(0, len(chunks), BATCH_SIZE):
        batch = chunks[start : start + BATCH_SIZE]
        vectors = embed([c[2] for c in batch], task="RETRIEVAL_DOCUMENT")
        rows.extend(
            (s, i, c, v, m) for (s, i, c, m), v in zip(batch, vectors)
        )
    insert_chunks(rows)
    print(f"Inserted {len(rows)} chunks.")

Re-run ingest (truncate first so you don't double-up):

uv run python -m scripts.reset   # TRUNCATE chunks RESTART IDENTITY
uv run python -m src.ingest

3. Filtered Retrieval

This is where the work pays off. One SQL statement; the database does the rest.

# src/retrieve.py — new function alongside the existing retrieve()
from datetime import datetime
from typing import Optional
 
from .db import get_conn
from .embeddings import embed, vec_to_pg
 
 
def retrieve_filtered(
    question: str,
    k: int = 5,
    *,
    category: Optional[str] = None,
    tags: Optional[list[str]] = None,
    after: Optional[datetime] = None,
    max_distance: Optional[float] = None,
) -> list[dict]:
    """Top-K chunks closest to the question, scoped by metadata.
 
    - category: exact match
    - tags: ARRAY @> match — chunks must include every tag listed
    - after: published_at >= after
    - max_distance: drop chunks whose cosine distance exceeds this threshold
    """
    [query_vec] = embed([question], task="RETRIEVAL_QUERY")
    query_str = vec_to_pg(query_vec)
 
    clauses = []
    params: list = []
 
    if category is not None:
        clauses.append("category = %s")
        params.append(category)
    if tags:
        clauses.append("tags @> %s")
        params.append(tags)
    if after is not None:
        clauses.append("published_at >= %s")
        params.append(after)
    if max_distance is not None:
        clauses.append("embedding <=> %s <= %s")
        params.append(query_str)
        params.append(max_distance)
 
    where = ("WHERE " + " AND ".join(clauses)) if clauses else ""
    sql = f"""
        SELECT id, source, chunk_index, content,
               category, tags, published_at,
               embedding <=> %s AS distance
        FROM chunks
        {where}
        ORDER BY embedding <=> %s
        LIMIT %s
    """
    # Two trailing %s for the SELECT-distance and ORDER BY copies of the vector,
    # plus the limit. Any earlier %s placeholders come from the filter params.
    final_params = [query_str, *params, query_str, k]
 
    with get_conn() as conn:
        cur = conn.cursor()
        cur.execute(sql, final_params)
        rows = cur.fetchall()
 
    return [
        {
            "id": r[0],
            "source": r[1],
            "chunk_index": r[2],
            "content": r[3],
            "category": r[4],
            "tags": list(r[5]) if r[5] is not None else [],
            "published_at": r[6].isoformat() if r[6] is not None else None,
            "distance": float(r[7]),
        }
        for r in rows
    ]

A few things worth flagging:

  • Pre-filter is the right shape. Postgres evaluates the WHERE clause first, then ranks the survivors. On modest data (under ~1M chunks) the HNSW index handles this directly; on bigger tables, set SET LOCAL hnsw.iterative_scan = relaxed_order per session and pgvector will pull more candidates from the index until enough pass the filter. No application-side mask, no two-stage query.
  • tags @> %s requires all the listed tags. Use tags && %s for "any of these tags" (set overlap).
  • max_distance is the cosine equivalent of min_score. Lower distance = more similar. 0.5 is a generous cutoff; 0.35 is strict. Tune against your data — log the distances for a few real questions before picking a number.
  • The embedding <=> %s appears in both the SELECT (to return the distance) and the ORDER BY (to drive the index). That isn't a copy-paste; Postgres won't reuse the result, so you pass the vector twice.

4. Try It

A quick check that filters actually do what you expect:

# scripts/test_filtered.py
from datetime import datetime, timezone
 
from src.retrieve import retrieve_filtered
 
# All categories, no filters — should match Step 6 behavior
print("\n--- no filter ---")
for r in retrieve_filtered("What is RAG?", k=3):
    print(f"[{r['distance']:.3f}] {r['source']} cat={r['category']} tags={r['tags']}")
 
# Only billing
print("\n--- category=billing ---")
for r in retrieve_filtered("What is RAG?", k=3, category="billing"):
    print(f"[{r['distance']:.3f}] {r['source']} cat={r['category']}")
 
# Only docs published in 2026
print("\n--- after 2026-01-01 ---")
for r in retrieve_filtered(
    "What is RAG?", k=3, after=datetime(2026, 1, 1, tzinfo=timezone.utc)
):
    print(f"[{r['distance']:.3f}] {r['source']} pub={r['published_at']}")

Run it:

uv run python -m scripts.test_filtered

You should see the category and date filters narrowing the result set. Top-K chunks change as you change the scope.

5. Expose Filters Through the API

Make the /ask endpoint accept the same filters:

# src/api.py — extended AskRequest
from datetime import datetime
from typing import Optional
 
from pydantic import BaseModel, Field
 
 
class AskRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=2000)
    k: int = Field(5, ge=1, le=20)
    category: Optional[str] = None
    tags: Optional[list[str]] = None
    after: Optional[datetime] = None
    max_distance: Optional[float] = Field(None, ge=0.0, le=2.0)

And the handler delegates to the filtered retriever when any filter is present, falling back to the simpler one otherwise:

from .retrieve import retrieve, retrieve_filtered
from .generate import generate_answer
 
@app.post("/ask")
def ask(req: AskRequest) -> dict:
    if req.category or req.tags or req.after or req.max_distance is not None:
        chunks = retrieve_filtered(
            req.question,
            k=req.k,
            category=req.category,
            tags=req.tags,
            after=req.after,
            max_distance=req.max_distance,
        )
    else:
        chunks = retrieve(req.question, k=req.k)
 
    return {
        "question": req.question,
        "answer": generate_answer(req.question, chunks),
        "sources": [
            {"source": c["source"], "chunk_index": c["chunk_index"],
             "category": c.get("category"), "tags": c.get("tags", []),
             "distance": c["distance"]}
            for c in chunks
        ],
    }

A scoped call now looks like:

curl -X POST $SERVICE_URL/ask \
  -H "Content-Type: application/json" \
  -d '{
    "question": "What is the refund policy?",
    "category": "billing",
    "tags": ["refunds"],
    "after": "2026-01-01T00:00:00Z",
    "max_distance": 0.5
  }'

The unfiltered call still works exactly like before. Filters are additive and optional, and the answer payload now carries enough metadata back that callers can show "answered from billing/refunds, published 2026-03-12."

Layer 3, optional but striking when it works. The premise: a chunk in isolation often lacks the context that the surrounding document gives it. "The limit is 30 per minute" embedded on its own is ambiguous; embedded as "[Rate Limiting → Free Tier] The limit is 30 per minute" is not.

Anthropic's "contextual retrieval" technique formalizes this — before embedding each chunk, ask a small LLM to write a one-line summary of where the chunk sits in the document, and prepend that summary to the chunk's text. The embedding now encodes both the chunk's content and its document context.

A minimal implementation, slotted into the ingest pipeline:

# src/contextual_chunker.py (optional)
from google import genai
from google.genai.types import GenerateContentConfig
 
from .config import load_config
 
_cfg = load_config()
_client = genai.Client(vertexai=True, project=_cfg.project, location=_cfg.region)
 
CONTEXT_PROMPT = """Write one short sentence describing where the chunk
below sits in the broader document. The sentence will be prepended to the
chunk before embedding, to help retrieval. Be terse and specific.
 
<document>
{doc}
</document>
 
<chunk>
{chunk}
</chunk>
 
One sentence:"""
 
 
def contextualize(doc: str, chunk: str) -> str:
    response = _client.models.generate_content(
        model="gemini-flash-latest",
        contents=CONTEXT_PROMPT.format(doc=doc[:8000], chunk=chunk),
        config=GenerateContentConfig(temperature=0.0),
    )
    summary = (response.text or "").strip().splitlines()[0]
    return f"[{summary}]\n\n{chunk}" if summary else chunk

Wire it into ingest (one extra call per chunk):

# In gather_chunks or alongside the embed() call:
content = contextualize(text, content)

Trade-offs to flag:

  • Cost: one Gemini Flash call per chunk at ingest. Cheap (fractions of a cent each) but not free. Worth caching by content hash.
  • Quality: Anthropic reports 35% improvement in retrieval failure rate on their RAG eval set with contextual chunks alone, ~50% combined with hybrid search. Your mileage varies.
  • Re-embedding: changing the chunk text means re-embedding. Treat contextual chunking as an ingest-time decision; don't toggle it on existing data without TRUNCATE + re-ingest.

If you turn this on, leave a note in the row so you can A/B test. Add a chunking_strategy TEXT column and set it to "contextual" vs "plain", then compare retrieval quality with the same eval set across both strategies. Don't enable it everywhere without checking — for short, self-contained docs it rarely helps.

What You Have Now

  • A chunks table with three new searchable metadata columns and the right indexes
  • An ingest pipeline that picks up sidecar .meta.json files automatically
  • A retrieve_filtered() function that scopes the candidate set before scoring
  • A /ask endpoint that takes optional filters in the request body
  • An optional contextual-chunking pattern you can layer on top when you need higher recall

The retriever still falls back to the simple top-K when no filters are passed, so nothing breaks on day one.

Next: ship it to Cloud Run.


Reference: pgvector iterative scan · Postgres array operators · Anthropic — Contextual Retrieval · GIN indexes for tags