Skip to content

API Reference

Core

GantryEngine

gantrygraph.engine.engine.GantryEngine(*, llm, perception=None, tools=None, approval_callback=None, on_event=None, max_steps=50, guardrail=None, system_prompt=None, memory=None, checkpointer=None, enable_suspension=False, budget=None, workspace_policy=None)

Autonomous agent engine backed by LangGraph.

Composes a perception source, a set of tools/MCP connectors, and an LLM into a self-correcting observe → think → act → review loop.

Parameters:

Name Type Description Default
llm BaseChatModel

Any LangChain BaseChatModel.

required
perception BasePerception | None

How the agent observes its environment.

None
tools list[_AnyTool] | None

BaseAction, BaseMCPConnector, or bare BaseTool instances. All are flattened into one tool registry.

None
approval_callback ApprovalCallback | None

Called before every tool execution. Return True to allow, False to deny.

None
on_event EventCallback | None

Called after each node transition with a GantryEvent. Use for logging, tracing, OTel, etc.

None
max_steps int

Hard upper bound on act-node executions.

50
guardrail GuardrailPolicy | None

Optional GuardrailPolicy for per-tool approval.

None
system_prompt str | None

Prepended as a SystemMessage before the task.

None
memory BaseMemory | None

Optional long-term memory backend. Past experiences are recalled at the start of each run and the result is stored automatically on completion.

None
checkpointer Any

LangGraph checkpointer for state persistence. When provided, enable_suspension defaults to True.

None
enable_suspension bool

Use LangGraph interrupt() inside act_node instead of approval_callback for HITL. Requires a checkpointer to be set (auto-creates MemorySaver if none provided).

False
budget BudgetPolicy | None

Optional BudgetPolicy. Enforces max_steps (caps the engine-level limit) and max_wall_seconds (wall-clock timeout per run via asyncio.wait_for). max_tokens is stored but not currently enforced by gantrygraph; configure token limits on the LLM itself.

None
workspace_policy WorkspacePolicy | None

Optional WorkspacePolicy. When set, automatically adds FileSystemTools and ShellTool locked to workspace_path. Equivalent to passing those tools in the tools list, but more declarative.

None

Example::

agent = GantryEngine(
    llm=ChatAnthropic(model="claude-sonnet-4-6"),
    tools=[ShellTool(workspace="/tmp")],
    on_event=lambda e: print(e),
    max_steps=20,
)
result = agent.run("List the 5 largest files in /tmp")

arun(task, *, thread_id=None) async

Primary async entry point.

Enters the resource lifecycle, runs the graph to completion, stores the result in long-term memory (if configured), and returns the final answer as a string.

When suspension is enabled and an approval is needed, raises :exc:AgentSuspended with the thread_id to resume from.

astream_events(task) async

Stream GantryEvents as they are emitted during execution.

get_graph()

Return the compiled LangGraph — the official escape hatch for loop customisation.

Use this when the default observe → think → act → review topology does not fit your use case. The returned CompiledStateGraph is a standard LangGraph object; you can call ainvoke, astream, get_state, etc. on it directly.

Pattern A — inspect or stream the existing graph:

.. code-block:: python

compiled = agent.get_graph()

# Invoke directly with a custom initial state
result = await compiled.ainvoke({
    "task": "my task",
    "messages": [],
    "step_count": 0,
    "is_done": False,
})

# Stream individual node outputs
async for chunk in compiled.astream(initial_state):
    print(chunk)

Pattern B — build a fully custom loop using gantrygraph's node primitives:

.. code-block:: python

from functools import partial
from gantrygraph.engine import (
    act_node, observe_node, review_node,
    should_continue, think_node,
)
from gantrygraph.core.state import GantryState
from langgraph.graph import END, START, StateGraph

async def my_pre_act_hook(state: GantryState) -> dict:
    """Validate tool calls before execution."""
    return {}

