Skip to content

Worker

YouTubeNotificationParser

Parser for YouTube PubSubHubbub notification XML data.

Attributes:

Name Type Description
namespaces dict

XML namespaces used for parsing.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/parser.py
class YouTubeNotificationParser:
    """
    Parser for YouTube PubSubHubbub notification XML data.

    Attributes:
        namespaces (dict): XML namespaces used for parsing.
    """

    namespaces = {
        "xmlns": "http://www.w3.org/2005/Atom",
        "yt": "http://www.youtube.com/xml/schemas/2015",
    }

    @staticmethod
    def _find_text(entry: ET.Element, tag: str) -> Optional[str]:
        """
        Find text content for a given tag inside an XML element.

        Args:
            entry (ET.Element): XML element to search within.
            tag (str): Tag name to find.

        Returns:
            Optional[str]: Text content of the found element or None if not found.
        """
        elem = entry.find(tag, YouTubeNotificationParser.namespaces)
        return elem.text if elem is not None else None

    @staticmethod
    def _find_link(entry: ET.Element) -> Optional[str]:
        """
        Find the 'href' attribute from the link element inside the XML entry.

        Args:
            entry (ET.Element): XML element to search within.

        Returns:
            Optional[str]: URL string if found, otherwise None.
        """
        link_elem = entry.find("xmlns:link", YouTubeNotificationParser.namespaces)
        return link_elem.get("href") if link_elem is not None else None

    @staticmethod
    def parse(xml_data: str) -> Optional[YouTubeNotification]:
        """
        Parse YouTube PubSubHubbub notification XML string into a YouTubeNotification model.

        Args:
            xml_data (str): XML string of the notification.

        Returns:
            Optional[YouTubeNotification]: Parsed notification object or None if parsing fails.
        """
        try:
            root = ET.fromstring(xml_data)
            entry = root.find("xmlns:entry", YouTubeNotificationParser.namespaces)
            if entry is None:
                logger.warning("No entry found in notification XML")
                return None

            video_id = YouTubeNotificationParser._find_text(entry, "yt:videoId")
            if not video_id:
                logger.warning("Missing video ID in notification")
                return None

            notification = YouTubeNotification(
                video_id=video_id,
                channel_id=YouTubeNotificationParser._find_text(entry, "yt:channelId"),
                title=YouTubeNotificationParser._find_text(entry, "xmlns:title"),
                published=YouTubeNotificationParser._find_text(
                    entry, "xmlns:published"
                ),
                updated=YouTubeNotificationParser._find_text(entry, "xmlns:updated"),
                link=YouTubeNotificationParser._find_link(entry),
                author=YouTubeNotificationParser._find_text(
                    entry, "./xmlns:author/xmlns:name"
                ),
                processed_at=datetime.now(timezone.utc),
                source="pubsubhubbub",
            )
            return notification

        except Exception as e:
            logger.error(f"Failed to parse notification XML: {e}")
            logger.debug(f"Notification data: {xml_data}")
            return None

parse(xml_data) staticmethod

Parse YouTube PubSubHubbub notification XML string into a YouTubeNotification model.

Parameters:

Name Type Description Default
xml_data str

XML string of the notification.

required

Returns:

Type Description
Optional[YouTubeNotification]

Optional[YouTubeNotification]: Parsed notification object or None if parsing fails.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/parser.py
@staticmethod
def parse(xml_data: str) -> Optional[YouTubeNotification]:
    """
    Parse YouTube PubSubHubbub notification XML string into a YouTubeNotification model.

    Args:
        xml_data (str): XML string of the notification.

    Returns:
        Optional[YouTubeNotification]: Parsed notification object or None if parsing fails.
    """
    try:
        root = ET.fromstring(xml_data)
        entry = root.find("xmlns:entry", YouTubeNotificationParser.namespaces)
        if entry is None:
            logger.warning("No entry found in notification XML")
            return None

        video_id = YouTubeNotificationParser._find_text(entry, "yt:videoId")
        if not video_id:
            logger.warning("Missing video ID in notification")
            return None

        notification = YouTubeNotification(
            video_id=video_id,
            channel_id=YouTubeNotificationParser._find_text(entry, "yt:channelId"),
            title=YouTubeNotificationParser._find_text(entry, "xmlns:title"),
            published=YouTubeNotificationParser._find_text(
                entry, "xmlns:published"
            ),
            updated=YouTubeNotificationParser._find_text(entry, "xmlns:updated"),
            link=YouTubeNotificationParser._find_link(entry),
            author=YouTubeNotificationParser._find_text(
                entry, "./xmlns:author/xmlns:name"
            ),
            processed_at=datetime.now(timezone.utc),
            source="pubsubhubbub",
        )
        return notification

    except Exception as e:
        logger.error(f"Failed to parse notification XML: {e}")
        logger.debug(f"Notification data: {xml_data}")
        return None

YouTubeNotificationProcessor

