Skip to content

LLMfyPipe

llmfy.llmfypipe

LLMfyPipe

Source code in llmfy/llmfypipe/llmfypipe.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
class LLMfyPipe:
    def __init__(
        self,
        initial_state: Optional[Dict[str, Any]] = None,
        memory: Optional[MemoryManager] = None,
    ):
        self.nodes: Dict[str, Node] = {
            START: Node(name=START, node_type=NodeType.START),
            END: Node(name=END, node_type=NodeType.END),
        }
        self.edges: List[Edge] = []
        self.state = WorkflowState(initial_state or {})
        self.visualizer = WorkflowVisualizer()
        self.memory = memory

    def validate_workflow(self) -> bool:
        """
        Validate the workflow structure.
        Ensures START has outgoing edges and END has incoming edges.
        """
        start_has_edge = False
        end_has_edge = False

        for edge in self.edges:
            if edge.source == START:
                start_has_edge = True
            if END in edge.targets:
                end_has_edge = True

        if not start_has_edge:
            raise ValueError("Workflow must have at least one edge from START")
        if not end_has_edge:
            raise ValueError("Workflow must have at least one edge to END")

        return True

    def add_node(
        self,
        name: str,
        func: Callable,
        node_type: NodeType = NodeType.FUNCTION,
        stream: bool = False,
    ) -> None:
        """
        Add a node to the workflow.
        Set is_streaming=True for nodes that return streaming content.

        Args:
            name (str): Node name
            func (Callable): Node description
            node_type (NodeType, optional): Node type. Defaults to NodeType.FUNCTION.
            stream (bool, optional): Node is use stream or not, if node use streaming set to True. Defaults to False.

        Raises:
            ValueError: _description_

        Returns:
            _type_: _description_
        """
        if name in (START, END):
            raise ValueError(f"Cannot add node with reserved name {name}")

        # Modify the function to include state access
        original_func = func

        async def wrapped_func(*args, **kwargs):
            sig = inspect.signature(original_func)
            if "state" in sig.parameters:
                kwargs["state"] = self.state

            if inspect.iscoroutinefunction(original_func):
                return await original_func(*args, **kwargs)
            return original_func(*args, **kwargs)

        wrapped_func.__signature__ = inspect.signature(original_func)  # type: ignore
        wrapped_func.__annotations__ = original_func.__annotations__

        sig = inspect.signature(original_func)
        inputs = list(sig.parameters.keys())

        outputs = []
        if sig.return_annotation != inspect.Signature.empty:
            if hasattr(sig.return_annotation, "__annotations__"):
                outputs = list(sig.return_annotation.__annotations__.keys())

        node = Node(
            name=name,
            node_type=node_type,
            func=wrapped_func,
            inputs=inputs,
            outputs=outputs,
            stream=stream,
        )
        self.nodes[name] = node

    def add_edge(self, source: str, target: str) -> None:
        """Add a direct edge between nodes."""
        if source not in self.nodes:
            raise ValueError(f"Source node '{source}' not found")
        if target not in self.nodes:
            raise ValueError(f"Target node '{target}' not found")

        edge = Edge(source=source, targets=[target])
        self.edges.append(edge)

    def add_conditional_edge(
        self, source: str, targets: Union[str, List[str]], condition: Callable
    ) -> None:
        """Add a conditional edge with multiple possible targets."""
        if source not in self.nodes:
            raise ValueError(f"Source node '{source}' not found")

        target_list = [targets] if isinstance(targets, str) else targets

        for target in target_list:
            if target not in self.nodes:
                raise ValueError(f"Target node '{target}' not found")

        original_condition = condition

        def wrapped_condition(*args, **kwargs):
            result = original_condition(self.state)
            if result not in target_list and result != END:
                raise ValueError(
                    f"Condition returned '{result}' which is not in targets: {target_list}"
                )
            return result

        edge = Edge(source=source, targets=target_list, condition=wrapped_condition)
        self.edges.append(edge)

    def _handle_state_update(self, additional_state, thread_id=None):
        updated_state = (
            self.memory.update_memory(thread_id, additional_state)
            if self.memory and thread_id
            else additional_state
        )
        self.state._update(updated_state)

    async def execute(
        self,
        additional_state: Dict[str, Any],
        thread_id: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Execute the workflow in standard (non-streaming) mode.

        Args:
            additional_state: Initial state to add to the workflow
            thread_id: Optional thread ID for memory management

        Returns:
            The final workflow state
        """
        try:
            # validate thread_id
            if self.memory and not thread_id:
                raise LLMfyException(
                    "`thread_id` required because workflow has memory."
                )

            if additional_state:
                self._handle_state_update(additional_state, thread_id)
                # if self.memory and thread_id:
                #     # using memory
                #     self.state._update(
                #         self.memory.update_memory(thread_id, additional_state)
                #     )
                # else:
                #     self.state._update(additional_state)

            # Validate workflow before execution
            self.validate_workflow()

            nodes_to_process = [START]
            # processed_nodes = set()

            while nodes_to_process:
                current_node = nodes_to_process.pop(0)
                if current_node == END:
                    continue

                if current_node != START:
                    node = self.nodes[current_node]

                    try:
                        func_inputs = {
                            input_name: self.state.get(input_name)
                            for input_name in node.inputs
                            if input_name != "state"
                            and input_name in self.state.get_current()
                        }

                        if node.func:
                            result = await node.func(**func_inputs)
                            # print(f"execute@LLMfyPipe result: {result}")
                            if isinstance(result, dict):
                                self._handle_state_update(result, thread_id)
                                # self.state._update(result)
                            elif len(node.outputs) == 1:
                                self._handle_state_update(
                                    {node.outputs[0]: result}, thread_id
                                )
                                # self.state._update({node.outputs[0]: result})

                    except Exception as e:
                        raise LLMfyException(
                            f"Error executing node {current_node}: {str(e)}"
                        )

                # Get next nodes
                next_nodes = []
                for edge in self.edges:
                    if edge.source == current_node:
                        if edge.condition is None:
                            next_nodes.extend(edge.targets)
                        else:
                            target = edge.condition()
                            if target and target != END:
                                next_nodes.append(target)

                nodes_to_process.extend(next_nodes)

            return self.state.get_current()
        except Exception as e:
            raise LLMfyException(e)

    async def stream(
        self,
        additional_state: Dict[str, Any],
        thread_id: Optional[str] = None,
        stream_callback: Optional[Callable] = None,
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Execute the workflow in streaming mode, yielding results as they become available.

        Args:
            additional_state: Initial state to add to the workflow
            thread_id: Optional thread ID for memory management
            stream_callback: Optional callback function for handling streaming chunks (content only)

        Returns:
            An async generator yielding state and event updates
        """
        try:
            # validate thread_id
            if self.memory and not thread_id:
                raise LLMfyException(
                    "`thread_id` required because workflow has memory."
                )

            if additional_state:
                updated_state = self._handle_state_update(additional_state, thread_id)
                yield {"type": "state_update", "state": updated_state}

            # Validate workflow before execution
            self.validate_workflow()
            yield {"type": "workflow_start"}

            nodes_to_process = [START]

            while nodes_to_process:
                current_node = nodes_to_process.pop(0)

                if current_node == END:
                    yield {"type": "node_complete", "node": END}
                    continue

                yield {"type": "node_start", "node": current_node}

                if current_node != START:
                    node = self.nodes[current_node]

                    try:
                        func_inputs = {
                            input_name: self.state.get(input_name)
                            for input_name in node.inputs
                            if input_name != "state"
                            and input_name in self.state.get_current()
                        }

                        if node.func:
                            if node.stream:
                                # Handle streaming node
                                async for chunk in await node.func(**func_inputs):
                                    if isinstance(chunk, dict):
                                        # Update state
                                        updated_state = self._handle_state_update(
                                            chunk, thread_id
                                        )
                                        yield {
                                            "type": "node_result",
                                            "node": current_node,
                                            "result": chunk,
                                            "state": updated_state,
                                        }

                                    else:
                                        # For raw non-dict chunks (like string tokens)
                                        yield {
                                            "type": "stream_chunk",
                                            "node": current_node,
                                            "content": chunk,
                                        }

                                        if stream_callback:
                                            await stream_callback(chunk)
                            else:
                                # Handle non-streaming node
                                result = await node.func(**func_inputs)
                                if isinstance(result, dict):
                                    updated_state = self._handle_state_update(
                                        result, thread_id
                                    )
                                    yield {
                                        "type": "node_result",
                                        "node": current_node,
                                        "result": result,
                                        "state": updated_state,
                                    }
                                elif len(node.outputs) == 1:
                                    result_dict = {node.outputs[0]: result}
                                    updated_state = self._handle_state_update(
                                        result_dict, thread_id
                                    )
                                    yield {
                                        "type": "node_result",
                                        "node": current_node,
                                        "result": result_dict,
                                        "state": updated_state,
                                    }

                    except Exception as e:
                        error_msg = f"Error executing node {current_node}: {str(e)}"
                        yield {
                            "type": "error",
                            "node": current_node,
                            "error": error_msg,
                        }
                        raise LLMfyException(error_msg)

                yield {"type": "node_complete", "node": current_node}

                # Get next nodes
                next_nodes = []
                for edge in self.edges:
                    if edge.source == current_node:
                        if edge.condition is None:
                            next_nodes.extend(edge.targets)
                        else:
                            target = edge.condition()
                            if target and target != END:
                                next_nodes.append(target)

                nodes_to_process.extend(next_nodes)
                yield {"type": "next_nodes", "nodes": next_nodes}

            yield {"type": "workflow_complete", "state": self.state.get_current()}
        except Exception as e:
            yield {"type": "workflow_error", "error": str(e)}
            raise LLMfyException(e)

    def get_diagram_code(self) -> str:
        """Get code for the workflow diagram."""
        return self.visualizer.create_mermaid_diagram(self)

    def get_diagram_base64(self):
        """Generate Mermaid diagram base64 for the workflow.

        Display with:
        ```py
        display(Image(url=your_diagram_url))
        ```
        """

        def get_base64(graph):
            graphbytes = graph.encode("utf8")
            base64_bytes = base64.urlsafe_b64encode(graphbytes)
            base64_string = base64_bytes.decode("ascii")
            return base64_string

        mermaid_code = self.get_diagram_code()
        return get_base64(
            f"""
			 {mermaid_code}
			 """
        )

    def get_diagram_url(self):
        """Generate Mermaid diagram url for the workflow.

        Display with:
        ```py
        display(Image(url=your_diagram_url))
        ```
        """

        def get_url(graph):
            graphbytes = graph.encode("utf8")
            base64_bytes = base64.urlsafe_b64encode(graphbytes)
            base64_string = base64_bytes.decode("ascii")
            return "https://mermaid.ink/img/" + base64_string

        mermaid_code = self.get_diagram_code()
        return get_url(
            f"""
			 {mermaid_code}
			 """
        )

add_conditional_edge(source, targets, condition)

Add a conditional edge with multiple possible targets.

Source code in llmfy/llmfypipe/llmfypipe.py
def add_conditional_edge(
    self, source: str, targets: Union[str, List[str]], condition: Callable
) -> None:
    """Add a conditional edge with multiple possible targets."""
    if source not in self.nodes:
        raise ValueError(f"Source node '{source}' not found")

    target_list = [targets] if isinstance(targets, str) else targets

    for target in target_list:
        if target not in self.nodes:
            raise ValueError(f"Target node '{target}' not found")

    original_condition = condition

    def wrapped_condition(*args, **kwargs):
        result = original_condition(self.state)
        if result not in target_list and result != END:
            raise ValueError(
                f"Condition returned '{result}' which is not in targets: {target_list}"
            )
        return result

    edge = Edge(source=source, targets=target_list, condition=wrapped_condition)
    self.edges.append(edge)

add_edge(source, target)

Add a direct edge between nodes.

Source code in llmfy/llmfypipe/llmfypipe.py
def add_edge(self, source: str, target: str) -> None:
    """Add a direct edge between nodes."""
    if source not in self.nodes:
        raise ValueError(f"Source node '{source}' not found")
    if target not in self.nodes:
        raise ValueError(f"Target node '{target}' not found")

    edge = Edge(source=source, targets=[target])
    self.edges.append(edge)

add_node(name, func, node_type=NodeType.FUNCTION, stream=False)

Add a node to the workflow. Set is_streaming=True for nodes that return streaming content.

Parameters:

Name Type Description Default
name str

Node name

required
func Callable

Node description

required
node_type NodeType

Node type. Defaults to NodeType.FUNCTION.

FUNCTION
stream bool

Node is use stream or not, if node use streaming set to True. Defaults to False.

False

Raises:

Type Description
ValueError

description

Returns:

Name Type Description
_type_ None

description

Source code in llmfy/llmfypipe/llmfypipe.py
def add_node(
    self,
    name: str,
    func: Callable,
    node_type: NodeType = NodeType.FUNCTION,
    stream: bool = False,
) -> None:
    """
    Add a node to the workflow.
    Set is_streaming=True for nodes that return streaming content.

    Args:
        name (str): Node name
        func (Callable): Node description
        node_type (NodeType, optional): Node type. Defaults to NodeType.FUNCTION.
        stream (bool, optional): Node is use stream or not, if node use streaming set to True. Defaults to False.

    Raises:
        ValueError: _description_

    Returns:
        _type_: _description_
    """
    if name in (START, END):
        raise ValueError(f"Cannot add node with reserved name {name}")

    # Modify the function to include state access
    original_func = func

    async def wrapped_func(*args, **kwargs):
        sig = inspect.signature(original_func)
        if "state" in sig.parameters:
            kwargs["state"] = self.state

        if inspect.iscoroutinefunction(original_func):
            return await original_func(*args, **kwargs)
        return original_func(*args, **kwargs)

    wrapped_func.__signature__ = inspect.signature(original_func)  # type: ignore
    wrapped_func.__annotations__ = original_func.__annotations__

    sig = inspect.signature(original_func)
    inputs = list(sig.parameters.keys())

    outputs = []
    if sig.return_annotation != inspect.Signature.empty:
        if hasattr(sig.return_annotation, "__annotations__"):
            outputs = list(sig.return_annotation.__annotations__.keys())

    node = Node(
        name=name,
        node_type=node_type,
        func=wrapped_func,
        inputs=inputs,
        outputs=outputs,
        stream=stream,
    )
    self.nodes[name] = node

execute(additional_state, thread_id=None) async

Execute the workflow in standard (non-streaming) mode.

Parameters:

Name Type Description Default
additional_state Dict[str, Any]

Initial state to add to the workflow

required
thread_id Optional[str]

Optional thread ID for memory management

None

Returns:

Type Description
Dict[str, Any]

The final workflow state

Source code in llmfy/llmfypipe/llmfypipe.py
async def execute(
    self,
    additional_state: Dict[str, Any],
    thread_id: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Execute the workflow in standard (non-streaming) mode.

    Args:
        additional_state: Initial state to add to the workflow
        thread_id: Optional thread ID for memory management

    Returns:
        The final workflow state
    """
    try:
        # validate thread_id
        if self.memory and not thread_id:
            raise LLMfyException(
                "`thread_id` required because workflow has memory."
            )

        if additional_state:
            self._handle_state_update(additional_state, thread_id)
            # if self.memory and thread_id:
            #     # using memory
            #     self.state._update(
            #         self.memory.update_memory(thread_id, additional_state)
            #     )
            # else:
            #     self.state._update(additional_state)

        # Validate workflow before execution
        self.validate_workflow()

        nodes_to_process = [START]
        # processed_nodes = set()

        while nodes_to_process:
            current_node = nodes_to_process.pop(0)
            if current_node == END:
                continue

            if current_node != START:
                node = self.nodes[current_node]

                try:
                    func_inputs = {
                        input_name: self.state.get(input_name)
                        for input_name in node.inputs
                        if input_name != "state"
                        and input_name in self.state.get_current()
                    }

                    if node.func:
                        result = await node.func(**func_inputs)
                        # print(f"execute@LLMfyPipe result: {result}")
                        if isinstance(result, dict):
                            self._handle_state_update(result, thread_id)
                            # self.state._update(result)
                        elif len(node.outputs) == 1:
                            self._handle_state_update(
                                {node.outputs[0]: result}, thread_id
                            )
                            # self.state._update({node.outputs[0]: result})

                except Exception as e:
                    raise LLMfyException(
                        f"Error executing node {current_node}: {str(e)}"
                    )

            # Get next nodes
            next_nodes = []
            for edge in self.edges:
                if edge.source == current_node:
                    if edge.condition is None:
                        next_nodes.extend(edge.targets)
                    else:
                        target = edge.condition()
                        if target and target != END:
                            next_nodes.append(target)

            nodes_to_process.extend(next_nodes)

        return self.state.get_current()
    except Exception as e:
        raise LLMfyException(e)

get_diagram_base64()

Generate Mermaid diagram base64 for the workflow.

Display with:

display(Image(url=your_diagram_url))

Source code in llmfy/llmfypipe/llmfypipe.py
    def get_diagram_base64(self):
        """Generate Mermaid diagram base64 for the workflow.

        Display with:
        ```py
        display(Image(url=your_diagram_url))
        ```
        """

        def get_base64(graph):
            graphbytes = graph.encode("utf8")
            base64_bytes = base64.urlsafe_b64encode(graphbytes)
            base64_string = base64_bytes.decode("ascii")
            return base64_string

        mermaid_code = self.get_diagram_code()
        return get_base64(
            f"""
			 {mermaid_code}
			 """
        )

get_diagram_code()

Get code for the workflow diagram.

Source code in llmfy/llmfypipe/llmfypipe.py
def get_diagram_code(self) -> str:
    """Get code for the workflow diagram."""
    return self.visualizer.create_mermaid_diagram(self)

get_diagram_url()

Generate Mermaid diagram url for the workflow.

Display with:

display(Image(url=your_diagram_url))

Source code in llmfy/llmfypipe/llmfypipe.py
    def get_diagram_url(self):
        """Generate Mermaid diagram url for the workflow.

        Display with:
        ```py
        display(Image(url=your_diagram_url))
        ```
        """

        def get_url(graph):
            graphbytes = graph.encode("utf8")
            base64_bytes = base64.urlsafe_b64encode(graphbytes)
            base64_string = base64_bytes.decode("ascii")
            return "https://mermaid.ink/img/" + base64_string

        mermaid_code = self.get_diagram_code()
        return get_url(
            f"""
			 {mermaid_code}
			 """
        )

stream(additional_state, thread_id=None, stream_callback=None) async

Execute the workflow in streaming mode, yielding results as they become available.

Parameters:

Name Type Description Default
additional_state Dict[str, Any]

Initial state to add to the workflow

required
thread_id Optional[str]

Optional thread ID for memory management

None
stream_callback Optional[Callable]

Optional callback function for handling streaming chunks (content only)

None

Returns:

Type Description
AsyncGenerator[Dict[str, Any], None]

An async generator yielding state and event updates

Source code in llmfy/llmfypipe/llmfypipe.py
async def stream(
    self,
    additional_state: Dict[str, Any],
    thread_id: Optional[str] = None,
    stream_callback: Optional[Callable] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
    """
    Execute the workflow in streaming mode, yielding results as they become available.

    Args:
        additional_state: Initial state to add to the workflow
        thread_id: Optional thread ID for memory management
        stream_callback: Optional callback function for handling streaming chunks (content only)

    Returns:
        An async generator yielding state and event updates
    """
    try:
        # validate thread_id
        if self.memory and not thread_id:
            raise LLMfyException(
                "`thread_id` required because workflow has memory."
            )

        if additional_state:
            updated_state = self._handle_state_update(additional_state, thread_id)
            yield {"type": "state_update", "state": updated_state}

        # Validate workflow before execution
        self.validate_workflow()
        yield {"type": "workflow_start"}

        nodes_to_process = [START]

        while nodes_to_process:
            current_node = nodes_to_process.pop(0)

            if current_node == END:
                yield {"type": "node_complete", "node": END}
                continue

            yield {"type": "node_start", "node": current_node}

            if current_node != START:
                node = self.nodes[current_node]

                try:
                    func_inputs = {
                        input_name: self.state.get(input_name)
                        for input_name in node.inputs
                        if input_name != "state"
                        and input_name in self.state.get_current()
                    }

                    if node.func:
                        if node.stream:
                            # Handle streaming node
                            async for chunk in await node.func(**func_inputs):
                                if isinstance(chunk, dict):
                                    # Update state
                                    updated_state = self._handle_state_update(
                                        chunk, thread_id
                                    )
                                    yield {
                                        "type": "node_result",
                                        "node": current_node,
                                        "result": chunk,
                                        "state": updated_state,
                                    }

                                else:
                                    # For raw non-dict chunks (like string tokens)
                                    yield {
                                        "type": "stream_chunk",
                                        "node": current_node,
                                        "content": chunk,
                                    }

                                    if stream_callback:
                                        await stream_callback(chunk)
                        else:
                            # Handle non-streaming node
                            result = await node.func(**func_inputs)
                            if isinstance(result, dict):
                                updated_state = self._handle_state_update(
                                    result, thread_id
                                )
                                yield {
                                    "type": "node_result",
                                    "node": current_node,
                                    "result": result,
                                    "state": updated_state,
                                }
                            elif len(node.outputs) == 1:
                                result_dict = {node.outputs[0]: result}
                                updated_state = self._handle_state_update(
                                    result_dict, thread_id
                                )
                                yield {
                                    "type": "node_result",
                                    "node": current_node,
                                    "result": result_dict,
                                    "state": updated_state,
                                }

                except Exception as e:
                    error_msg = f"Error executing node {current_node}: {str(e)}"
                    yield {
                        "type": "error",
                        "node": current_node,
                        "error": error_msg,
                    }
                    raise LLMfyException(error_msg)

            yield {"type": "node_complete", "node": current_node}

            # Get next nodes
            next_nodes = []
            for edge in self.edges:
                if edge.source == current_node:
                    if edge.condition is None:
                        next_nodes.extend(edge.targets)
                    else:
                        target = edge.condition()
                        if target and target != END:
                            next_nodes.append(target)

            nodes_to_process.extend(next_nodes)
            yield {"type": "next_nodes", "nodes": next_nodes}

        yield {"type": "workflow_complete", "state": self.state.get_current()}
    except Exception as e:
        yield {"type": "workflow_error", "error": str(e)}
        raise LLMfyException(e)

validate_workflow()

Validate the workflow structure. Ensures START has outgoing edges and END has incoming edges.

Source code in llmfy/llmfypipe/llmfypipe.py
def validate_workflow(self) -> bool:
    """
    Validate the workflow structure.
    Ensures START has outgoing edges and END has incoming edges.
    """
    start_has_edge = False
    end_has_edge = False

    for edge in self.edges:
        if edge.source == START:
            start_has_edge = True
        if END in edge.targets:
            end_has_edge = True

    if not start_has_edge:
        raise ValueError("Workflow must have at least one edge from START")
    if not end_has_edge:
        raise ValueError("Workflow must have at least one edge to END")

    return True

MemoryManager

Source code in llmfy/llmfypipe/state/memory_manager.py
class MemoryManager:
    def __init__(
        self,
        extend_list: bool = False,
        cleanup: bool = True,
        cleanup_time: int = 9000,
    ):
        """MemoryManager

        Args:
            extend_list (bool, optional): Always extended list value.
                If there is key with list value then list will extended with new value.
            cleanup (bool, optional): run cleanup memories after `cleanup_time`
            cleanup_time (int, optional): default clean up time 90000 seconds = 1 day + 1 hour = 86400s + 3600s
        """
        self._memories: Dict[str, Dict[str, Any]] = {}
        self._lock = Lock()
        self.always_extend_list = extend_list
        self._timestamps = {}  # Timestamps to track last usage of threads
        self._cleanup_task = None  # Reference to the cleanup timer
        self._using_cleanup = cleanup
        self._cleanup_time = cleanup_time

    def get_memory(self, thread_id: str) -> Optional[Dict[str, Any]]:
        """Get memory for a specific thread."""
        with self._lock:
            return deepcopy(self._memories.get(thread_id))

    def deep_merge(self, existing: Any, new_data: Any) -> Any:
        """Recursively merge dictionaries and append new unique items to lists."""
        if isinstance(existing, dict) and isinstance(new_data, dict):
            for key, value in new_data.items():
                if key in existing:
                    existing[key] = self.deep_merge(existing[key], value)
                else:
                    existing[key] = deepcopy(value)
        elif isinstance(existing, list) and isinstance(new_data, list):
            # Append only new unique dictionaries (avoid duplicates)
            existing_dicts = {
                frozenset(d.items()) for d in existing if isinstance(d, dict)
            }
            for item in new_data:
                if (
                    isinstance(item, dict)
                    and frozenset(item.items()) not in existing_dicts
                ):
                    existing.append(deepcopy(item))
                elif item not in existing:  # For non-dict items
                    existing.append(deepcopy(item))
        else:
            # If neither a dict nor a list, just overwrite the value
            existing = deepcopy(new_data)

        return existing

    def update_memory(
        self,
        thread_id: str,
        state: Dict[str, Any],
    ) -> Dict[str, Any]:
        """Update memory for a specific thread."""
        with self._lock:
            # Update the last usage timestamp
            self._timestamps[thread_id] = datetime.now()

            if self._using_cleanup:
                # Ensure cleanup task is running
                if self._cleanup_task is None:
                    self._start_cleanup_task(interval=self._cleanup_time)

            # Check thread_id
            if thread_id not in self._memories:
                self._memories[thread_id] = deepcopy(state)
            else:
                if self.always_extend_list:
                    self._memories[thread_id] = self.deep_merge(
                        self._memories[thread_id], state
                    )
                else:
                    self._memories[thread_id].update(deepcopy(state))

            # hah = deepcopy(self._memories.get(thread_id))
            # pprint.pp(f"\nMEMORY: {thread_id}= {self._memories}")
            # print(f"\nCHECK: {self._memories}")
            return self._memories[thread_id]

    def delete_memory(self, thread_id: str) -> None:
        """Delete memory for a specific thread."""
        with self._lock:
            if thread_id in self._memories:
                del self._memories[thread_id]

    def list_threads(self) -> List[str]:
        """List all thread IDs with active memories."""
        with self._lock:
            return list(self._memories.keys())

    def _start_cleanup_task(self, interval: int):
        """Start a periodic task to clean up inactive threads.

        Args:
            interval (int, optional): Interval in seconds for running the cleanup task.
                (default 90000 seconds = 1 day + 1 hour = 86400s + 3600s)
        """

        def cleanup():
            self._remove_inactive_threads(interval)
            # Schedule the next cleanup only if there are threads left
            if self._memories:
                self._cleanup_task = Timer(interval, cleanup)
                self._cleanup_task.start()
            else:
                self._cleanup_task = None

        if self._cleanup_task is None:  # Prevent multiple timers
            self._cleanup_task = Timer(interval, cleanup)
            self._cleanup_task.start()

    def _remove_inactive_threads(self, interval: int):
        print("---- CLEAN-UP RUNNING ----")
        with self._lock:  # Ensure thread-safe access
            now = datetime.now()
            inactive_threads = [
                thread_id
                for thread_id, last_used in self._timestamps.items()
                if now - last_used > timedelta(seconds=interval)
            ]
            for thread_id in inactive_threads:
                del self._memories[thread_id]
                del self._timestamps[thread_id]

            # Stop cleanup task if no threads remain
            if not self._memories:
                self.cleanup_task = None
                print("No threads left. Stopping cleanup task.")
                print("---- CLEAN-UP STOPPED ----")

__init__(extend_list=False, cleanup=True, cleanup_time=9000)

MemoryManager

Parameters:

Name Type Description Default
extend_list bool

Always extended list value. If there is key with list value then list will extended with new value.

False
cleanup bool

run cleanup memories after cleanup_time

True
cleanup_time int

default clean up time 90000 seconds = 1 day + 1 hour = 86400s + 3600s

9000
Source code in llmfy/llmfypipe/state/memory_manager.py
def __init__(
    self,
    extend_list: bool = False,
    cleanup: bool = True,
    cleanup_time: int = 9000,
):
    """MemoryManager

    Args:
        extend_list (bool, optional): Always extended list value.
            If there is key with list value then list will extended with new value.
        cleanup (bool, optional): run cleanup memories after `cleanup_time`
        cleanup_time (int, optional): default clean up time 90000 seconds = 1 day + 1 hour = 86400s + 3600s
    """
    self._memories: Dict[str, Dict[str, Any]] = {}
    self._lock = Lock()
    self.always_extend_list = extend_list
    self._timestamps = {}  # Timestamps to track last usage of threads
    self._cleanup_task = None  # Reference to the cleanup timer
    self._using_cleanup = cleanup
    self._cleanup_time = cleanup_time

deep_merge(existing, new_data)

Recursively merge dictionaries and append new unique items to lists.

Source code in llmfy/llmfypipe/state/memory_manager.py
def deep_merge(self, existing: Any, new_data: Any) -> Any:
    """Recursively merge dictionaries and append new unique items to lists."""
    if isinstance(existing, dict) and isinstance(new_data, dict):
        for key, value in new_data.items():
            if key in existing:
                existing[key] = self.deep_merge(existing[key], value)
            else:
                existing[key] = deepcopy(value)
    elif isinstance(existing, list) and isinstance(new_data, list):
        # Append only new unique dictionaries (avoid duplicates)
        existing_dicts = {
            frozenset(d.items()) for d in existing if isinstance(d, dict)
        }
        for item in new_data:
            if (
                isinstance(item, dict)
                and frozenset(item.items()) not in existing_dicts
            ):
                existing.append(deepcopy(item))
            elif item not in existing:  # For non-dict items
                existing.append(deepcopy(item))
    else:
        # If neither a dict nor a list, just overwrite the value
        existing = deepcopy(new_data)

    return existing

delete_memory(thread_id)

Delete memory for a specific thread.

Source code in llmfy/llmfypipe/state/memory_manager.py
def delete_memory(self, thread_id: str) -> None:
    """Delete memory for a specific thread."""
    with self._lock:
        if thread_id in self._memories:
            del self._memories[thread_id]

get_memory(thread_id)

Get memory for a specific thread.

Source code in llmfy/llmfypipe/state/memory_manager.py
def get_memory(self, thread_id: str) -> Optional[Dict[str, Any]]:
    """Get memory for a specific thread."""
    with self._lock:
        return deepcopy(self._memories.get(thread_id))

list_threads()

List all thread IDs with active memories.

Source code in llmfy/llmfypipe/state/memory_manager.py
def list_threads(self) -> List[str]:
    """List all thread IDs with active memories."""
    with self._lock:
        return list(self._memories.keys())

update_memory(thread_id, state)

Update memory for a specific thread.

Source code in llmfy/llmfypipe/state/memory_manager.py
def update_memory(
    self,
    thread_id: str,
    state: Dict[str, Any],
) -> Dict[str, Any]:
    """Update memory for a specific thread."""
    with self._lock:
        # Update the last usage timestamp
        self._timestamps[thread_id] = datetime.now()

        if self._using_cleanup:
            # Ensure cleanup task is running
            if self._cleanup_task is None:
                self._start_cleanup_task(interval=self._cleanup_time)

        # Check thread_id
        if thread_id not in self._memories:
            self._memories[thread_id] = deepcopy(state)
        else:
            if self.always_extend_list:
                self._memories[thread_id] = self.deep_merge(
                    self._memories[thread_id], state
                )
            else:
                self._memories[thread_id].update(deepcopy(state))

        # hah = deepcopy(self._memories.get(thread_id))
        # pprint.pp(f"\nMEMORY: {thread_id}= {self._memories}")
        # print(f"\nCHECK: {self._memories}")
        return self._memories[thread_id]

WorkflowState

Source code in llmfy/llmfypipe/state/workflow_state.py
class WorkflowState:
    def __init__(
        self,
        initial_state: Dict[str, Any],
    ):
        self._state = initial_state or {}
        self._history: List[Dict[str, Any]] = []

    def get(self, key: str, default: Any = None) -> Any:
        """Get a value from the state."""
        # Return a deep copy to prevent direct modifications
        value = self._state.get(key, default)
        return deepcopy(value)

    def get_current(self) -> Dict[str, Any]:
        """Get the current state."""
        return deepcopy(self._state)

    def _update(self, values: Dict[str, Any]) -> None:
        """Internal method to update state. Only used by Workflow class."""
        self._state.update(deepcopy(values))
        self._history.append(deepcopy(self._state))

get(key, default=None)

Get a value from the state.

Source code in llmfy/llmfypipe/state/workflow_state.py
def get(self, key: str, default: Any = None) -> Any:
    """Get a value from the state."""
    # Return a deep copy to prevent direct modifications
    value = self._state.get(key, default)
    return deepcopy(value)

get_current()

Get the current state.

Source code in llmfy/llmfypipe/state/workflow_state.py
def get_current(self) -> Dict[str, Any]:
    """Get the current state."""
    return deepcopy(self._state)

WorkflowVisualizer

Source code in llmfy/llmfypipe/visualizer/visualizer.py
class WorkflowVisualizer:
    @staticmethod
    def create_mermaid_diagram(workflow) -> str:
        """Generate Mermaid diagram markdown from workflow."""
        nodes = workflow.nodes
        edges = workflow.edges

        # Start building the diagram
        mermaid = [
            "graph TD",
            "    %% Node Styles",
            "    %% Nodes",
        ]

        # Add all nodes
        for name, node in nodes.items():
            # Style nodes based on their type
            if name == START:
                mermaid.append(f"    {name}([{name}])")
                # mermaid.append(f"    class {name} start")
            elif name == END:
                mermaid.append(f"    {name}([{name}])")
                # mermaid.append(f"    class {name} end")
            else:
                mermaid.append(f"    {name}({name})")

        mermaid.append("    %% Edges")

        # Add all edges
        for edge in edges:
            source = edge.source

            if edge.condition is None:
                # Simple edges
                for target in edge.targets:
                    mermaid.append(f"    {source} --> {target}")
            else:
                # Conditional edges
                for target in edge.targets:
                    mermaid.append(f"    {source} -.->|condition| {target}")

        mermaid.append("style START fill:#777EFE,stroke:#4F54AA,color:white")
        mermaid.append("style END fill:#777EFE,stroke:#4F54AA,color:white")

        return "\n".join(mermaid)

    @staticmethod
    def generate_diagram_url(mermaid_code: str) -> str:
        """Generate URL for Mermaid diagram image."""
        graphbytes = mermaid_code.encode("utf8")
        base64_bytes = base64.urlsafe_b64encode(graphbytes)
        base64_string = base64_bytes.decode("ascii")
        return "https://mermaid.ink/img/" + base64_string

create_mermaid_diagram(workflow) staticmethod

Generate Mermaid diagram markdown from workflow.

Source code in llmfy/llmfypipe/visualizer/visualizer.py
@staticmethod
def create_mermaid_diagram(workflow) -> str:
    """Generate Mermaid diagram markdown from workflow."""
    nodes = workflow.nodes
    edges = workflow.edges

    # Start building the diagram
    mermaid = [
        "graph TD",
        "    %% Node Styles",
        "    %% Nodes",
    ]

    # Add all nodes
    for name, node in nodes.items():
        # Style nodes based on their type
        if name == START:
            mermaid.append(f"    {name}([{name}])")
            # mermaid.append(f"    class {name} start")
        elif name == END:
            mermaid.append(f"    {name}([{name}])")
            # mermaid.append(f"    class {name} end")
        else:
            mermaid.append(f"    {name}({name})")

    mermaid.append("    %% Edges")

    # Add all edges
    for edge in edges:
        source = edge.source

        if edge.condition is None:
            # Simple edges
            for target in edge.targets:
                mermaid.append(f"    {source} --> {target}")
        else:
            # Conditional edges
            for target in edge.targets:
                mermaid.append(f"    {source} -.->|condition| {target}")

    mermaid.append("style START fill:#777EFE,stroke:#4F54AA,color:white")
    mermaid.append("style END fill:#777EFE,stroke:#4F54AA,color:white")

    return "\n".join(mermaid)

generate_diagram_url(mermaid_code) staticmethod

Generate URL for Mermaid diagram image.

Source code in llmfy/llmfypipe/visualizer/visualizer.py
@staticmethod
def generate_diagram_url(mermaid_code: str) -> str:
    """Generate URL for Mermaid diagram image."""
    graphbytes = mermaid_code.encode("utf8")
    base64_bytes = base64.urlsafe_b64encode(graphbytes)
    base64_string = base64_bytes.decode("ascii")
    return "https://mermaid.ink/img/" + base64_string