graph = StateGraph(GantryState)
graph.add_node("observe",  partial(observe_node, perception=None, event_cb=None))
graph.add_node("think",    partial(think_node,   llm_with_tools=my_llm, event_cb=None))
graph.add_node("pre_act",  my_pre_act_hook)       # custom hook
graph.add_node("act",      partial(act_node, tool_map=tool_map,
                                   approval_cb=None, guardrail=None,
                                   event_cb=None, use_interrupt=False))
graph.add_node("review",   review_node)
graph.add_edge(START,      "observe")
graph.add_edge("observe",  "think")
graph.add_edge("think",    "pre_act")
graph.add_edge("pre_act",  "act")
graph.add_edge("act",      "review")
graph.add_conditional_edges(
    "review",
    partial(should_continue, max_steps=30),
    {"observe": "observe", END: END},
)
compiled = graph.compile()

All node functions are exported from :mod:gantrygraph.engine and accept only keyword-only arguments (bound via functools.partial), so they remain pure and testable in isolation.

resume(thread_id, *, approved=True) async

Resume a suspended agent run.

Parameters:

Name Type Description Default
thread_id str

The thread_id from the :exc:AgentSuspended exception raised by :meth:arun.

required
approved bool

Decision to pass back to the interrupted node. True → execute the tool, False → deny it.

True

Returns:

Type Description
str

The agent's final answer after resuming.

Raises:

Type Description

exc:AgentSuspended again if the agent suspends a second time.

run(task, *, thread_id=None)

Synchronous entry point. Blocks until the task completes.


AgentSuspended

gantrygraph.engine.engine.AgentSuspended(thread_id, data=None)

Bases: Exception

Raised by arun() when the agent is suspended awaiting human approval.

Resume execution with GantryEngine.resume(thread_id, approved=True/False).


GantryConfig

gantrygraph.config.GantryConfig

Bases: BaseModel

Full declarative configuration for a GantryEngine instance.

All fields have sensible defaults so you only need to set what you change.

Attributes:

Name Type Description
max_steps int

Hard upper bound on act-node executions.

system_prompt str | None

Optional extra system prompt prepended to every run.

workspace str | None

If set, attaches FileSystemTools and ShellTool locked to this directory.

shell_allowed_commands list[str] | None

Allowlist for ShellTool. None = allow all executables.

shell_timeout float

Per-command wall-clock limit (seconds).

perception Literal['none', 'desktop', 'web']

Perception backend to attach. "none" | "desktop" | "web"

browser_headless bool

Run browser in headless mode.

memory Literal['none', 'in_memory', 'chroma']

Long-term memory backend. "none" | "in_memory" | "chroma"

memory_persist_directory str | None

On-disk path for ChromaMemory.

memory_collection str

ChromaDB collection name.

telemetry_service_name str

service.name OTel resource attribute.

telemetry_otlp_endpoint str | None

If set, enables OTel export to this OTLP gRPC endpoint (e.g. Grafana Alloy, Datadog agent, Jaeger).

enable_suspension bool

Enable LangGraph interrupt-based HITL.

guardrail_requires_approval list[str]

Tool names that require approval_callback confirmation before execution.

max_wall_seconds float | None

Wall-clock timeout for the entire arun() call. None = no timeout.

build(llm, *, approval_callback=None, on_event=None, extra_tools=None)

Assemble and return a configured GantryEngine.

Parameters:

Name Type Description Default
llm Any

Any LangChain BaseChatModel.

required
approval_callback Any

Optional HITL callback ((tool, args) → bool).

None
on_event Any

Optional event callback; merged with the OTel callback when telemetry_otlp_endpoint is set.

None
extra_tools list[Any] | None

Additional tools appended after the built-in set.

None

Returns:

Type Description
GantryEngine

A ready-to-use GantryEngine instance.

from_env(prefix='CLAW_') classmethod

Load config from environment variables.

Each field maps to {prefix}{FIELD_NAME_UPPERCASE}.

Example::

CLAW_MAX_STEPS=30
CLAW_WORKSPACE=/app
CLAW_MEMORY=chroma
CLAW_MEMORY_PERSIST_DIRECTORY=/data/memory
CLAW_TELEMETRY_OTLP_ENDPOINT=http://collector:4317
CLAW_ENABLE_SUSPENSION=true
CLAW_GUARDRAIL_REQUIRES_APPROVAL=shell_run,file_delete
CLAW_DESKTOP_MAX_RESOLUTION=1280,720

