Skip to content

InMemoryCheckpointer

llmfy.flow_engine.checkpointer.in_memory_checkpointer

InMemoryCheckpointer

Bases: BaseCheckpointer

In-memory checkpoint storage backend.

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
class InMemoryCheckpointer(BaseCheckpointer):
    """In-memory checkpoint storage backend."""

    def __init__(self):
        """Initialize the memory checkpointer."""
        # Storage: session_id -> list of checkpoints
        self._storage: Dict[str, List[Checkpoint]] = defaultdict(list)
        # Index: checkpoint_id -> (session_id, checkpoint)
        self._index: Dict[str, tuple[str, Checkpoint]] = {}

    async def save(self, checkpoint: Checkpoint) -> None:
        """
        Save a checkpoint to memory.

        Args:
            checkpoint: The checkpoint to save
        """
        # Deep copy to prevent external modifications
        checkpoint_copy = deepcopy(checkpoint)

        session_id = checkpoint.metadata.session_id
        checkpoint_id = checkpoint.metadata.checkpoint_id

        # Add to storage
        self._storage[session_id].append(checkpoint_copy)

        # Sort by timestamp (newest first)
        self._storage[session_id].sort(
            key=lambda c: c.metadata.timestamp,
            reverse=True
        )

        # Add to index
        self._index[checkpoint_id] = (session_id, checkpoint_copy)

    async def load(self, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Checkpoint]:
        """
        Load a checkpoint from memory.

        Args:
            session_id: The session ID
            checkpoint_id: Specific checkpoint ID, or None for latest

        Returns:
            The checkpoint if found, None otherwise
        """
        if checkpoint_id:
            # Load specific checkpoint
            if checkpoint_id in self._index:
                stored_session_id, checkpoint = self._index[checkpoint_id]
                if stored_session_id == session_id:
                    return deepcopy(checkpoint)
            return None
        else:
            # Load latest checkpoint for thread
            if session_id in self._storage and self._storage[session_id]:
                return deepcopy(self._storage[session_id][0])
            return None

    async def list(self, session_id: str, limit: int = 10) -> list[Checkpoint]:
        """
        List checkpoints for a thread.

        Args:
            session_id: The session ID
            limit: Maximum number of checkpoints to return

        Returns:
            List of checkpoints, newest first
        """
        if session_id not in self._storage:
            return []

        checkpoints = self._storage[session_id][:limit]
        return [deepcopy(c) for c in checkpoints]

    async def delete(self, session_id: str, checkpoint_id: Optional[str] = None) -> None:
        """
        Delete checkpoint(s) from memory.

        Args:
            session_id: The session ID
            checkpoint_id: Specific checkpoint ID, or None to delete all for session
        """
        if checkpoint_id:
            # Delete specific checkpoint
            if checkpoint_id in self._index:
                stored_session_id, checkpoint = self._index[checkpoint_id]
                if stored_session_id == session_id:
                    # Remove from storage
                    self._storage[session_id] = [
                        c for c in self._storage[session_id]
                        if c.metadata.checkpoint_id != checkpoint_id
                    ]
                    # Remove from index
                    del self._index[checkpoint_id]

                    # Clean up empty thread storage
                    if not self._storage[session_id]:
                        del self._storage[session_id]
        else:
            # Delete all checkpoints for thread
            if session_id in self._storage:
                # Remove from index
                for checkpoint in self._storage[session_id]:
                    checkpoint_id = checkpoint.metadata.checkpoint_id
                    if checkpoint_id in self._index:
                        del self._index[checkpoint_id]

                # Remove from storage
                del self._storage[session_id]

    async def clear_all(self) -> None:
        """Clear all checkpoints from memory."""
        self._storage.clear()
        self._index.clear()

    def get_stats(self) -> Dict[str, Any]:
        """
        Get storage statistics.

        Returns:
            Dictionary with statistics
        """
        return {
            "total_sessions": len(self._storage),
            "total_checkpoints": len(self._index),
            "checkpoints_per_session": {
                session_id: len(checkpoints)
                for session_id, checkpoints in self._storage.items()
            }
        }

__init__()

Initialize the memory checkpointer.

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
def __init__(self):
    """Initialize the memory checkpointer."""
    # Storage: session_id -> list of checkpoints
    self._storage: Dict[str, List[Checkpoint]] = defaultdict(list)
    # Index: checkpoint_id -> (session_id, checkpoint)
    self._index: Dict[str, tuple[str, Checkpoint]] = {}

save(checkpoint) async

