State
State is a TypedDict shared across all nodes in a workflow. Each node receives the current state and returns a dict of updates.
Defining State
| from typing import Annotated
from typing_extensions import TypedDict
class AppState(TypedDict):
messages: list
status: str
counter: int
|
Reducers
By default, a node's returned value replaces the existing field. Use Annotated to attach a reducer function — the reducer receives (old_value, new_value) and returns the merged result.
| from typing import Annotated
from typing_extensions import TypedDict
def add_messages(old: list, new: list) -> list:
"""Append new messages to existing list instead of replacing."""
if old is None:
return new
return old + new
class AppState(TypedDict):
messages: Annotated[list, add_messages] # reducer: appends
status: str # no reducer: replaces
counter: int # no reducer: replaces
|
Reducer vs Replace behaviour
Given a node that returns {"messages": ["b"], "status": "done", "counter": 2}:
| Field |
Has Reducer |
Previous |
Returned |
Result |
messages |
Yes (add_messages) |
["a"] |
["b"] |
["a", "b"] |
status |
No |
"running" |
"done" |
"done" |
counter |
No |
1 |
2 |
2 |
Node State Updates
Node functions return a partial dict — only include keys that changed:
| async def my_node(state: AppState) -> dict:
new_counter = state.get("counter", 0) + 1
return {
"counter": new_counter,
"status": "processed",
# "messages" not returned — it keeps its current value
}
|
State with Custom Objects
State fields can hold any serialisable Python object. Custom objects are automatically serialised/deserialised when using checkpointers:
| from typing import Annotated, List
from typing_extensions import TypedDict
from llmfy import Message
def add_messages(old: List[Message], new: List[Message]) -> List[Message]:
if old is None:
return new
return old + new
class ChatState(TypedDict):
messages: Annotated[List[Message], add_messages]
session_id: str
|
Continuation with Reducers
When using a checkpointer and the same session_id, the apply_state passed to invoke is merged into the checkpointed state via reducers before the workflow runs:
| # First run — starts fresh
state = await flow.invoke(
{"messages": [], "status": "start", "counter": 0},
session_id="user-123",
)
# Second run — merged with checkpoint via reducers
state = await flow.invoke(
{"messages": ["new input"], "status": "continuing"},
session_id="user-123",
)
# messages field: reducer appends "new input" to previous messages
# status field: replaced with "continuing"
# counter field: carried over from checkpoint unchanged
# Pass None to continue from checkpoint without any updates
state = await flow.invoke(None, session_id="user-123")
|