from_yaml(path) classmethod

Load config from a YAML file.

Requires pyyaml::

pip install pyyaml

Example YAML::

max_steps: 30
workspace: /app
memory: chroma
memory_persist_directory: /data/memory
telemetry_otlp_endpoint: http://localhost:4317
guardrail_requires_approval:
  - shell_run
  - file_delete

State & Events

GantryState

gantrygraph.core.state.GantryState

Bases: TypedDict

LangGraph state dict for the gantrygraph agent loop.

All graph nodes receive the full state and return a partial update dict. The messages field uses the add_messages reducer, so a node can append a new message by returning {"messages": [new_msg]} without reading the current list first.

Fields

task: The original task string passed to GantryEngine.run(). messages: Full conversation history, auto-appended via reducer. step_count: Number of act-node executions so far; used by budget guard. is_done: Set to True by the review node to terminate the loop. last_error: Most recent tool error message (for self-correction context). last_observation: Raw PerceptionResult.model_dump() from the last observe node; stored so nodes can access it without re-capturing.

GantryEvent

gantrygraph.core.events.GantryEvent(event_type, step, data=dict()) dataclass

Emitted by GantryEngine at each state transition for observability.

Pass an on_event callback to GantryEngine to receive these events. The callback may be either a plain function or an async coroutine.

Example::

def my_logger(event: GantryEvent) -> None:
    print(f"[step {event.step}] {event.event_type}: {event.data}")

agent = GantryEngine(..., on_event=my_logger)

PerceptionResult

gantrygraph.core.events.PerceptionResult

Bases: BaseModel

Serialisable snapshot from any BasePerception.

Passed to the agent as a LangChain multimodal message via to_message_content(). Pydantic is used here so the result can be stored in GantryState.last_observation as a plain dict via .model_dump().

to_message_content()

Convert to LangChain multimodal message content blocks.


Base Classes

BasePerception

gantrygraph.core.base_perception.BasePerception

Bases: ABC

Abstract base class for all perception sources.

Subclass this to add new ways for an agent to observe its environment (desktop screenshot, web page, terminal output, REST API, database state — anything that can be turned into text or an image).

The engine calls observe() once per loop iteration and attaches the result as a multimodal HumanMessage before invoking the LLM. If you return a screenshot_b64, the LLM receives a vision block; if you return an accessibility_tree, it receives a text block; you can return both at the same time.

Optionally override close() to release resources on shutdown (file handles, network sockets, subprocesses). GantryEngine calls it automatically at the end of every arun() / run() call.

Minimal example — observe a REST API:

.. code-block:: python

from gantrygraph.core.base_perception import BasePerception
from gantrygraph.core.events import PerceptionResult
import httpx

class APIStatusPerception(BasePerception):
    def __init__(self, url: str) -> None:
        self._url = url
        self._client = httpx.AsyncClient()

    async def observe(self) -> PerceptionResult:
        resp = await self._client.get(self._url)
        return PerceptionResult(
            accessibility_tree=f"HTTP {resp.status_code}\n{resp.text[:2000]}"
        )

    async def close(self) -> None:
        await self._client.aclose()

agent = GantryEngine(
    llm=my_llm,
    perception=APIStatusPerception("https://api.example.com/status"),
)

Terminal / subprocess example:

.. code-block:: python

import asyncio
from gantrygraph.core.base_perception import BasePerception
from gantrygraph.core.events import PerceptionResult

class TerminalPerception(BasePerception):
    async def observe(self) -> PerceptionResult:
        proc = await asyncio.create_subprocess_shell(
            "ps aux --sort=-%cpu | head -20",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.DEVNULL,
        )
        stdout, _ = await proc.communicate()
        return PerceptionResult(accessibility_tree=stdout.decode())

Combining multiple sources — use MultiPerception:

.. code-block:: python

