Queues
MessageQueue
¶
Bases: Queue
Queue implementation using Valkey/Redis for message tasks.
This queue stores serialized JSON-compatible tasks in a Redis list. Tasks can be enqueued, dequeued singly or in batches.
Attributes:
| Name | Type | Description |
|---|---|---|
client |
Any
|
Redis or Valkey client instance used for queue operations. |
queue_name |
str
|
Name/key of the Redis list representing the queue. |
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
__init__(client, queue_name='queue')
¶
Initialize a MessageQueue instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
Any
|
Redis/Valkey client instance. |
required |
queue_name
|
str
|
Name of the queue (Redis list key). Defaults to "queue". |
'queue'
|
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
batch_dequeue(batch_size=10)
¶
Remove and return multiple tasks from the queue in a batch.
Uses a Redis pipeline to pop multiple items atomically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of tasks to dequeue. Defaults to 10. |
10
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: List of dequeued tasks, each parsed from JSON if possible, otherwise raw string. |
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
dequeue(timeout=0.1)
¶
Remove and return a single task from the queue.
Performs a blocking pop with a timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Timeout in seconds to wait for a task before returning None. Defaults to 0.1. |
0.1
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The dequeued task, parsed from JSON if possible, or raw string if not JSON. Returns None if no task is available. |
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
enqueue(task_data)
¶
Add a task to the queue.
Serializes the task to JSON if it is a dict or list before pushing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_data
|
Any
|
The task data to enqueue. Can be any JSON-serializable object or string. |
required |
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/message.py
queue_size()
¶
Get the current size of the queue.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
Number of tasks currently in the queue. |
Queue
¶
Bases: ABC
Abstract base class defining the interface for queue implementations.
This class enforces methods for enqueuing, dequeuing (single and batch), and querying the size of the queue. Subclasses must implement these methods according to the underlying queue mechanism.
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
batch_dequeue(batch_size)
abstractmethod
¶
Remove and return multiple items from the queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
batch_size
|
int
|
Number of items to remove from the queue. |
required |
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: A list of items removed from the queue. The list can be shorter than |
List[Any]
|
batch_size if the queue has fewer items. |
Source code in .venv/lib/python3.12/site-packages/ytindexer/queues/base.py
dequeue()
abstractmethod
¶
Remove and return a single item from the queue.
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The item removed from the queue. Should return None if the queue is empty. |
enqueue(task_data)
abstractmethod
¶
Add an item to the queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_data
|
Any
|
The item to be added to the queue. |
required |
queue_size()
abstractmethod
¶
Return the current number of items in the queue.
Returns:
| Name | Type | Description |
|---|---|---|
int |
int
|
The count of items currently in the queue. |