Skip to content

Queues

MessageQueue

Bases: Queue

Queue implementation using Valkey/Redis for message tasks.

This queue stores serialized JSON-compatible tasks in a Redis list. Tasks can be enqueued, dequeued singly or in batches.

Attributes:

Name Type Description
client Any

Redis or Valkey client instance used for queue operations.

queue_name str

Name/key of the Redis list representing the queue.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
class MessageQueue(Queue):
    """Queue implementation using Valkey/Redis for message tasks.

    This queue stores serialized JSON-compatible tasks in a Redis list.
    Tasks can be enqueued, dequeued singly or in batches.

    Attributes:
        client (Any): Redis or Valkey client instance used for queue operations.
        queue_name (str): Name/key of the Redis list representing the queue.
    """

    def __init__(self, client: Any, queue_name: str = "queue"):
        """
        Initialize a MessageQueue instance.

        Args:
            client (Any): Redis/Valkey client instance.
            queue_name (str, optional): Name of the queue (Redis list key). Defaults to "queue".
        """
        self.client = client
        self.queue_name = queue_name
        logger.info(f"Initialized MessageQueue with name: {queue_name}")

    def enqueue(self, task_data: Any) -> None:
        """
        Add a task to the queue.

        Serializes the task to JSON if it is a dict or list before pushing.

        Args:
            task_data (Any): The task data to enqueue. Can be any JSON-serializable object or string.
        """
        if isinstance(task_data, (dict, list)):
            task_data = json.dumps(task_data)
        self.client.lpush(self.queue_name, task_data)
        logger.debug(f"Enqueued task to {self.queue_name}")

    def dequeue(self, timeout: float = 0.1) -> Any:
        """
        Remove and return a single task from the queue.

        Performs a blocking pop with a timeout.

        Args:
            timeout (float, optional): Timeout in seconds to wait for a task before returning None. Defaults to 0.1.

        Returns:
            Any: The dequeued task, parsed from JSON if possible, or raw string if not JSON. Returns None if no task is available.
        """
        result = self.client.brpop(self.queue_name, timeout=timeout)
        if not result:
            return None

        data = result[1]
        try:
            return json.loads(data)
        except json.JSONDecodeError:
            return data

    def batch_dequeue(self, batch_size: int = 10) -> List[Any]:
        """
        Remove and return multiple tasks from the queue in a batch.

        Uses a Redis pipeline to pop multiple items atomically.

        Args:
            batch_size (int, optional): Number of tasks to dequeue. Defaults to 10.

        Returns:
            List[Any]: List of dequeued tasks, each parsed from JSON if possible, otherwise raw string.
        """
        pipeline = self.client.pipeline()
        pipeline.multi()
        for _ in range(batch_size):
            pipeline.rpop(self.queue_name)
        results = pipeline.execute()

        processed_results = []
        for result in results:
            if result is None:
                continue
            try:
                processed_results.append(json.loads(result))
            except json.JSONDecodeError:
                processed_results.append(result)

        return processed_results

    def queue_size(self) -> int:
        """
        Get the current size of the queue.

        Returns:
            int: Number of tasks currently in the queue.
        """
        return self.client.llen(self.queue_name)

__init__(client, queue_name='queue')

Initialize a MessageQueue instance.

Parameters:

Name Type Description Default
client Any

Redis/Valkey client instance.

required
queue_name str

Name of the queue (Redis list key). Defaults to "queue".

'queue'
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
def __init__(self, client: Any, queue_name: str = "queue"):
    """
    Initialize a MessageQueue instance.

    Args:
        client (Any): Redis/Valkey client instance.
        queue_name (str, optional): Name of the queue (Redis list key). Defaults to "queue".
    """
    self.client = client
    self.queue_name = queue_name
    logger.info(f"Initialized MessageQueue with name: {queue_name}")

batch_dequeue(batch_size=10)

Remove and return multiple tasks from the queue in a batch.

Uses a Redis pipeline to pop multiple items atomically.

Parameters:

Name Type Description Default
batch_size int

Number of tasks to dequeue. Defaults to 10.

10

Returns:

Type Description
List[Any]

List[Any]: List of dequeued tasks, each parsed from JSON if possible, otherwise raw string.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
def batch_dequeue(self, batch_size: int = 10) -> List[Any]:
    """
    Remove and return multiple tasks from the queue in a batch.

    Uses a Redis pipeline to pop multiple items atomically.

    Args:
        batch_size (int, optional): Number of tasks to dequeue. Defaults to 10.

    Returns:
        List[Any]: List of dequeued tasks, each parsed from JSON if possible, otherwise raw string.
    """
    pipeline = self.client.pipeline()
    pipeline.multi()
    for _ in range(batch_size):
        pipeline.rpop(self.queue_name)
    results = pipeline.execute()

    processed_results = []
    for result in results:
        if result is None:
            continue
        try:
            processed_results.append(json.loads(result))
        except json.JSONDecodeError:
            processed_results.append(result)

    return processed_results

dequeue(timeout=0.1)

