TL;DR
- agent loopの脆弱点は「状態がメモリにしかない」こと → checkpointで永続化することで再起動・クラッシュ後も途中から再開できる
- retry/saga patternで部分失敗を冪等にリカバリ → 全体を最初からやり直す「巻き戻し地獄」を回避できる
- step-level tracing(OTel)でどのステップで詰まったか即特定 → 障害対応の MTTR を大幅に短縮できる
はじめに
こんにちは、みねです。
AIエージェントの agent loop は「plan → execute → verify → replan」の繰り返しです。autonomous agent loop の基本設計で説明したように、短いタスクなら問題ありませんが、複数のAPIコールや長時間のLLM推論を伴うタスクでは、ネットワーク障害・LLMタイムアウト・メモリ不足により頻繁に失敗します。最も痛いのは「どこまで進んだか分からない」状態で全部やり直しになることです。durable workflowを組み合わせることで、この問題を構造的に解決できます。
この記事では プロンプトからエージェントエンジニアリングへの転換 で述べた「仕組みで制御する」思想を、耐障害性の文脈で実装レベルまで掘り下げます。
agent loopの失敗パターン
実運用で踏む失敗を整理します。
| 失敗タイプ | 原因 | 影響 |
|---|---|---|
| LLMタイムアウト | APIレスポンス遅延・モデル過負荷 | ループ全体が例外で終了 |
| ネットワーク断 | 一時的な接続不安定 | 実行済みstepの結果が消える |
| OOM(メモリ不足) | コンテキストの肥大化 | プロセス強制終了、進捗消失 |
| 冪等性の欠如 | 外部APIへの重複書き込み | データ不整合・課金の二重発生 |
| 状態の競合 | 並列agentが同一リソースを更新 | 上書き・データ破損 |
これら全てに共通する根本原因は「状態がメモリにしかない」ことです。
checkpointによる状態永続化
設計方針
各stepの完了時に状態をストレージへ書き込みます。再実行時はcheckpointを読み込んでそのstepから再開します。
[plan] → checkpoint → [execute:step1] → checkpoint → [execute:step2] → checkpoint → [verify] → checkpoint → [done]
実装例(TypeScript / PostgreSQL)
// checkpoint.ts
interface CheckpointRecord {
runId: string;
stepName: string;
stepIndex: number;
status: "running" | "completed" | "failed";
output: unknown;
createdAt: Date;
}
class AgentCheckpointStore {
constructor(private db: DatabaseClient) {}
async save(runId: string, stepName: string, stepIndex: number, output: unknown): Promise<void> {
await this.db.upsert("agent_checkpoints", {
run_id: runId,
step_name: stepName,
step_index: stepIndex,
status: "completed",
output: JSON.stringify(output),
created_at: new Date(),
});
}
async load(runId: string): Promise<CheckpointRecord | null> {
return this.db.findOne("agent_checkpoints", {
run_id: runId,
status: "completed",
order: { step_index: "desc" },
});
}
}
checkpointを使ったagent loop
// durable-agent-loop.ts
async function durableAgentLoop(runId: string, task: Task): Promise<Result> {
const store = new AgentCheckpointStore(db);
const steps: Step[] = [planStep, executeStep, verifyStep];
const lastCheckpoint = await store.load(runId);
const startIndex = lastCheckpoint ? lastCheckpoint.stepIndex + 1 : 0;
let context = lastCheckpoint?.output ?? {};
for (let i = startIndex; i < steps.length; i++) {
const step = steps[i];
context = await step.run(context, task);
await store.save(runId, step.name, i, context);
}
return context as Result;
}
startIndex を checkpoint から決定することで、再起動後でも完了済みのstepをスキップして続きから実行できます。
retry / saga pattern の実装
指数バックオフ付きretry
一時的な障害(APIレート制限・タイムアウト)はretryで対処します。無限リトライを防ぐために上限と指数バックオフを設けます。
// retry.ts
interface RetryOptions {
maxAttempts: number;
baseDelayMs: number;
maxDelayMs: number;
retryableErrors: string[];
}
async function withRetry<T>(
fn: () => Promise<T>,
opts: RetryOptions
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= opts.maxAttempts; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err as Error;
const isRetryable = opts.retryableErrors.some((code) =>
lastError.message.includes(code)
);
if (!isRetryable || attempt === opts.maxAttempts) throw lastError;
const delay = Math.min(
opts.baseDelayMs * 2 ** (attempt - 1),
opts.maxDelayMs
);
await sleep(delay);
}
}
throw lastError!;
}
// 使用例
const result = await withRetry(() => llm.complete(prompt), {
maxAttempts: 3,
baseDelayMs: 1000,
maxDelayMs: 30_000,
retryableErrors: ["rate_limit_exceeded", "timeout", "service_unavailable"],
});
saga patternによる補償トランザクション
複数のstepにまたがる処理で一部が失敗した場合、実行済みのstepを逆順に「補償(undo)」します。マルチエージェント間のタスク分割設計は 実践マルチエージェント設計パターン で詳しく扱っています。
// saga.ts
interface SagaStep<T> {
name: string;
execute: (ctx: T) => Promise<T>;
compensate: (ctx: T) => Promise<void>;
}
async function executeSaga<T>(steps: SagaStep<T>[], initialCtx: T): Promise<T> {
const completed: SagaStep<T>[] = [];
let ctx = initialCtx;
try {
for (const step of steps) {
ctx = await step.execute(ctx);
completed.push(step);
}
return ctx;
} catch (err) {
// 補償処理を逆順で実行
for (const step of completed.reverse()) {
await step.compensate(ctx).catch((compensateErr) => {
console.error(`compensation failed for ${step.name}:`, compensateErr);
});
}
throw err;
}
}
// agent loopへの適用例
const result = await executeSaga(
[
{
name: "createDraftDocument",
execute: async (ctx) => {
const doc = await docStore.create(ctx.content);
return { ...ctx, docId: doc.id };
},
compensate: async (ctx) => {
if (ctx.docId) await docStore.delete(ctx.docId);
},
},
{
name: "publishToExternalAPI",
execute: async (ctx) => {
await externalApi.publish(ctx.docId, ctx.metadata);
return { ...ctx, published: true };
},
compensate: async (ctx) => {
if (ctx.published) await externalApi.unpublish(ctx.docId);
},
},
],
{ content: task.content, metadata: task.metadata }
);
compensate を各stepに定義することで、「途中で失敗したときにどこまで巻き戻すか」が明示的になります。
step-level observability の仕込み
OpenTelemetryによるstep tracing
各stepをOTelのspanで囲み、step名・入出力サイズ・所要時間・エラーを記録します。observabilityの全体設計については AIエージェントの観測基盤 eBPFで実行時の挙動を追う も参照してください。
// tracing.ts
import { trace, SpanStatusCode } from "@opentelemetry/api";
const tracer = trace.getTracer("agent-loop", "1.0.0");
async function tracedStep<T>(stepName: string, fn: () => Promise<T>): Promise<T> {
return tracer.startActiveSpan(`agent.step.${stepName}`, async (span) => {
span.setAttribute("agent.step.name", stepName);
span.setAttribute("agent.run.id", currentRunId);
try {
const result = await fn();
span.setAttribute("agent.step.output.size", JSON.stringify(result).length);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (err) {
span.setStatus({ code: SpanStatusCode.ERROR, message: (err as Error).message });
span.recordException(err as Error);
throw err;
} finally {
span.end();
}
});
}
// 使用例
const planResult = await tracedStep("plan", () => planner.run(task));
const execResult = await tracedStep("execute", () => executor.run(planResult));
const verifyResult = await tracedStep("verify", () => verifier.run(execResult));
ログ設計の原則
| ログレベル | 記録するイベント |
|---|---|
INFO | step開始・完了・checkpoint保存 |
WARN | retry試行・レート制限ヒット |
ERROR | step失敗・補償処理の失敗 |
DEBUG | LLMプロンプト・レスポンス全文 |
// structured logging
logger.info({
event: "step.completed",
runId,
stepName,
stepIndex,
durationMs: Date.now() - startMs,
outputTokens: result.usage?.totalTokens,
});
runId をすべてのログに含めることで、1回の実行に関するログをグレップで即座に集約できます。
実装チェックリスト
agent loopをdurable workflowで安定化するための確認項目です。
checkpoint設計
- 各stepの完了直後にcheckpointを保存しているか
- checkpoint読み込みでstartIndexを決定し、完了済みstepをスキップしているか
- checkpointのストレージが冪等な書き込みに対応しているか(upsert)
- checkpointのTTLまたはクリーンアップ戦略を定義しているか
retry設計
- リトライ可能なエラーコードを明示的に列挙しているか
- 指数バックオフとジッターを実装しているか
- 最大リトライ回数を設定しているか(無限ループ防止)
- べき等でない操作にリトライを適用していないか
saga / 補償設計
- 各stepに補償処理(
compensate)を定義しているか - 補償処理の失敗をログに記録し、手動介入できるようにしているか
- 外部APIへの書き込みが冪等キー(idempotency key)付きで実行されているか
observability
- 各stepをOTelのspanで囲んでいるか
-
runIdを全ログ・トレースに伝播しているか - 失敗時に
span.recordExceptionを呼んでいるか - step所要時間・トークン数をattributeとして記録しているか
テスト
- checkpoint途中からの再開をテストしているか
- 特定stepでの失敗時に補償処理が正しく動作することをテストしているか
- リトライ上限到達時の動作をテストしているか
FAQ
Q1. checkpointのストレージは何を使えばよいですか?
PostgreSQL、Redis(永続化設定あり)、S3(JSON形式)が実績があります。選択基準は「runIdで高速に最終checkpointを引けるか」と「書き込みが冪等か」の2点です。PostgreSQLのupsertが最も実装しやすく、大量のagentを並列実行する場合はRedisのハッシュ型が向いています。S3は長期保存・監査ログ用途に適しています。
Q2. saga patternは全てのagent loopに必要ですか?
必須ではありません。step間に副作用(外部APIへの書き込み・課金処理)がない場合はretryのみで十分です。saga patternが必要になるのは「一部のstepが成功し、後続stepが失敗したとき、成功済みstepの結果を取り消す必要がある」ケースに限定されます。最初はretry+checkpointから始め、副作用が増えてきた段階でsagaを導入するのが現実的なアプローチです。
Q3. OTelを導入すると実行速度に影響しますか?
本番実績では、step単位のspan生成オーバーヘッドは通常1ms未満です。LLMのAPIコール(数百ms〜数秒)と比べると無視できるレベルです。ただし、LLMのプロンプト・レスポンス全文をattributeに含める場合はデータ量が増加するため、DEBUGレベルで環境変数で切り替えられるようにしておくことを推奨します。
まとめ
checkpoint・retry/saga・OTelの3点セットで、agent loopは長時間実行に耐えられる基盤になります。まずcheckpointから始めて、段階的に追加してください。
agent loop の実装が安定したら、運用上の最初の壁は「黙って止まる」エージェントの観測です。アプリ層のトレース・ログ・error taxonomy で MTTR を縮める設計は、AIエージェントの可観測性と障害解析で詳しく扱っています。loop 内のツール呼び出し API(Responses API / Anthropic Tool Use)自体の設計判断は Responses API 時代のツール呼び出し設計 を参照してください。
:::message 実際の設計・実装について相談したい場合は、お気軽にお問い合わせください。 :::
