agent-contracts

コアコンセプト

agent-contractsアーキテクチャの詳細


概要

agent-contractsはシンプルな原則に基づいています:ノードが何をするかを宣言し、どう接続するかは宣言しない

┌─────────────────────────────────────────────────────────────┐
│                        Registry                              │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐               │
│  │ NodeA     │  │ NodeB     │  │ NodeC     │  ...          │
│  │ CONTRACT  │  │ CONTRACT  │  │ CONTRACT  │               │
│  └───────────┘  └───────────┘  └───────────┘               │
└─────────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                     GraphBuilder                             │
│  • コントラクトを分析                                         │
│  • スーパーバイザーを作成                                     │
│  • LangGraphを自動配線                                       │
└─────────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                      LangGraph                               │
│  START → Supervisor ⟷ Nodes → END                           │
└─────────────────────────────────────────────────────────────┘

NodeContract

NodeContractはライブラリの中心です。ノードに関するすべてを宣言します:

NodeContract(
    # === 識別 ===
    name="my_node",                    # 一意の識別子
    description="このノードの役割", # 人間が読める説明
    
    # === I/O定義 ===
    reads=["request", "context"],      # 読み取るステートスライス
    writes=["response"],               # 書き込むステートスライス
    
    # === 依存関係 ===
    requires_llm=True,                 # LLMが必要か
    services=["db_service"],           # 必要な外部サービス
    
    # === ルーティング ===
    supervisor="main",                 # 管理するスーパーバイザー
    trigger_conditions=[...],          # ノードを起動する条件
    is_terminal=False,                 # 実行後にフローを終了するか
)

なぜコントラクトか?

コントラクトなし コントラクトあり
手動でグラフを配線 自動グラフ構築
依存関係が隠れている 明示的なI/O宣言
ランタイムエラー 静的検証
ドキュメント化が困難 自動生成ドキュメント

ステートスライス

agent-contractsのステートは独立したスライスに整理されます:

state = {
    "request": {           # ユーザーからの入力
        "action": "search",
        "params": {"query": "laptop"}
    },
    "response": {          # ユーザーへの出力
        "response_type": "results",
        "data": [...]
    },
    "context": {           # 共有コンテキスト
        "user_preferences": {...}
    },
    "_internal": {         # フレームワーク内部
        "decision": "search_node",
        "iteration": 1
    }
}

設計原則

  1. 関心の分離: 各スライスは単一の目的を持つ
  2. 明示的アクセス: ノードは読み書きするスライスを宣言
  3. 検証: 不明なスライス名や、コントラクト外I/Oは警告/エラーで検知できる

コントラクト外I/Oの扱い(実行時):

YAML設定例:

io:
  strict: true                  # 例外で停止
  warn: true                    # 警告ログ
  drop_undeclared_writes: true  # コントラクト外writeを破棄

組み込みスライス

スライス 目的
request ユーザー入力(読み取り専用推奨)
response ユーザー出力
_internal フレームワークのルーティング/イテレーション

カスタムスライスを定義可能:

registry.add_valid_slice("orders")
registry.add_valid_slice("workflow")

TriggerCondition

トリガー条件はノードが選択されるタイミングを制御します:

TriggerCondition(
    priority=10,                           # 高い = 先に評価
    when={"request.action": "search"},     # マッチ条件
    when_not={"response.done": True},      # 否定マッチ
    llm_hint="商品検索に使用",               # LLMルーティングヒント
)

優先度レベル

範囲 用途
🔴 100+ クリティカル/即時 エラーハンドラ
🟡 50-99 主要ハンドラ メインビジネスロジック
🟢 1-49 フォールバック デフォルトハンドラ
⚪ 0 常にマッチ キャッチオール

条件マッチング

# 正確な値マッチ
when={"request.action": "search"}

# ブール値チェック
when={"context.authenticated": True}

# ネストパス
when={"request.params.category": "electronics"}

# 複数条件 (AND)
when={"request.action": "buy", "context.cart_ready": True}

GenericSupervisor

スーパーバイザーは多段階アプローチでノード選択を調整します。

LLM用データサニタイズ(v0.3.3+)

Supervisorはルーティング判断のためにLLMに送信する前に、ステートデータを自動的にサニタイズします:

自動処理:

メリット:

設定:

from agent_contracts import GenericSupervisor

supervisor = GenericSupervisor(
    supervisor_name="main",
    llm=llm,
    max_field_length=10000,  # デフォルト: 10000文字
)

決定フロー

