Drop in Some Sample Data
For testing we'll use any plain-text source. Two options:
# Option A — use anything you have
cp ~/some-doc.txt data/
# Option B — grab the Wikipedia article on RAG (pure illustration)
curl -s "https://en.wikipedia.org/api/rest_v1/page/plain/Retrieval-augmented_generation" \
> data/rag-wikipedia.txtTwo or three short files are enough to verify the pipeline. We'll add more once we know it works.
The Plan
The ingest script is ~70 lines and does exactly three things:
- Chunk — walk
data/, split each file into overlapping ~400-token chunks - Embed — batch-call
text-embedding-005for the chunks - Insert — write
(source, chunk_index, content, embedding)rows into Postgres
We're going to keep it dependency-light. No LlamaIndex, no LangChain. The chunker is 15 lines of Python.
A Tiny Chunker
Most chunkers do one job: split on sentence boundaries and pack sentences until the byte/token budget is full, with a small overlap between chunks so context isn't lost at the seams.
# src/chunker.py
"""Split text into overlapping chunks of roughly N characters."""
import re
from typing import Iterable
# Roughly: 400 tokens ≈ 1600 characters for English prose.
TARGET_CHARS = 1600
OVERLAP_CHARS = 200
def _split_paragraphs(text: str) -> list[str]:
return [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
def chunk_text(text: str) -> Iterable[str]:
paragraphs = _split_paragraphs(text)
buf = ""
for p in paragraphs:
if len(buf) + len(p) + 2 <= TARGET_CHARS:
buf = f"{buf}\n\n{p}".strip()
continue
if buf:
yield buf
# Start the new buffer with the tail of the previous one for overlap.
tail = buf[-OVERLAP_CHARS:] if buf else ""
buf = f"{tail}\n\n{p}".strip() if tail else p
if buf:
yield bufThat's it. Paragraph-aware, overlap-aware, no external dependency. It's not the smartest chunker on earth, but for prose it's perfectly fine — and you understand every line.
The Embedding Call
Vertex AI's text-embedding-005 takes a list of strings (up to 250 per call) and returns a list of vectors. We use it through the official google-cloud-aiplatform SDK:
# src/embeddings.py
"""Wrap Vertex AI text-embedding-005."""
from functools import lru_cache
from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel
import vertexai
from .config import load_config
EMBED_MODEL = "text-embedding-005"
EMBED_DIM = 768
@lru_cache(maxsize=1)
def _model() -> TextEmbeddingModel:
cfg = load_config()
vertexai.init(project=cfg.project, location=cfg.region)
return TextEmbeddingModel.from_pretrained(EMBED_MODEL)
def embed(texts: list[str], task: str = "RETRIEVAL_DOCUMENT") -> list[list[float]]:
"""Embed a batch of texts. Use task='RETRIEVAL_QUERY' for question embedding."""
inputs = [TextEmbeddingInput(text=t, task_type=task) for t in texts]
response = _model().get_embeddings(inputs)
return [r.values for r in response]A couple of things to notice:
task_typematters. Vertex AI embedding models produce slightly different vectors depending on whether the text is a document being indexed (RETRIEVAL_DOCUMENT) or a query being asked (RETRIEVAL_QUERY). Always passRETRIEVAL_DOCUMENTduring ingest andRETRIEVAL_QUERYat query time. Using the same task type for both is a common silent quality regression.lru_cachekeeps the model object alive across calls so we don't reinitialize Vertex on every batch.EMBED_DIM = 768matches thevector(768)column we created. If you ever change embedding models, you change this constant and migrate the column.
The Ingest Script
Glue it together:
# src/ingest.py
"""Walk data/, chunk, embed, insert."""
from pathlib import Path
from .chunker import chunk_text
from .db import get_conn
from .embeddings import embed
DATA_DIR = Path("data")
BATCH_SIZE = 50 # how many chunks to embed per Vertex AI call
def gather_chunks() -> list[tuple[str, int, str]]:
"""Return a flat list of (source, chunk_index, content)."""
out: list[tuple[str, int, str]] = []
for path in sorted(DATA_DIR.glob("*.txt")):
text = path.read_text(encoding="utf-8")
for i, content in enumerate(chunk_text(text)):
out.append((path.name, i, content))
return out
def insert_chunks(rows: list[tuple[str, int, str, list[float]]]) -> None:
"""Insert (source, chunk_index, content, embedding) rows."""
with get_conn() as conn:
cur = conn.cursor()
cur.executemany(
"""
INSERT INTO chunks (source, chunk_index, content, embedding)
VALUES (%s, %s, %s, %s)
""",
# pgvector accepts the standard string format '[1.0,2.0,...]'
[(s, i, c, _vec_to_pg(v)) for s, i, c, v in rows],
)
conn.commit()
def _vec_to_pg(vec: list[float]) -> str:
return "[" + ",".join(f"{x:.7f}" for x in vec) + "]"
def main() -> None:
chunks = gather_chunks()
print(f"Found {len(chunks)} chunks across {len(set(c[0] for c in chunks))} files.")
rows: list[tuple[str, int, str, list[float]]] = []
for start in range(0, len(chunks), BATCH_SIZE):
batch = chunks[start : start + BATCH_SIZE]
texts = [c[2] for c in batch]
vectors = embed(texts, task="RETRIEVAL_DOCUMENT")
rows.extend((s, i, c, v) for (s, i, c), v in zip(batch, vectors))
print(f"Embedded {min(start + BATCH_SIZE, len(chunks))}/{len(chunks)}")
insert_chunks(rows)
print(f"Inserted {len(rows)} chunks into Postgres.")
if __name__ == "__main__":
main()Why _vec_to_pg?
pgvector accepts vectors as either a native binary type or the string '[1.0,2.0,3.0,...]'. The string form is fine for our throughput, doesn't need any extra adapter registration with pg8000, and survives every Postgres driver in existence. Don't optimize until you have a reason to.
Run It
uv run python -m src.ingestExpected output:
Found 17 chunks across 2 files.
Embedded 17/17
Inserted 17 chunks into Postgres.Verify in psql
Connect back to the database and look:
gcloud sql connect rag-db --user=$USER_EMAIL --database=rag --quiet-- Count rows
SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks;
-- Peek at one
SELECT source, chunk_index, LEFT(content, 80) AS preview
FROM chunks
LIMIT 3;
-- Spot-check the embedding dim
SELECT vector_dims(embedding) FROM chunks LIMIT 1;You should see your chunk count, your source filenames, the start of the chunks, and 768 for the dimension. If vector_dims returns anything other than 768, your embedding model and your column type are out of sync — fix one or the other before going further.
On Re-Ingest
This script appends. Running it twice doubles the rows. For a real pipeline you'd want either:
- Delete-then-insert per
source:DELETE FROM chunks WHERE source = %sbefore inserting that file's chunks. - Upsert via a content hash: add a
content_hash TEXT UNIQUEcolumn andINSERT ... ON CONFLICT DO NOTHING.
For the blueprint we're going to keep it simple — if you re-ingest, run TRUNCATE chunks first. The companion repo includes a scripts/reset.py if you don't want to type it.
What You Have Now
- A working ingest pipeline that turns text files into searchable vectors
- A
chunkstable with real content and real embeddings inside Cloud SQL - A clear separation: chunker, embedder, inserter — each replaceable on its own
Next: query.
Reference: Vertex AI text embeddings · pgvector data types · Task type for embeddings