FlowEngine Agent Basic Example
Examples demonstrating FlowEngine with normal node (non-streaming node) and run with invoke.
import asyncio
import os
from typing import List, TypedDict, cast
from dotenv import load_dotenv
from sqlalchemy.engine import URL
from typing_extensions import Annotated
from llmfy import (
BedrockConfig,
BedrockModel,
LLMfy,
Message,
Role,
Tool,
ToolRegistry,
tools_node,
RedisCheckpointer,
SQLCheckpointer,
FlowEngine,
END,
START,
)
load_dotenv()
db_url = URL.create(
drivername="mysql+pymysql",
username=os.getenv("MYSQL_USER", "root"),
password=os.getenv("MYSQL_PASSWORD", ""),
host=os.getenv("MYSQL_HOST", "localhost"),
port=int(os.getenv("MYSQL_PORT", 3306)),
database=os.getenv("MYSQL_DATABASE", ""),
query={"charset": "utf8mb4"},
)
def add_message(old_messages: List[Message], new_message: List[Message]):
"""Reducer function to append messages."""
if old_messages is None:
return new_message
return old_messages + new_message
class AppState(TypedDict):
messages: Annotated[list[Message], add_message]
status: str
def build_agent(use_redis: bool = True):
print("\n" + "=" * 60)
print("Example Agent: Complex State with Custom Objects")
print("=" * 60 + "\n")
if use_redis:
checkpointer = RedisCheckpointer(
redis_url="redis://localhost:6379/0",
prefix="flowengine:",
ttl=3600, # 1 hour TTL
)
else:
checkpointer = SQLCheckpointer(
connection_string=db_url.render_as_string(hide_password=False),
echo=False, # Set to True to see SQL queries
)
# checkpointer = MemoryCheckpointer()
# Define a sample tool
@Tool()
def get_current_weather(location: str, unit: str = "celsius") -> str:
return f"The weather in {location} is 22 degrees {unit}"
@Tool()
def get_current_time(location: str) -> str:
return f"The time in {location} is 09:00 AM"
model = BedrockModel(
# model="amazon.nova-pro-v1:0",
# model="amazon.nova-pro-v1:0",
# model="us.anthropic.claude-3-5-haiku-20241022-v1:0",
# model="anthropic.claude-3-haiku-20240307-v1:0",
# model="us.meta.llama3-3-70b-instruct-v1:0",
model="amazon.nova-lite-v1:0",
config=BedrockConfig(temperature=0.7),
)
# model = OpenAIModel(model="gpt-4o-mini", config=OpenAIConfig())
llm = LLMfy(model, system_message="You are Hoki a helpfull assistant.")
tools = [get_current_weather, get_current_time]
# Register tool
llm.register_tool(tools)
# Register to ToolRegistry
tool_registry = ToolRegistry(tools, model)
flow = FlowEngine(state_schema=AppState, checkpointer=checkpointer)
def main_orchestrator(state: AppState):
messages = state.get("messages", [])
# for msg in messages:
# print(f"- {msg}")
response = llm.chat(messages)
ai_response = response.messages[-1]
return {"messages": [ai_response], "status": "main"}
def tools_executor(state):
tool_results = tools_node(
messages=state.get("messages", []),
registry=tool_registry,
)
return {"messages": tool_results}
def should_continue(state):
messages = state.get("messages", [])
last_message = messages[-1]
if last_message.tool_calls:
return "tools"
return END
flow.add_node("main", main_orchestrator)
flow.add_node("tools", tools_executor)
flow.add_edge(START, "main")
flow.add_edge("tools", "main")
flow.add_conditional_edge("main", ["tools", END], should_continue)
return flow.build()
# ============================================================================
# Main execution
# ============================================================================
agent = build_agent(use_redis=False)
async def chat(message: str):
result = await agent.invoke(
{
"messages": [Message(role=Role.USER, content=message)],
},
session_id="cobalagi",
)
return cast(Message, result["messages"][-1])
async def main():
print("=== Terminal Chat ===")
print("Type 'exit' to quit.\n")
while True:
user_msg = input("You: ")
if user_msg.strip().lower() in ["exit", "quit"]:
print("Chatbot: Goodbye! 👋")
break
reply = await chat(user_msg)
print(f"Chatbot: {reply.content}")
if __name__ == "__main__":
asyncio.run(main())