Indexer
ChannelStatsService
¶
Bases: HealthCheckable
Handles channel statistics updates
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | |
ensure_indices()
async
¶
Ensure the required database indices exist
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
health_check()
async
¶
Check MongoDB connection health for channels collection
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
update_channel_stats(video_data)
async
¶
Update channel statistics based on video data with retry logic
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/stats.py
ElasticsearchConfig
dataclass
¶
Configuration for Elasticsearch indexing and search operations.
This class manages Elasticsearch-specific settings including index configuration, field mappings, and analysis settings. It provides a computed mapping property that generates the complete index configuration based on the configured parameters.
The mapping includes optimized field types for video metadata, proper analyzers for text search, and index settings for performance tuning.
Attributes:
| Name | Type | Description |
|---|---|---|
index_name |
str
|
Name of the Elasticsearch index for storing video documents. |
shards |
int
|
Number of primary shards for the index. More shards allow better distribution across nodes but increase overhead. |
replicas |
int
|
Number of replica shards for each primary shard. Replicas provide redundancy and can improve search throughput. |
Example
config = ElasticsearchConfig( ... index_name="videos_production", ... shards=3, ... replicas=2 ... )
Create index with the computed mapping¶
es_client.indices.create( ... index=config.index_name, ... body=config.mapping ... )
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
mapping
property
¶
Generate the complete Elasticsearch index mapping and settings.
Creates a comprehensive mapping configuration that defines how video documents are indexed and stored. The mapping includes:
- Keyword fields for exact matching (video_id, channel_id, tags)
- Text fields with standard analyzer for full-text search
- Multi-field configurations for both search and aggregations
- Appropriate data types for metrics and timestamps
- Index settings based on configured shard and replica counts
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary containing the complete Elasticsearch mapping configuration |
Dict[str, Any]
|
with both field mappings and index settings. |
Note
The mapping is generated dynamically based on current attribute values, so changes to shards or replicas will be reflected in subsequent calls.
Example
config = ElasticsearchConfig(shards=2, replicas=1) mapping = config.mapping print(mapping["settings"]["number_of_shards"]) # 2
Text fields support both search and keyword aggregations¶
title_mapping = mapping["mappings"]["properties"]["title"] print(title_mapping["type"]) # "text" print(title_mapping["fields"]["keyword"]["type"]) # "keyword"
MongoDBConfig
dataclass
¶
Configuration for MongoDB collections and database indexes.
This class manages MongoDB-specific configuration including database and collection names, as well as index definitions for optimal query performance. It provides computed properties that generate index configurations for different collection types.
The index configurations are optimized for common query patterns including lookups by ID, filtering by channel, date-based queries, and subscription management operations.
Attributes:
| Name | Type | Description |
|---|---|---|
database_name |
str
|
Name of the MongoDB database containing the collections. |
videos_collection |
str
|
Name of the collection storing video metadata documents. |
channels_collection |
str
|
Name of the collection storing channel information. |
Example
config = MongoDBConfig( ... database_name="video_platform", ... videos_collection="video_metadata", ... channels_collection="channel_data" ... )
Create indexes for optimal query performance¶
db = mongo_client[config.database_name] videos = db[config.videos_collection]
for index_config in config.video_indexes.values(): ... videos.create_index(index_config["key"], **index_config)
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | |
channel_indexes
property
¶
Generate index configurations for the channels collection.
Creates index definitions for channel-related queries, primarily focused on unique channel identification and lookups.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dictionary mapping index names to their MongoDB index configurations |
Dict[str, Dict[str, Any]]
|
for the channels collection. |
Example
config = MongoDBConfig() channel_indexes = config.channel_indexes
Unique index for channel identification¶
channel_idx = channel_indexes["channel_id"] print(channel_idx["unique"]) # True
subscription_indexes
property
¶
Generate index configurations for the subscriptions collection.
Creates index definitions optimized for subscription management operations including unique subscription lookups, expiration queries, and active subscription filtering. Includes a compound index for efficient queries on expiring active subscriptions.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dictionary mapping index names to their MongoDB index configurations |
Dict[str, Dict[str, Any]]
|
for subscription management. |
Note
The compound index on expires_at and is_active enables efficient queries for finding subscriptions that need renewal, which is a common operation in subscription management workflows.
Example
config = MongoDBConfig() sub_indexes = config.subscription_indexes
Unique constraint on channel subscriptions¶
channel_idx = sub_indexes["channel_id"] print(channel_idx["unique"]) # True
Compound index for expiration queries¶
compound_idx = sub_indexes["expires_at_active"] print(compound_idx) # [("expires_at", 1), ("is_active", 1)]
video_indexes
property
¶
Generate index configurations for the videos collection.
Creates index definitions optimized for common video query patterns including unique video lookups, channel-based filtering, and date-based sorting and filtering operations.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dictionary mapping index names to their MongoDB index configurations. |
Dict[str, Dict[str, Any]]
|
Each configuration includes the key specification, uniqueness constraint, |
Dict[str, Dict[str, Any]]
|
and index name. |
Note
The video_id index enforces uniqueness to prevent duplicate video documents, while other indexes are non-unique to support filtering and sorting operations.
RetryConfig
dataclass
¶
Configuration for retry logic with exponential backoff.
This class defines parameters for implementing robust retry mechanisms with exponential backoff to handle transient failures gracefully. The configuration controls retry attempts, timing, and backoff behavior.
The exponential backoff algorithm increases delays between retry attempts to reduce load on failing systems and improve the likelihood of eventual success. The max_delay parameter prevents delays from becoming excessive.
Attributes:
| Name | Type | Description |
|---|---|---|
max_attempts |
int
|
Maximum number of retry attempts before giving up. Includes the initial attempt, so max_attempts=3 means 1 initial attempt plus 2 retries. |
base_delay |
float
|
Initial delay in seconds before the first retry attempt. Subsequent delays are calculated using exponential backoff. |
max_delay |
float
|
Maximum delay in seconds between retry attempts. Prevents exponential backoff from creating excessively long delays. |
exponential_base |
float
|
Base for exponential backoff calculation. Common values are 2.0 (doubling) or 1.5 (50% increase per attempt). |
Example
Conservative retry configuration¶
config = RetryConfig( ... max_attempts=3, ... base_delay=1.0, ... max_delay=30.0, ... exponential_base=2.0 ... )
Aggressive retry for critical operations¶
critical_config = RetryConfig( ... max_attempts=10, ... base_delay=0.5, ... max_delay=120.0, ... exponential_base=1.5 ... )
Delay calculation example:¶
Attempt 1: base_delay * (exponential_base ^ 0) = 1.0 * 1 = 1.0s¶
Attempt 2: base_delay * (exponential_base ^ 1) = 1.0 * 2 = 2.0s¶
Attempt 3: base_delay * (exponential_base ^ 2) = 1.0 * 4 = 4.0s¶
Note
Actual delays include random jitter to prevent thundering herd problems when multiple clients retry simultaneously. The jitter is typically 10-30% of the calculated delay.
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/config.py
SearchIndexingService
¶
Bases: HealthCheckable
Handles video search indexing in Elasticsearch
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
close()
async
¶
ensure_index()
async
¶
Create Elasticsearch index if it doesn't exist
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
health_check()
async
¶
Check Elasticsearch cluster health
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
index_video(video_data)
async
¶
Index video metadata in Elasticsearch with retry logic
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/indexing.py
VideoIndexingProcessor
¶
Bases: HealthCheckable
Orchestrates the video indexing process with enhanced error handling and health checks
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | |
active_task_count
property
¶
Get the number of currently active processing tasks
ensure_indices()
async
¶
Ensure all required database indices and mappings exist
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
health_check()
async
¶
Check health of all dependent services
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
is_running()
¶
process_video(video_data)
async
¶
Process a single video through all indexing services
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
run()
async
¶
Main processing loop - consumes videos from queue and processes them
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/processor.py
stop()
async
¶
VideoStorageService
¶
Bases: HealthCheckable
Handles video metadata storage in MongoDB
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
ensure_indices()
async
¶
Ensure the required database indices exist
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
health_check()
async
¶
Check MongoDB connection health
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
store_video(video_data)
async
¶
Store video metadata in MongoDB with retry logic
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/storage.py
VideoTranscriptService
¶
Downloads YouTube video transcripts with multiple language support
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |
__init__(languages=None)
¶
Initialize the transcript service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
languages
|
Optional[List[str]]
|
Preferred languages in order of preference. Defaults to ['en', 'en-US'] if not provided. |
None
|
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
get_transcript(video_id)
¶
Download and format transcript for a YouTube video.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
video_id
|
str
|
YouTube video ID |
required |
Returns:
| Type | Description |
|---|---|
Optional[str]
|
Formatted transcript text or None if unavailable |
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
get_transcript_with_timestamps(video_id)
¶
Get transcript with timing information for each segment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
video_id
|
str
|
YouTube video ID |
required |
Returns:
| Type | Description |
|---|---|
Optional[List[Dict]]
|
List of transcript segments with 'text', 'start', 'duration' keys, |
Optional[List[Dict]]
|
or None if transcript unavailable |
Source code in .venv/lib/python3.12/site-packages/ytindexer/indexer/transcript.py
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | |