What's the Concept?
The pipeline so far ends at a BigQuery table. The agent doesn't query BigQuery directly — it calls a tool. This final lesson builds that tool: a small Cloud Run service whose only job is to convert "a question from the agent" into "a ranked list of relevant doc chunks."
The tool implements the retrieval contract: a stable name, a typed input schema, a typed output schema, a freshness SLA, and a cost ceiling. Everything we set up in Modules 04–07 funnels into this last service.
How It Works
The Cloud Run service, in full:
# search_docs/main.py
import os, time
from flask import Flask, request, jsonify
from google.cloud import bigquery
from pydantic import BaseModel, Field, ValidationError
PROJECT = os.environ["GCP_PROJECT"]
EMBED_MODEL = f"{PROJECT}.embedding_models.text_embedding_005"
CHUNKS_TABLE = f"{PROJECT}.gold.docs_chunks"
MAX_BYTES = 50_000_000 # 50 MB cost ceiling per call
bq = bigquery.Client(project=PROJECT)
app = Flask(__name__)
class SearchRequest(BaseModel):
query: str = Field(..., min_length=2, max_length=500)
top_k: int = Field(5, ge=1, le=20)
section: str | None = None # optional filter by docs section
class Chunk(BaseModel):
chunk_id: str
doc_path: str
title: str | None
summary: str | None
chunk_text: str
distance: float
class SearchResponse(BaseModel):
chunks: list[Chunk]
query_embedded_ms: int
search_ms: int
_refreshed_at: str
@app.route("/search_docs", methods=["POST"])
def search_docs():
try:
req = SearchRequest(**request.get_json(force=True))
except ValidationError as e:
return jsonify({"error": "bad_request", "details": e.errors()}), 400
section_clause = ""
params = [
bigquery.ScalarQueryParameter("query_text", "STRING", req.query),
bigquery.ScalarQueryParameter("top_k", "INT64", req.top_k),
]
if req.section:
section_clause = "AND STARTS_WITH(doc_path, @section)"
params.append(
bigquery.ScalarQueryParameter("section", "STRING", req.section)
)
t0 = time.monotonic()
sql = f"""
WITH q AS (
SELECT ml_generate_embedding_result AS qv
FROM ML.GENERATE_EMBEDDING(
MODEL `{EMBED_MODEL}`,
(SELECT @query_text AS content)
)
),
candidates AS (
SELECT * FROM `{CHUNKS_TABLE}`
WHERE 1=1 {section_clause}
)
SELECT
base.chunk_id,
base.doc_path,
base.title,
base.summary,
base.chunk_text,
distance,
base._chunked_at AS refreshed_at
FROM VECTOR_SEARCH(
TABLE candidates,
'embedding',
TABLE q,
top_k => @top_k,
distance_type => 'COSINE'
)
ORDER BY distance ASC
"""
job_config = bigquery.QueryJobConfig(
query_parameters=params,
maximum_bytes_billed=MAX_BYTES,
)
rows = list(bq.query(sql, job_config=job_config).result())
t1 = time.monotonic()
chunks = [
Chunk(
chunk_id=r.chunk_id,
doc_path=r.doc_path,
title=r.title,
summary=r.summary,
chunk_text=r.chunk_text,
distance=float(r.distance),
)
for r in rows
]
refreshed_at = max((r.refreshed_at.isoformat() for r in rows), default="")
resp = SearchResponse(
chunks=chunks,
query_embedded_ms=int((t1 - t0) * 1000), # rough; embedding is inside BQ
search_ms=int((t1 - t0) * 1000),
_refreshed_at=refreshed_at,
)
return jsonify(resp.model_dump()), 200The corresponding tool definition the agent sees:
TOOLS = [
{
"name": "search_docs",
"description": (
"Search Brain Drip's product documentation. Use this whenever "
"the user asks about how the product works, how to configure "
"something, or what a feature does. Returns ranked passages "
"from the docs with their source path."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "What to search for, in natural language.",
"minLength": 2,
"maxLength": 500,
},
"top_k": {
"type": "integer",
"description": "How many chunks to return (default 5, max 20).",
"default": 5,
"minimum": 1,
"maximum": 20,
},
"section": {
"type": "string",
"description": "Optional path prefix to filter results, e.g. 'docs/admin/'.",
},
},
"required": ["query"],
},
}
]That tool definition + the Cloud Run service is the full agent interface. Whatever agent framework you use — Claude with tool use, Vertex AI Gemini with function calling, LangGraph, the OpenAI Agents SDK — points at this endpoint with this schema, and you're done.
Why It Matters
- The pipeline becomes operationally useful. Every preceding module led to this; without it, the warehouse is just a warehouse.
- The contract closes the loop. The agent's behavior is now a function of: the docs you ingest + the chunking + the embedding model + this tool's input/output shape. Each is independently versionable.
- You can test the whole stack with
curl. No agent framework needed for development; the tool is just an HTTP endpoint.
Deployment + Verification
gcloud run deploy search-docs \
--source . \
--region us-central1 \
--service-account agent-runtime-sa@myco-prod.iam.gserviceaccount.com \
--set-env-vars GCP_PROJECT=myco-prod \
--no-allow-unauthenticated \
--concurrency 80 \
--memory 512Mi \
--timeout 30
# Smoke test
TOKEN=$(gcloud auth print-identity-token)
curl -X POST https://search-docs-<hash>-uc.a.run.app/search_docs \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"query": "how do I configure the search index?", "top_k": 3}'You should see three JSON objects back, each a chunk with a distance score under 0.6 if the corpus actually contains the answer. Hook those JSON shapes into your agent's tool runtime and the agent can answer the question.
Key Technical Details
- The
maximum_bytes_billed=50_000_000(50 MB) bound caps cost per call at roughly 30. Predictable. - Cloud Run scales to zero when idle; cold starts add ~500 ms latency on the first request after a quiet period. For latency-sensitive applications, set
--min-instances=1to keep one warm. - The service uses
--no-allow-unauthenticatedand requires a Google-signed identity token, the same pattern used between the agent runtime and any GCP service. - All output goes through Pydantic validation. A schema-violating BigQuery response (which shouldn't happen, but) fails closed rather than passing junk to the agent.
Common Misconceptions
"Skip the typed schema — JSON is JSON." Skipping validation is how you ship breakage. The schema is the contract; check it both directions.
"The tool should do its own auth and tenant filtering." It absolutely should, in real deployments — pass tenant ID from the agent's identity, filter WHERE tenant_id = @caller_tenant in the SQL. This capstone omits multi-tenancy for clarity; production should not.
"This is over-engineered for a docs search." The pieces — typed schema, parameterized query, cost ceiling, named service account, identity-token auth — are the minimum production-grade shape. Stripping any of them is how prototypes turn into incidents.
Connections to Other Concepts
- Course
05-serving-data-to-agents/04-the-retrieval-contract-between-pipeline-and-agent— The contract pattern this lesson implements. - Course
07-operating-the-system/01-observability-and-data-quality-monitoring— Hooking the tool's metrics into the broader observability stack. - Course
07-operating-the-system/03-iam-and-security-for-agent-data-paths— The IAM topology this deployment relies on.
Further Reading
- Anthropic's tool-use cookbook + the OpenAI Agents SDK reference — Two of the most common agent runtimes; both call tools that look exactly like this one.
- "Vertex AI Agent Builder" docs — Google's first-party agent framework; same tool-call shape.
- Brain Drip course "Building a Multi-Skill AI Agent" — The agent-side counterpart to this data-side capstone.