Skip to content

Database

AsyncDatabaseConnection

Bases: Generic[T], ABC

Abstract base class for asynchronous database connections.

This class defines the interface for establishing and closing asynchronous connections to a database or similar resource. Intended to be used with dependency injection.

Parameters:

Name Type Description Default
*args

Variable length argument list for subclasses.

()
**kwargs

Arbitrary keyword arguments for subclasses.

{}
Source code in .venv/lib/python3.12/site-packages/ytindexer/database/base.py
class AsyncDatabaseConnection(Generic[T], ABC):
    """
    Abstract base class for asynchronous database connections.

    This class defines the interface for establishing and closing
    asynchronous connections to a database or similar resource.
    Intended to be used with dependency injection.

    Args:
        *args: Variable length argument list for subclasses.
        **kwargs: Arbitrary keyword arguments for subclasses.
    """

    def __init__(self, *args, **kwargs):
        """
        Initialize the connection instance.

        Subclasses may override this to accept parameters such as
        database URLs or credentials.
        """
        pass

    @abstractmethod
    async def connect(self) -> T:
        """
        Establish and return the asynchronous database connection.

        This method should be overridden by subclasses to implement
        the logic for creating the actual connection.

        Returns:
            T: An instance representing the database connection.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self) -> None:
        """
        Close the database connection.

        This method should be overridden by subclasses to implement
        proper cleanup and resource deallocation.

        Returns:
            None
        """
        raise NotImplementedError

__init__(*args, **kwargs)

Initialize the connection instance.

Subclasses may override this to accept parameters such as database URLs or credentials.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/base.py
def __init__(self, *args, **kwargs):
    """
    Initialize the connection instance.

    Subclasses may override this to accept parameters such as
    database URLs or credentials.
    """
    pass

close() abstractmethod async

Close the database connection.

This method should be overridden by subclasses to implement proper cleanup and resource deallocation.

Returns:

Type Description
None

None

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/base.py
@abstractmethod
async def close(self) -> None:
    """
    Close the database connection.

    This method should be overridden by subclasses to implement
    proper cleanup and resource deallocation.

    Returns:
        None
    """
    raise NotImplementedError

connect() abstractmethod async

Establish and return the asynchronous database connection.

This method should be overridden by subclasses to implement the logic for creating the actual connection.

Returns:

Name Type Description
T T

An instance representing the database connection.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/base.py
@abstractmethod
async def connect(self) -> T:
    """
    Establish and return the asynchronous database connection.

    This method should be overridden by subclasses to implement
    the logic for creating the actual connection.

    Returns:
        T: An instance representing the database connection.
    """
    raise NotImplementedError

ElasticConnection

Bases: AsyncDatabaseConnection[AsyncElasticsearch]

Concrete implementation of AsyncDatabaseConnection for ElasticSearch.

Parameters:

Name Type Description Default
dsn str

The connection string or DSN for ElasticSearch.

required
Source code in .venv/lib/python3.12/site-packages/ytindexer/database/elastic.py
class ElasticConnection(AsyncDatabaseConnection[AsyncElasticsearch]):
    """
    Concrete implementation of AsyncDatabaseConnection for ElasticSearch.

    Args:
        dsn (str): The connection string or DSN for ElasticSearch.
    """

    def __init__(self, dsn: str):
        self.dsn = dsn
        self._client: AsyncElasticsearch | None = None
        self._lock = asyncio.Lock()

    async def connect(self) -> AsyncElasticsearch:
        """
        Establish and return an AsyncElasticsearch client instance.

        Returns:
            AsyncElasticsearch: The ElasticSearch async client.

        Raises:
            ConnectionError: If connection to ElasticSearch fails.
        """
        async with self._lock:
            if self._client is None:
                try:
                    self._client = AsyncElasticsearch(self.dsn)
                    await self._client.info()
                    logger.info(
                        "Successfully connected to Elastic at: {host}", host=self.dsn
                    )
                except ConnectionError as conn_fail:
                    logger.error(
                        "Couldn't connect to Elastic: {error}", error=conn_fail
                    )
                    raise
            return self._client

    async def close(self) -> None:
        """
        Close the AsyncElasticsearch client connection.
        """
        async with self._lock:
            if self._client is not None:
                await self._client.close()
                self._client = None
                logger.error("Elastic client close")

close() async

Close the AsyncElasticsearch client connection.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/elastic.py
async def close(self) -> None:
    """
    Close the AsyncElasticsearch client connection.
    """
    async with self._lock:
        if self._client is not None:
            await self._client.close()
            self._client = None
            logger.error("Elastic client close")

connect() async

Establish and return an AsyncElasticsearch client instance.

Returns:

Name Type Description
AsyncElasticsearch AsyncElasticsearch

The ElasticSearch async client.

Raises:

Type Description
ConnectionError

If connection to ElasticSearch fails.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/elastic.py
async def connect(self) -> AsyncElasticsearch:
    """
    Establish and return an AsyncElasticsearch client instance.

    Returns:
        AsyncElasticsearch: The ElasticSearch async client.

    Raises:
        ConnectionError: If connection to ElasticSearch fails.
    """
    async with self._lock:
        if self._client is None:
            try:
                self._client = AsyncElasticsearch(self.dsn)
                await self._client.info()
                logger.info(
                    "Successfully connected to Elastic at: {host}", host=self.dsn
                )
            except ConnectionError as conn_fail:
                logger.error(
                    "Couldn't connect to Elastic: {error}", error=conn_fail
                )
                raise
        return self._client

MongoConnection

Bases: AsyncDatabaseConnection[AsyncIOMotorClient]

Concrete implementation of AsyncDatabaseConnection for MongoDB using Motor.

Parameters:

Name Type Description Default
dsn str

MongoDB connection string.

required
Source code in .venv/lib/python3.12/site-packages/ytindexer/database/mongo.py
class MongoConnection(AsyncDatabaseConnection[AsyncIOMotorClient]):
    """
    Concrete implementation of AsyncDatabaseConnection for MongoDB using Motor.

    Args:
        dsn (str): MongoDB connection string.
    """

    def __init__(self, dsn: str):
        self.dsn = dsn
        self._client: AsyncIOMotorClient | None = None
        self._lock = asyncio.Lock()

    async def connect(self) -> AsyncIOMotorClient:
        """
        Establish and return an AsyncIOMotorClient instance.

        Returns:
            AsyncIOMotorClient: The async MongoDB client.

        Raises:
            ConnectionFailure: If connection to MongoDB fails.
        """
        async with self._lock:
            if self._client is None:
                try:
                    self._client = AsyncIOMotorClient(self.dsn)
                    # Optionally test connection by pinging
                    await self._client.admin.command("ping")
                    logger.info(
                        "Successfully connected to MongoDB at: {host}", host=self.dsn
                    )
                except ConnectionFailure as conn_fail:
                    logger.error(
                        "Couldn't connect to the MongoDB database: {error}",
                        error=conn_fail,
                    )
                    raise
            return self._client

    async def close(self) -> None:
        """
        Close the MongoDB client connection.
        """
        async with self._lock:
            if self._client is not None:
                self._client.close()
                self._client = None

close() async

Close the MongoDB client connection.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/mongo.py
async def close(self) -> None:
    """
    Close the MongoDB client connection.
    """
    async with self._lock:
        if self._client is not None:
            self._client.close()
            self._client = None

connect() async

Establish and return an AsyncIOMotorClient instance.

Returns:

Name Type Description
AsyncIOMotorClient AsyncIOMotorClient

The async MongoDB client.

Raises:

Type Description
ConnectionFailure

If connection to MongoDB fails.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/mongo.py
async def connect(self) -> AsyncIOMotorClient:
    """
    Establish and return an AsyncIOMotorClient instance.

    Returns:
        AsyncIOMotorClient: The async MongoDB client.

    Raises:
        ConnectionFailure: If connection to MongoDB fails.
    """
    async with self._lock:
        if self._client is None:
            try:
                self._client = AsyncIOMotorClient(self.dsn)
                # Optionally test connection by pinging
                await self._client.admin.command("ping")
                logger.info(
                    "Successfully connected to MongoDB at: {host}", host=self.dsn
                )
            except ConnectionFailure as conn_fail:
                logger.error(
                    "Couldn't connect to the MongoDB database: {error}",
                    error=conn_fail,
                )
                raise
        return self._client

ValkeyConnection

Bases: AsyncDatabaseConnection[Valkey]

Concrete implementation of AsyncDatabaseConnection for Valkey client.

Parameters:

Name Type Description Default
host str

Valkey host address.

required
port int

Valkey port.

required
password str

Password for Valkey authentication.

required
Source code in .venv/lib/python3.12/site-packages/ytindexer/database/valkey.py
class ValkeyConnection(AsyncDatabaseConnection[valkey.client.Valkey]):
    """
    Concrete implementation of AsyncDatabaseConnection for Valkey client.

    Args:
        host (str): Valkey host address.
        port (int): Valkey port.
        password (str): Password for Valkey authentication.
    """

    def __init__(self, host: str, port: int, password: str):
        self.host = host
        self.port = port
        self.password = password
        self._client: valkey.client.Valkey | None = None
        self._lock = asyncio.Lock()

    async def connect(self) -> valkey.client.Valkey:
        """
        Establish and return a Valkey client connection.

        Returns:
            valkey.client.Valkey: The Valkey client instance.

        Raises:
            valkey.exceptions.ConnectionError: If connection to Valkey fails.
        """
        async with self._lock:
            if self._client is None:
                try:
                    self._client = valkey.Valkey(
                        host=self.host,
                        port=self.port,
                        username=None,
                        password=self.password,
                        db=0,
                    )
                    self._client.ping()
                    logger.info(
                        "Successfully connected to Valkey at: {host}", host=self.host
                    )
                except valkey.exceptions.ConnectionError as conn_fail:
                    logger.error(
                        "Couldn't connect to the Valkey: {error}", error=conn_fail
                    )
                    raise
            return self._client

    async def close(self) -> None:
        """
        Close the Valkey client connection.
        """
        async with self._lock:
            if self._client is not None:
                self._client.close()
                self._client = None

close() async

Close the Valkey client connection.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/valkey.py
async def close(self) -> None:
    """
    Close the Valkey client connection.
    """
    async with self._lock:
        if self._client is not None:
            self._client.close()
            self._client = None

connect() async

Establish and return a Valkey client connection.

Returns:

Type Description
Valkey

valkey.client.Valkey: The Valkey client instance.

Raises:

Type Description
ConnectionError

If connection to Valkey fails.

Source code in .venv/lib/python3.12/site-packages/ytindexer/database/valkey.py
async def connect(self) -> valkey.client.Valkey:
    """
    Establish and return a Valkey client connection.

    Returns:
        valkey.client.Valkey: The Valkey client instance.

    Raises:
        valkey.exceptions.ConnectionError: If connection to Valkey fails.
    """
    async with self._lock:
        if self._client is None:
            try:
                self._client = valkey.Valkey(
                    host=self.host,
                    port=self.port,
                    username=None,
                    password=self.password,
                    db=0,
                )
                self._client.ping()
                logger.info(
                    "Successfully connected to Valkey at: {host}", host=self.host
                )
            except valkey.exceptions.ConnectionError as conn_fail:
                logger.error(
                    "Couldn't connect to the Valkey: {error}", error=conn_fail
                )
                raise
        return self._client