AI に重たいジョブを任せたあと、3 時間かけて作った中間結果が最後のステップで例外を投げて吹き飛んだ経験はないでしょうか。私も以前、ユーザーから受け取った 600 件の問い合わせメールを Gemini で要約して Slack に投げる処理を「Vercel の Edge Function でやればいいや」と気軽に書いたら、途中で OpenAI のレート制限に引っかかって全部最初からやり直す羽目になりました。
「ジョブが落ちたときに、どこまで進んでいたかが分からない」— これがイベント駆動の AI ワークフローを書いていてもっとも辛い瞬間だと思います。ここではAntigravity でフロントとバックを横断的に書きながら、ジョブ実行基盤として Inngest を採用するパターンを、私が実プロダクトに組み込んだ構成そのままで紹介します。Temporal や Trigger.dev で 24 時間自律稼働の AI エージェントを作る も検討した上で、なぜこのスタックに落ち着いたのかから話を始めます。
なぜ AI ワークフローに Inngest を選ぶのか
最初に正直に書いておくと、ジョブ実行基盤に「絶対の正解」はありません。Temporal は機能豊富で大規模運用にも耐えますが、自前で Worker クラスタを運用するコストがそれなりにかかります。BullMQ は軽量ですが、リトライや並行制御の設計を全部自分で書く必要があります。Trigger.dev は Inngest と非常に近い思想で、こちらも有力な選択肢です。
そのうえで私が個人開発と小〜中規模プロダクトで Inngest を選ぶ理由は、おおまかに 3 つあります。第一に Function-as-Workflow という設計で、TypeScript の関数として書いたものが、勝手にステップごとに永続化・リトライされる開発体験。第二に Cloudflare Workers / Vercel / Next.js Route Handler にそのままデプロイできる軽さ。第三に AI 系の API コールでよくある「冪等キーで重複防止」「concurrency でレート制限調整」「waitForEvent で人間の承認を待つ」がワンライナーで書けることです。
設計思想の背景にある Durable Execution パターンの考え方は Antigravity × Durable Execution: 障害に強い長時間 AI タスクの設計パターン で詳しく扱いましたので、概念を押さえてから本記事に戻ってきていただくとスムーズです。
Antigravity からプロジェクトを立ち上げる
ここからは Next.js 16(App Router)+ Inngest + Gemini API の構成で実装を進めます。Antigravity でプロジェクトを開いたら、まずはエージェントに次の構造を作ってもらいます。私はこのとき、Antigravity の Manager Surface に直接タスクを書いて並列で走らせるのが好みです。
# Antigravity のターミナルから実行
npx create-next-app@latest ai-workflows --typescript --app --tailwind
cd ai-workflows
npm install inngest @google/genai zod
npm install -D @types/node tsx@google/genai は Gemini API の TypeScript SDK、zod は Inngest が型推論で利用するスキーマ定義に使います。Antigravity でプロジェクトを開き直し、エディタの右ペインで AI に「Inngest クライアントの初期化と、Next.js Route Handler の雛形を src/inngest/ に作って」と頼むと、ほぼ完成形が降ってきます。ただし、降ってきたコードを そのまま使わない のがコツです。Inngest はバージョン差で API 名が変わることがあるので、必ず公式の最新ドキュメントと突き合わせてください。
最低限のセットアップとして、src/inngest/client.ts は次のようになります。
// src/inngest/client.ts
import { Inngest } from "inngest";
import { z } from "zod";
// イベント型を Zod で定義しておくと、関数側で自動的に型が効きます
const eventSchemas = {
"support/inquiry.received": {
data: z.object({
inquiryId: z.string().uuid(),
userId: z.string(),
body: z.string().min(1).max(8000),
locale: z.enum(["ja", "en"]).default("ja"),
}),
},
"support/reply.approved": {
data: z.object({
inquiryId: z.string().uuid(),
approvedBy: z.string(),
}),
},
} as const;
export const inngest = new Inngest({
id: "antigravity-support-bot",
// 本番では環境変数経由で signing key を渡す
schemas: eventSchemas,
});ポイントは、イベント名を domain/action 形式に揃えていることと、z.object でデータの形を厳格に決めていることです。後者を省略すると、Inngest の Dashboard で「なぜか body が undefined」というデバッグに 30 分溶かす日が必ず来ます。
最初のイベント駆動関数を書く
最小構成の関数を 1 つ書いてみます。お題は「ユーザーから問い合わせを受け取ったら、Gemini で要約して、社内 Slack に投げる」というワークフローです。Antigravity の AI に「テスト可能な状態で書いて」と注文すると、依存を引数で受け取る形でジェネレートしてくれることが多いです。
// src/inngest/functions/handle-inquiry.ts
import { inngest } from "../client";
import { GoogleGenAI } from "@google/genai";
import { NonRetriableError } from "inngest";
const ai = new GoogleGenAI({ apiKey: process.env.GEMINI_API_KEY! });
export const handleInquiry = inngest.createFunction(
{
id: "handle-inquiry",
// 同一ユーザーから連投された場合、5 秒以内のものは 1 件として扱う
debounce: { period: "5s", key: "event.data.userId" },
retries: 3,
},
{ event: "support/inquiry.received" },
async ({ event, step, logger }) => {
const { inquiryId, body, locale } = event.data;
// 1. AI 要約(高コストなので step.run でチェックポイント化)
const summary = await step.run("summarize-with-gemini", async () => {
const resp = await ai.models.generateContent({
model: "gemini-2.5-flash",
contents: `次の問い合わせを ${locale === "ja" ? "日本語" : "英語"}で 80 字以内に要約してください:\n\n${body}`,
});
const text = resp.text?.trim();
if (!text) {
// 空応答は再試行しても無駄なので非リトライ扱い
throw new NonRetriableError("Gemini returned empty response");
}
return text;
});
// 2. DB に下書き保存(冪等キーは inquiryId)
await step.run("save-draft-reply", async () => {
await fetch(`${process.env.APP_URL}/api/internal/save-draft`, {
method: "POST",
headers: { "content-type": "application/json", "x-idempotency-key": inquiryId },
body: JSON.stringify({ inquiryId, summary }),
});
});
// 3. Slack に通知
await step.run("notify-slack", async () => {
await fetch(process.env.SLACK_WEBHOOK_URL!, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
text: `📩 新しい問い合わせ要約: ${summary}\n承認 → /approve ${inquiryId}`,
}),
});
});
logger.info({ inquiryId }, "inquiry summarized and notified");
return { inquiryId, summary };
}
);この関数で押さえておきたいのは 3 点です。第一に、AI コール・DB 書き込み・Slack 通知をそれぞれ step.run で囲んでいる点。これによって、もし Slack 通知が失敗してリトライされても、Gemini API は再呼び出しされず、すでに生成した要約がキャッシュから戻ります。Gemini の課金は決して安くないので、これは現場で効きます。第二に debounce を入れて、ユーザーが同じ問い合わせを連打したケースを 1 イベントに丸めている点。第三に NonRetriableError を使って「空応答」のような再試行が無意味なケースを明示的にマークしている点です。
step.run と冪等性 — 課金事故を防ぐ設計
Inngest の本質は step.run のセマンティクスにあります。ステップが一度成功すると、その結果は永続化され、後続のステップが失敗してもそのステップは再実行されません。これは Stripe の決済のように「絶対に二度実行してほしくない処理」と相性が抜群に良いです。
私が以前やらかしたのは、AI で生成した記事を Stripe の決済成功 webhook の中で「直接」プッシュしていて、webhook が再送されたタイミングで二重課金が発生したケースです。Inngest を間に挟んで、決済イベントをイベントとして発火 → 関数内で step.run("charge-once", ...) の冪等キーに event.data.checkoutSessionId を渡すように変えてから、この種の事故はゼロになりました。
冪等キー設計のコツは、そのステップが「論理的に同一」と見なされる粒度を意識すること です。たとえば「ユーザー A への 10 月分のサブスク請求」は userId + period で一意ですが、「メール送信」はメッセージ ID を含めないと「同じ内容の別メール」を排除できません。冪等性まわりは Antigravity と Effect-TS で堅牢なエラーハンドリングを設計する実装ガイド でも触れていますので、エラー設計を一緒に整える際の参考にしてください。
concurrency と throttle で AI 並列実行を制御する
AI のレート制限はワークフロー設計で一番ハマるポイントです。Gemini も OpenAI も Anthropic も、無料枠から有料枠に上げてもなお RPM(1 分あたりリクエスト数)の壁がそびえています。Inngest は関数定義の中で concurrency と throttle を組み合わせることで、複雑なキューイングを書かずに済みます。
たとえば「全ユーザーの月次レポートを AI で生成する」というジョブを考えます。素朴に並列で発火すると、5,000 ユーザーいたら一瞬で API が 429 を返します。次のように書きます。
// src/inngest/functions/generate-monthly-report.ts
import { inngest } from "../client";
export const generateMonthlyReport = inngest.createFunction(
{
id: "generate-monthly-report",
// 同時実行は最大 8 つ。ユーザー単位ではなく関数全体で制御
concurrency: {
limit: 8,
key: "event.data.tenantId", // テナントごとに独立した concurrency
},
// 1 分あたり最大 60 リクエストに絞る(Gemini Free の上限想定)
throttle: {
limit: 60,
period: "1m",
key: "event.data.tenantId",
},
retries: 5,
},
{ event: "report/generate.requested" },
async ({ event, step }) => {
const { userId, month } = event.data;
const sections = await step.run("collect-data", async () => {
// ユーザーのアクティビティを集計して返す
return await collectUserActivity(userId, month);
});
const draft = await step.run("ai-write-report", async () => {
// 集計結果を Gemini に渡してレポート本文を生成
return await composeReportWithGemini(sections);
});
await step.run("publish-report", async () => {
await publishToUser(userId, draft);
});
return { userId, month, draftId: draft.id };
}
);
// ヘルパは実装簡略化のため省略しています
async function collectUserActivity(userId: string, month: string) { /* ... */ }
async function composeReportWithGemini(sections: unknown) { /* ... */ }
async function publishToUser(userId: string, draft: unknown) { /* ... */ }concurrency.key をテナント ID にすることで、A 社のジョブが暴れても B 社のジョブが詰まらない、という設計が一行で書けます。これを自前のキューシステムでやろうとすると、Redis のキー設計やワーカープロセス数の調整に丸 1 日溶かすので、ここだけでも Inngest を選ぶ価値があると私は思っています。
似た用途で Temporal を選ぶ場合は、Antigravity と Temporal で本番運用に耐える AI ワークフロー基盤を構築する で扱っています。要件が「数千ユーザーまで」なら Inngest、「数十万ユーザーで自前運用したい」なら Temporal という棲み分けが私のなかでは整理できています。
step.waitForEvent でヒューマン・イン・ザ・ループを実装する
完全自動化が怖いユースケースもあります。たとえば、AI が下書きした問い合わせ返信を、人間が承認してから送りたい場合です。Inngest の step.waitForEvent を使うと、この「人間の承認待ち」を自然に書けます。
// src/inngest/functions/auto-reply-with-approval.ts
import { inngest } from "../client";
export const autoReplyWithApproval = inngest.createFunction(
{ id: "auto-reply-with-approval", retries: 2 },
{ event: "support/inquiry.received" },
async ({ event, step }) => {
const { inquiryId } = event.data;
// 下書き生成(別関数に丸投げしてもよい)
const draft = await step.run("generate-draft", async () => {
return await generateDraftWithGemini(event.data.body);
});
// 24 時間以内に承認イベントが来たら受け取り、来なければタイムアウト
const approval = await step.waitForEvent("wait-for-approval", {
event: "support/reply.approved",
timeout: "24h",
// 同じ inquiryId の承認だけを待つ
if: `event.data.inquiryId == "${inquiryId}"`,
});
if (!approval) {
// タイムアウト時は人間にエスカレーション
await step.run("escalate", async () => {
await sendEscalationEmail(inquiryId, draft);
});
return { status: "escalated", inquiryId };
}
// 承認されたら送信
await step.run("send-reply", async () => {
await sendEmail({
inquiryId,
body: draft.body,
approvedBy: approval.data.approvedBy,
});
});
return { status: "sent", inquiryId, approvedBy: approval.data.approvedBy };
}
);
async function generateDraftWithGemini(body: string) { /* ... */ return { body: "" }; }
async function sendEscalationEmail(inquiryId: string, draft: { body: string }) { /* ... */ }
async function sendEmail(args: { inquiryId: string; body: string; approvedBy: string }) { /* ... */ }このコードのどこにも setTimeout もポーリングも書かれていない点が重要です。Inngest が裏で 24 時間スリープし、承認イベントが届いた瞬間に再開してくれます。Antigravity 上で inngest dev を立ち上げてローカルでテストすれば、Slack の /approve コマンドから手で承認イベントを送って動作確認できます。
リトライ戦略とエラー分類
「リトライすればいつかは通る」という楽観論は本番では通用しません。エラーは大きく 3 種類に分けて扱うのが基本です。第一が 再試行で回復するエラー(ネットワーク瞬断、429 の Rate Limit、5xx)。第二が 再試行しても結果が変わらないエラー(400 Bad Request、ビジネスルール違反、バリデーションエラー)。第三が 再試行が危険なエラー(部分的に課金が完了しているなど、副作用が中途半端な状態)。
Inngest では NonRetriableError で第二種を、RetryAfterError で第一種の Rate Limit を、それぞれ明示的に表現できます。
// src/inngest/functions/safe-charge.ts
import { inngest } from "../client";
import { NonRetriableError, RetryAfterError } from "inngest";
export const safeCharge = inngest.createFunction(
{ id: "safe-charge", retries: 4 },
{ event: "billing/charge.requested" },
async ({ event, step }) => {
await step.run("charge-stripe", async () => {
const res = await fetch("https://api.stripe.com/v1/charges", {
method: "POST",
headers: {
authorization: `Bearer ${process.env.STRIPE_SECRET_KEY}`,
// 冪等キーを必ず付ける
"idempotency-key": event.data.checkoutSessionId,
},
body: new URLSearchParams({ amount: String(event.data.amount), currency: "jpy" }),
});
if (res.status === 429) {
const retryAfter = Number(res.headers.get("retry-after") ?? "10");
throw new RetryAfterError("rate limited by Stripe", `${retryAfter}s`);
}
if (res.status === 400) {
// 400 は再試行しても回復しないので非リトライ
throw new NonRetriableError(`Stripe rejected: ${await res.text()}`);
}
if (!res.ok) {
// 5xx は通常リトライに任せる
throw new Error(`Stripe error: ${res.status}`);
}
return await res.json();
});
}
);RetryAfterError の第二引数で次回試行までの待機時間を指定できるので、Stripe や OpenAI が返してくる Retry-After ヘッダの値をそのまま渡すのが定石です。エクスポネンシャルバックオフは Inngest 側のデフォルト挙動に任せて、明示的なヒントだけ渡す、というのが私の好みです。
オブザーバビリティ — Dashboard と OpenTelemetry の使い分け
Inngest の Dashboard は、関数ごとの成功率・遅延・失敗ログを GUI で見られる優秀なツールです。それでも私は「複数の関数にまたがるリクエスト全体の追跡」のために OpenTelemetry を併用しています。
具体的には、AI 呼び出しごとに traceId を発行してイベントの data に含め、step.run の中で @opentelemetry/api の Span として記録します。こうすると、Datadog / Grafana / Honeycomb などのトレース基盤で「このユーザーの問い合わせが受信されてから返信されるまでのタイムライン」を一画面で追えるようになります。
OpenTelemetry の組み込み詳細は Antigravity と OpenTelemetry で AI エージェントのトレース観測基盤を作る で解説していますので、本記事と組み合わせるとオブザーバビリティ層が一気に整います。
よくある落とし穴と対処
実プロダクトに投入してから気づくポイントを 5 つだけ書いておきます。
第一に、step.run の中で長時間ブロックする処理を 1 つの step に詰め込むと、Inngest 側の実行時間制限に引っかかります。「外部 API が返ってこないから 60 秒待つ」のではなく、ジョブをキックして step.waitForEvent で完了通知を待つ設計にすると安定します。
第二に、step.run の戻り値は JSON シリアライズ可能でなければなりません。Date オブジェクトをそのまま返すと、再起動後に文字列に化けてバグります。私はチームのコーディング規約として「step の戻り値は ISO 文字列か数値のみ」にしています。
第三に、Inngest Dev Server を立ち上げ忘れるとローカルで関数がトリガーされません。Antigravity の AI に「dev コマンドが見当たらない」と相談すると、npx inngest-cli@latest dev のような提案が返ってくるので、package.json の scripts に登録しておくのが楽です。
第四に、本番では Signing Key を必ず Cloudflare / Vercel の Secret に登録してください。.env.local をコミットしてしまうと、攻撃者があなたの関数を任意のイベントで起動できてしまいます。
第五に、debounce と concurrency を同時に使うと挙動が直感に反することがあります。debounce はイベント受信側の集約、concurrency は実行側の制御なので、両方使うときは「どっちが先に効くか」を Dashboard で必ず確認してください。私は本番で初めて気づいて 1 時間冷や汗をかきました。
全体を振り返って — 今日から手を動かす最小ステップ
ここまで読んでいただきありがとうございます。Inngest はドキュメントが充実していて、Antigravity の AI に「Inngest の step.run を使ってこの処理を冪等にして」と頼むと、想像以上に正確に書き直してくれます。逆に言うと、設計判断はあなた自身がしないと、AI は「動くけれど壊れやすい」コードを生成してしまいます。
最初の一歩としておすすめなのは、今あなたのプロダクトに 1 つだけある「失敗したら困るが、再実行が怖くてできない処理」を Inngest 関数に切り出すこと です。たとえば AI で生成した請求書の Stripe 送信、ニュースレターの一括送信、ユーザー登録時のウェルカムメール — こういう「副作用つき AI ジョブ」を 1 つだけ Inngest に逃がしてみてください。Dashboard で再生ボタンを押せば、本番事故のリプレイができる安心感が、運用の景色を変えます。
次に手を動かすなら、Inngest と並んで有力な選択肢である Trigger.dev を、自律エージェントの 24 時間運用という別角度から扱った Trigger.dev で 24 時間自律稼働の AI エージェントを作る と読み比べてみてください。同じ Durable Execution の世界でも、ツール選びの判断軸が立体的に見えてくるはずです。