Skip to content

API

ChannelStatsService

Bases: HealthCheckable

Handles channel statistics updates

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
class ChannelStatsService(HealthCheckable):
    """Handles channel statistics updates"""

    def __init__(self, client: Any, config: MongoDBConfig, retry_config: RetryConfig):
        self.client = client
        self.config = config
        self.retry = RetryableOperation(retry_config)
        self.db = self.client[config.database_name]
        self.channels_collection = self.db[config.channels_collection]
        logger.info("Initialized ChannelStatsService")

    async def ensure_indices(self) -> OperationResult:
        """Ensure the required database indices exist"""
        created_indexes = []
        failed_indexes = []

        for index_name, index_config in self.config.channel_indexes.items():
            try:
                await self.channels_collection.create_index(index_name, **index_config)
                created_indexes.append(index_name)
                logger.debug(f"Created index: {index_name}")
            except OperationFailure as e:
                if "already exists" in str(e):
                    logger.debug(f"Index '{index_name}' already exists")
                    created_indexes.append(index_name)
                else:
                    failed_indexes.append(index_name)
                    logger.error(f"Failed to create index {index_name}: {str(e)}")

        if failed_indexes:
            return OperationResult.failure(
                f"Failed to create indexes: {failed_indexes}",
                metadata={"created": created_indexes, "failed": failed_indexes},
            )
        else:
            return OperationResult.success(
                f"Ensured indexes: {created_indexes}",
                metadata={"indexes": created_indexes},
            )

    async def update_channel_stats(self, video_data: Dict[str, Any]) -> OperationResult:
        """Update channel statistics based on video data with retry logic"""
        channel_id = video_data.get("channel_id")
        if not channel_id:
            return OperationResult.failure("Video data missing channel_id")

        async def _update_operation():
            result = await self.channels_collection.update_one(
                {"channel_id": channel_id},
                {
                    "$inc": {"video_count": 1},
                    "$set": {"last_activity": datetime.now(timezone.utc)},
                    "$setOnInsert": {"first_seen": datetime.now(timezone.utc)},
                },
                upsert=True,
            )
            return result

        try:
            result = await self.retry.execute(
                _update_operation, f"update_channel_stats_{channel_id}"
            )
            logger.debug(f"Updated channel stats: {channel_id}")

            action = "updated" if result.matched_count > 0 else "created"
            return OperationResult.success(
                f"Channel stats {action}: {channel_id}",
                metadata={"channel_id": channel_id, "action": action},
            )
        except Exception as e:
            logger.error(f"Failed to update channel stats: {str(e)}")
            logger.debug(traceback.format_exc())
            return OperationResult.failure(
                f"Failed to update channel stats: {str(e)}", e
            )

    async def health_check(self) -> HealthStatus:
        """Check MongoDB connection health for channels collection"""
        start_time = time.time()
        try:
            # Test access to channels collection
            await self.channels_collection.count_documents({}, limit=1)
            response_time = (time.time() - start_time) * 1000

            return HealthStatus(
                service_name="mongodb_channels",
                is_healthy=True,
                response_time_ms=response_time,
                message="Channels collection accessible",
            )
        except Exception as e:
            response_time = (time.time() - start_time) * 1000
            return HealthStatus(
                service_name="mongodb_channels",
                is_healthy=False,
                response_time_ms=response_time,
                message=f"Health check failed: {str(e)}",
            )

ensure_indices() async

Ensure the required database indices exist

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
async def ensure_indices(self) -> OperationResult:
    """Ensure the required database indices exist"""
    created_indexes = []
    failed_indexes = []

    for index_name, index_config in self.config.channel_indexes.items():
        try:
            await self.channels_collection.create_index(index_name, **index_config)
            created_indexes.append(index_name)
            logger.debug(f"Created index: {index_name}")
        except OperationFailure as e:
            if "already exists" in str(e):
                logger.debug(f"Index '{index_name}' already exists")
                created_indexes.append(index_name)
            else:
                failed_indexes.append(index_name)
                logger.error(f"Failed to create index {index_name}: {str(e)}")

    if failed_indexes:
        return OperationResult.failure(
            f"Failed to create indexes: {failed_indexes}",
            metadata={"created": created_indexes, "failed": failed_indexes},
        )
    else:
        return OperationResult.success(
            f"Ensured indexes: {created_indexes}",
            metadata={"indexes": created_indexes},
        )

health_check() async

Check MongoDB connection health for channels collection

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
async def health_check(self) -> HealthStatus:
    """Check MongoDB connection health for channels collection"""
    start_time = time.time()
    try:
        # Test access to channels collection
        await self.channels_collection.count_documents({}, limit=1)
        response_time = (time.time() - start_time) * 1000

        return HealthStatus(
            service_name="mongodb_channels",
            is_healthy=True,
            response_time_ms=response_time,
            message="Channels collection accessible",
        )
    except Exception as e:
        response_time = (time.time() - start_time) * 1000
        return HealthStatus(
            service_name="mongodb_channels",
            is_healthy=False,
            response_time_ms=response_time,
            message=f"Health check failed: {str(e)}",
        )

update_channel_stats(video_data) async

Update channel statistics based on video data with retry logic

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
async def update_channel_stats(self, video_data: Dict[str, Any]) -> OperationResult:
    """Update channel statistics based on video data with retry logic"""
    channel_id = video_data.get("channel_id")
    if not channel_id:
        return OperationResult.failure("Video data missing channel_id")

    async def _update_operation():
        result = await self.channels_collection.update_one(
            {"channel_id": channel_id},
            {
                "$inc": {"video_count": 1},
                "$set": {"last_activity": datetime.now(timezone.utc)},
                "$setOnInsert": {"first_seen": datetime.now(timezone.utc)},
            },
            upsert=True,
        )
        return result

    try:
        result = await self.retry.execute(
            _update_operation, f"update_channel_stats_{channel_id}"
        )
        logger.debug(f"Updated channel stats: {channel_id}")

        action = "updated" if result.matched_count > 0 else "created"
        return OperationResult.success(
            f"Channel stats {action}: {channel_id}",
            metadata={"channel_id": channel_id, "action": action},
        )
    except Exception as e:
        logger.error(f"Failed to update channel stats: {str(e)}")
        logger.debug(traceback.format_exc())
        return OperationResult.failure(
            f"Failed to update channel stats: {str(e)}", e
        )

ElasticsearchConfig dataclass

Configuration for Elasticsearch indexing and search operations.

This class manages Elasticsearch-specific settings including index configuration, field mappings, and analysis settings. It provides a computed mapping property that generates the complete index configuration based on the configured parameters.

The mapping includes optimized field types for video metadata, proper analyzers for text search, and index settings for performance tuning.

Attributes:

Name Type Description
index_name str

Name of the Elasticsearch index for storing video documents.

shards int

Number of primary shards for the index. More shards allow better distribution across nodes but increase overhead.

replicas int

Number of replica shards for each primary shard. Replicas provide redundancy and can improve search throughput.

Example

config = ElasticsearchConfig( ... index_name="videos_production", ... shards=3, ... replicas=2 ... )

Create index with the computed mapping

