Flow
A Flow orchestrates how Nodes connect and run, based on Actions returned from each Node’s post()
method. You can chain Nodes in a sequence or create branching logic depending on the Action string.
1. Action-based Transitions
Each Node’s post(shared, prep_res, exec_res)
method returns an Action string. By default, if post()
doesn’t explicitly return anything, we treat that as "default"
.
You define transitions with the syntax:
-
Basic default transition:
node_a >> node_b
This means ifnode_a.post()
returns"default"
(orNone
), go tonode_b
. (Equivalent tonode_a - "default" >> node_b
) -
Named action transition:
node_a - "action_name" >> node_b
This means ifnode_a.post()
returns"action_name"
, go tonode_b
.
It’s possible to create loops, branching, or multi-step flows.
2. Creating a Flow
A Flow begins with a start node (or flow). You call Flow(start=some_node)
to specify the entry point. When you call flow.run(shared)
, it executes the first node, looks at its post()
return Action, follows the corresponding transition, and continues until there’s no next node or you explicitly stop.
Example: Simple Sequence
Here’s a minimal flow of two nodes in a chain:
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
- When you run the flow, it executes
node_a
. - Suppose
node_a.post()
returns"default"
. - The flow then sees
"default"
Action is linked tonode_b
and runsnode_b
. - If
node_b.post()
returns"default"
but we didn’t definenode_b >> something_else
, the flow ends there.
Example: Branching & Looping
Here’s a simple expense approval flow that demonstrates branching and looping. The ReviewExpense
node can return three possible Actions:
"approved"
: expense is approved, move to payment processing"needs_revision"
: expense needs changes, send back for revision"rejected"
: expense is denied, finish the process
We can wire them like this:
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
Let’s see how it flows:
- If
review.post()
returns"approved"
, the expense moves topayment
node - If
review.post()
returns"needs_revision"
, it goes torevise
node, which then loops back toreview
- If
review.post()
returns"rejected"
, it moves tofinish
node and stops
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
Running Individual Nodes vs. Running a Flow
node.run(shared)
: Just runs that node alone (callsprep()
,exec()
,post()
), returns an Action.flow.run(shared)
: Executes from the start node, follows Actions to the next node, and so on until the flow can’t continue (no next node or no next Action).
node.run(shared) does not proceed automatically to the successor and may use incorrect parameters. This is mainly for debugging or testing a single node. Always use
flow.run(...)
in production to ensure the full pipeline runs correctly.
3. Nested Flows
A Flow can act like a Node, which enables powerful composition patterns. This means you can:
- Use a Flow as a Node within another Flow’s transitions.
- Combine multiple smaller Flows into a larger Flow for reuse.
- Node
params
will be a merging of all parents’params
.
Basic Flow Nesting
Here’s how to connect a flow to another node:
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
When parent_flow.run()
executes:
- It starts
subflow
subflow
runs through its nodes (node_a
thennode_b
)- After
subflow
completes, execution continues tonode_c
Example: Order Processing Pipeline
Here’s a practical example that breaks down order processing into nested flows:
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
This creates a clean separation of concerns while maintaining a clear execution path:
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end