from gantrygraph.perception import MultiPerception, DesktopScreen, WebPage

agent = GantryEngine(
    llm=my_llm,
    perception=MultiPerception([DesktopScreen(), WebPage("https://example.com")]),
)

close() async

Optional cleanup hook called by GantryEngine on shutdown.

observe() abstractmethod async

Capture the current environment state.

BaseAction

gantrygraph.core.base_action.BaseAction

Bases: ABC

Abstract base class for an action set — a bundle of LangChain tools.

Subclass this to expose a group of related tools to the agent. GantryEngine collects tools from all registered action sets, flattens them into one registry, and binds them to the LLM via llm.bind_tools().

Grouping tools into a BaseAction subclass is the recommended pattern when your tools share state (e.g., an HTTP session, a DB connection) or when you want them to appear as a logical unit. For single stateless tools, use the @gantry_tool decorator instead.

Optionally override close() to release resources on shutdown. GantryEngine calls it automatically at the end of every run.

Minimal example — HTTP client tools:

.. code-block:: python

import httpx
from langchain_core.tools import StructuredTool
from gantrygraph.core.base_action import BaseAction

class HTTPTools(BaseAction):
    def __init__(self, base_url: str) -> None:
        self._client = httpx.AsyncClient(base_url=base_url)

    def get_tools(self) -> list:
        async def http_get(path: str) -> str:
            """Make a GET request to *path* and return the response body."""
            r = await self._client.get(path)
            return r.text

        async def http_post(path: str, body: str) -> str:
            """POST *body* (JSON string) to *path*, return the response."""
            r = await self._client.post(path, content=body)
            return r.text

        return [
            StructuredTool.from_function(coroutine=http_get,  name="http_get"),
            StructuredTool.from_function(coroutine=http_post, name="http_post"),
        ]

    async def close(self) -> None:
        await self._client.aclose()

agent = GantryEngine(llm=my_llm, tools=[HTTPTools("https://api.example.com")])

Using @gantry_tool for simple stateless tools:

.. code-block:: python

from gantrygraph import gantry_tool

@gantry_tool
def calculator(expression: str) -> str:
    """Evaluate a Python math expression and return the result."""
    return str(eval(expression))  # noqa: S307

agent = GantryEngine(llm=my_llm, tools=[calculator])

close() async

Optional cleanup hook called by GantryEngine on shutdown.

get_tools() abstractmethod

Return the LangChain tools this action set provides.

BaseMCPConnector

gantrygraph.core.base_mcp.BaseMCPConnector

Bases: ABC

Abstract base class for MCP server connections.

MCP connectors own a subprocess (or network connection) lifetime. They must be used as async context managers: the server starts in __aenter__ and shuts down in __aexit__.

GantryEngine enters all connectors automatically inside its _lifecycle() context manager before building the graph, so user code never needs to manage the lifecycle manually.

Implement this ABC when you need to wrap an MCP-compatible server that is not covered by the built-in MCPClient.

Standalone usage (e.g., in scripts):

.. code-block:: python

async with MCPClient("npx -y @mcp/github") as client:
    tools = client.get_tools()   # list[BaseTool]
    print([t.name for t in tools])

Passing to GantryEngine (lifecycle managed automatically):

.. code-block:: python

agent = GantryEngine(
    llm=my_llm,
    tools=[MCPClient("npx -y @mcp/github")],
)
agent.run("Open a pull request")

Custom connector example (Python-based MCP server):

.. code-block:: python

from types import TracebackType
from langchain_core.tools import BaseTool, StructuredTool
from gantrygraph.core.base_mcp import BaseMCPConnector

class InProcessMCPConnector(BaseMCPConnector):
    """Wraps an in-process FastMCP server (no subprocess)."""

    def __init__(self) -> None:
        self._tools: list[BaseTool] = []

    async def __aenter__(self) -> "InProcessMCPConnector":
        # spin up in-process server, discover tools
        self._tools = [
            StructuredTool.from_function(
                func=lambda x: x.upper(), name="shout",
                description="Convert text to uppercase.",
            )
        ]
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        self._tools = []   # cleanup

    def get_tools(self) -> list[BaseTool]:
        return self._tools

