Most agents stay private. The developer uses them, maybe shares them with a team, but that's where it ends.
That's often a missed opportunity. A document analysis agent, a code review agent, a data extraction agent — these solve problems that other developers and businesses also have. Wrapping your Antigravity agent in a simple API layer and charging for access turns a personal tool into a recurring revenue stream.
Overall Architecture
Four components are required to ship a billable agent API.
API Gateway: Handles authentication, rate limiting, and request routing. This is the entry point for all external calls.
API Key Management: Issues keys to users, stores only hashed versions, tracks which plan each key belongs to.
Usage Measurement: Logs every call — tokens used, endpoint hit, user ID, timestamp — to a database.
Billing Integration: Reports accumulated usage to Stripe, triggering automatic monthly invoicing.
Build these in order. Each is independently testable before wiring to the next.
Step 1: API Gateway Implementation
from fastapi import FastAPI, Request, HTTPException, Depends
import hashlib
import time
import anthropic
import uuid
from collections import defaultdict
from datetime import datetime, UTC
app = FastAPI(title="AI Agent API")
claude_client = anthropic.Anthropic()
rate_limit_store: dict[str, list[float]] = defaultdict(list)
class APIKeyAuth:
"""
Dependency class for API key authentication.
In production, replace the in-memory store with a database lookup.
"""
def __init__(self):
self._keys: dict[str, dict] = {} # key_hash -> user_info
def register(self, api_key: str, user_id: str, plan: str) -> str:
"""Register an API key (for testing). Returns the hash."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
self._keys[key_hash] = {
"user_id": user_id,
"plan": plan,
"rate_limit": {"free": 5, "starter": 20, "pro": 100, "enterprise": 1000}.get(plan, 5),
}
return key_hash
async def __call__(self, request: Request) -> dict:
api_key = (
request.headers.get("X-API-Key") or
request.headers.get("Authorization", "").replace("Bearer ", "")
)
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
user_info = self._keys.get(key_hash)
if not user_info:
raise HTTPException(status_code=401, detail="Invalid API key")
# Rate limit check (sliding 60-second window)
user_id = user_info["user_id"]
now = time.time()
rate_limit_store[user_id] = [t for t in rate_limit_store[user_id] if now - t < 60]
if len(rate_limit_store[user_id]) >= user_info["rate_limit"]:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"Retry-After": "60",
"X-RateLimit-Limit": str(user_info["rate_limit"]),
"X-RateLimit-Reset": str(int(now + 60)),
}
)
rate_limit_store[user_id].append(now)
return user_info
auth = APIKeyAuth()
@app.post("/v1/agents/analyze")
async def analyze_document(
request: Request,
user_info: dict = Depends(auth),
):
"""
Document analysis agent endpoint.
Request: {"text": str, "task": str}
Response: {"result": str, "tokens_used": int, "request_id": str}
"""
body = await request.json()
text = body.get("text", "")
task = body.get("task", "summarize")
if not text:
raise HTTPException(status_code=400, detail="'text' field required")
if len(text) > 100_000:
raise HTTPException(status_code=400, detail="Text exceeds 100,000 character limit")
request_id = str(uuid.uuid4())
try:
response = claude_client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
system="You are a precise document analysis agent. Execute the requested task and return structured JSON output.",
messages=[
{
"role": "user",
"content": f"""Task: {task}
Document:
{text}
Return JSON: {{"result": "task output", "confidence": 0.0-1.0, "key_points": ["point1", "point2"]}}"""
}
]
)
except anthropic.APIError:
# Never expose internal error details to API consumers
raise HTTPException(status_code=503, detail="Service temporarily unavailable")
tokens_used = response.usage.input_tokens + response.usage.output_tokens
# Fire-and-forget usage recording (implement as background task in production)
await record_usage(user_info["user_id"], request_id, tokens_used, "/v1/agents/analyze")
return {
"result": response.content[0].text,
"tokens_used": tokens_used,
"request_id": request_id,
}
async def record_usage(user_id: str, request_id: str, tokens: int, endpoint: str):
"""Record usage to database and queue Stripe reporting."""
# Implement DB write + Stripe batch queue here
print(f"USAGE: user={user_id}, tokens={tokens}, endpoint={endpoint}")Step 2: Secure API Key Management
import secrets
import hashlib
def generate_api_key() -> tuple[str, str]:
"""
Generate a new API key.
Returns (raw_key, key_hash).
Only the hash is stored in the database.
The raw key is shown to the user exactly once.
Never store raw keys — if the database is compromised, keys stay safe.
"""
raw_key = f"ak_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
return raw_key, key_hash
def issue_key(db_conn, user_id: str, plan: str, stripe_customer_id: str) -> str:
"""
Create and store a new API key for a user.
Returns the raw key (display to user once, then discard).
"""
raw_key, key_hash = generate_api_key()
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO api_keys (
id, user_id, key_hash, plan, stripe_customer_id, is_active, created_at
) VALUES (gen_random_uuid(), %s, %s, %s, %s, TRUE, NOW())
""", (user_id, key_hash, plan, stripe_customer_id))
db_conn.commit()
return raw_key # Show this to the user. Once. That's it.
# Database schema
SCHEMA = """
CREATE TABLE api_keys (
id UUID PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
key_hash VARCHAR(64) UNIQUE NOT NULL,
plan VARCHAR(50) NOT NULL DEFAULT 'free',
stripe_customer_id VARCHAR(255),
stripe_subscription_item_id VARCHAR(255),
is_active BOOLEAN NOT NULL DEFAULT TRUE,
last_used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE api_usage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(255) NOT NULL,
request_id VARCHAR(255) UNIQUE NOT NULL,
endpoint VARCHAR(255) NOT NULL,
tokens_used INTEGER NOT NULL DEFAULT 0,
billable_units INTEGER NOT NULL DEFAULT 0,
stripe_reported BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_usage_user ON api_usage(user_id, created_at);
CREATE INDEX idx_usage_unreported ON api_usage(stripe_reported) WHERE stripe_reported = FALSE;
"""Step 3: Stripe Metered Billing
import stripe
import math
from datetime import datetime, UTC
stripe.api_key = "sk_live_..."
TOKENS_PER_UNIT = 1000 # 1 billable unit = 1,000 tokens
CENTS_PER_UNIT = 2 # $0.02 per 1,000 tokens (includes markup)
def tokens_to_units(tokens: int) -> int:
"""Convert raw token count to Stripe billing units. Always round up."""
return math.ceil(tokens / TOKENS_PER_UNIT)
def report_usage_batch(db_conn, stripe_subscription_item_id: str, user_id: str) -> int:
"""
Reports unreported usage to Stripe.
Use FOR UPDATE SKIP LOCKED to prevent duplicate reporting from concurrent workers.
Returns number of records processed.
"""
with db_conn.cursor() as cur:
cur.execute("""
SELECT id, billable_units
FROM api_usage
WHERE user_id = %s AND stripe_reported = FALSE
ORDER BY created_at ASC
LIMIT 200
FOR UPDATE SKIP LOCKED
""", (user_id,))
rows = cur.fetchall()
if not rows:
return 0
total_units = sum(r[1] for r in rows)
try:
stripe.SubscriptionItem.create_usage_record(
stripe_subscription_item_id,
quantity=total_units,
timestamp=int(datetime.now(UTC).timestamp()),
action="increment", # Critical: use increment, not set
)
cur.execute("""
UPDATE api_usage SET stripe_reported = TRUE
WHERE id = ANY(%s)
""", ([r[0] for r in rows],))
db_conn.commit()
return len(rows)
except stripe.error.StripeError as e:
db_conn.rollback()
raise # Let the batch job retry
def setup_metered_price() -> dict:
"""One-time setup: create Stripe product and metered price."""
product = stripe.Product.create(name="AI Agent API (Usage-Based)")
price = stripe.Price.create(
product=product.id,
currency="usd",
billing_scheme="per_unit",
unit_amount=CENTS_PER_UNIT,
recurring={"interval": "month", "usage_type": "metered", "aggregate_usage": "sum"},
)
return {"product_id": product.id, "price_id": price.id}Step 4: Plan Design and SLAs
from dataclasses import dataclass
@dataclass
class Plan:
name: str
monthly_usd: int
included_tokens: int
rate_limit_per_minute: int
max_input_tokens: int
uptime_sla: float # 99.0 means 99.0%
support: str # none / email / priority / dedicated
concurrent_requests: int
PLANS = {
"free": Plan("Free", 0, 10_000, 5, 4_000, 99.0, "none", 1),
"starter": Plan("Starter", 29, 100_000, 20, 20_000, 99.5, "email", 5),
"pro": Plan("Pro", 99, 500_000, 100, 100_000, 99.9, "priority", 20),
"enterprise": Plan("Enterprise", 0, 0, 1000, 200_000, 99.95, "dedicated", 100),
}Three Failure Modes
Storing raw API keys in the database: If your database is ever compromised, every customer's key is exposed and must be rotated. Store only the SHA-256 hash. The raw key is shown exactly once — at issuance — and never stored.
Leaking internal errors to API consumers: A raw exception trace from anthropic.APIError reveals your model name, prompts, and internal structure. Catch all AI provider exceptions at the gateway layer and return generic HTTP 503 responses. Log the details internally.
Promising SLAs without monitoring: Committing to 99.9% uptime requires knowing when you're down. Set up external uptime monitoring (UptimeRobot's free tier covers this) before launch, not after the first incident.
Pre-Launch Checklist
Before making the API public: confirm API key hashing is working correctly (never plaintext). Verify rate limiting enforces correctly for each plan. Test that usage records appear in the database after each request. Confirm the Stripe batch reporting job processes and marks records as reported. Verify error responses don't expose internal information. Set up uptime monitoring with alerting.
Each of these is an hour of work at most. All of them matter before the first paying customer arrives.