es_client.indices.create( ... index=config.index_name, ... body=config.mapping ... )

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
@dataclass
class ElasticsearchConfig:
    """Configuration for Elasticsearch indexing and search operations.

    This class manages Elasticsearch-specific settings including index configuration,
    field mappings, and analysis settings. It provides a computed mapping property
    that generates the complete index configuration based on the configured parameters.

    The mapping includes optimized field types for video metadata, proper analyzers
    for text search, and index settings for performance tuning.

    Attributes:
        index_name: Name of the Elasticsearch index for storing video documents.
        shards: Number of primary shards for the index. More shards allow better
            distribution across nodes but increase overhead.
        replicas: Number of replica shards for each primary shard. Replicas provide
            redundancy and can improve search throughput.

    Example:
        >>> config = ElasticsearchConfig(
        ...     index_name="videos_production",
        ...     shards=3,
        ...     replicas=2
        ... )
        >>> 
        >>> # Create index with the computed mapping
        >>> es_client.indices.create(
        ...     index=config.index_name,
        ...     body=config.mapping
        ... )
    """

    index_name: str = "videos"
    shards: int = 1
    replicas: int = 0

    @property
    def mapping(self) -> Dict[str, Any]:
        """Generate the complete Elasticsearch index mapping and settings.

        Creates a comprehensive mapping configuration that defines how video
        documents are indexed and stored. The mapping includes:

        - Keyword fields for exact matching (video_id, channel_id, tags)
        - Text fields with standard analyzer for full-text search
        - Multi-field configurations for both search and aggregations
        - Appropriate data types for metrics and timestamps
        - Index settings based on configured shard and replica counts

        Returns:
            Dictionary containing the complete Elasticsearch mapping configuration
            with both field mappings and index settings.

        Note:
            The mapping is generated dynamically based on current attribute values,
            so changes to shards or replicas will be reflected in subsequent calls.

        Example:
            >>> config = ElasticsearchConfig(shards=2, replicas=1)
            >>> mapping = config.mapping
            >>> print(mapping["settings"]["number_of_shards"])  # 2
            >>> 
            >>> # Text fields support both search and keyword aggregations
            >>> title_mapping = mapping["mappings"]["properties"]["title"]
            >>> print(title_mapping["type"])  # "text"
            >>> print(title_mapping["fields"]["keyword"]["type"])  # "keyword"
        """
        return {
            "mappings": {
                "properties": {
                    "video_id": {"type": "keyword"},
                    "channel_id": {"type": "keyword"},
                    "title": {
                        "type": "text",
                        "analyzer": "standard",
                        "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                    },
                    "description": {"type": "text", "analyzer": "standard"},
                    "published": {"type": "date"},
                    "updated": {"type": "date"},
                    "author": {
                        "type": "text",
                        "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
                    },
                    "tags": {"type": "keyword"},
                    "categories": {"type": "keyword"},
                    "duration": {"type": "integer"},
                    "view_count": {"type": "long"},
                    "like_count": {"type": "long"},
                    "comment_count": {"type": "long"},
                    "processed_at": {"type": "date"},
                }
            },
            "settings": {
                "number_of_shards": self.shards,
                "number_of_replicas": self.replicas,
            },
        }

mapping property

Generate the complete Elasticsearch index mapping and settings.

Creates a comprehensive mapping configuration that defines how video documents are indexed and stored. The mapping includes:

  • Keyword fields for exact matching (video_id, channel_id, tags)
  • Text fields with standard analyzer for full-text search
  • Multi-field configurations for both search and aggregations
  • Appropriate data types for metrics and timestamps
  • Index settings based on configured shard and replica counts

Returns:

Type Description
Dict[str, Any]

Dictionary containing the complete Elasticsearch mapping configuration

Dict[str, Any]

with both field mappings and index settings.

Note

The mapping is generated dynamically based on current attribute values, so changes to shards or replicas will be reflected in subsequent calls.

Example

config = ElasticsearchConfig(shards=2, replicas=1) mapping = config.mapping print(mapping["settings"]["number_of_shards"]) # 2

Text fields support both search and keyword aggregations

title_mapping = mapping["mappings"]["properties"]["title"] print(title_mapping["type"]) # "text" print(title_mapping["fields"]["keyword"]["type"]) # "keyword"

MongoDBConfig dataclass

Configuration for MongoDB collections and database indexes.

This class manages MongoDB-specific configuration including database and collection names, as well as index definitions for optimal query performance. It provides computed properties that generate index configurations for different collection types.

The index configurations are optimized for common query patterns including lookups by ID, filtering by channel, date-based queries, and subscription management operations.

Attributes:

Name Type Description
database_name str

Name of the MongoDB database containing the collections.

videos_collection str

Name of the collection storing video metadata documents.

channels_collection str

Name of the collection storing channel information.

Example

config = MongoDBConfig( ... database_name="video_platform", ... videos_collection="video_metadata", ... channels_collection="channel_data" ... )

Create indexes for optimal query performance

db = mongo_client[config.database_name] videos = db[config.videos_collection]

for index_config in config.video_indexes.values(): ... videos.create_index(index_config["key"], **index_config)

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
@dataclass
class MongoDBConfig:
    """Configuration for MongoDB collections and database indexes.

    This class manages MongoDB-specific configuration including database and
    collection names, as well as index definitions for optimal query performance.
    It provides computed properties that generate index configurations for
    different collection types.

    The index configurations are optimized for common query patterns including
    lookups by ID, filtering by channel, date-based queries, and subscription
    management operations.

    Attributes:
        database_name: Name of the MongoDB database containing the collections.
        videos_collection: Name of the collection storing video metadata documents.
        channels_collection: Name of the collection storing channel information.

    Example:
        >>> config = MongoDBConfig(
        ...     database_name="video_platform",
        ...     videos_collection="video_metadata",
        ...     channels_collection="channel_data"
        ... )
        >>> 
        >>> # Create indexes for optimal query performance
        >>> db = mongo_client[config.database_name]
        >>> videos = db[config.videos_collection]
        >>> 
        >>> for index_config in config.video_indexes.values():
        ...     videos.create_index(index_config["key"], **index_config)
    """

    database_name: str = "mongo"
    videos_collection: str = "videos"
    channels_collection: str = "channels"

    @property
    def video_indexes(self) -> Dict[str, Dict[str, Any]]:
        """Generate index configurations for the videos collection.

        Creates index definitions optimized for common video query patterns
        including unique video lookups, channel-based filtering, and date-based
        sorting and filtering operations.

        Returns:
            Dictionary mapping index names to their MongoDB index configurations.
            Each configuration includes the key specification, uniqueness constraint,
            and index name.

        Note:
            The video_id index enforces uniqueness to prevent duplicate video
            documents, while other indexes are non-unique to support filtering
            and sorting operations.

        Example:
            >>> config = MongoDBConfig()
            >>> indexes = config.video_indexes
            >>> 
            >>> # Unique index for video lookups
            >>> video_id_idx = indexes["video_id"]
            >>> print(video_id_idx["unique"])  # True
            >>> 
            >>> # Non-unique index for channel filtering
            >>> channel_idx = indexes["channel_id_non"]
            >>> print(channel_idx["unique"])  # False
        """
        return {
            "video_id": {
                "key": [("video_id", 1)],
                "unique": True,
                "name": "video_id_idx",
            },
            "channel_id_non": {
                "key": [("channel_id", 1)],
                "unique": False,
                "name": "channel_id_non_idx",
            },
            "published_non": {
                "key": [("published", 1)],
                "unique": False,
                "name": "published_non_idx",
            },
        }

    @property
    def channel_indexes(self) -> Dict[str, Dict[str, Any]]:
        """Generate index configurations for the channels collection.

        Creates index definitions for channel-related queries, primarily focused
        on unique channel identification and lookups.

        Returns:
            Dictionary mapping index names to their MongoDB index configurations
            for the channels collection.

        Example:
            >>> config = MongoDBConfig()
            >>> channel_indexes = config.channel_indexes
            >>> 
            >>> # Unique index for channel identification
            >>> channel_idx = channel_indexes["channel_id"]
            >>> print(channel_idx["unique"])  # True
        """
        return {
            "channel_id": {
                "key": [("channel_id", 1)],
                "unique": True,
                "name": "channel_id_idx",
            }
        }

    @property
    def subscription_indexes(self) -> Dict[str, Dict[str, Any]]:
        """Generate index configurations for the subscriptions collection.

        Creates index definitions optimized for subscription management operations
        including unique subscription lookups, expiration queries, and active
        subscription filtering. Includes a compound index for efficient queries
        on expiring active subscriptions.

        Returns:
            Dictionary mapping index names to their MongoDB index configurations
            for subscription management.

        Note:
            The compound index on expires_at and is_active enables efficient
            queries for finding subscriptions that need renewal, which is a
            common operation in subscription management workflows.

        Example:
            >>> config = MongoDBConfig()
            >>> sub_indexes = config.subscription_indexes
            >>> 
            >>> # Unique constraint on channel subscriptions
            >>> channel_idx = sub_indexes["channel_id"]
            >>> print(channel_idx["unique"])  # True
            >>> 
            >>> # Compound index for expiration queries
            >>> compound_idx = sub_indexes["expires_at_active"]
            >>> print(compound_idx)  # [("expires_at", 1), ("is_active", 1)]
        """
        return {
            "channel_id": {"unique": True},
            "expires_at": {},
            "is_active": {},
            "expires_at_active": [("expires_at", 1), ("is_active", 1)],
        }

channel_indexes property

Generate index configurations for the channels collection.

Creates index definitions for channel-related queries, primarily focused on unique channel identification and lookups.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dictionary mapping index names to their MongoDB index configurations

Dict[str, Dict[str, Any]]

for the channels collection.

Example

config = MongoDBConfig() channel_indexes = config.channel_indexes

Unique index for channel identification

channel_idx = channel_indexes["channel_id"] print(channel_idx["unique"]) # True

subscription_indexes property

Generate index configurations for the subscriptions collection.

Creates index definitions optimized for subscription management operations including unique subscription lookups, expiration queries, and active subscription filtering. Includes a compound index for efficient queries on expiring active subscriptions.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dictionary mapping index names to their MongoDB index configurations

Dict[str, Dict[str, Any]]

for subscription management.

Note

The compound index on expires_at and is_active enables efficient queries for finding subscriptions that need renewal, which is a common operation in subscription management workflows.

Example

config = MongoDBConfig() sub_indexes = config.subscription_indexes

Unique constraint on channel subscriptions

channel_idx = sub_indexes["channel_id"] print(channel_idx["unique"]) # True

Compound index for expiration queries

compound_idx = sub_indexes["expires_at_active"] print(compound_idx) # [("expires_at", 1), ("is_active", 1)]

video_indexes property

Generate index configurations for the videos collection.

Creates index definitions optimized for common video query patterns including unique video lookups, channel-based filtering, and date-based sorting and filtering operations.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dictionary mapping index names to their MongoDB index configurations.

Dict[str, Dict[str, Any]]

Each configuration includes the key specification, uniqueness constraint,

Dict[str, Dict[str, Any]]

and index name.

Note

The video_id index enforces uniqueness to prevent duplicate video documents, while other indexes are non-unique to support filtering and sorting operations.

Example

config = MongoDBConfig() indexes = config.video_indexes

Unique index for video lookups

video_id_idx = indexes["video_id"] print(video_id_idx["unique"]) # True

Non-unique index for channel filtering

channel_idx = indexes["channel_id_non"] print(channel_idx["unique"]) # False

RetryConfig dataclass

Configuration for retry logic with exponential backoff.

This class defines parameters for implementing robust retry mechanisms with exponential backoff to handle transient failures gracefully. The configuration controls retry attempts, timing, and backoff behavior.

The exponential backoff algorithm increases delays between retry attempts to reduce load on failing systems and improve the likelihood of eventual success. The max_delay parameter prevents delays from becoming excessive.

Attributes:

Name Type Description
max_attempts int

Maximum number of retry attempts before giving up. Includes the initial attempt, so max_attempts=3 means 1 initial attempt plus 2 retries.

base_delay float

Initial delay in seconds before the first retry attempt. Subsequent delays are calculated using exponential backoff.

max_delay float

Maximum delay in seconds between retry attempts. Prevents exponential backoff from creating excessively long delays.

exponential_base float

Base for exponential backoff calculation. Common values are 2.0 (doubling) or 1.5 (50% increase per attempt).

Example

Conservative retry configuration

config = RetryConfig( ... max_attempts=3, ... base_delay=1.0, ... max_delay=30.0, ... exponential_base=2.0 ... )

Aggressive retry for critical operations

critical_config = RetryConfig( ... max_attempts=10, ... base_delay=0.5, ... max_delay=120.0, ... exponential_base=1.5 ... )

Delay calculation example:

Attempt 1: base_delay * (exponential_base ^ 0) = 1.0 * 1 = 1.0s

Attempt 2: base_delay * (exponential_base ^ 1) = 1.0 * 2 = 2.0s

Attempt 3: base_delay * (exponential_base ^ 2) = 1.0 * 4 = 4.0s

Note

Actual delays include random jitter to prevent thundering herd problems when multiple clients retry simultaneously. The jitter is typically 10-30% of the calculated delay.

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
@dataclass
class RetryConfig:
    """Configuration for retry logic with exponential backoff.

    This class defines parameters for implementing robust retry mechanisms
    with exponential backoff to handle transient failures gracefully. The
    configuration controls retry attempts, timing, and backoff behavior.

    The exponential backoff algorithm increases delays between retry attempts
    to reduce load on failing systems and improve the likelihood of eventual
    success. The max_delay parameter prevents delays from becoming excessive.

    Attributes:
        max_attempts: Maximum number of retry attempts before giving up.
            Includes the initial attempt, so max_attempts=3 means 1 initial
            attempt plus 2 retries.
        base_delay: Initial delay in seconds before the first retry attempt.
            Subsequent delays are calculated using exponential backoff.
        max_delay: Maximum delay in seconds between retry attempts. Prevents
            exponential backoff from creating excessively long delays.
        exponential_base: Base for exponential backoff calculation. Common
            values are 2.0 (doubling) or 1.5 (50% increase per attempt).

    Example:
        >>> # Conservative retry configuration
        >>> config = RetryConfig(
        ...     max_attempts=3,
        ...     base_delay=1.0,
        ...     max_delay=30.0,
        ...     exponential_base=2.0
        ... )
        >>> 
        >>> # Aggressive retry for critical operations
        >>> critical_config = RetryConfig(
        ...     max_attempts=10,
        ...     base_delay=0.5,
        ...     max_delay=120.0,
        ...     exponential_base=1.5
        ... )
        >>> 
        >>> # Delay calculation example:
        >>> # Attempt 1: base_delay * (exponential_base ^ 0) = 1.0 * 1 = 1.0s
        >>> # Attempt 2: base_delay * (exponential_base ^ 1) = 1.0 * 2 = 2.0s  
        >>> # Attempt 3: base_delay * (exponential_base ^ 2) = 1.0 * 4 = 4.0s

    Note:
        Actual delays include random jitter to prevent thundering herd problems
        when multiple clients retry simultaneously. The jitter is typically
        10-30% of the calculated delay.
    """

    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0

SearchIndexingService

Bases: HealthCheckable

Handles video search indexing in Elasticsearch

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
class SearchIndexingService(HealthCheckable):
    """Handles video search indexing in Elasticsearch"""

    def __init__(
        self, client: Any, config: ElasticsearchConfig, retry_config: RetryConfig
    ):
        self.client = client
        self.config = config
        self.retry = RetryableOperation(retry_config)
        logger.info("Initialized SearchIndexingService")

    async def ensure_index(self) -> OperationResult:
        """Create Elasticsearch index if it doesn't exist"""
        try:
            if not await self.client.indices.exists(index=self.config.index_name):
                await self.client.indices.create(
                    index=self.config.index_name, body=self.config.mapping
                )
                logger.info(f"Created Elasticsearch index: {self.config.index_name}")
                return OperationResult.success(
                    f"Created index: {self.config.index_name}"
                )
            else:
                return OperationResult.success(
                    f"Index already exists: {self.config.index_name}"
                )
        except Exception as e:
            logger.error(f"Failed to ensure index: {str(e)}")
            return OperationResult.failure(f"Failed to ensure index: {str(e)}", e)

    async def index_video(self, video_data: Dict[str, Any]) -> OperationResult:
        """Index video metadata in Elasticsearch with retry logic"""
        video_id = video_data.get("video_id")
        if not video_id:
            return OperationResult.failure("Video data missing video_id")

        async def _index_operation():
            await self.client.index(
                index=self.config.index_name, id=video_id, body=video_data, refresh=True
            )
            return video_id

        try:
            result_id = await self.retry.execute(
                _index_operation, f"index_video_{video_id}"
            )
            logger.debug(f"Indexed video in Elasticsearch: {result_id}")
            return OperationResult.success(
                f"Indexed video: {result_id}", metadata={"video_id": result_id}
            )
        except Exception as e:
            logger.error(f"Failed to index video in Elasticsearch: {str(e)}")
            logger.debug(traceback.format_exc())
            return OperationResult.failure(f"Failed to index video: {str(e)}", e)

    async def health_check(self) -> HealthStatus:
        """Check Elasticsearch cluster health"""
        start_time = time.time()
        try:
            health = await self.client.cluster.health()
            response_time = (time.time() - start_time) * 1000

            is_healthy = health.get("status") in ["green", "yellow"]
            message = f"Cluster status: {health.get('status', 'unknown')}"

            return HealthStatus(
                service_name="elasticsearch",
                is_healthy=is_healthy,
                response_time_ms=response_time,
                message=message,
                metadata=health,
            )
        except Exception as e:
            response_time = (time.time() - start_time) * 1000
            return HealthStatus(
                service_name="elasticsearch",
                is_healthy=False,
                response_time_ms=response_time,
                message=f"Health check failed: {str(e)}",
            )

    async def close(self):
        """Close the Elasticsearch client"""
        await self.client.close()

close() async

Close the Elasticsearch client

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
async def close(self):
    """Close the Elasticsearch client"""
    await self.client.close()

ensure_index() async

Create Elasticsearch index if it doesn't exist

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
async def ensure_index(self) -> OperationResult:
    """Create Elasticsearch index if it doesn't exist"""
    try:
        if not await self.client.indices.exists(index=self.config.index_name):
            await self.client.indices.create(
                index=self.config.index_name, body=self.config.mapping
            )
            logger.info(f"Created Elasticsearch index: {self.config.index_name}")
            return OperationResult.success(
                f"Created index: {self.config.index_name}"
            )
        else:
            return OperationResult.success(
                f"Index already exists: {self.config.index_name}"
            )
    except Exception as e:
        logger.error(f"Failed to ensure index: {str(e)}")
        return OperationResult.failure(f"Failed to ensure index: {str(e)}", e)

health_check() async

Check Elasticsearch cluster health

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
async def health_check(self) -> HealthStatus:
    """Check Elasticsearch cluster health"""
    start_time = time.time()
    try:
        health = await self.client.cluster.health()
        response_time = (time.time() - start_time) * 1000

        is_healthy = health.get("status") in ["green", "yellow"]
        message = f"Cluster status: {health.get('status', 'unknown')}"

        return HealthStatus(
            service_name="elasticsearch",
            is_healthy=is_healthy,
            response_time_ms=response_time,
            message=message,
            metadata=health,
        )
    except Exception as e:
        response_time = (time.time() - start_time) * 1000
        return HealthStatus(
            service_name="elasticsearch",
            is_healthy=False,
            response_time_ms=response_time,
            message=f"Health check failed: {str(e)}",
        )

index_video(video_data) async

Index video metadata in Elasticsearch with retry logic

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
async def index_video(self, video_data: Dict[str, Any]) -> OperationResult:
    """Index video metadata in Elasticsearch with retry logic"""
    video_id = video_data.get("video_id")
    if not video_id:
        return OperationResult.failure("Video data missing video_id")

    async def _index_operation():
        await self.client.index(
            index=self.config.index_name, id=video_id, body=video_data, refresh=True
        )
        return video_id

    try:
        result_id = await self.retry.execute(
            _index_operation, f"index_video_{video_id}"
        )
        logger.debug(f"Indexed video in Elasticsearch: {result_id}")
        return OperationResult.success(
            f"Indexed video: {result_id}", metadata={"video_id": result_id}
        )
    except Exception as e:
        logger.error(f"Failed to index video in Elasticsearch: {str(e)}")
        logger.debug(traceback.format_exc())
        return OperationResult.failure(f"Failed to index video: {str(e)}", e)

VideoIndexingProcessor

Bases: HealthCheckable

Orchestrates the video indexing process with enhanced error handling and health checks

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
class VideoIndexingProcessor(HealthCheckable):
    """Orchestrates the video indexing process with enhanced error handling and health checks"""

    def __init__(
        self,
        input_queue: Queue,
        video_storage: VideoStorageService,
        search_indexing: SearchIndexingService,
        channel_stats: ChannelStatsService,
        transcript_service: VideoTranscriptService,
        max_concurrent_tasks: int = 10,
        poll_interval: float = 1.0,
    ):
        self.input_queue = input_queue
        self.video_storage = video_storage
        self.search_indexing = search_indexing
        self.channel_stats = channel_stats
        self.transcript_service = transcript_service
        self.max_concurrent_tasks = max_concurrent_tasks
        self.poll_interval = poll_interval
        self._running = False
        self._shutdown_event = asyncio.Event()
        self._active_tasks: set = set()
        logger.info("Initialized VideoIndexingProcessor")

    async def ensure_indices(self) -> OperationResult:
        """Ensure all required database indices and mappings exist"""
        results = []

        storage_result = await self.video_storage.ensure_indices()
        results.append(("storage", storage_result))

        indexing_result = await self.search_indexing.ensure_index()
        results.append(("indexing", indexing_result))

        stats_result = await self.channel_stats.ensure_indices()
        results.append(("stats", stats_result))

        logger.info(results)

        failed_services = [name for name, result in results if result.is_failure]

        if failed_services:
            return OperationResult.failure(
                f"Failed to ensure indices for: {failed_services}",
                metadata={
                    "results": {name: result.message for name, result in results}
                },
            )
        else:
            return OperationResult.success(
                "All indices ensured successfully",
                metadata={
                    "results": {name: result.message for name, result in results}
                },
            )

    async def _enrich_video_with_transcript(
        self, video_data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Enrich video data with transcript information"""
        video_id = video_data.get("video_id")
        if not video_id:
            logger.warning(
                "Video data missing video_id, skipping transcript enrichment"
            )
            return video_data

        try:
            # Get transcript in a thread pool to avoid blocking
            loop = asyncio.get_event_loop()
            transcript = await loop.run_in_executor(
                None, self.transcript_service.get_transcript, video_id
            )

            if transcript:
                video_data["transcript"] = transcript
                logger.debug(f"Added transcript to video {video_id}")
            else:
                logger.debug(f"No transcript available for video {video_id}")
                video_data["transcript"] = None

        except Exception as e:
            logger.warning(f"Failed to get transcript for video {video_id}: {e}")
            video_data["transcript"] = None

        return video_data

    async def process_video(self, video_data: Dict[str, Any]) -> ProcessingResult:
        """Process a single video through all indexing services"""
        video_id = video_data.get("video_id", "unknown")

        # Enrich with transcript data (non-critical operation)
        enriched_video_data = await self._enrich_video_with_transcript(video_data)

        # Store in MongoDB (critical operation)
        storage_result = await self.video_storage.store_video(video_data)

        # Index in Elasticsearch (non-critical)
        indexing_result = await self.search_indexing.index_video(video_data)

        # Update channel statistics (non-critical)
        stats_result = await self.channel_stats.update_channel_stats(video_data)

        result = ProcessingResult(
            video_id=video_id,
            storage_result=storage_result,
            indexing_result=indexing_result,
            stats_result=stats_result,
        )

        if result.is_success:
            has_transcript = enriched_video_data.get("transcript") is not None
            logger.debug(
                f"Successfully processed video: {video_id} (transcript: {has_transcript})"
            )
        elif result.overall_status == OperationStatus.PARTIAL_SUCCESS:
            logger.warning(
                f"Partially processed video {video_id}: storage succeeded but other operations failed"
            )
        else:
            logger.error(f"Failed to process video {video_id}: storage failed")

        return result

    async def _process_video_with_cleanup(
        self, video_data: Dict[str, Any]
    ) -> ProcessingResult:
        """Process video and handle task cleanup"""
        task = asyncio.current_task()
        try:
            return await self.process_video(video_data)
        finally:
            # Clean up task reference
            if task:
                self._active_tasks.discard(task)

    async def run(self) -> None:
        """Main processing loop - consumes videos from queue and processes them"""
        logger.info("Starting VideoIndexingProcessor main loop")
        self._running = True

        try:
            while self._running and not self._shutdown_event.is_set():
                try:
                    # Check if we have capacity for more tasks
                    if len(self._active_tasks) >= self.max_concurrent_tasks:
                        # Wait for some tasks to complete
                        if self._active_tasks:
                            _, self._active_tasks = await asyncio.wait(
                                self._active_tasks,
                                return_when=asyncio.FIRST_COMPLETED,
                                timeout=self.poll_interval,
                            )
                        else:
                            await asyncio.sleep(self.poll_interval)
                        continue

                    # Try to get video data from queue
                    video_data = await self._get_next_video()

                    if video_data is None:
                        # No video available, wait before trying again
                        await asyncio.sleep(self.poll_interval)
                        continue

                    # Create and track processing task
                    task = asyncio.create_task(
                        self._process_video_with_cleanup(video_data)
                    )
                    self._active_tasks.add(task)

                    logger.debug(
                        f"Started processing task for video: {video_data.get('video_id', 'unknown')}"
                    )

                except asyncio.CancelledError:
                    logger.info("Processing loop cancelled")
                    break
                except Exception as e:
                    logger.error(f"Error in main processing loop: {e}")
                    await asyncio.sleep(self.poll_interval)

        finally:
            logger.info("Shutting down VideoIndexingProcessor")
            await self._cleanup_active_tasks()

    async def _get_next_video(self) -> Optional[Dict[str, Any]]:
        """Get next video from queue with timeout"""
        try:
            result = self.input_queue.dequeue()
            return result
        except Exception as e:
            logger.error(f"Error getting video from queue: {e}")
            return None

    async def _cleanup_active_tasks(self) -> None:
        """Clean up any remaining active tasks"""
        if not self._active_tasks:
            return

        logger.info(f"Waiting for {len(self._active_tasks)} active tasks to complete")

        # Give tasks a chance to complete gracefully
        try:
            await asyncio.wait_for(
                asyncio.gather(*self._active_tasks, return_exceptions=True),
                timeout=30.0,  # 30 second timeout for graceful shutdown
            )
        except asyncio.TimeoutError:
            logger.warning(
                "Timeout waiting for tasks to complete, cancelling remaining tasks"
            )
            for task in self._active_tasks:
                if not task.done():
                    task.cancel()

            # Wait for cancellations to complete
            await asyncio.gather(*self._active_tasks, return_exceptions=True)

        self._active_tasks.clear()

    async def stop(self) -> None:
        """Gracefully stop the processor"""
        logger.info("Stopping VideoIndexingProcessor")
        self._running = False
        self._shutdown_event.set()

    def is_running(self) -> bool:
        """Check if the processor is currently running"""
        return self._running

    @property
    def active_task_count(self) -> int:
        """Get the number of currently active processing tasks"""
        return len(self._active_tasks)

    async def health_check(self) -> HealthStatus:
        """Check health of all dependent services"""
        start_time = time.time()

        try:
            # Check all services concurrently
            health_checks = await asyncio.gather(
                self.video_storage.health_check(),
                self.search_indexing.health_check(),
                self.channel_stats.health_check(),
                return_exceptions=True,
            )

            response_time = (time.time() - start_time) * 1000

            # Determine overall health
            unhealthy_services = []
            service_statuses = {}

            for i, (service_name, check) in enumerate(
                [
                    ("storage", health_checks[0]),
                    ("indexing", health_checks[1]),
                    ("stats", health_checks[2]),
                ]
            ):
                if isinstance(check, Exception):
                    unhealthy_services.append(service_name)
                    service_statuses[service_name] = f"Error: {str(check)}"
                elif not check.is_healthy:
                    unhealthy_services.append(service_name)
                    service_statuses[service_name] = check.message
                else:
                    service_statuses[service_name] = "healthy"

            is_healthy = len(unhealthy_services) == 0 and self._running
            processor_status = (
                f"running ({self.active_task_count} active tasks)"
                if self._running
                else "stopped"
            )
            message = f"Processor {processor_status}. " + (
                "All services healthy"
                if len(unhealthy_services) == 0
                else f"Unhealthy services: {unhealthy_services}"
            )

            return HealthStatus(
                service_name="video_processor",
                is_healthy=is_healthy,
                response_time_ms=response_time,
                message=message,
                metadata={
                    "services": service_statuses,
                    "active_tasks": self.active_task_count,
                    "is_running": self._running,
                },
            )

        except Exception as e:
            response_time = (time.time() - start_time) * 1000
            return HealthStatus(
                service_name="video_processor",
                is_healthy=False,
                response_time_ms=response_time,
                message=f"Health check failed: {str(e)}",
            )

active_task_count property

Get the number of currently active processing tasks

ensure_indices() async

Ensure all required database indices and mappings exist

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
async def ensure_indices(self) -> OperationResult:
    """Ensure all required database indices and mappings exist"""
    results = []

    storage_result = await self.video_storage.ensure_indices()
    results.append(("storage", storage_result))

    indexing_result = await self.search_indexing.ensure_index()
    results.append(("indexing", indexing_result))

    stats_result = await self.channel_stats.ensure_indices()
    results.append(("stats", stats_result))

    logger.info(results)

    failed_services = [name for name, result in results if result.is_failure]

    if failed_services:
        return OperationResult.failure(
            f"Failed to ensure indices for: {failed_services}",
            metadata={
                "results": {name: result.message for name, result in results}
            },
        )
    else:
        return OperationResult.success(
            "All indices ensured successfully",
            metadata={
                "results": {name: result.message for name, result in results}
            },
        )

health_check() async

Check health of all dependent services

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
async def health_check(self) -> HealthStatus:
    """Check health of all dependent services"""
    start_time = time.time()

    try:
        # Check all services concurrently
        health_checks = await asyncio.gather(
            self.video_storage.health_check(),
            self.search_indexing.health_check(),
            self.channel_stats.health_check(),
            return_exceptions=True,
        )

        response_time = (time.time() - start_time) * 1000

        # Determine overall health
        unhealthy_services = []
        service_statuses = {}

        for i, (service_name, check) in enumerate(
            [
                ("storage", health_checks[0]),
                ("indexing", health_checks[1]),
                ("stats", health_checks[2]),
            ]
        ):
            if isinstance(check, Exception):
                unhealthy_services.append(service_name)
                service_statuses[service_name] = f"Error: {str(check)}"
            elif not check.is_healthy:
                unhealthy_services.append(service_name)
                service_statuses[service_name] = check.message
            else:
                service_statuses[service_name] = "healthy"

        is_healthy = len(unhealthy_services) == 0 and self._running
        processor_status = (
            f"running ({self.active_task_count} active tasks)"
            if self._running
            else "stopped"
        )
        message = f"Processor {processor_status}. " + (
            "All services healthy"
            if len(unhealthy_services) == 0
            else f"Unhealthy services: {unhealthy_services}"
        )

        return HealthStatus(
            service_name="video_processor",
            is_healthy=is_healthy,
            response_time_ms=response_time,
            message=message,
            metadata={
                "services": service_statuses,
                "active_tasks": self.active_task_count,
                "is_running": self._running,
            },
        )

    except Exception as e:
        response_time = (time.time() - start_time) * 1000
        return HealthStatus(
            service_name="video_processor",
            is_healthy=False,
            response_time_ms=response_time,
            message=f"Health check failed: {str(e)}",
        )

is_running()

Check if the processor is currently running

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
def is_running(self) -> bool:
    """Check if the processor is currently running"""
    return self._running

process_video(video_data) async

Process a single video through all indexing services

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
async def process_video(self, video_data: Dict[str, Any]) -> ProcessingResult:
    """Process a single video through all indexing services"""
    video_id = video_data.get("video_id", "unknown")

    # Enrich with transcript data (non-critical operation)
    enriched_video_data = await self._enrich_video_with_transcript(video_data)

    # Store in MongoDB (critical operation)
    storage_result = await self.video_storage.store_video(video_data)

    # Index in Elasticsearch (non-critical)
    indexing_result = await self.search_indexing.index_video(video_data)

    # Update channel statistics (non-critical)
    stats_result = await self.channel_stats.update_channel_stats(video_data)

    result = ProcessingResult(
        video_id=video_id,
        storage_result=storage_result,
        indexing_result=indexing_result,
        stats_result=stats_result,
    )

    if result.is_success:
        has_transcript = enriched_video_data.get("transcript") is not None
        logger.debug(
            f"Successfully processed video: {video_id} (transcript: {has_transcript})"
        )
    elif result.overall_status == OperationStatus.PARTIAL_SUCCESS:
        logger.warning(
            f"Partially processed video {video_id}: storage succeeded but other operations failed"
        )
    else:
        logger.error(f"Failed to process video {video_id}: storage failed")

    return result

run() async

Main processing loop - consumes videos from queue and processes them

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
async def run(self) -> None:
    """Main processing loop - consumes videos from queue and processes them"""
    logger.info("Starting VideoIndexingProcessor main loop")
    self._running = True

    try:
        while self._running and not self._shutdown_event.is_set():
            try:
                # Check if we have capacity for more tasks
                if len(self._active_tasks) >= self.max_concurrent_tasks:
                    # Wait for some tasks to complete
                    if self._active_tasks:
                        _, self._active_tasks = await asyncio.wait(
                            self._active_tasks,
                            return_when=asyncio.FIRST_COMPLETED,
                            timeout=self.poll_interval,
                        )
                    else:
                        await asyncio.sleep(self.poll_interval)
                    continue

                # Try to get video data from queue
                video_data = await self._get_next_video()

                if video_data is None:
                    # No video available, wait before trying again
                    await asyncio.sleep(self.poll_interval)
                    continue

                # Create and track processing task
                task = asyncio.create_task(
                    self._process_video_with_cleanup(video_data)
                )
                self._active_tasks.add(task)

                logger.debug(
                    f"Started processing task for video: {video_data.get('video_id', 'unknown')}"
                )

            except asyncio.CancelledError:
                logger.info("Processing loop cancelled")
                break
            except Exception as e:
                logger.error(f"Error in main processing loop: {e}")
                await asyncio.sleep(self.poll_interval)

    finally:
        logger.info("Shutting down VideoIndexingProcessor")
        await self._cleanup_active_tasks()

stop() async

Gracefully stop the processor

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
async def stop(self) -> None:
    """Gracefully stop the processor"""
    logger.info("Stopping VideoIndexingProcessor")
    self._running = False
    self._shutdown_event.set()

VideoStorageService

Bases: HealthCheckable

Handles video metadata storage in MongoDB

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
class VideoStorageService(HealthCheckable):
    """Handles video metadata storage in MongoDB"""

    def __init__(self, client: Any, config: MongoDBConfig, retry_config: RetryConfig):
        self.client = client
        self.config = config
        self.retry = RetryableOperation(
            retry_config, non_retry_exceptions=[DuplicateKeyError]
        )
        self.db = self.client[config.database_name]
        self.videos_collection = self.db[config.videos_collection]
        logger.info("Initialized VideoStorageService")

    async def ensure_indices(self) -> OperationResult:
        """Ensure the required database indices exist"""
        created_indexes = []
        failed_indexes = []

        for index_name, index_config in self.config.video_indexes.items():
            try:
                await self.videos_collection.create_index(index_name, **index_config)
                created_indexes.append(index_name)
                logger.debug(f"Created index: {index_name}")
            except OperationFailure as e:
                if "already exists" in str(e):
                    logger.debug(f"Index '{index_name}' already exists")
                    created_indexes.append(index_name)
                else:
                    failed_indexes.append(index_name)
                    logger.error(f"Failed to create index {index_name}: {str(e)}")

        if failed_indexes:
            return OperationResult.failure(
                f"Failed to create indexes: {failed_indexes}",
                metadata={"created": created_indexes, "failed": failed_indexes},
            )
        else:
            return OperationResult.success(
                f"Ensured indexes: {created_indexes}",
                metadata={"indexes": created_indexes},
            )

    async def store_video(self, video_data: Dict[str, Any]) -> OperationResult:
        """Store video metadata in MongoDB with retry logic"""
        video_id = video_data.get("video_id")
        if not video_id:
            return OperationResult.failure("Video data missing video_id")

        async def _store_operation():
            result = await self.videos_collection.update_one(
                {"video_id": video_id},
                {"$set": {**video_data, "updated_at": datetime.now(timezone.utc)}},
                upsert=True,
            )
            return result

        try:
            result = await self.retry.execute(
                _store_operation, f"store_video_{video_id}"
            )
            logger.debug(f"Stored video in MongoDB: {video_id}")

            action = "updated" if result.matched_count > 0 else "inserted"

            return OperationResult.success(
                f"Video {action}: {video_id}",
                metadata={"video_id": video_id, "action": action},
            )
        except Exception as e:
            logger.error(f"Failed to store video in MongoDB: {str(e)}")
            logger.debug(traceback.format_exc())
            return OperationResult.failure(f"Failed to store video: {str(e)}", e)

    async def health_check(self) -> HealthStatus:
        """Check MongoDB connection health"""
        start_time = time.time()
        try:
            # Simple ping to check connection
            await self.client.admin.command("ping")
            response_time = (time.time() - start_time) * 1000

            return HealthStatus(
                service_name="mongodb",
                is_healthy=True,
                response_time_ms=response_time,
                message="Connection healthy",
            )
        except Exception as e:
            response_time = (time.time() - start_time) * 1000
            return HealthStatus(
                service_name="mongodb",
                is_healthy=False,
                response_time_ms=response_time,
                message=f"Health check failed: {str(e)}",
            )

ensure_indices() async

Ensure the required database indices exist

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
async def ensure_indices(self) -> OperationResult:
    """Ensure the required database indices exist"""
    created_indexes = []
    failed_indexes = []

    for index_name, index_config in self.config.video_indexes.items():
        try:
            await self.videos_collection.create_index(index_name, **index_config)
            created_indexes.append(index_name)
            logger.debug(f"Created index: {index_name}")
        except OperationFailure as e:
            if "already exists" in str(e):
                logger.debug(f"Index '{index_name}' already exists")
                created_indexes.append(index_name)
            else:
                failed_indexes.append(index_name)
                logger.error(f"Failed to create index {index_name}: {str(e)}")

    if failed_indexes:
        return OperationResult.failure(
            f"Failed to create indexes: {failed_indexes}",
            metadata={"created": created_indexes, "failed": failed_indexes},
        )
    else:
        return OperationResult.success(
            f"Ensured indexes: {created_indexes}",
            metadata={"indexes": created_indexes},
        )

health_check() async

Check MongoDB connection health

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
async def health_check(self) -> HealthStatus:
    """Check MongoDB connection health"""
    start_time = time.time()
    try:
        # Simple ping to check connection
        await self.client.admin.command("ping")
        response_time = (time.time() - start_time) * 1000

        return HealthStatus(
            service_name="mongodb",
            is_healthy=True,
            response_time_ms=response_time,
            message="Connection healthy",
        )
    except Exception as e:
        response_time = (time.time() - start_time) * 1000
        return HealthStatus(
            service_name="mongodb",
            is_healthy=False,
            response_time_ms=response_time,
            message=f"Health check failed: {str(e)}",
        )

store_video(video_data) async

Store video metadata in MongoDB with retry logic

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
async def store_video(self, video_data: Dict[str, Any]) -> OperationResult:
    """Store video metadata in MongoDB with retry logic"""
    video_id = video_data.get("video_id")
    if not video_id:
        return OperationResult.failure("Video data missing video_id")

    async def _store_operation():
        result = await self.videos_collection.update_one(
            {"video_id": video_id},
            {"$set": {**video_data, "updated_at": datetime.now(timezone.utc)}},
            upsert=True,
        )
        return result

    try:
        result = await self.retry.execute(
            _store_operation, f"store_video_{video_id}"
        )
        logger.debug(f"Stored video in MongoDB: {video_id}")

        action = "updated" if result.matched_count > 0 else "inserted"

        return OperationResult.success(
            f"Video {action}: {video_id}",
            metadata={"video_id": video_id, "action": action},
        )
    except Exception as e:
        logger.error(f"Failed to store video in MongoDB: {str(e)}")
        logger.debug(traceback.format_exc())
        return OperationResult.failure(f"Failed to store video: {str(e)}", e)

VideoTranscriptService

Downloads YouTube video transcripts with multiple language support

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
class VideoTranscriptService:
    """Downloads YouTube video transcripts with multiple language support"""

    def __init__(self, languages: Optional[List[str]] = None):
        """
        Initialize the transcript service.

        Args:
            languages: Preferred languages in order of preference.
                      Defaults to ['en', 'en-US'] if not provided.
        """
        if languages is None:
            languages = ["en", "en-US"]
        self.languages = languages
        self.formatter = TextFormatter()

    def get_transcript(self, video_id: str) -> Optional[str]:
        """
        Download and format transcript for a YouTube video.

        Args:
            video_id: YouTube video ID

        Returns:
            Formatted transcript text or None if unavailable
        """
        if not video_id or not video_id.strip():
            logger.error("Empty or invalid video ID provided")
            return None

        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

            # Try preferred languages first
            for language in self.languages:
                try:
                    transcript = transcript_list.find_transcript([language])
                    transcript_data = transcript.fetch()
                    formatted_text = self.formatter.format_transcript(transcript_data)
                    logger.info(
                        f"Successfully retrieved transcript for {video_id} in {language}"
                    )
                    return formatted_text
                except NoTranscriptFound:
                    logger.debug(
                        f"No transcript found for {video_id} in language {language}"
                    )
                    continue
                except Exception as e:
                    logger.debug(
                        f"Error getting transcript in {language} for {video_id}: {e}"
                    )
                    continue

            # Fallback to generated English transcript
            try:
                transcript = transcript_list.find_generated_transcript(["en"])
                transcript_data = transcript.fetch()
                formatted_text = self.formatter.format_transcript(transcript_data)
                logger.info(
                    f"Successfully retrieved generated English transcript for {video_id}"
                )
                return formatted_text
            except NoTranscriptFound:
                logger.debug(f"No generated English transcript found for {video_id}")
            except Exception as e:
                logger.debug(f"Error getting generated transcript for {video_id}: {e}")

            # Last resort: use any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    first_transcript = available_transcripts[0]
                    transcript_data = first_transcript.fetch()
                    formatted_text = self.formatter.format_transcript(transcript_data)
                    logger.info(
                        f"Retrieved fallback transcript for {video_id} in {first_transcript.language}"
                    )
                    return formatted_text
                else:
                    logger.warning(f"No transcripts available for video {video_id}")
                    return None
            except Exception as e:
                logger.error(f"Error fetching fallback transcript for {video_id}: {e}")

        except TranscriptsDisabled:
            logger.warning(f"Transcripts are disabled for video {video_id}")
            return None
        except VideoUnavailable:
            logger.error(f"Video {video_id} is unavailable")
            return None
        except NoTranscriptFound:
            logger.warning(f"No transcripts available for video {video_id}")
            return None
        except CouldNotRetrieveTranscript as e:
            logger.error(f"Could not retrieve transcript for {video_id}: {e}")
            return None
        except Exception as e:
            logger.error(
                f"Unexpected error getting transcript for video {video_id}: {e}"
            )
            return None

    def get_transcript_with_timestamps(self, video_id: str) -> Optional[List[Dict]]:
        """
        Get transcript with timing information for each segment.

        Args:
            video_id: YouTube video ID

        Returns:
            List of transcript segments with 'text', 'start', 'duration' keys,
            or None if transcript unavailable
        """
        if not video_id or not video_id.strip():
            logger.error("Empty or invalid video ID provided")
            return None

        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

            # Try preferred languages first
            for language in self.languages:
                try:
                    transcript = transcript_list.find_transcript([language])
                    transcript_data = transcript.fetch()
                    logger.info(
                        f"Successfully retrieved timestamped transcript for {video_id} in {language}"
                    )
                    return transcript_data
                except NoTranscriptFound:
                    logger.debug(
                        f"No transcript found for {video_id} in language {language}"
                    )
                    continue
                except Exception as e:
                    logger.debug(
                        f"Error getting timestamped transcript in {language} for {video_id}: {e}"
                    )
                    continue

            # Fallback to generated English transcript
            try:
                transcript = transcript_list.find_generated_transcript(["en"])
                transcript_data = transcript.fetch()
                logger.info(
                    f"Successfully retrieved generated English timestamped transcript for {video_id}"
                )
                return transcript_data
            except NoTranscriptFound:
                logger.debug(f"No generated English transcript found for {video_id}")
            except Exception as e:
                logger.debug(
                    f"Error getting generated timestamped transcript for {video_id}: {e}"
                )

            # Last resort: use any available transcript
            try:
                available_transcripts = list(transcript_list)
                if available_transcripts:
                    first_transcript = available_transcripts[0]
                    transcript_data = first_transcript.fetch()
                    logger.info(
                        f"Retrieved fallback timestamped transcript for {video_id} in {first_transcript.language}"
                    )
                    return transcript_data
                else:
                    logger.warning(f"No transcripts available for video {video_id}")
                    return None
            except Exception as e:
                logger.error(
                    f"Error fetching fallback timestamped transcript for {video_id}: {e}"
                )

        except TranscriptsDisabled:
            logger.warning(f"Transcripts are disabled for video {video_id}")
            return None
        except VideoUnavailable:
            logger.error(f"Video {video_id} is unavailable")
            return None
        except NoTranscriptFound:
            logger.warning(f"No transcripts available for video {video_id}")
            return None
        except CouldNotRetrieveTranscript as e:
            logger.error(
                f"Could not retrieve timestamped transcript for {video_id}: {e}"
            )
            return None
        except Exception as e:
            logger.error(
                f"Unexpected error getting timestamped transcript for video {video_id}: {e}"
            )
            return None

__init__(languages=None)

Initialize the transcript service.

Parameters:

Name Type Description Default
languages Optional[List[str]]

Preferred languages in order of preference. Defaults to ['en', 'en-US'] if not provided.

None
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
def __init__(self, languages: Optional[List[str]] = None):
    """
    Initialize the transcript service.

    Args:
        languages: Preferred languages in order of preference.
                  Defaults to ['en', 'en-US'] if not provided.
    """
    if languages is None:
        languages = ["en", "en-US"]
    self.languages = languages
    self.formatter = TextFormatter()

get_transcript(video_id)

Download and format transcript for a YouTube video.

Parameters:

Name Type Description Default
video_id str

YouTube video ID

required

Returns:

Type Description
Optional[str]

Formatted transcript text or None if unavailable

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
def get_transcript(self, video_id: str) -> Optional[str]:
    """
    Download and format transcript for a YouTube video.

    Args:
        video_id: YouTube video ID

    Returns:
        Formatted transcript text or None if unavailable
    """
    if not video_id or not video_id.strip():
        logger.error("Empty or invalid video ID provided")
        return None

    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

        # Try preferred languages first
        for language in self.languages:
            try:
                transcript = transcript_list.find_transcript([language])
                transcript_data = transcript.fetch()
                formatted_text = self.formatter.format_transcript(transcript_data)
                logger.info(
                    f"Successfully retrieved transcript for {video_id} in {language}"
                )
                return formatted_text
            except NoTranscriptFound:
                logger.debug(
                    f"No transcript found for {video_id} in language {language}"
                )
                continue
            except Exception as e:
                logger.debug(
                    f"Error getting transcript in {language} for {video_id}: {e}"
                )
                continue

        # Fallback to generated English transcript
        try:
            transcript = transcript_list.find_generated_transcript(["en"])
            transcript_data = transcript.fetch()
            formatted_text = self.formatter.format_transcript(transcript_data)
            logger.info(
                f"Successfully retrieved generated English transcript for {video_id}"
            )
            return formatted_text
        except NoTranscriptFound:
            logger.debug(f"No generated English transcript found for {video_id}")
        except Exception as e:
            logger.debug(f"Error getting generated transcript for {video_id}: {e}")

        # Last resort: use any available transcript
        try:
            available_transcripts = list(transcript_list)
            if available_transcripts:
                first_transcript = available_transcripts[0]
                transcript_data = first_transcript.fetch()
                formatted_text = self.formatter.format_transcript(transcript_data)
                logger.info(
                    f"Retrieved fallback transcript for {video_id} in {first_transcript.language}"
                )
                return formatted_text
            else:
                logger.warning(f"No transcripts available for video {video_id}")
                return None
        except Exception as e:
            logger.error(f"Error fetching fallback transcript for {video_id}: {e}")

    except TranscriptsDisabled:
        logger.warning(f"Transcripts are disabled for video {video_id}")
        return None
    except VideoUnavailable:
        logger.error(f"Video {video_id} is unavailable")
        return None
    except NoTranscriptFound:
        logger.warning(f"No transcripts available for video {video_id}")
        return None
    except CouldNotRetrieveTranscript as e:
        logger.error(f"Could not retrieve transcript for {video_id}: {e}")
        return None
    except Exception as e:
        logger.error(
            f"Unexpected error getting transcript for video {video_id}: {e}"
        )
        return None

get_transcript_with_timestamps(video_id)

Get transcript with timing information for each segment.

Parameters:

Name Type Description Default
video_id str

YouTube video ID

required

Returns:

Type Description
Optional[List[Dict]]

List of transcript segments with 'text', 'start', 'duration' keys,

Optional[List[Dict]]

or None if transcript unavailable

Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
def get_transcript_with_timestamps(self, video_id: str) -> Optional[List[Dict]]:
    """
    Get transcript with timing information for each segment.

    Args:
        video_id: YouTube video ID

    Returns:
        List of transcript segments with 'text', 'start', 'duration' keys,
        or None if transcript unavailable
    """
    if not video_id or not video_id.strip():
        logger.error("Empty or invalid video ID provided")
        return None

    try:
        transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

        # Try preferred languages first
        for language in self.languages:
            try:
                transcript = transcript_list.find_transcript([language])
                transcript_data = transcript.fetch()
                logger.info(
                    f"Successfully retrieved timestamped transcript for {video_id} in {language}"
                )
                return transcript_data
            except NoTranscriptFound:
                logger.debug(
                    f"No transcript found for {video_id} in language {language}"
                )
                continue
            except Exception as e:
                logger.debug(
                    f"Error getting timestamped transcript in {language} for {video_id}: {e}"
                )
                continue

        # Fallback to generated English transcript
        try:
            transcript = transcript_list.find_generated_transcript(["en"])
            transcript_data = transcript.fetch()
            logger.info(
                f"Successfully retrieved generated English timestamped transcript for {video_id}"
            )
            return transcript_data
        except NoTranscriptFound:
            logger.debug(f"No generated English transcript found for {video_id}")
        except Exception as e:
            logger.debug(
                f"Error getting generated timestamped transcript for {video_id}: {e}"
            )

        # Last resort: use any available transcript
        try:
            available_transcripts = list(transcript_list)
            if available_transcripts:
                first_transcript = available_transcripts[0]
                transcript_data = first_transcript.fetch()
                logger.info(
                    f"Retrieved fallback timestamped transcript for {video_id} in {first_transcript.language}"
                )
                return transcript_data
            else:
                logger.warning(f"No transcripts available for video {video_id}")
                return None
        except Exception as e:
            logger.error(
                f"Error fetching fallback timestamped transcript for {video_id}: {e}"
            )

    except TranscriptsDisabled:
        logger.warning(f"Transcripts are disabled for video {video_id}")
        return None
    except VideoUnavailable:
        logger.error(f"Video {video_id} is unavailable")
        return None
    except NoTranscriptFound:
        logger.warning(f"No transcripts available for video {video_id}")
        return None
    except CouldNotRetrieveTranscript as e:
        logger.error(
            f"Could not retrieve timestamped transcript for {video_id}: {e}"
        )
        return None
    except Exception as e:
        logger.error(
            f"Unexpected error getting timestamped transcript for video {video_id}: {e}"
        )
        return None