FlowEngine
FlowEngine is a state-based workflow orchestrator for building LLM-powered pipelines. It lets you define a graph of nodes (processing steps) connected by edges (transitions), manage shared state across nodes, and optionally persist state across sessions with checkpointers.
Key Concepts
| Concept | Description |
|---|---|
| State | A TypedDict shared across all nodes. Each node reads from state and returns updates. |
| Node | A Python function (sync or async) that processes state and returns a dict of updates. |
| Edge | A connection between two nodes. Can be direct or conditional. |
| START / END | Special constants marking the entry and exit of the workflow. |
| Checkpointer | Optional backend that saves state after each node so sessions can resume. |
Installation
FlowEngine state requires the typing_extensions package:
Quick Start
FlowEngine API
| Method | Description |
|---|---|
FlowEngine(state_schema, checkpointer=None) |
Create engine with a TypedDict state schema and optional checkpointer |
add_node(name, func, stream=False) |
Add a processing node. Set stream=True for generator nodes |
add_edge(source, target) |
Add a direct transition between nodes |
add_conditional_edge(source, targets, condition) |
Add conditional routing: condition(state) -> str returns the next node name |
build() |
Validate and compile the workflow. Returns the built FlowEngine |
invoke(apply_state, session_id=None) |
Run the workflow synchronously. Returns the final state dict |
stream(apply_state, session_id=None) |
Run the workflow with streaming. Returns an async generator of FlowEngineStreamResponse |
get_state(session_id) |
Retrieve the latest checkpointed state for a session |
reset_session(session_id) |
Clear all checkpoints for a session (start fresh) |
list_checkpoints(session_id, limit=10) |
List checkpoint metadata for a session |
details() |
Print a text representation of the workflow graph |
visualize() |
Return a Mermaid diagram URL for the workflow |
Visualization
After calling build(), inspect or visualize the workflow: