夜間に静かに走らせている並列エージェントが、朝の自分の作業と重なった瞬間に 429 Too Many Requests を吐き始める。これは、同時実行数を一つの固定値で決め打ちしたときに、ほぼ必ず通る道です。
固定値は二重に外れます。低く取れば、誰も使っていない深夜にエージェントが順番待ちで遊んでしまいます。高く取れば、自分が同じアカウントで対話している昼間にクォータを食い合い、下流のAPIが詰まります。どちらに寄せても、片方の時間帯で間違えるのです。
ここで扱うのは、同時実行数を「固定して守る」のではなく「観測して動かす」設計です。題材はAntigravity 2.0 の並列エージェントと Managed Agents API ですが、考え方はどのエージェント実行基盤にもそのまま移せます。
固定した同時実行数は、たいてい間違っています
まず、固定値が外れる構造を具体的に置いておきます。あるワークロードで、同時実行数を 6 に固定したとします。
| 時間帯 | 下流の余裕 | 同時実行 6 の結果 |
| 深夜(自分は不在) | 大きい | 能力を遊ばせる。10 でも 12 でも捌けたはず |
| 夕方(自分も対話中) | 小さい | クォータを共有して食い合い、429 が連鎖する |
| 大型更新の直後 | 読めない | レイテンシが伸び、6 でも詰まることがある |
固定値はこの表のどの行に最適化しても、他の行で外れます。しかも「正解の同時実行数」は、自分の在席状況・クォータの残量・モデル側の混雑という、こちらが制御できない要因で刻々と変わります。動く標的に固定値を当てようとすること自体が、設計の誤りなのです。
なぜ「ちょうどいい同時実行数」は止まっていないのか
Antigravity 2.0 は Gemini 3.5 Flash を中心に、計画・コード生成・ブラウザでの実機テストを複数エージェントで並行させます。Flash が速いほど、単位時間あたりに下流へ投げる要求の密度が上がります。つまり、エージェントが賢く速くなるほど、同時実行数の上限はシビアになります。
さらに Managed Agents API は、単一の呼び出しで隔離環境のエージェントを起動できます。起動が手軽だからこそ、こちらが無自覚に並列度を上げてしまい、共有クォータの天井に頭をぶつけやすくなります。
ここで効くのが、ネットワークの世界で何十年も使われてきた発想です。TCP は回線の帯域を事前に知りません。少しずつ送信量を増やし、パケットの損失という「詰まりの兆候」を見たら一気に絞る。この繰り返しで、実際の帯域に追従し続けます。エージェントの同時実行数も、同じように「観測しながら追従する量」として扱えます。
TCP の輻輳制御から借りる — 加法的増加・乗法的減少
借りる中核は AIMD(Additive Increase / Multiplicative Decrease)です。
- 加法的増加: 成功が続く間は、同時実行数をゆっくり、一定の刻みで足していきます。
- 乗法的減少: 詰まりの兆候を一度でも見たら、掛け算で一気に下げます(たとえば半分に)。
増やすときは慎重に、減らすときは大胆に。この非対称性が肝です。詰まりは下流を壊しかけているサインなので、線形に少しずつ下げていては手遅れになります。逆に、増やすのは余裕の確認にすぎないので、急ぐ必要はありません。
import time
from dataclasses import dataclass, field
@dataclass
class AIMDController:
"""同時実行数を観測に応じて増減させるコントローラ。"""
min_limit: int = 1
max_limit: int = 12
limit: float = 2.0 # 現在の許容同時実行数(実数で保持し、丸めて使う)
increase_step: float = 0.5 # 成功ごとの加算(加法的増加)
decrease_factor: float = 0.5 # 詰まり検知時の乗算(乗法的減少)
cooldown_s: float = 8.0 # 減少直後に再増加を抑える猶予
_last_decrease: float = field(default=0.0)
def on_success(self) -> None:
# 減らした直後は、しばらく増やさない。揺り戻しを防ぐため。
if time.monotonic() - self._last_decrease < self.cooldown_s:
return
self.limit = min(self.max_limit, self.limit + self.increase_step)
def on_throttle(self) -> None:
self.limit = max(self.min_limit, self.limit * self.decrease_factor)
self._last_decrease = time.monotonic()
@property
def concurrency(self) -> int:
return max(self.min_limit, int(self.limit))
limit を整数ではなく実数で持つのは、increase_step を 1 未満にできるようにするためです。成功 2 回でようやく 1 枠増える、といったゆっくりした増やし方ができ、上限付近での暴れを抑えられます。
何を「詰まりの兆候」として観測するか
on_throttle() を呼ぶ判断材料が、この設計の精度を決めます。観測する信号を、強さの順に並べておきます。
| 信号 | 意味 | 反応 |
| HTTP 429 / クォータ超過エラー | 下流が明確に「もう無理」と言っている | 即座に乗法的減少 |
| クォータ残量の警告ヘッダ | 壁に近づいている前兆 | 増加を止める(減らしはしない) |
| レイテンシの継続的な上昇 | 渋滞が始まっている | 緩やかに減少、または増加停止 |
| タイムアウト・接続断 | 詰まりが実害に変わった | 乗法的減少+リトライ予算の消費 |
一番上の 429 だけを見るのでも、最初の一歩としては十分に機能します。レイテンシまで見るのは、ノイズに振り回されないよう移動平均を入れてからにするのが安全です。単発の遅いレスポンスで同時実行数を半減させると、今度は能力を遊ばせる側に倒れます。
レスポンスを「成功か詰まりか」に振り分ける分類器を、呼び出しのすぐ外側に置きます。
class ThrottleError(Exception):
"""下流が明確に過負荷を示したときに送出する。"""
def classify(status_code: int, headers: dict) -> str:
if status_code == 429 or status_code == 503:
return "throttled"
# 残量警告は「これ以上増やすな」のサイン。減らすほどではない。
remaining = headers.get("x-quota-remaining")
if remaining is not None and int(remaining) < 5:
return "near_limit"
return "ok"
適応スケジューラに組み込む
コントローラが持つ concurrency は時々刻々と変わります。固定サイズのセマフォでは表現できないので、現在の同時実行数を毎回チェックする可変ゲートを用意します。
import asyncio
class AdaptiveScheduler:
def __init__(self, controller: AIMDController):
self.ctrl = controller
self._inflight = 0
self._cond = asyncio.Condition()
async def _acquire(self) -> None:
async with self._cond:
# 現在の許容数を毎回読み直すので、上限が縮んだら即座に効く。
while self._inflight >= self.ctrl.concurrency:
await self._cond.wait()
self._inflight += 1
async def _release(self) -> None:
async with self._cond:
self._inflight -= 1
self._cond.notify_all()
async def run(self, call):
"""call() は (status_code, headers, body) を返す非同期関数。"""
await self._acquire()
try:
status, headers, body = await call()
outcome = classify(status, headers)
if outcome == "throttled":
self.ctrl.on_throttle()
elif outcome == "ok":
self.ctrl.on_success()
# near_limit のときは増やしも減らしもしない
return status, headers, body
except (ThrottleError, asyncio.TimeoutError):
self.ctrl.on_throttle()
raise
finally:
await self._release()
# 上限が減った可能性があるので、待機者を起こして再評価させる
async with self._cond:
self._cond.notify_all()
_acquire() がループの中で self.ctrl.concurrency を読み直しているのが要点です。on_throttle() で上限が 6 から 3 に落ちた瞬間、新しく枠を取りたいタスクはその場で待たされます。すでに走っている処理は止めません。あくまで「次に始める数」を絞ることで、いま動いているものを壊さずに流量を下げます。
これを Managed Agents API の起動呼び出しにかぶせれば、隔離環境のエージェントを何本同時に立ち上げるかが、観測に応じて自動で調整されます。手元の並列エージェント呼び出しでも同じです。
バックプレッシャー設計との違いと、重ね方
「同時実行数を制限する」という話は、待ち行列とセマフォによるバックプレッシャー設計と隣り合わせです。両者は競合せず、役割が違います。
| 観点 | 固定上限のバックプレッシャー | 適応制御(AIMD) |
| 上限の決め方 | 事前に人が決めた固定値 | 観測から自動で追従 |
| 守るもの | 下流が確実に壊れない天井 | そのときの最適点への接近 |
| 苦手なこと | 条件変化への追従 | 暴走時の絶対的な安全弁 |
実運用では両方を重ねるのが堅実です。AIMD が日々の最適点を探し、その上に「何があっても超えない絶対上限」を固定のセマフォとして置きます。max_limit がまさにその役割を兼ねていますが、コスト上の事故を避けたいなら、コントローラの外側にもう一段、人が決めた硬い天井を持たせておくと安心できます。適応は最適化のため、固定上限は事故防止のため、と分けて考えると設計が澄みます。
増やしすぎ・減らしすぎを防ぐチューニング
数値の決め方には、いくつか効く勘所があります。
| パラメータ | 大きすぎると | 小さすぎると | 出発点の目安 |
| increase_step | 上限付近で振動する | 回復が遅い | 0.5 前後 |
| decrease_factor | 絞りが甘く429が続く | 過剰に絞り遊ぶ | 0.5(半減) |
| cooldown_s | 変化への追従が鈍る | 揺り戻しで暴れる | レイテンシ中央値の5〜10倍 |
最初は減少を大胆に、増加を控えめに振っておくのが安全です。詰まりを早く確実に解消できれば、多少能力を遊ばせても下流は守れます。慣れてきたら increase_step を上げ、遊んでいる枠を回収していきます。
一つ落とし穴があります。リトライと適応制御を素朴に組み合わせると、429 を受けたタスクが即座にリトライし、それがまた on_throttle() を呼んで、二重に絞ってしまうことがあります。リトライは指数バックオフに任せ、適応コントローラへ詰まりを報告するのは最初の一回だけにする、と決めておくと混線を回避できます。これは本番運用でいちばん踏みやすい罠なので、先に手を打っておくことをお勧めします。
運用して見えたこと
個人開発で複数のサイトを自動更新するパイプラインと、いくつかのモバイルアプリのバックグラウンド処理を、同じ時間帯に並走させていた時期があります。当初は同時実行数を固定していて、深夜は明らかに余っているのに保守的な値のまま、私自身が日中に触り始めると今度は 429 が連鎖する、という両側の失敗を毎日くり返していました。
固定値を捨てて、429 を唯一の信号にした素朴な AIMD に切り替えたところ、まず夕方の連鎖的な失敗が静かになりました。コントローラが詰まりを見て勝手に絞るので、私が在席を意識して手で値を下げる作業がなくなったのです。夜は逆に、誰も競合しないので上限までゆっくり上がり、放っておいても枠を使い切るようになりました。賢い予測を足したわけではありません。ただ「結果を見て、増やすか減らすかを決める」という一点を機械に任せただけです。
制御の世界で長く使われてきた発想を、エージェントの並列度という新しい対象に当てはめ直しただけ、とも言えます。新しいものほど、古くから確かめられてきた原理がよく効くと感じています。固定値で消耗していた方は、まず 429 だけを見る小さなコントローラから試してみてください。
お読みいただきありがとうございました。