Save a checkpoint to memory.

Parameters:

Name Type Description Default
checkpoint Checkpoint

The checkpoint to save

required
Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
async def save(self, checkpoint: Checkpoint) -> None:
    """
    Save a checkpoint to memory.

    Args:
        checkpoint: The checkpoint to save
    """
    # Deep copy to prevent external modifications
    checkpoint_copy = deepcopy(checkpoint)

    session_id = checkpoint.metadata.session_id
    checkpoint_id = checkpoint.metadata.checkpoint_id

    # Add to storage
    self._storage[session_id].append(checkpoint_copy)

    # Sort by timestamp (newest first)
    self._storage[session_id].sort(
        key=lambda c: c.metadata.timestamp,
        reverse=True
    )

    # Add to index
    self._index[checkpoint_id] = (session_id, checkpoint_copy)

load(session_id, checkpoint_id=None) async

Load a checkpoint from memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required
checkpoint_id Optional[str]

Specific checkpoint ID, or None for latest

None

Returns:

Type Description
Optional[Checkpoint]

The checkpoint if found, None otherwise

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
async def load(self, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Checkpoint]:
    """
    Load a checkpoint from memory.

    Args:
        session_id: The session ID
        checkpoint_id: Specific checkpoint ID, or None for latest

    Returns:
        The checkpoint if found, None otherwise
    """
    if checkpoint_id:
        # Load specific checkpoint
        if checkpoint_id in self._index:
            stored_session_id, checkpoint = self._index[checkpoint_id]
            if stored_session_id == session_id:
                return deepcopy(checkpoint)
        return None
    else:
        # Load latest checkpoint for thread
        if session_id in self._storage and self._storage[session_id]:
            return deepcopy(self._storage[session_id][0])
        return None

list(session_id, limit=10) async

List checkpoints for a thread.

Parameters:

Name Type Description Default
session_id str

The session ID

required
limit int

Maximum number of checkpoints to return

10

Returns:

Type Description
list[Checkpoint]

List of checkpoints, newest first

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
async def list(self, session_id: str, limit: int = 10) -> list[Checkpoint]:
    """
    List checkpoints for a thread.

    Args:
        session_id: The session ID
        limit: Maximum number of checkpoints to return

    Returns:
        List of checkpoints, newest first
    """
    if session_id not in self._storage:
        return []

    checkpoints = self._storage[session_id][:limit]
    return [deepcopy(c) for c in checkpoints]

delete(session_id, checkpoint_id=None) async

Delete checkpoint(s) from memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required
checkpoint_id Optional[str]

Specific checkpoint ID, or None to delete all for session

None
Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
async def delete(self, session_id: str, checkpoint_id: Optional[str] = None) -> None:
    """
    Delete checkpoint(s) from memory.

    Args:
        session_id: The session ID
        checkpoint_id: Specific checkpoint ID, or None to delete all for session
    """
    if checkpoint_id:
        # Delete specific checkpoint
        if checkpoint_id in self._index:
            stored_session_id, checkpoint = self._index[checkpoint_id]
            if stored_session_id == session_id:
                # Remove from storage
                self._storage[session_id] = [
                    c for c in self._storage[session_id]
                    if c.metadata.checkpoint_id != checkpoint_id
                ]
                # Remove from index
                del self._index[checkpoint_id]

                # Clean up empty thread storage
                if not self._storage[session_id]:
                    del self._storage[session_id]
    else:
        # Delete all checkpoints for thread
        if session_id in self._storage:
            # Remove from index
            for checkpoint in self._storage[session_id]:
                checkpoint_id = checkpoint.metadata.checkpoint_id
                if checkpoint_id in self._index:
                    del self._index[checkpoint_id]

            # Remove from storage
            del self._storage[session_id]

clear_all() async

Clear all checkpoints from memory.

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
async def clear_all(self) -> None:
    """Clear all checkpoints from memory."""
    self._storage.clear()
    self._index.clear()

get_stats()

Get storage statistics.

Returns:

Type Description
Dict[str, Any]

Dictionary with statistics

Source code in llmfy/flow_engine/checkpointer/in_memory_checkpointer.py
def get_stats(self) -> Dict[str, Any]:
    """
    Get storage statistics.

    Returns:
        Dictionary with statistics
    """
    return {
        "total_sessions": len(self._storage),
        "total_checkpoints": len(self._index),
        "checkpoints_per_session": {
            session_id: len(checkpoints)
            for session_id, checkpoints in self._storage.items()
        }
    }