__aenter__() abstractmethod async

Start the MCP server subprocess and initialise the client session.

__aexit__(exc_type, exc_val, exc_tb) abstractmethod async

Shut down the MCP server subprocess.

get_tools() abstractmethod

Return dynamically generated StructuredTool instances.

Only valid after __aenter__ has been called.

BaseMemory

gantrygraph.memory.base.BaseMemory

Bases: ABC

Pluggable long-term memory backend.

Subclasses must implement :meth:add and :meth:search. All methods are async so implementations can use I/O-bound vector DBs.

add(text, metadata=None) abstractmethod async

Persist text with optional metadata for future retrieval.

close() async

Release any held resources (connections, files, etc.).

search(query, k=5) abstractmethod async

Return up to k results most relevant to query, sorted by score desc.


Security

GuardrailPolicy

gantrygraph.security.policies.GuardrailPolicy

Bases: BaseModel

Configures which tools require explicit human approval before execution.

Pass this to GantryEngine together with an approval_callback to gate dangerous operations.

Example::

from gantrygraph.security import GuardrailPolicy

policy = GuardrailPolicy(
    requires_approval={"shell_run", "file_delete"},
)
agent = GantryEngine(..., guardrail=policy,
                   approval_callback=my_slack_approval_fn)

WorkspacePolicy

gantrygraph.security.policies.WorkspacePolicy

Bases: BaseModel

Restrict filesystem and shell operations to a specific directory.

Pass to GantryEngine via workspace_policy= to automatically add FileSystemTools and ShellTool locked to workspace_path. This is more declarative than listing the tools manually.

Example::

from gantrygraph import GantryEngine
from gantrygraph.security import WorkspacePolicy

agent = GantryEngine(
    llm=my_llm,
    workspace_policy=WorkspacePolicy(workspace_path="/home/user/project"),
)
# Equivalent to:
# GantryEngine(llm=..., tools=[FileSystemTools("/home/user/project"),
#                            ShellTool("/home/user/project")])
Note

allow_read_outside and allow_write_outside are reserved for future fine-grained enforcement. Currently the workspace boundary is enforced at the tool level (path traversal blocked in FileSystemTools).

BudgetPolicy

gantrygraph.security.policies.BudgetPolicy

Bases: BaseModel

Hard limits to prevent runaway costs and infinite loops.

Pass to GantryEngine via budget= to enforce spending limits.

Enforcement
  • max_steps: caps GantryEngine.max_steps (whichever is lower wins).
  • max_wall_seconds: wraps the full arun() call in asyncio.wait_for; raises TimeoutError on breach.
  • max_tokens: stored but not currently enforced by gantrygraph. Configure token limits on the LLM itself (e.g. max_tokens= in ChatAnthropic).

Example::

from gantrygraph import GantryEngine
from gantrygraph.security import BudgetPolicy

agent = GantryEngine(
    llm=my_llm,
    budget=BudgetPolicy(max_steps=30, max_wall_seconds=120.0),
)

Swarm

GantrySupervisor

gantrygraph.swarm.supervisor.GantrySupervisor(*, llm, worker_factory=None, workers=None, max_workers=5)

Decompose a task into parallel subtasks and synthesize the results.

The supervisor uses the LLM twice:

  1. Decompose — break the input task into N independent subtasks, optionally assigning each to a named specialist worker.
  2. Synthesize — merge all worker answers into a final response.

Worker agents run concurrently via asyncio.gather.

Homogeneous workers (original API — all workers share the same tools):

.. code-block:: python

from gantrygraph.swarm import GantrySupervisor
from gantrygraph import GantryEngine
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-6")
supervisor = GantrySupervisor(
    llm=llm,
    worker_factory=lambda: GantryEngine(llm=llm, tools=[...]),
    max_workers=4,
)
result = await supervisor.run("Analyse these 10 documents and summarise findings")

Heterogeneous workers (new API — each worker has its own tools):