Remove and return a single task from the queue.

Performs a blocking pop with a timeout.

Parameters:

Name Type Description Default
timeout float

Timeout in seconds to wait for a task before returning None. Defaults to 0.1.

0.1

Returns:

Name Type Description
Any Any

The dequeued task, parsed from JSON if possible, or raw string if not JSON. Returns None if no task is available.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
def dequeue(self, timeout: float = 0.1) -> Any:
    """
    Remove and return a single task from the queue.

    Performs a blocking pop with a timeout.

    Args:
        timeout (float, optional): Timeout in seconds to wait for a task before returning None. Defaults to 0.1.

    Returns:
        Any: The dequeued task, parsed from JSON if possible, or raw string if not JSON. Returns None if no task is available.
    """
    result = self.client.brpop(self.queue_name, timeout=timeout)
    if not result:
        return None

    data = result[1]
    try:
        return json.loads(data)
    except json.JSONDecodeError:
        return data

enqueue(task_data)

Add a task to the queue.

Serializes the task to JSON if it is a dict or list before pushing.

Parameters:

Name Type Description Default
task_data Any

The task data to enqueue. Can be any JSON-serializable object or string.

required
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
def enqueue(self, task_data: Any) -> None:
    """
    Add a task to the queue.

    Serializes the task to JSON if it is a dict or list before pushing.

    Args:
        task_data (Any): The task data to enqueue. Can be any JSON-serializable object or string.
    """
    if isinstance(task_data, (dict, list)):
        task_data = json.dumps(task_data)
    self.client.lpush(self.queue_name, task_data)
    logger.debug(f"Enqueued task to {self.queue_name}")

queue_size()

Get the current size of the queue.

Returns:

Name Type Description
int int

Number of tasks currently in the queue.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
def queue_size(self) -> int:
    """
    Get the current size of the queue.

    Returns:
        int: Number of tasks currently in the queue.
    """
    return self.client.llen(self.queue_name)

Queue

Bases: ABC

Abstract base class defining the interface for queue implementations.

This class enforces methods for enqueuing, dequeuing (single and batch), and querying the size of the queue. Subclasses must implement these methods according to the underlying queue mechanism.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
class Queue(ABC):
    """Abstract base class defining the interface for queue implementations.

    This class enforces methods for enqueuing, dequeuing (single and batch), and querying
    the size of the queue. Subclasses must implement these methods according to
    the underlying queue mechanism.
    """

    @abstractmethod
    def enqueue(self, task_data: Any) -> None:
        """
        Add an item to the queue.

        Args:
            task_data (Any): The item to be added to the queue.
        """
        pass

    @abstractmethod
    def dequeue(self) -> Any:
        """
        Remove and return a single item from the queue.

        Returns:
            Any: The item removed from the queue. Should return None if the queue is empty.
        """
        pass

    @abstractmethod
    def batch_dequeue(self, batch_size: int) -> List[Any]:
        """
        Remove and return multiple items from the queue.

        Args:
            batch_size (int): Number of items to remove from the queue.

        Returns:
            List[Any]: A list of items removed from the queue. The list can be shorter than
            batch_size if the queue has fewer items.
        """
        pass

    @abstractmethod
    def queue_size(self) -> int:
        """
        Return the current number of items in the queue.

        Returns:
            int: The count of items currently in the queue.
        """
        pass

batch_dequeue(batch_size) abstractmethod

Remove and return multiple items from the queue.

Parameters:

Name Type Description Default
batch_size int

Number of items to remove from the queue.

required

Returns:

Type Description
List[Any]

List[Any]: A list of items removed from the queue. The list can be shorter than

List[Any]

batch_size if the queue has fewer items.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
@abstractmethod
def batch_dequeue(self, batch_size: int) -> List[Any]:
    """
    Remove and return multiple items from the queue.

    Args:
        batch_size (int): Number of items to remove from the queue.

    Returns:
        List[Any]: A list of items removed from the queue. The list can be shorter than
        batch_size if the queue has fewer items.
    """
    pass

dequeue() abstractmethod

Remove and return a single item from the queue.

Returns:

Name Type Description
Any Any

The item removed from the queue. Should return None if the queue is empty.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
@abstractmethod
def dequeue(self) -> Any:
    """
    Remove and return a single item from the queue.

    Returns:
        Any: The item removed from the queue. Should return None if the queue is empty.
    """
    pass

enqueue(task_data) abstractmethod

Add an item to the queue.

Parameters:

Name Type Description Default
task_data Any

The item to be added to the queue.

required
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
@abstractmethod
def enqueue(self, task_data: Any) -> None:
    """
    Add an item to the queue.

    Args:
        task_data (Any): The item to be added to the queue.
    """
    pass

queue_size() abstractmethod

Return the current number of items in the queue.

Returns:

Name Type Description
int int

The count of items currently in the queue.

Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
@abstractmethod
def queue_size(self) -> int:
    """
    Return the current number of items in the queue.

    Returns:
        int: The count of items currently in the queue.
    """
    pass