Processes YouTube PubSubHubbub notifications from the queue using the parser.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/processor.py
class YouTubeNotificationProcessor:
    """
    Processes YouTube PubSubHubbub notifications from the queue using the parser.
    """

    def __init__(self, notification_queue: Queue, output_queue: Queue, parser: Any):
        self.notification_queue = notification_queue
        self.output_queue = output_queue
        self.parser = parser
        self._shutdown_event = asyncio.Event()

    async def process_notification(self, xml_data: str) -> Optional[Dict[str, Any]]:
        """
        Process a single YouTube notification XML.

        Args:
            xml_data (str): Raw XML data from the notification.

        Returns:
            Optional[Dict[str, Any]]: Extracted video metadata if successful, otherwise None.
        """
        try:
            metadata = self.parser.parse(xml_data)
            if metadata is None:
                logger.warning("Notification processing returned None metadata")
            return metadata
        except Exception as e:
            logger.error(f"Failed to process notification: {e}")
            logger.debug(traceback.format_exc())
            return None

    async def process_batch(self, batch_size: int = 10) -> int:
        """
        Process a batch of notifications from the queue.

        Args:
            batch_size (int): Number of notifications to process in a batch.

        Returns:
            int: Number of successfully processed notifications.
        """
        notifications = []
        processed = 0

        for _ in range(batch_size):
            notification = self.notification_queue.dequeue(timeout=0.1)
            if notification is None:
                break
            notifications.append(notification)

        if not notifications:
            return 0

        tasks = [
            self.process_notification(notification) for notification in notifications
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in results:
            if isinstance(result, Exception):
                logger.error(f"Error during notification processing: {result}")
                continue
            if result is not None:
                if isinstance(result, dict):
                    self.output_queue.enqueue(result)
                elif isinstance(result, YouTubeNotification):
                    self.output_queue.enqueue(result.model_dump_json())
                else:
                    raise ValueError(f"Can't equeue payload with type: {type(result)}")
                processed += 1

        logger.info(f"Processed {processed}/{len(notifications)} notifications")
        return processed

    async def run(self, poll_interval: float = 0.5):
        """
        Run the worker process to continuously process notifications.

        Args:
            poll_interval (float): Time to wait between polling the queue.
        """
        logger.info("Starting YouTube notification processor worker")

        try:
            while not self._shutdown_event.is_set():
                queue_size = self.notification_queue.queue_size()

                if queue_size > 0:
                    logger.debug(f"Queue has {queue_size} notifications pending")
                    await self.process_batch()
                else:
                    await asyncio.sleep(poll_interval)

        except asyncio.CancelledError:
            logger.info("Worker cancelled gracefully")
            raise
        except Exception as e:
            logger.error(f"Worker encountered an error: {str(e)}")
            logger.error(traceback.format_exc())

    def shutdown(self):
        """
        Signal the processor to shut down gracefully.
        """
        self._shutdown_event.set()

process_batch(batch_size=10) async

Process a batch of notifications from the queue.

Parameters:

Name Type Description Default
batch_size int

Number of notifications to process in a batch.

10

Returns:

Name Type Description
int int

Number of successfully processed notifications.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/processor.py
async def process_batch(self, batch_size: int = 10) -> int:
    """
    Process a batch of notifications from the queue.

    Args:
        batch_size (int): Number of notifications to process in a batch.

    Returns:
        int: Number of successfully processed notifications.
    """
    notifications = []
    processed = 0

    for _ in range(batch_size):
        notification = self.notification_queue.dequeue(timeout=0.1)
        if notification is None:
            break
        notifications.append(notification)

    if not notifications:
        return 0

    tasks = [
        self.process_notification(notification) for notification in notifications
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            logger.error(f"Error during notification processing: {result}")
            continue
        if result is not None:
            if isinstance(result, dict):
                self.output_queue.enqueue(result)
            elif isinstance(result, YouTubeNotification):
                self.output_queue.enqueue(result.model_dump_json())
            else:
                raise ValueError(f"Can't equeue payload with type: {type(result)}")
            processed += 1

    logger.info(f"Processed {processed}/{len(notifications)} notifications")
    return processed

process_notification(xml_data) async

Process a single YouTube notification XML.

Parameters:

Name Type Description Default
xml_data str

Raw XML data from the notification.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Optional[Dict[str, Any]]: Extracted video metadata if successful, otherwise None.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/processor.py
async def process_notification(self, xml_data: str) -> Optional[Dict[str, Any]]:
    """
    Process a single YouTube notification XML.

    Args:
        xml_data (str): Raw XML data from the notification.

    Returns:
        Optional[Dict[str, Any]]: Extracted video metadata if successful, otherwise None.
    """
    try:
        metadata = self.parser.parse(xml_data)
        if metadata is None:
            logger.warning("Notification processing returned None metadata")
        return metadata
    except Exception as e:
        logger.error(f"Failed to process notification: {e}")
        logger.debug(traceback.format_exc())
        return None

run(poll_interval=0.5) async

Run the worker process to continuously process notifications.

Parameters:

Name Type Description Default
poll_interval float

Time to wait between polling the queue.

0.5
Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/processor.py
async def run(self, poll_interval: float = 0.5):
    """
    Run the worker process to continuously process notifications.

    Args:
        poll_interval (float): Time to wait between polling the queue.
    """
    logger.info("Starting YouTube notification processor worker")

    try:
        while not self._shutdown_event.is_set():
            queue_size = self.notification_queue.queue_size()

            if queue_size > 0:
                logger.debug(f"Queue has {queue_size} notifications pending")
                await self.process_batch()
            else:
                await asyncio.sleep(poll_interval)

    except asyncio.CancelledError:
        logger.info("Worker cancelled gracefully")
        raise
    except Exception as e:
        logger.error(f"Worker encountered an error: {str(e)}")
        logger.error(traceback.format_exc())

shutdown()

Signal the processor to shut down gracefully.

Source code in .venv/lib/python3.12/site-packages/ytindexer/worker/processor.py
def shutdown(self):
    """
    Signal the processor to shut down gracefully.
    """
    self._shutdown_event.set()