本ドキュメントは agent_contracts のPublic APIを、
「目的 → 署名 → 最小例 → 主要引数/戻り値 → 注意点」のテンプレで整理したものです。
__version__: strimport agent_contracts as ac
print(ac.__version__)
NodeContract(name: str, description: str, reads: list[str], writes: list[str], requires_llm: bool = False, services: list[str] = [], supervisor: str, trigger_conditions: list[TriggerCondition] = [], is_terminal: bool = False, icon: str | None = None)from agent_contracts import NodeContract, TriggerCondition
CONTRACT = NodeContract(
name="search",
description="Search knowledge base",
reads=["request"],
writes=["response"],
supervisor="main",
trigger_conditions=[TriggerCondition(when={"request.action": "search"})],
)
reads/writes が参照・更新するスライス、trigger_conditions が起動条件。requires_llm=True の場合はノード初期化時にLLMを注入する。TriggerCondition(priority: int = 0, when: dict[str, Any] | None = None, when_not: dict[str, Any] | None = None, llm_hint: str | None = None)from agent_contracts import TriggerCondition
condition = TriggerCondition(
priority=100,
when={"request.action": "search"},
llm_hint="Search when action is search",
)
when/when_not が状態マッチ条件、priority が優先度。when と when_not の両方指定も可能(両方を満たす必要あり)。NodeInputs(**slices)async def execute(self, inputs):
req = inputs.get_slice("request")
get_slice(name) が辞書スライスを返す。NodeOutputs(**slices)from agent_contracts import NodeOutputs
return NodeOutputs(response={"response_type": "done"})
to_state_updates() がLangGraph向け更新辞書を返す。None の値は更新辞書から除外される。class ModularNode(ABC) / __init__(llm: BaseChatModel | None = None, **services) / execute(inputs: NodeInputs, config: RunnableConfig | None = None) -> NodeOutputsfrom agent_contracts import ModularNode, NodeOutputs
class MyNode(ModularNode):
CONTRACT = ...
async def execute(self, inputs, config=None):
return NodeOutputs(response={"response_type": "done"})
execute が実処理、__call__ がLangGraph互換エントリ。CONTRACT に requires_llm=True がある場合、llm を渡す。class InteractiveNode(ModularNode) / prepare_context(...) / process_answer(...) / check_completion(...) / generate_question(...)class InterviewNode(InteractiveNode):
CONTRACT = ...
def prepare_context(self, inputs):
return {}
def check_completion(self, context, inputs):
return False
async def process_answer(self, context, inputs, config=None):
return False
async def generate_question(self, context, inputs, config=None):
return NodeOutputs(response={"response_type": "question"})
execute は抽象メソッド群を順に呼ぶ。process_answer は bool を返すが状態更新は自前で行う。class BaseRequestSlice(TypedDict, total=False)class BaseResponseSlice(TypedDict, total=False)class BaseInternalSlice(TypedDict, total=False)from agent_contracts import BaseRequestSlice
req: BaseRequestSlice = {"session_id": "s1", "action": "ask"}
request/response/_internal を含む最小状態構造のTypedDict。class BaseAgentState(TypedDict, total=False)class MyState(BaseAgentState):
user: dict
get_slice(state: dict, slice_name: str) -> dictfrom agent_contracts import get_slice
request = get_slice(state, "request")
merge_slice_updates(state: dict, updates: dict[str, Any] | None) -> dict[str, Any]updates = merge_slice_updates(state, {"response": {"response_type": "done"}})
updates に既存スライスを統合して返す。updates が None の場合は空辞書を返す。StateAccessor(slice_name: str, field_name: str, default: T)from agent_contracts import StateAccessor
count = StateAccessor("_internal", "turn_count", 0)
value = count.get(state)
state = count.set(state, value + 1)
get(state) -> T / set(state, value) -> dictclass Internalclass Requestclass Responsefrom agent_contracts import Internal, Request
turn = Internal.turn_count.get(state)
action = Request.action.get(state)
StateAccessor。Internal は原則ノードから直接操作しない。reset_response(state: dict) -> dictincrement_turn(state: dict) -> dictset_error(state: dict, error: str) -> dictclear_error(state: dict) -> dictstate = increment_turn(state)
state = set_error(state, "invalid")
RoutingDecision(selected_node: str, reason: RoutingReason)RoutingReason(decision_type: str, matched_rules: list[MatchedRule] = [], llm_used: bool = False, llm_reasoning: str | None = None)MatchedRule(node: str, condition: str, priority: int)decision = await supervisor.decide_with_trace(state)
print(decision.selected_node)
RoutingDecision.to_supervisor_decision() で簡易形式に変換可能。decision_type は terminal_state/rule_match/llm_decision などを想定。SupervisorDecision(next_node: str, reasoning: str = "")decision = await supervisor.decide(state)
print(decision.next_node)
next_node が次に実行するノード名。decide_with_trace を使用。class BaseActionRouter(ABC) / route(action: str, state: dict | None = None) -> strclass MyRouter(BaseActionRouter):
def route(self, action, state=None):
return "default_supervisor"
__call__(state) でLangGraphノードとして利用可能。route が ValueError を投げるとエラー応答を作成する。ContractValidator(registry: NodeRegistry, known_services: set[str] | None = None, strict: bool = False)ValidationResult(errors: list[str] = [], warnings: list[str] = [], info: list[str] = [])validator = ContractValidator(get_node_registry())
result = validator.validate()
print(result)
validate() -> ValidationResult。strict=True で警告もエラー扱いにできる。ContractVisualizer(registry: NodeRegistry, graph: CompiledStateGraph | None = None)visualizer = ContractVisualizer(get_node_registry())
markdown = visualizer.generate_architecture_doc()
generate_architecture_doc() -> str。graph を渡すとLangGraphフローのMermaidを含められる。ContractDiffReport(added: list[str], removed: list[str], changes: list[NodeChange])NodeChange(node: str, severity: str, details: list[str] = [])diff_contracts(before: dict[str, dict], after: dict[str, dict]) -> ContractDiffReportbefore = registry.export_contracts()
# ... registry update ...
after = registry.export_contracts()
report = diff_contracts(before, after)
print(report.to_text())
ContractDiffReport.has_breaking() で破壊的変更を判定。before/after は NodeContract.model_dump() 形式を想定。class ContractViolationError(RuntimeError)try:
outputs = await node(state)
except ContractViolationError:
...
NodeRegistry(valid_slices: set[str] | None = None)registry = NodeRegistry()
registry.register(MyNode)
matches = registry.evaluate_triggers("main", state)
register(node_class) / evaluate_triggers(supervisor, state)。valid_slices に含まれないスライス名は警告対象。TriggerMatch(priority: int, node_name: str, condition_index: int)matches = registry.evaluate_triggers("main", state)
top = matches[0]
priority が高い順にソートされる。condition_index は trigger_conditions 配列のインデックス。get_node_registry() -> NodeRegistryreset_registry() -> Noneregistry = get_node_registry()
reset_registry()
reset_registry を使うと安全。GenericSupervisor(supervisor_name: str, llm: BaseChatModel | None = None, registry: NodeRegistry | None = None, max_iterations: int | None = None, terminal_response_types: set[str] | None = None, context_builder: ContextBuilder | None = None, max_field_length: int | None = None, explicit_routing_handler: ExplicitRoutingHandler | None = None)supervisor = GenericSupervisor("main", llm=llm)
decision = await supervisor.decide(state)
decide(state) / decide_with_trace(state) / run(state)。decide_with_trace は RoutingDecision を返す。GraphBuilder(registry: NodeRegistry | None = None, state_class: type | None = None, llm_provider: Callable[[], Any] | None = None, dependency_provider: Callable[[NodeContract], dict] | None = None, supervisor_factory: Callable[[str, Any], GenericSupervisor] | None = None)builder = GraphBuilder(get_node_registry())
builder.add_supervisor("main", llm=llm)
# builder.create_node_wrapper(...) などを使って StateGraph を組み立てる
add_supervisor(name, llm, **services) / create_node_wrapper(...) / create_supervisor_wrapper(...)。llm_provider を使うとノードごとにLLMを生成できる。build_graph_from_registry(registry: NodeRegistry | None = None, llm=None, llm_provider: Callable[[], Any] | None = None, dependency_provider: Callable[[NodeContract], dict] | None = None, supervisor_factory: Callable[[str, Any], GenericSupervisor] | None = None, entrypoint: tuple[str, Callable, Callable] | None = None, supervisors: list[str] | None = None, state_class: type | None = None, **services) -> StateGraphgraph = build_graph_from_registry(get_node_registry(), llm=llm, supervisors=["main"])
compiled = graph.compile()
StateGraph。entrypoint を指定すると外部ルーターを起点にできる。RequestContext(session_id: str, action: str, params: dict[str, Any] | None = None, message: str | None = None, image: str | None = None, resume_session: bool = False, metadata: dict[str, Any] = {})from agent_contracts import RequestContext
ctx = RequestContext(session_id="s1", action="answer", message="hi")
get_param(key, default) でparams取得。resume_session=True でセッション復元を試みる。ExecutionResult(state: dict[str, Any], response_type: str | None = None, response_data: dict[str, Any] | None = None, response_message: str | None = None, success: bool = True, error: str | None = None)result = await runtime.execute(ctx)
print(result.response_type)
from_state(state) / error_result(error, state=None) / to_response_dict()。to_response_dict() は response_type を優先して type に反映。RuntimeHooks.prepare_state(state: dict, request: RequestContext) -> dictRuntimeHooks.after_execution(state: dict, result: ExecutionResult) -> Noneclass DefaultHooksclass MyHooks:
async def prepare_state(self, state, request):
return state
async def after_execution(self, state, result):
pass
prepare_state は状態辞書を返す。SessionStore.load(session_id: str) -> dict | NoneSessionStore.save(session_id: str, data: dict, ttl_seconds: int = 3600) -> NoneSessionStore.delete(session_id: str) -> NoneInMemorySessionStore()store = InMemorySessionStore()
await store.save("s1", {"_internal": {"turn_count": 1}})
load はセッション辞書または None。InMemorySessionStore は開発/テスト用途専用。AgentRuntime(graph: Any, hooks: RuntimeHooks | None = None, session_store: SessionStore | None = None, slices_to_restore: list[str] | None = None)runtime = AgentRuntime(compiled_graph, session_store=InMemorySessionStore())
result = await runtime.execute(RequestContext(session_id="s1", action="ask"))
execute(request: RequestContext) -> ExecutionResult。ExecutionResult.error_result(...) に変換される。SubgraphContract(subgraph_id: str, description: str, reads: list[str], writes: list[str], entrypoint: str, input_schema: type[BaseModel] | None = None, output_schema: type[BaseModel] | None = None)from agent_contracts import SubgraphContract
contract = SubgraphContract(
subgraph_id="fashion",
description="Fashion trend subgraph",
reads=["request"],
writes=["response"],
entrypoint="fashion_supervisor",
)
subgraph_id がサブグラフの一意識別子、entrypoint がサブグラフ内のエントリーポイント。call_subgraph::<subgraph_id> を返すことでサブグラフを呼び出す。SubgraphDefinition(subgraph_id: str, supervisors: list[str] | None = None, nodes: list[str] | None = None)from agent_contracts import SubgraphDefinition
definition = SubgraphDefinition(
subgraph_id="fashion",
supervisors=["fashion_supervisor"],
nodes=["trend_node", "style_node"],
)
supervisors と nodes でサブグラフの構成要素を指定。registry.register_subgraph(contract, definition) で登録する。Budgets(max_depth: int = 2, max_steps: int = 40, max_reentry: int = 2)from agent_contracts import Budgets
budgets = Budgets(max_depth=3, max_steps=50, max_reentry=2)
state = {"_internal": {"budgets": {"max_depth": 3, "max_steps": 50}}}
termination_reason が記録される。CallStackFrame(subgraph_id: str, depth: int, entry_step: int, locals: dict[str, Any] = {})from agent_contracts import CallStackFrame
frame = CallStackFrame(
subgraph_id="fashion",
depth=1,
entry_step=5,
)
depth がコールスタックの深さ、entry_step がサブグラフに入った時点のステップ数。_internal.call_stack にリストとして保持される。DecisionTraceItem(step: int, depth: int, supervisor: str | None, decision_kind: DecisionKind, target: str | None = None, reason: str | None = None, termination_reason: str | None = None)from agent_contracts import DecisionTraceItem
trace = DecisionTraceItem(
step=1,
depth=0,
supervisor="domain",
decision_kind="SUBGRAPH",
target="fashion",
)
decision_kind は NODE, SUBGRAPH, STOP_LOCAL, STOP_GLOBAL, FALLBACK のいずれか。_internal.decision_trace にリストとして記録される。