┌─────────────────────────────────────────────────────────────┐
│                    決定フロー                                │
├─────────────────────────────────────────────────────────────┤
│  1. 終了状態チェック                                         │
│     └─ response_type が terminal_states に含まれる → done    │
│                                                              │
│  2. 明示的ルーティング                                       │
│     └─ action="answer" → 質問の送信元にルーティング          │
│                                                              │
│  3. ルールベース評価                                         │
│     └─ 全TriggerConditionを評価、候補を収集                  │
│                                                              │
│  4. LLM決定(利用可能な場合)                                │
│     └─ LLMがllm_hintsを使用して候補から選択                  │
│                                                              │
│  5. フォールバック                                           │
│     └─ 最高優先度のルールマッチを使用                        │
└─────────────────────────────────────────────────────────────┘

LLMあり vs なし

モード 動作
LLMあり LLMがルールヒントを使用して最終決定
LLMなし 純粋なルールベース、最高優先度マッチを使用

InteractiveNode

会話型エージェントにはInteractiveNodeを使用:

from agent_contracts import InteractiveNode, NodeContract, NodeOutputs, TriggerCondition


class QuestionerNode(InteractiveNode):
    CONTRACT = NodeContract(
        name="questioner",
        description="質問を行い、回答を処理する",
        reads=["request", "workflow"],
        writes=["response", "workflow", "_internal"],
        supervisor="main",
        trigger_conditions=[
            TriggerCondition(priority=10, llm_hint="次の質問を行うときに使用"),
        ],
    )
    
    def prepare_context(self, inputs):
        """Extract context from inputs."""
        return inputs.get_slice("workflow")
    
    def check_completion(self, context, inputs):
        """インタビュー完了をチェック"""
        return len(context.get("answers", [])) >= 5
    
    async def process_answer(self, context, inputs, config=None):
        """ユーザーの回答を処理"""
        answer = inputs.get_slice("request").get("answer")
        # 回答を保存...
        return True
    
    async def generate_question(self, context, inputs, config=None):
        """次の質問を生成"""
        # LLMで質問を生成...
        return NodeOutputs(
            response={
                "response_type": "question",
                "response_data": {"question": "どの色が好きですか?"},
            }
        )

ライフサイクル

┌─────────────────────────────────────────────────────────────┐
│                  InteractiveNode フロー                      │
├─────────────────────────────────────────────────────────────┤
│  1. prepare_context()     → 必要なデータを抽出               │
│  2. check_completion()    → 既に完了?                       │
│       └─ Yes → create_completion_output()                    │
│       └─ No ↓                                               │
│  3. process_answer()      → ユーザーの応答を処理             │
│  4. check_completion()    → 今完了?                         │
│       └─ Yes → create_completion_output()                    │
│       └─ No → generate_question()                           │
└─────────────────────────────────────────────────────────────┘

ContractValidator

実行前にコントラクトを検証:

from agent_contracts import ContractValidator

validator = ContractValidator(
    registry,
    known_services={"db_service", "cache_service"},
)
result = validator.validate()

if result.has_errors:
    print(result)  # エラーを表示
    exit(1)

Strictモード(CI向け)

validator = ContractValidator(
    registry,
    known_services={"db_service", "cache_service"},
    strict=True,  # WARNINGもERRORとして扱う
)
result = validator.validate()

Strictモードは、WARNING(不明なサービス、到達不能ノード、requestへの書き込み等)を ERRORに昇格し、CIで早期に検知できます。

検証レベル

レベル
ERROR reads/writesの不明なスライス
WARNING 不明なサービス、到達不能ノード
INFO 共有ライター(複数ノードが同じスライスに書き込み)

トレーサブルルーティング

デバッグにはdecide_with_trace()を使用:

decision = await supervisor.decide_with_trace(state)

print(f"選択: {decision.selected_node}")
print(f"タイプ: {decision.reason.decision_type}")

for rule in decision.reason.matched_rules:
    print(f"  {rule.node} (P{rule.priority}): {rule.condition}")

決定タイプ

タイプ 意味
terminal_state レスポンスタイプが終了をトリガー
explicit_routing 回答が質問の送信元にルーティング
rule_match TriggerConditionがマッチ
llm_decision LLMが選択
fallback マッチなし、デフォルトを使用

TriggerMatch構造(v0.4.0+)

v0.4.0から、evaluate_triggers()TriggerMatchオブジェクトを返します:

from agent_contracts import TriggerMatch

# evaluate_triggers() の返り値
matches: list[TriggerMatch] = registry.evaluate_triggers("supervisor_name", state)

for match in matches:
    print(f"ノード: {match.node_name}")
    print(f"優先度: {match.priority}")
    print(f"条件インデックス: {match.condition_index}")  # 実際にマッチした条件

メリット:

マイグレーション (v0.3.x → v0.4.0):

# v0.3.x - tuple形式
matches: list[tuple[int, str]] = registry.evaluate_triggers("main", state)
for priority, node_name in matches:
    print(f"{node_name}: P{priority}")

# v0.4.0 - TriggerMatch形式
matches: list[TriggerMatch] = registry.evaluate_triggers("main", state)
for match in matches:
    print(f"{match.node_name}: P{match.priority}")