.. code-block:: python

from gantrygraph.swarm import GantrySupervisor, WorkerSpec
from gantrygraph import GantryEngine
from gantrygraph.actions import ShellTool, FileSystemTools
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-6")
supervisor = GantrySupervisor(
    llm=llm,
    workers=[
        WorkerSpec(
            name="shell_expert",
            engine=GantryEngine(llm=llm, tools=[ShellTool(workspace="/tmp")]),
            description="Runs shell commands and explores the filesystem.",
        ),
        WorkerSpec(
            name="file_editor",
            engine=GantryEngine(llm=llm, tools=[FileSystemTools(workspace="/tmp")]),
            description="Reads, writes, and edits files.",
        ),
    ],
)
result = await supervisor.run(
    "Find all .log files in /tmp and summarise their contents."
)

When workers is provided the supervisor asks the LLM to assign each subtask to the most appropriate specialist by name. If a subtask is not assigned (or the name is unrecognised), it falls back to the first worker in the list.

run(task) async

Decompose task, run workers concurrently, and synthesise results.

WorkerSpec

gantrygraph.swarm.worker.WorkerSpec(name, engine, description='') dataclass

Named specialist worker with its own pre-configured engine.

Use WorkerSpec when different subtasks require different tools, LLMs, or configurations. Pass a list of WorkerSpec instances to GantrySupervisor instead of a worker_factory.

The supervisor LLM reads all description fields and automatically routes each decomposed subtask to the most appropriate specialist.

Parameters:

Name Type Description Default
name str

Short identifier used in routing (e.g. "analyst").

required
engine Any

A fully-configured GantryEngine instance.

required
description str

One-sentence description of what this worker can do. Used by the supervisor LLM to route subtasks.

''

Example::

from gantrygraph import GantryEngine
from gantrygraph.actions import ShellTool, FileSystemTools
from gantrygraph.swarm import GantrySupervisor, WorkerSpec
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4-6")

supervisor = GantrySupervisor(
    llm=llm,
    workers=[
        WorkerSpec(
            name="shell_expert",
            engine=GantryEngine(llm=llm, tools=[ShellTool(workspace="/tmp")]),
            description="Runs shell commands, explores the filesystem, executes scripts.",
        ),
        WorkerSpec(
            name="file_editor",
            engine=GantryEngine(llm=llm, tools=[FileSystemTools(workspace="/tmp")]),
            description="Reads, writes, and edits files.",
        ),
    ],
)
result = await supervisor.run(
    "Explore /tmp, find all .log files, then read their first 10 lines."
)

MCP

MCPClient

gantrygraph.mcp.client.MCPClient(server_command, env=None)

Bases: BaseMCPConnector

Connect to an MCP server process and expose its tools as LangChain tools.

The server is started as a subprocess via stdio transport when the client is used as an async context manager. GantryEngine manages this lifecycle automatically.

Example — standalone::

async with MCPClient("npx -y @modelcontextprotocol/server-filesystem /tmp") as c:
    tools = c.get_tools()
    result = await tools[0].ainvoke({"path": "/tmp"})

Example — with engine (lifecycle managed automatically)::

agent = GantryEngine(
    ...,
    tools=[MCPClient("npx -y @mcp/github")],
)

MCPToolRegistry

gantrygraph.mcp.registry.MCPToolRegistry(clients)

Bases: BaseMCPConnector

Manage multiple MCPClient instances as a single pluggable unit.

Useful when an agent needs tools from several MCP servers — pass the registry to GantryEngine instead of individual clients.

Example::

registry = MCPToolRegistry([
    MCPClient("npx -y @mcp/github"),
    MCPClient("npx -y @mcp/sqlite ./db.sqlite"),
])
agent = GantryEngine(..., tools=[registry])

Memory

InMemoryVector

gantrygraph.memory.in_memory.InMemoryVector()

Bases: BaseMemory

Ephemeral in-process memory backed by trigram Jaccard similarity.

All entries are held in RAM and lost when the process exits. Thread-safe for single-threaded asyncio use (no locking needed).


