A few weeks after putting an Antigravity-built agent into production, my Grafana dashboard stayed green the whole time. Total cost was smooth, P95 latency sat inside the threshold, the error rate was essentially zero. And yet the month-end bill came in at 1.6× my estimate, and one feature's answers had visibly gotten sloppier.
Totals dissolve problems into their average. When you're a solo or indie developer running several features through a single API key, that dissolving property bites especially hard. Here I want to write down — in the order I actually rebuilt them — the instrumentation pieces that let me catch the two drifts that move underneath the totals: the billing drift and the quality drift.
A total tells you how much, not who is eating it
The first monitoring I built just summed token counts and cost by model. That answers "how much did we spend," but not "where did it grow." In production, cost skews hard by feature and by tenant. The summarize feature might eat 60% of the total; one particular customer might burn 20× the average. The aggregate graph flattens all of that out.
So I changed the unit of instrumentation from "model" to "feature × tenant × prompt version." Every OpenTelemetry span and metric carries those three as attributes. As long as the attributes are present, you can pivot to any slice later in PromQL.
# llm_telemetry.py
import time
import anthropic
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
_tp = TracerProvider()
_tp.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317")))
trace.set_tracer_provider(_tp)
metrics.set_meter_provider(MeterProvider(metric_readers=[
PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317"),
export_interval_millis=30_000,
)
]))
tracer = trace.get_tracer("llm-app")
meter = metrics.get_meter("llm-app")
tokens = meter.create_counter("llm.tokens", description="tokens by direction")
cost = meter.create_counter("llm.cost_usd", unit="USD", description="API cost in USD")
latency = meter.create_histogram("llm.latency_ms", unit="ms", description="end-to-end latency")
errors = meter.create_counter("llm.errors", description="error count by type")
# Prices are $/1M tokens. Revisit on every model change (see below).
MODEL_PRICES = {
"claude-sonnet-4-6": {"in": 3.0, "out": 15.0},
"claude-haiku-4-5-20251001": {"in": 0.25, "out": 1.25},
"claude-opus-4-8": {"in": 5.0, "out": 25.0},
}
class TelemetryClient:
"""Wrapper that attributes telemetry by feature, tenant, and prompt version."""
def __init__(self):
self._client = anthropic.Anthropic()
def call(self, *, model, messages, feature, tenant, prompt_version,
max_tokens=2048, system=None, role="product"):
# role="product" is a live response; role="eval" is quality scoring
# (the key that keeps the two budgets separate).
attrs = {
"feature": feature, "tenant": tenant,
"prompt_version": prompt_version, "model": model, "role": role,
}
with tracer.start_as_current_span("llm.call") as span:
for k, v in attrs.items():
span.set_attribute(k, v)
t0 = time.monotonic()
try:
kwargs = {"model": model, "max_tokens": max_tokens, "messages": messages}
if system:
kwargs["system"] = system
res = self._client.messages.create(**kwargs)
ms = (time.monotonic() - t0) * 1000
tin, tout = res.usage.input_tokens, res.usage.output_tokens
price = MODEL_PRICES.get(model, {"in": 3.0, "out": 15.0})
usd = tin / 1e6 * price["in"] + tout / 1e6 * price["out"]
tokens.add(tin, {**attrs, "direction": "in"})
tokens.add(tout, {**attrs, "direction": "out"})
cost.add(usd, attrs)
latency.record(ms, attrs)
span.set_attribute("llm.cost_usd", usd)
span.set_attribute("llm.latency_ms", ms)
return {"text": res.content[0].text, "cost_usd": usd,
"latency_ms": ms, "in": tin, "out": tout}
except Exception as e:
errors.add(1, {**attrs, "error_type": type(e).__name__})
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
raiseI include prompt_version as an attribute because it pays off in the quality tracking later. I want to be able to cut cost and quality at the version boundary — to see how both moved the moment I changed a prompt. The version string can be a hash of the prompt template or a short hand-assigned identifier like chat-v7. What matters is that "when did I change what" survives as an axis on the metrics.
With attributes in place, the queries that expand cost by feature or tenant are straightforward to write.
# Cost per feature over the last hour (which feature is eating the budget)
sum(increase(llm_cost_usd_total{role="product"}[1h])) by (feature)
# Top tenant consumers (spot a single customer running away)
topk(5, sum(increase(llm_cost_usd_total[24h])) by (tenant))Eval-model spend sneaks into production cost
To score quality automatically with an LLM-as-a-Judge, the evaluation itself calls the API. At first I didn't separate this, so the eval spend rode on top of my production cost graph, and I couldn't tell whether "production is expensive" or "I'm running too many evaluations." That's why I split it out with role="eval". Evaluation runs on a cheap model, and in PromQL I look at production cost alone with role="product".
Scoring every request inflates the eval budget on its own, so I made it a rolling sample. Instead of grading everything every minute, I pull a fixed fraction per feature and evaluate that. This keeps eval spend at a few percent of production cost while still capturing trend changes.
# quality_sampler.py
import json, random
from dataclasses import dataclass
@dataclass
class Score:
relevance: float
grounded: float # is it grounded in the reference (for RAG)
overall: float
note: str
class QualitySampler:
"""Rolling-sample scoring, aggregated by prompt version."""
def __init__(self, telemetry, sample_rate=0.05, eval_model="claude-haiku-4-5-20251001"):
self.t = telemetry
self.rate = sample_rate
self.eval_model = eval_model
def maybe_score(self, *, question, answer, context, feature, tenant, prompt_version):
if random.random() > self.rate:
return None # not sampled this time
ctx = f"\n\nReference:\n{context}" if context else ""
prompt = (
"Grade the following answer as JSON only. "
"Each field is 0.0-1.0.\n"
f"Question: {question}{ctx}\n\nAnswer: {answer}\n\n"
'{"relevance": fit, "grounded": faithfulness to reference, '
'"overall": overall, "note": "one-sentence remark"}'
)
res = self.t.call(
model=self.eval_model,
messages=[{"role": "user", "content": prompt}],
feature=feature, tenant=tenant, prompt_version=prompt_version,
role="eval", max_tokens=300,
)
data = json.loads(res["text"])
score = Score(**data)
# Push quality into metrics too, so it can be tracked per version.
from opentelemetry import metrics
gauge = metrics.get_meter("llm-app").create_histogram("llm.quality_overall")
gauge.record(score.overall, {"feature": feature, "prompt_version": prompt_version})
return scoreAggregating scores by prompt_version makes the start of a quality regression appear right at the version boundary. That becomes the foundation for catching the nastiest drift of all.