注意: GenericSupervisordecide()/decide_with_trace()を使用している場合は、変更不要です。

Supervisorのコンテキスト構築

GenericSupervisorはLLMベースのルーティング判断のためのコンテキストを自動的に構築します。

デフォルトのコンテキスト構築(v0.2.3+)

デフォルトでは、Supervisorは最小限のコンテキストをLLMに提供します:

  1. 基本スライスのみ: 常にrequestresponse_internalを含む
  2. 根拠:
    • 候補スライスはトリガー条件で既に評価済み
    • それらをLLMに渡すのは冗長でトークンを浪費
    • 明確な分離:トリガー = ルールベースフィルタリング、LLM = 最終選択
  3. メリット:
    • 大幅なトークン削減
    • パフォーマンス向上(シリアライズと送信データが少ない)
    • LLM理解のためresponse経由で会話コンテキストを維持

カスタムコンテキストビルダー(v0.3.0+)

追加のコンテキストが必要な複雑なシナリオでは、カスタムcontext_builder関数を提供できます:

from agent_contracts import GenericSupervisor

def my_context_builder(state: dict, candidates: list[str]) -> dict:
    """ルーティング判断用のカスタムコンテキストを構築"""
    return {
        "slices": {"request", "response", "_internal", "conversation"},
        "summary": {
            "total_turns": len(state.get("conversation", {}).get("messages", [])),
            "readiness_score": calculate_readiness(state),
        }
    }

supervisor = GenericSupervisor(
    supervisor_name="orders",
    llm=llm,
    context_builder=my_context_builder,
)

サマリーフォーマット(v0.3.1+)

context_builderの戻り値のsummaryフィールドはdictstrの両フォーマットをサポート:

# 文字列フォーマット - プロンプトに直接含まれる(整形テキストに最適)
def context_builder(state, candidates):
    return {
        "slices": {"request", "response", "conversation"},
        "summary": f"最近の会話:\n{format_messages(state)}"
    }

# 辞書フォーマット - 含まれる前にJSON化(構造を保持)
def context_builder(state, candidates):
    return {
        "slices": {"request", "response", "conversation"},
        "summary": {
            "turn_count": 5,
            "topics": ["orders", "preferences"]
        }
    }

レジストリベースグラフでの使用(v0.3.1+)

build_graph_from_registry()llm_providerと共に使用する場合、supervisor_factoryを使用してカスタムスーパーバイザーを注入:

from agent_contracts import build_graph_from_registry, GenericSupervisor

def my_context_builder(state, candidates):
    return {
        "slices": {"request", "response", "conversation"},
        "summary": f"会話履歴:\n{format_history(state)}"
    }

def supervisor_factory(name: str, llm):
    return GenericSupervisor(
        supervisor_name=name,
        llm=llm,
        context_builder=my_context_builder,  # カスタムコンテキストが保持される!
    )

graph = build_graph_from_registry(
    llm_provider=get_llm,
    supervisor_factory=supervisor_factory,  # カスタムスーパーバイザーを注入
    supervisors=["orders", "notifications"],
)

コンテキストビルダープロトコル

from typing import Protocol

class ContextBuilder(Protocol):
    def __call__(self, state: dict, candidates: list[str]) -> dict:
        """
        LLMルーティング判断用のコンテキストを構築
        
        Args:
            state: 現在のエージェント状態
            candidates: 候補ノード名のリスト
            
        Returns:
            以下を含む辞書:
            - slices (set[str]): 含めるスライス名のセット
            - summary (dict | str | None): オプションの追加コンテキスト
              - str: プロンプトに直接含まれる(整形テキスト)
              - dict: 含まれる前にJSON化
        """
        ...

ユースケース

シナリオ カスタムコンテキスト
ECサイト 購入認識ルーティングのためにcartinventoryを含める
カスタマーサポート コンテキスト認識レスポンスのためにticket_historysentimentを含める
教育 適応型指導のためにlearning_progresspaceを含める
会話 ターン数と履歴を含むconversationを含める

例: 会話認識ルーティング

def conversation_context_builder(state: dict, candidates: list[str]) -> dict:
    """より良いルーティングのために会話履歴を含める"""
    messages = state.get("conversation", {}).get("messages", [])
    
    # LLM可読性向上のため文字列としてフォーマット
    formatted = "\n".join([
        f"{m['role']}: {m['content']}"
        for m in messages[-5:]  # 最後の5メッセージ
    ])
    
    return {
        "slices": {"request", "response", "_internal", "conversation"},
        "summary": f"最近の会話 ({len(messages)} ターン):\n{formatted}"
    }

メリット

移行ノート


StateAccessorパターン

型安全でイミュータブルな状態フィールドアクセス:

from agent_contracts import Internal, Request, Response, reset_response

