Building Idempotent Scheduled Agents with the Antigravity SDK
Scheduling an Antigravity SDK agent is almost a one-liner. The hard part is making it idempotent — so a double trigger never runs the job twice, a crash never drops a day, and the result always converges to one. Here is how I build idempotent scheduled agents, learned from the maintenance jobs I run as an indie developer.
The first week my scheduled agent ran fine. The problem showed up the morning I manually triggered it again during a deploy. Two copies of the same report-aggregation job ran at once, one overwrote the file the other had just written, and that day's numbers ended up being a copy of the day before.
Adding a cron expression to run an agent on a schedule is nearly a single line in the Antigravity SDK. The genuinely hard part is making the result converge to exactly one, whether the agent fired once or twice.
As an indie developer running several apps, I want an agent to handle dependency bumps, crash-report triage, and AdMob report aggregation every night. Idempotency is the foundation that makes that safe.
The minimal scheduled definition
Start with the smallest form of a scheduled agent. The key point is that you separate the agent itself (what it does) from the schedule (when it runs).
# schedule_agent.pyfrom antigravity import Agent, Schedule, runagent = Agent( name="daily-report-aggregator", model="gemini-3.5-flash", instructions=""" Aggregate the AdMob report for the given date and write it to output/report-<date>.json. If the file already exists, exit without overwriting. """,)schedule = Schedule( agent=agent, # Daily at 09:00 JST (00:00 UTC) cron="0 0 * * *", timezone="Asia/Tokyo", # Cap a single run so a runaway job is physically stopped max_duration_seconds=600,)if __name__ == "__main__": run(schedule)
With cron and timezone set, the scheduler starts the agent each time the clock matches. max_duration_seconds looks minor but matters: it is the safety valve that kills a job that has wandered into an unexpected loop.
This much is close to the official examples. The trouble started when I misread how the startup actually works.
Don't misread the "fresh session every run" assumption
Antigravity scheduled execution creates a new session on every trigger. No prior conversation history, no in-memory variables carry over. This is good design for stopping runaway state from propagating — but it also means the agent itself has no idea where the last run left off.
My first version implicitly assumed the session continued. I had written "aggregate from yesterday onward," but a fresh session has no yesterday, so the agent re-aggregated the entire range from scratch every time. The numbers were correct, but the runtime crept up day by day.
The fix is to keep state externally and re-read it on every startup.
The session is blank each time, but the checkpoint persists on disk (or KV). Tell the agent in its instructions to "read state/last_processed.json and process from the day after," and a fresh session can still resume.
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦Understand the minimal code to define a scheduled Antigravity SDK agent, and how cron triggering, fresh-session startup, and external state fit together
✦Copy a working idempotency setup — execution lock, output-key dedup, and checkpoint ordering — that guards against double runs, dropped work, and mid-run failures
✦Add dry-run verification and a pause/resume hook so any scheduled job becomes reviewable in ten minutes
Secure payment via Stripe · Cancel anytime
✦
Unlock This Article
Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.
Double start, dropped work, mid-run failure. Scheduled-job accidents almost always fall into these three. Add a guard for each, in order.
1. An execution lock stops double runs
This was the direct cause of the opening incident. When a scheduled start and a manual trigger overlap, the same job runs in parallel. A minimal file lock rejects the second one.
import os, time, contextlibLOCK = pathlib.Path("state/agent.lock")@contextlib.contextmanagerdef single_run(stale_seconds: int = 1800): # Treat an old lock as a leftover and ignore it if LOCK.exists() and time.time() - LOCK.stat().st_mtime < stale_seconds: raise RuntimeError("already running; skip this trigger") LOCK.write_text(str(os.getpid())) try: yield finally: LOCK.unlink(missing_ok=True)
stale_seconds exists so that if a process dies and leaves the lock behind, you are not blocked forever. A crash that leaves only the lock file is something you will hit at least once in production.
2. An output key prevents drops and duplicate writes
The most effective idea in idempotency is to tie the result to a unique key derived from the input. For a per-day report, the filename itself is the output key.
def already_done(date: str) -> bool: return pathlib.Path(f"output/report-{date}.json").exists()def process(date: str): if already_done(date): return "skip" # never process twice, however many runs data = aggregate_admob(date) pathlib.Path(f"output/report-{date}.json").write_text(json.dumps(data)) save_checkpoint(date) return "done"
With this shape, no matter how many times the same day is triggered, the result converges to one. Defining correctness by the set of items to process rather than the number of runs is the whole advantage of idempotent design.
3. Advance the checkpoint after the work
Get the order wrong and a mid-run failure drops work. If you advance the checkpoint before the work, a crash right after marks an unprocessed date as "done," and it is never picked up again.
Always advance the checkpoint only after the write succeeds. That is exactly why save_checkpoint(date) sits after write_text in process() above. It is unglamorous, but that ordering is everything.
Structured logs so failure is never silent
The scariest failure in a scheduled job is not crashing — it is emitting "success" as if nothing happened. If the target set is empty, or the write quietly failed, a log that only says OK gives you nothing to notice.
On every run, record the numbers you would judge health by, in structured form.
import datetime, sys, jsondef log_run(status: str, **fields): record = { "ts": datetime.datetime.now(datetime.UTC).isoformat(), "agent": "daily-report-aggregator", "status": status, **fields, } print(json.dumps(record, ensure_ascii=False), file=sys.stderr)# exampleslog_run("done", date="2026-06-13", rows=412, bytes_written=18044)log_run("skip", date="2026-06-13", reason="already_done")log_run("empty", date="2026-06-13", rows=0) # <- catching this is the point
The trick is emitting rows=0 as its own status. By not folding "succeeded but empty" into done, a weekly review only needs to search status:empty to surface anomalies. After adding this small step, I found two jobs that had been silently spinning on empty.
Verify behavior before it ever fires
Scheduled execution runs quietly at night even when it is wrong, so it resists pre-deploy verification. Use the SDK's local execution to run a single pass by hand without waiting for cron.
# Run once for a given date, without waiting for cronpython schedule_agent.py --once --date 2026-06-13 --dry-run# Idempotency check: run twice and see the result converge to onepython schedule_agent.py --once --date 2026-06-13python schedule_agent.py --once --date 2026-06-13 # -> should be skip
--dry-run performs no real writes and only shows the log_run output. Running twice and confirming the second pass is skip is my minimum pre-deploy check. If it ends in done, done, the output-key design is still too weak.
Wire pause / resume into operations
Finally, you will always need a way to stop temporarily. Deploys, incident investigation, upstream API maintenance — the moments you want to pause arrive regularly.
PAUSE = pathlib.Path("state/PAUSED")def guard_paused(): if PAUSE.exists(): log_run("paused", reason=PAUSE.read_text().strip() or "manual") sys.exit(0) # exit normally, not as a failure
Drop a state/PAUSED file to skip future starts, delete it to resume — that simple. You can also call the SDK's schedule.pause() / schedule.resume() from a script, but keeping a file-based hook alongside means you can still stop the job by hand in an emergency when the SDK API is unavailable.
Pulled together, the skeleton of an idempotent scheduled agent is: lock to prevent parallel runs, an output key to reject duplicates, a checkpoint advanced after the work, and state preserved in structured logs.
For your next step, pick one scheduled job you run today and fire --once twice in a row. If the second run does not end in skip, that job is still weak to double starts — and that is exactly where idempotency begins.
Share
Thank You for Reading
Antigravity Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.