Telemetry

OTelExporter

gantrygraph.telemetry.otel.OTelExporter(service_name='gantry-agent', otlp_endpoint=None)

Converts GantryEvent callbacks into OpenTelemetry spans.

Usage::

exporter = OTelExporter(service_name="qa-agent")
agent = GantryEngine(llm=..., on_event=exporter.as_event_callback())

Parameters:

Name Type Description Default
service_name str

service.name attribute attached to all spans.

'gantry-agent'
otlp_endpoint str | None

gRPC endpoint for an OTLP collector (e.g. "http://localhost:4317" for Grafana Alloy). None prints spans to stdout via ConsoleSpanExporter.

None

as_event_callback()

Return a GantryEvent callback that creates OTel spans.

Each call to the returned function is stateful — the same callback instance tracks the root span across the full run. Do not share a single callback instance between concurrent agent runs.

force_flush(timeout_ms=5000)

Flush pending spans to the exporter (call before process exit).


Decorators

gantry_tool

gantrygraph.tool.gantry_tool(fn=None, *, name=None, description=None)

gantry_tool(fn: Callable[..., Any]) -> BaseTool
gantry_tool(fn: None = None, *, name: str | None = None, description: str | None = None) -> Callable[[Callable[..., Any]], BaseTool]

Decorate a function (sync or async) and return a LangChain BaseTool.

Can be used bare (@gantry_tool) or with keyword arguments (@gantry_tool(name="x", description="y")).

Parameters:

Name Type Description Default
fn Callable[..., Any] | None

The function to wrap. Passed automatically when the decorator is used bare. Leave None when passing keyword arguments.

None
name str | None

Tool name visible to the LLM. Defaults to the function's __name__.

None
description str | None

Tool description visible to the LLM. Defaults to the function's docstring. Required if the function has no docstring.

None

Returns:

Type Description
BaseTool | Callable[[Callable[..., Any]], BaseTool]

A BaseTool instance (when used bare or the inner function is

BaseTool | Callable[[Callable[..., Any]], BaseTool]

decorated), or a decorator factory (when keyword arguments are

BaseTool | Callable[[Callable[..., Any]], BaseTool]

provided).

Raises:

Type Description
ValueError

If neither a docstring nor an explicit description is provided.


Graph primitives

These are exported from gantrygraph for custom graph topologies:

gantrygraph.engine.nodes.observe_node(state, *, perception, event_cb) async

Capture the current environment and append it as a HumanMessage.

gantrygraph.engine.nodes.think_node(state, *, llm_with_tools, event_cb) async

Invoke the LLM with the full message history and get the next action.

gantrygraph.engine.nodes.act_node(state, *, tool_map, approval_cb, guardrail, event_cb, use_interrupt=False) async

Execute tool calls from the last AIMessage.

For each tool call: 1. Check the guardrail approval list — pause and ask if required. 2. Locate the tool in the registry — return an error message if not found. 3. Execute the tool — catch all exceptions and return them as error messages so the LLM can self-correct on the next think step.

When use_interrupt is True and a checkpointer is configured on the graph, tool calls that need approval use LangGraph's interrupt() to suspend execution and persist state. Resume via GantryEngine.resume(thread_id, approved=True).

gantrygraph.engine.nodes.review_node(state)

Decide whether the task is complete.

Termination condition: the last message is an AIMessage with no tool calls, meaning the LLM chose to stop calling tools and produce a final answer. This is a pure function — no I/O.

gantrygraph.engine.nodes.should_continue(state, *, max_steps)

Conditional edge: loop back to observe, or terminate.

gantrygraph.engine.graph.build_graph(*, perception, llm_with_tools, tool_map, approval_cb, guardrail, event_cb, max_steps, memory=None, use_interrupt=False, checkpointer=None)

Build and compile the gantrygraph agent StateGraph.

Nodes are bound to configuration via functools.partial so the node functions themselves remain pure and testable without constructing a full engine.

Graph structure::

START → memory_recall → observe → think → act → review → should_continue
                                                               ↙         ↘
                                                           observe        END