# 状態の読み取り
count = Internal.turn_count.get(state)
action = Request.action.get(state)

# 状態の書き込み(イミュータブル - 新しいstateを返す)
state = Internal.turn_count.set(state, 5)
state = reset_response(state)

利用可能なアクセサー

クラス フィールド
Internal turn_count, is_first_turn, active_mode, next_node, error
Request session_id, action, params, message, image
Response response_type, response_data, response_message

便利関数

from agent_contracts import increment_turn, set_error, clear_error

state = increment_turn(state)  # turn_count++, is_first_turn=False
state = set_error(state, "エラーが発生しました")
state = clear_error(state)

Runtimeレイヤー

本番アプリケーションでは、統合実行のためにRuntimeレイヤーを使用:

AgentRuntime

from agent_contracts import AgentRuntime, RequestContext, InMemorySessionStore

runtime = AgentRuntime(
    graph=compiled_graph,
    session_store=InMemorySessionStore(),
)

result = await runtime.execute(RequestContext(
    session_id="abc123",
    action="answer",
    message="カジュアルが好き",
    resume_session=True,
))

実行ライフサイクル

┌─────────────────────────────────────────────────────────────┐
│                  AgentRuntime ライフサイクル                 │
├─────────────────────────────────────────────────────────────┤
│  1. 初期状態を作成                                           │
│  2. セッションを復元(resume_session=True の場合)            │
│  3. hooks.prepare_state() → 実行前カスタマイズ               │
│  4. graph.ainvoke() → LangGraphを実行                        │
│  5. ExecutionResultを構築                                    │
│  6. hooks.after_execution() → 永続化、クリーンアップ         │
└─────────────────────────────────────────────────────────────┘

カスタムフック

from agent_contracts import RuntimeHooks

class MyHooks(RuntimeHooks):
    async def prepare_state(self, state, request):
        # 状態の正規化、リソースの読み込み
        return state
    
    async def after_execution(self, state, result):
        # セッション保存、ログなど
        await self.session_store.save(...)

StreamingRuntime(SSE対応)

from agent_contracts.runtime import StreamingRuntime, StreamEventType

runtime = (
    StreamingRuntime()
    .add_node("search", search_node, "検索中...")
    .add_node("stylist", stylist_node, "生成中...")
)

async for event in runtime.stream(request):
    if event.type == StreamEventType.NODE_END:
        print(f"ノード {event.node_name} 完了")
    yield event.to_sse()

階層型スーパーバイザー (v0.6.0)

v0.6.0から、親Supervisorが子Subgraphを呼び出し、子グラフ終了後に親へ戻る階層実行がサポートされます。

概要

階層実行は opt-in です。親Supervisorは以下の形式を返すことでサブグラフを呼び出します:

call_subgraph::<subgraph_id>
┌─────────────────────────────────────────────────────────────┐
│                     Domain Supervisor                        │
│                                                              │
│  decision = "call_subgraph::fashion"                        │
│       │                                                      │
│       ▼                                                      │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              CallSubgraph (fashion)                  │    │
│  │  ┌─────────────────────────────────────────────┐    │    │
│  │  │        Fashion Supervisor                    │    │    │
│  │  │             │                                │    │    │
│  │  │             ▼                                │    │    │
│  │  │        TrendNode → END                       │    │    │
│  │  └─────────────────────────────────────────────┘    │    │
│  │                    │                                 │    │
│  └────────────────────┼─────────────────────────────────┘    │
│                       ▼                                      │
│              Back to Domain Supervisor                       │
└─────────────────────────────────────────────────────────────┘

SubgraphContract と SubgraphDefinition

from agent_contracts import SubgraphContract, SubgraphDefinition

# サブグラフの契約を定義
contract = SubgraphContract(
    subgraph_id="fashion",
    description="Fashion trend subgraph",
    reads=["request"],
    writes=["response"],
    entrypoint="fashion_supervisor",
)

# サブグラフの構成を定義
definition = SubgraphDefinition(
    subgraph_id="fashion",
    supervisors=["fashion_supervisor"],
    nodes=["trend_node"],
)

# レジストリに登録
registry.register_subgraph(contract, definition)

安全制限 (Budgets)

階層実行では以下の制限が適用されます:

制限 デフォルト 説明
max_depth 2 最大コールスタック深度
max_steps 40 最大総ステップ数
max_reentry 2 同一サブグラフへの最大再入回数
state = {
    "_internal": {
        "budgets": {"max_depth": 3, "max_steps": 50, "max_reentry": 2}
    }
}

制限を超えると安全停止し、termination_reason が記録されます。

DecisionTrace

enable_subgraphs=True のとき、ルーティング履歴が _internal.decision_trace に記録されます:

詳細は 階層型スーパーバイザーガイド を参照してください。


次のステップ