Piping Antigravity CLI's JSON Lines Output Into My Own Script — Notes on Partial Lines and Exit Codes
Feeding Antigravity CLI's machine-readable JSON Lines output into my own script broke with JSONDecodeError roughly one run in three. Here are the three traps — partial lines, exit codes, and stderr bleed — and the line-buffered consumer that finally made it stable.
I wired up a small pipeline: take the output of the Antigravity CLI (agy), run it through a quality check, and only then hand it to the next step. For the first few days it died with json.JSONDecodeError: Expecting value about one run in three. The agent itself was running fine — it was the code receiving its output that broke. The cause wasn't the agent or the model. It was how I was reading the stream.
You never notice this while watching the CLI interactively. It only surfaces the moment you start consuming the output by machine. Since plenty of people hit the same wall, let me walk through what was actually happening and how I fixed it.
Machine-readable output is line-based, but lines arrive in pieces
Separate from its human-friendly display, the Antigravity CLI can emit a machine-readable stream of events. The flag name varies by version (check agy --help), but it's usually JSON Lines (NDJSON) — one JSON object per line. Events like agent_started, tool_call, and agent_completed stream out a line at a time as the agent progresses.
My first misconception lived right here. "If it's one JSON per line, I'll just split the blob on newlines and call json.loads." That works fine if you receive everything in one batch. The trouble began the instant I started reading the stream incrementally, because I wanted to watch progress live.
Pipes and streams make no promise about "lines." If you read in fixed-size chunks like read(4096), you get a blob that's cut off mid-line. That line stays incomplete until the rest arrives on the next read. The code I first wrote stepped right off that assumption.
import jsonimport subprocessproc = subprocess.Popen( ["agy", "run", "--json", "task.md"], stdout=subprocess.PIPE, text=True,)# ❌ a single read can hand you only the first half of a linewhile True: chunk = proc.stdout.read(4096) if not chunk: break for line in chunk.split("\n"): event = json.loads(line) # partial / blank lines raise JSONDecodeError handle(event)
The last element of chunk.split("\n") is usually the front half of a line that belongs with the start of the next chunk. Pass it to json.loads on its own and it breaks, of course. Blank lines (the empty string produced by a trailing newline) fail the same way. Because the error showed up probabilistically, I first blamed the model for "occasionally emitting junk" and took the long way around before reaching the real cause.
The simplest correct consumer iterates one line at a time
A Python text-mode stream guarantees complete lines when you iterate it with for line in stream:. If you'd rather not think about partial chunks at all, moving to this shape is the shortest path. While I was at it, I built in line buffering (bufsize=1) and stderr separation from the start.
import jsonimport subprocessimport sysdef consume(cmd): """Consume the CLI's JSON Lines output one safe line at a time.""" proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # keep progress logs out of stdout text=True, bufsize=1, # line buffering ) events = [] for raw in proc.stdout: # one line at a time, always complete line = raw.strip() if not line: # skip blank lines continue try: events.append(json.loads(line)) except json.JSONDecodeError: # non-JSON lines (startup banners, etc.) get dropped, but recorded sys.stderr.write(f"skip non-json line: {line[:120]}\n") code = proc.wait() return code, events
for raw in proc.stdout: is the key. That alone makes the partial-line problem disappear. We strip() whitespace and newlines, drop blank lines, and discard non-JSON lines while logging them to stderr. If you swallow the exception with a bare pass, you fall into a different swamp later — "why are some events missing?" Even when you drop a line, leave a trace that you dropped it. That one is a note to myself.
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦You'll be able to stop JSON parsing from breaking on lines that arrive half-finished, by buffering on line boundaries and carrying the remainder forward
✦You'll learn a dual-gate design that decides success from both the exit code and a completion event, instead of trusting the last line you happened to read
✦You'll walk away with a consumer template ready for unattended runs — stdout/stderr separation, timeouts, and a fresh working directory per run
Secure payment via Stripe · Cancel anytime
✦
Unlock This Article
Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.
When you can't iterate by line, carry the remainder yourself
There are cases where you can't lean on line iteration: multiplexing several streams with select, or receiving over a socket. There you read in fixed-size chunks and carry the leftover "half line" forward into a buffer yourself. Wrapping it in a generator lets the caller keep receiving one line at a time as before.
def iter_json_lines(stream, chunk_size=4096): """Read in fixed chunks, but carry line-spanning remainder so only complete lines surface.""" buffer = "" while True: chunk = stream.read(chunk_size) if not chunk: break buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) # keep the rest for next time line = line.strip() if line: yield line # end of stream: don't drop a final line that has no trailing newline tail = buffer.strip() if tail: yield tail
Two points matter. First, buffer.split("\n", 1) pulls only the leading complete line and puts the rest back into buffer. Second, you must catch the final tail when the stream closes without a trailing newline. Forget that tail handling and the very last event — often the completion event, the one that matters most — quietly vanishes. I hit exactly that once: "it succeeded, but it isn't being judged complete."
Don't decide success from "the last line" — gate on exit code and completion event
Once receiving is stable, the next question is judging success — and a naive implementation traps you here too. If you write "success if the last event is agent_completed," you'll misjudge a run that errored midway but still printed a summary line, and you'll miss a run that was killed on timeout and exited non-zero without ever emitting a completion event.
What I settled on is a dual judgment that looks at both the exit code and the completion event. Either one alone is not enough.
def run_and_gate(cmd): """Treat a run as successful only when both exit code and completion event agree.""" code, events = consume(cmd) completed = any(e.get("type") == "agent_completed" for e in events) had_error = any(e.get("type") == "error" for e in events) # an exit 0 can still leave the agent unfinished, so check both if code != 0 or had_error or not completed: reason = [] if code != 0: reason.append(f"exit={code}") if had_error: reason.append("error-event") if not completed: reason.append("no-completion") return False, ", ".join(reason), events return True, "ok", events
I'm this careful because, in unattended automation, the worst outcome is mistaking failure for success. Miss an error and let the next step run (in my case, a push), and a broken artifact flows straight downstream. The exit code tells you how the process died; the completion event tells you whether the agent finished its job. Only when both line up can you say "it really finished." I keep the reason as a string so that, reading the logs later, I can see at a glance why something was marked a failure.
Separate stdout and stderr — progress logs bleeding into the JSON
Even with the dual gate in place, JSONDecodeError still surfaced now and then. The culprit was a case where the CLI wrote progress messages to standard output. Once data (JSON Lines) and logs (human-facing progress) share the same stdout, there's no way for the receiver to separate them.
The fix is simple: have the CLI send logs to standard error, and have the receiver split the two into separate files. In shell, make the redirection explicit.
set -euo pipefail# a fresh working directory per run (reason in the next section)RUN="$(mktemp -d "${TMPDIR:-/tmp}/agy.XXXXXX")"trap 'rm -rf "$RUN"' EXIT# split stdout (NDJSON data) and stderr (progress logs) into separate files# timeout guards against a hung model response holding the job foreverif timeout 1200 agy run --json task.md \ >"$RUN/out.ndjson" 2>"$RUN/progress.log"; then status="ok"else status="failed(exit=$?)"fi# an exit 0 can still be incomplete, so check for the completion event tooif ! grep -q '"type":"agent_completed"' "$RUN/out.ndjson"; then status="incomplete"fiecho "status=$status"
That one line — >"$RUN/out.ndjson" 2>"$RUN/progress.log" — cleanly separates data from logs. When you want to watch progress, tail -f progress.log; out.ndjson stays free of non-JSON lines. I wrap it in timeout because, very occasionally, a model response hangs and the process never ends — and in unattended operation that ties up the job until morning. Cap the runtime and a hung job gets cut short as a failure.
In a terminal-less environment like CI there's another trap: the CLI may not switch into machine-readable mode at all and stdout goes missing. I've written up how to isolate that in running Antigravity CLI (agy) non-interactively in CI.
A fresh working directory per run — how fixed temp names invite contamination
The last point sits one layer outside the consumer itself. For a while I sent output to a fixed name like /tmp/agy_out.ndjson, and that occasionally carried the previous run's leftovers into the next one. If a write failed midway, or two jobs overlapped in time, a stale file would get read as the new job's output.
# ❌ fixed name: on a failed write or overlapping runs, stale leftovers mix in# OUT="/tmp/agy_out.ndjson"# ✅ a unique directory per run, always cleaned up on exitRUN="$(mktemp -d "${TMPDIR:-/tmp}/agy.XXXXXX")"trap 'rm -rf "$RUN"' EXITOUT="$RUN/out.ndjson"
mktemp -d makes a unique directory every time, and trap ... EXIT removes it whether the run ends normally or with an error. That alone removes any chance of jobs stepping on each other's output. Running several sites on a personal-developer automation, separate jobs sometimes overlap slightly in their off-peak windows — and a fixed name will bite you eventually. In the Dolice Labs auto-publishing pipeline I ended up with a two-step rule: cut temp files with a unique name per run, then verify their contents after writing.
If you go further and start bundling jobs in parallel, output separation and the join step are worth their own treatment — I've organized that in fanning out, polling, and joining nightly jobs with agy's async jobs. Read alongside this, the receiver and the bundling sit on the same continuous ground.
Build one hard-to-break consumer first, then widen the automation
It's tempting to focus on making the agent smarter, but the stability of unattended automation is mostly decided by the unglamorous part: how you receive the output. Read one line at a time with for line in stream:, judge on both the exit code and the completion event, separate stdout from stderr, and cut temp files fresh per run. Build one consumer with those four in place, and the foundation holds no matter what task you hand the CLI next.
For your next step, take just one of the automations you're already running and swap its output handling over to a for line in proc.stdout: shape, then add an assertion that checks for agent_completed. For me, once this small consumer was in order, the range of tasks I could trust an agent with quietly widened. I'd be glad if we could each make our own automation a little more robust, step by step.
Share
Thank You for Reading
Antigravity Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.