The pipeline tool for defining, executing, and managing multi-agent execution graphs
What it does: Lets agents define, run, save, and monitor multi-agent execution graphs (DAGs). Each node is a sub-agent task; independent nodes run in parallel, dependent nodes wait their turn.Who it’s for: Anyone who wants to coordinate more than one specialist agent on a single problem — research teams, code review rotations, debate / vote setups, multi-step content workflows, gated trade execution.For the conceptual background — node types, barrier modes, context modes, scheduling, and the architecture of the graph coordinator — see Execution Graphs. This page is the developer reference for the pipeline tool itself.
When a user describes a multi-agent workflow in chat, the agent translates it into a pipeline call. For example:
You: Have four analysts research NVDA in parallel, then write a final buy/hold/sell recommendation that incorporates all four reports.
The agent calls pipeline action: execute with a 5-node graph. Four research nodes have no depends_on (they run in parallel); the final recommendation node depends on all four (barrier_mode: all):
The graph coordinator runs the four analysts concurrently, blocks the recommendation node until all four return, then injects each upstream result into the prompt via {{nodeId.result}}. You can monitor the run with pipeline action: status graph_id: <id> and grab final outputs with pipeline action: outputs graph_id: <id>.
Validates a graph structure without executing it. Returns the topological execution order and any warnings. Use this to check a graph before running it.
tool: pipelineaction: definenodes: - node_id: search task: "Search for TypeScript best practices" - node_id: summarize task: "Summarize the search results: {{search.result}}" depends_on: [search]label: "TS Best Practices"
Check the status of a specific graph or list recent graph executions.
# Check a specific graphtool: pipelineaction: statusgraph_id: "abc-123-def"# List recent graphs from the last 30 minutestool: pipelineaction: statusrecent_minutes: 30
Stop a running graph. All running nodes are terminated and remaining nodes are skipped. Requires user confirmation (_confirmed: true) because it is a destructive action gated by the action classifier.
Retrieve node output values from a completed or running graph. Uses memory-first retrieval with disk fallback for expired graphs. Each node output is truncated at 12,000 characters.
Nodes that have not completed yet return null. The source field indicates whether outputs were retrieved from the in-memory coordinator ("memory") or from persisted output files on disk ("disk").
Each node in the nodes array accepts the following parameters:
Parameter
Type
Required
Description
node_id
string
Yes
Unique identifier within the graph
task
string
Yes
Task description for the sub-agent. Use {{nodeId.result}} to inline upstream output. Use ${VAR} for user-provided inputs.
depends_on
string[]
No
Node IDs that must complete before this node runs
agent
string
No
Agent ID to run this node (defaults to caller’s agent)
model
string
No
Model override for this node
timeout_ms
integer
No
Per-node timeout in milliseconds (default: 300000)
max_steps
integer
No
Maximum agentic steps for the sub-agent
barrier_mode
string
No
all (default), majority, or best-effort
retries
integer
No
Number of automatic retries on failure (0-3, default: 0). Exponential backoff delays: 1s, 2s, 4s.
type_id
string
No
Built-in node type: agent, debate, vote, refine, collaborate, approval-gate, or map-reduce. If omitted, the node runs as a regular single-agent task.
type_config
object
No
Type-specific configuration. Required when type_id is set. See Node Types for config schemas per type.
context_mode
string
No
Upstream output verbosity: full (default, complete outputs), summary (500 chars + shared dir reference), none (no automatic injection — use {{nodeId.result}} for explicit data).
mcp_servers
string[]
No
MCP server names whose tools should be pre-discovered for this node. Example: ["yfinance"] pre-seeds the yfinance tools so the sub-agent can skip discover_tools calls.
Top-level parameters that apply to the entire graph:
Parameter
Type
Required
Description
action
string
No
Pipeline action (default: execute)
nodes
array
For define/execute/save
Array of node definitions
label
string
No
Human-readable graph label
on_failure
string
No
fail-fast (default) or continue
timeout_ms
integer
No
Graph-level timeout in milliseconds (default: 1500000 / 25 min). Automatically raised to the DAG’s critical-path makespan (topological depth × concurrency waves × per-node timeout) when set too low, so later phases — e.g. a debate or final-synthesis node — are not starved by earlier parallel ones.
graph_id
string
For status/cancel/outputs
ID of a running/completed graph
id
string
For save/load/delete
Named graph ID for persistence
budget
object
No
{ max_tokens, max_cost } — resource limits
variables
object
No
Key-value pairs for ${VARIABLE_NAME} substitution in task text at execute time
edges
array
No
Explicit edge definitions (auto-derived if omitted)
Nodes receive upstream results through {{nodeId.result}} template interpolation directly in the task text. The nodeId must appear in the node’s depends_on array.
nodes: - node_id: search task: "Search for information about quantum computing" - node_id: analyze task: "Analyze the search results: {{search.result}}" depends_on: [search]
In this example, search is the upstream node ID referenced in depends_on, and {{search.result}} is replaced with the search node’s output at runtime.
Use ${VARIABLE_NAME} for user-provided inputs that the web dashboard prompts for before execution. Use {{nodeId.result}} to inline upstream node outputs. These are two different interpolation syntaxes — ${VAR} is resolved at execute time from the variables parameter, while {{nodeId.result}} is resolved at runtime when the upstream node completes.
Important constraints:
Template references ({{X.result}}) must use a node ID that appears in the same node’s depends_on array
Outputs exceeding 12,000 characters are automatically truncated
If an upstream node did not complete (failed or skipped), the template is replaced with [unavailable: node "nodeId" did not complete]
Additionally, upstream outputs are automatically injected as context for downstream nodes. Set context_mode: "none" to disable automatic injection and rely solely on {{nodeId.result}} templates.
Each graph execution creates a temporary directory at ~/.comis/graph-runs/{graphId}/. All nodes can read and write files here, making it ideal for exchanging large payloads between nodes.
- node_id: generate task: "Generate a CSV report and save it to the shared pipeline folder"- node_id: analyze task: "Read the CSV report from the shared pipeline folder and summarize it" depends_on: [generate]
The shared folder path is automatically provided to each sub-agent.
With fail-fast, any failed dependency cascades to skip all downstream nodes. With continue, the barrier mode determines whether a downstream node runs or is skipped:
all: skipped if any dependency failed
majority: skipped only if more than half the dependencies failed
best-effort: skipped only if every dependency failed
A node starts as pending, becomes ready when its dependencies are satisfied (or immediately if it has none), transitions to running when spawned, and ends as completed, failed, or skipped.
If the graph exceeds its budget (max_tokens or max_cost) or timeout, all running nodes are killed and remaining nodes are skipped. The graph:completed event includes a cancelReason field (timeout, budget, or manual).
A sequential chain where each step builds on the previous:
tool: pipelineaction: executenodes: - node_id: search task: "Search the web for the latest developments in renewable energy storage" - node_id: analyze task: "Analyze the search results and identify the top 3 trends: {{search.result}}" depends_on: [search] - node_id: write task: "Write a 1000-word article about renewable energy storage trends based on: {{analyze.result}}" depends_on: [analyze]label: "Renewable Energy Research"
Parallel analysis steps that feed into a final decision:
tool: pipelineaction: executenodes: - node_id: lint task: "Run the linter on the repository and report all findings" - node_id: test task: "Run the test suite and report results including any failures" - node_id: review task: "Review the code changes in the latest commit for security and performance issues" - node_id: merge task: "Based on lint results, test results, and code review, decide whether to approve the merge: lint={{lint.result}}, tests={{test.result}}, review={{review.result}}" depends_on: [lint, test, review] barrier_mode: alllabel: "Code Review"on_failure: continue
tool: pipelineaction: executenodes: - node_id: outline task: "Create a detailed outline for a blog post about ${TOPIC}" - node_id: draft task: "Write the full blog post based on this outline: {{outline.result}}" depends_on: [outline] model: "claude-sonnet-4-5-20250929" - node_id: edit task: "Edit and proofread the draft for clarity and grammar: {{draft.result}}" depends_on: [draft] - node_id: seo task: "Generate SEO metadata (title, description, keywords) for: {{draft.result}}" depends_on: [draft]label: "Blog Content Pipeline"budget: max_tokens: 100000 max_cost: 2.00timeout_ms: 300000variables: TOPIC: "AI-powered code review tools"
A pipeline with adversarial debate for balanced analysis:
tool: pipelineaction: executenodes: - node_id: research task: "Research the pros and cons of microservices vs monolith architecture" retries: 2 - node_id: evaluate task: "Is microservices the right architecture for a 10-person startup? Consider: {{research.result}}" depends_on: [research] type_id: debate type_config: agents: [optimist-architect, pragmatic-architect] rounds: 3 synthesizer: lead-architect - node_id: recommend task: "Write a final recommendation based on the debate outcome: {{evaluate.result}}" depends_on: [evaluate] context_mode: nonelabel: "Architecture Decision"
A pipeline that explicitly sets type_id: agent with driver-level configuration for model and step limits:
tool: pipelineaction: executenodes: - node_id: gather task: "Collect the latest financial data for ${TICKER}" agent: data-collector - node_id: analyze depends_on: [gather] task: "Perform deep technical analysis on: {{gather.result}}" type_id: agent type_config: agent: senior-analyst model: "claude-sonnet-4-5-20250929" max_steps: 15 - node_id: report depends_on: [analyze] task: "Write an executive summary from: {{analyze.result}}"label: "Typed Agent Analysis"variables: TICKER: "TSLA"
Setting type_id: agent explicitly is optional — a node without type_id runs as a single-agent task by default. Use the explicit form when you need to override the agent’s model or step limit via type_config while using the driver system.
A pipeline that fans out work to parallel mappers and then reduces their outputs into a single result:
tool: pipelineaction: executenodes: - node_id: plan task: "Break ${TOPIC} into 3 independent research areas" - node_id: research depends_on: [plan] task: "Research the assigned area in depth: {{plan.result}}" type_id: map-reduce type_config: mappers: - agent: researcher-1 task_suffix: "Focus on market trends" - agent: researcher-2 task_suffix: "Focus on technical feasibility" - agent: researcher-3 task_suffix: "Focus on competitive landscape" reducer: lead-analyst reducer_prompt: "Synthesize all research into a unified report with key findings and recommendations" - node_id: summary depends_on: [research] task: "Write an executive summary from: {{research.result}}"label: "Parallel Research Analysis"variables: TOPIC: "enterprise AI adoption"