Module pyracmon.connection

This module provides types and functions for DB connections.

Expand source code
"""
This module provides types and functions for DB connections.
"""
from collections.abc import Sequence, Callable
import secrets
import string
import threading
import types
from typing import Any, Callable, Optional, Union
from typing_extensions import Self
from . import dbapi
from .sql import Sql
from .marker import Marker
from .context import ConnectionContext, PARAMS


def connect(api: types.ModuleType, *args: Any, **kwargs: Any) -> 'Connection':
    """
    Connects to DB by passing arguments to DB-API 2.0 module.

    Every optional argument is passed to `api.connect` and returns the `Connection` object which wraps obtained DB connection.

    ```python
    import psycopg2
    from pyracmon import connect
    db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres")
    c = db.stmt().execute("SELECT 1")
    assert c.fetchone()[0] == 1
    ```

    Args:
        api: DB-API 2.0 module which exports `connect` function.
        args: Positional arguments passed to `api.connect`.
        kwargs: Keyword arguments passed to `api.connect`.
    Returns:
        Wrapper of DB-API 2.0 connection.
    """
    return Connection(api, api.connect(*args, **kwargs), None)


class Connection(dbapi.Connection):
    """
    Wrapper class of DB-API 2.0 Connection.

    Every instance works as the proxy object to original connection, therefore any attribute in it is still available.
    """
    _characters = string.ascii_letters + string.digits + ".="

    def __init__(self, api, conn: dbapi.Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None):
        #: A string which identifies a connection.
        self.identifier = self._gen_identifier()
        #: DB-API 2.0 module.
        self.api = api
        #: Original connection object.
        self.conn = conn
        self._context_factory = context_factory
        self._context = None

    def __getattr__(self, name):
        return getattr(self.conn, name)

    def __enter__(self):
        if hasattr(self.conn, "__enter__"):
            self.conn.__enter__() # type: ignore
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if hasattr(self.conn, "__exit__"):
            self.conn.__exit__(exc_type, exc_value, traceback) # type: ignore
        else:
            if exc_value is None:
                self.conn.rollback()
            else:
                self.conn.commit()
            self.conn.close()

    def _gen_identifier(self):
        return threading.current_thread().name + "-" + secrets.token_hex(4)

    @property
    def context(self) -> ConnectionContext:
        """
        Context object used for this connection.
        """
        if not self._context:
            self._context = (self._context_factory or ConnectionContext)()
            self._context.identifier = self.identifier
        return self._context

    def close(self) -> None:
        return self.conn.close()

    def commit(self) -> None:
        return self.conn.commit()

    def rollback(self) -> None:
        return self.conn.rollback()

    def cursor(self) -> dbapi.Cursor:
        return self.conn.cursor()

    def use(self, factory: Callable[[], ConnectionContext]) -> Self:
        """
        Sets factory function of `ConnectionContext` to use custom context.

        When the context is already set, it will be replaced with new one.

        Args:
            factory: Function returning custom context.
        Returns:
            This instance.
        """
        self._context_factory = factory
        self._context = None
        return self

    def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement':
        """
        Creates new `Statement` which executes queries on this connection.

        Args:
            context: Context object used in the statement. If `None`, the context of this connection is used.
        Returns:
            Created statement.
        """
        return Statement(self, context or self.context)


class Statement:
    """
    This class has methods to execute query on containing connection and context.

    Be sure to execute queries on this class to benefit from:

    - Query formatting using unified marker `$_`.
    - Query logging.
    """
    def __init__(self, conn: Connection, context: ConnectionContext):
        self.conn = conn
        self.context = context

    def prepare(self, sql: str, *args: Any, **kwargs: Any) -> tuple[str, PARAMS]:
        """
        Generates formatted query and a list of parameters.

        This method is invoked internally from `execute` to generate actual query and parameters.

        Args:
            sql: Query template which can contain unified marker.
            args: Positional parameters of query.
            kwargs: Keyword parameters of query.
        Returns:
            Formatted query and parameters.
        """
        paramstyle = self.context.config.paramstyle or self.conn.api.paramstyle

        return Sql(Marker.of(paramstyle), sql).render(*args, **kwargs)

    def execute(self, sql: str, *args: Any, **kwargs: Any) -> dbapi.Cursor:
        """
        Executes a query and returns a cursor object.

        Args:
            sql: Query template which can contain unified marker.
            args: Positional parameters of query.
            kwargs: Keyword parameters of query.
        Returns:
            Cursor object used for the query execution.
        """
        sql, params = self.prepare(sql, *args, **kwargs)

        c = self.conn.cursor()

        return self.context.execute(c, sql, params)

    def executemany(self, sql: str, seq_of_args: Sequence[PARAMS]) -> dbapi.Cursor:
        """
        Executes a query for each parameters in `seq_of_args` and returns a cursor object.

        Args:
            sql: Query template which can contain unified marker.
            seq_of_args: A sequence of parameters of the query.
        Returns:
            Cursor object used for the query execution.
        """
        def prepare(ps: Union[list[Any], dict[str, Any]]):
            args = list(ps) if isinstance(ps, (list, tuple)) else []
            kwargs = ps if isinstance(ps, dict) else {}
            return self.prepare(sql, *args, **kwargs)

        rendered, params = prepare(seq_of_args[0])
        seq_of_params: list[PARAMS] = [params]

        for i, ps in enumerate(seq_of_args[1:]):
            _, params = prepare(ps)
            seq_of_params.append(params)

        c = self.conn.cursor()

        return self.context.executemany(c, rendered, seq_of_params)

Functions

def connect(api: module, *args: Any, **kwargs: Any) ‑> Connection

Connects to DB by passing arguments to DB-API 2.0 module.

Every optional argument is passed to api.connect and returns the Connection object which wraps obtained DB connection.

import psycopg2
from pyracmon import connect
db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres")
c = db.stmt().execute("SELECT 1")
assert c.fetchone()[0] == 1

Args

api
DB-API 2.0 module which exports connect() function.
args
Positional arguments passed to api.connect.
kwargs
Keyword arguments passed to api.connect.

Returns

Wrapper of DB-API 2.0 connection.

Expand source code
def connect(api: types.ModuleType, *args: Any, **kwargs: Any) -> 'Connection':
    """
    Connects to DB by passing arguments to DB-API 2.0 module.

    Every optional argument is passed to `api.connect` and returns the `Connection` object which wraps obtained DB connection.

    ```python
    import psycopg2
    from pyracmon import connect
    db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres")
    c = db.stmt().execute("SELECT 1")
    assert c.fetchone()[0] == 1
    ```

    Args:
        api: DB-API 2.0 module which exports `connect` function.
        args: Positional arguments passed to `api.connect`.
        kwargs: Keyword arguments passed to `api.connect`.
    Returns:
        Wrapper of DB-API 2.0 connection.
    """
    return Connection(api, api.connect(*args, **kwargs), None)

Classes

class Connection (api, conn: Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None)

Wrapper class of DB-API 2.0 Connection.

Every instance works as the proxy object to original connection, therefore any attribute in it is still available.

Expand source code
class Connection(dbapi.Connection):
    """
    Wrapper class of DB-API 2.0 Connection.

    Every instance works as the proxy object to original connection, therefore any attribute in it is still available.
    """
    _characters = string.ascii_letters + string.digits + ".="

    def __init__(self, api, conn: dbapi.Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None):
        #: A string which identifies a connection.
        self.identifier = self._gen_identifier()
        #: DB-API 2.0 module.
        self.api = api
        #: Original connection object.
        self.conn = conn
        self._context_factory = context_factory
        self._context = None

    def __getattr__(self, name):
        return getattr(self.conn, name)

    def __enter__(self):
        if hasattr(self.conn, "__enter__"):
            self.conn.__enter__() # type: ignore
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if hasattr(self.conn, "__exit__"):
            self.conn.__exit__(exc_type, exc_value, traceback) # type: ignore
        else:
            if exc_value is None:
                self.conn.rollback()
            else:
                self.conn.commit()
            self.conn.close()

    def _gen_identifier(self):
        return threading.current_thread().name + "-" + secrets.token_hex(4)

    @property
    def context(self) -> ConnectionContext:
        """
        Context object used for this connection.
        """
        if not self._context:
            self._context = (self._context_factory or ConnectionContext)()
            self._context.identifier = self.identifier
        return self._context

    def close(self) -> None:
        return self.conn.close()

    def commit(self) -> None:
        return self.conn.commit()

    def rollback(self) -> None:
        return self.conn.rollback()

    def cursor(self) -> dbapi.Cursor:
        return self.conn.cursor()

    def use(self, factory: Callable[[], ConnectionContext]) -> Self:
        """
        Sets factory function of `ConnectionContext` to use custom context.

        When the context is already set, it will be replaced with new one.

        Args:
            factory: Function returning custom context.
        Returns:
            This instance.
        """
        self._context_factory = factory
        self._context = None
        return self

    def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement':
        """
        Creates new `Statement` which executes queries on this connection.

        Args:
            context: Context object used in the statement. If `None`, the context of this connection is used.
        Returns:
            Created statement.
        """
        return Statement(self, context or self.context)

Ancestors

Instance variables

var api

DB-API 2.0 module.

var conn

Original connection object.

var contextConnectionContext

Context object used for this connection.

Expand source code
@property
def context(self) -> ConnectionContext:
    """
    Context object used for this connection.
    """
    if not self._context:
        self._context = (self._context_factory or ConnectionContext)()
        self._context.identifier = self.identifier
    return self._context
var identifier

A string which identifies a connection.

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    return self.conn.close()
def commit(self) ‑> None
Expand source code
def commit(self) -> None:
    return self.conn.commit()
def cursor(self) ‑> Cursor
Expand source code
def cursor(self) -> dbapi.Cursor:
    return self.conn.cursor()
def rollback(self) ‑> None
Expand source code
def rollback(self) -> None:
    return self.conn.rollback()
def stmt(self, context: Optional[ConnectionContext] = None) ‑> Statement

Creates new Statement which executes queries on this connection.

Args

context
Context object used in the statement. If None, the context of this connection is used.

Returns

Created statement.

Expand source code
def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement':
    """
    Creates new `Statement` which executes queries on this connection.

    Args:
        context: Context object used in the statement. If `None`, the context of this connection is used.
    Returns:
        Created statement.
    """
    return Statement(self, context or self.context)
def use(self, factory: Callable[[], ConnectionContext]) ‑> typing_extensions.Self

Sets factory function of ConnectionContext to use custom context.

When the context is already set, it will be replaced with new one.

Args

factory
Function returning custom context.

Returns

This instance.

Expand source code
def use(self, factory: Callable[[], ConnectionContext]) -> Self:
    """
    Sets factory function of `ConnectionContext` to use custom context.

    When the context is already set, it will be replaced with new one.

    Args:
        factory: Function returning custom context.
    Returns:
        This instance.
    """
    self._context_factory = factory
    self._context = None
    return self
class Statement (conn: Connection, context: ConnectionContext)

This class has methods to execute query on containing connection and context.

Be sure to execute queries on this class to benefit from:

  • Query formatting using unified marker $_.
  • Query logging.
Expand source code
class Statement:
    """
    This class has methods to execute query on containing connection and context.

    Be sure to execute queries on this class to benefit from:

    - Query formatting using unified marker `$_`.
    - Query logging.
    """
    def __init__(self, conn: Connection, context: ConnectionContext):
        self.conn = conn
        self.context = context

    def prepare(self, sql: str, *args: Any, **kwargs: Any) -> tuple[str, PARAMS]:
        """
        Generates formatted query and a list of parameters.

        This method is invoked internally from `execute` to generate actual query and parameters.

        Args:
            sql: Query template which can contain unified marker.
            args: Positional parameters of query.
            kwargs: Keyword parameters of query.
        Returns:
            Formatted query and parameters.
        """
        paramstyle = self.context.config.paramstyle or self.conn.api.paramstyle

        return Sql(Marker.of(paramstyle), sql).render(*args, **kwargs)

    def execute(self, sql: str, *args: Any, **kwargs: Any) -> dbapi.Cursor:
        """
        Executes a query and returns a cursor object.

        Args:
            sql: Query template which can contain unified marker.
            args: Positional parameters of query.
            kwargs: Keyword parameters of query.
        Returns:
            Cursor object used for the query execution.
        """
        sql, params = self.prepare(sql, *args, **kwargs)

        c = self.conn.cursor()

        return self.context.execute(c, sql, params)

    def executemany(self, sql: str, seq_of_args: Sequence[PARAMS]) -> dbapi.Cursor:
        """
        Executes a query for each parameters in `seq_of_args` and returns a cursor object.

        Args:
            sql: Query template which can contain unified marker.
            seq_of_args: A sequence of parameters of the query.
        Returns:
            Cursor object used for the query execution.
        """
        def prepare(ps: Union[list[Any], dict[str, Any]]):
            args = list(ps) if isinstance(ps, (list, tuple)) else []
            kwargs = ps if isinstance(ps, dict) else {}
            return self.prepare(sql, *args, **kwargs)

        rendered, params = prepare(seq_of_args[0])
        seq_of_params: list[PARAMS] = [params]

        for i, ps in enumerate(seq_of_args[1:]):
            _, params = prepare(ps)
            seq_of_params.append(params)

        c = self.conn.cursor()

        return self.context.executemany(c, rendered, seq_of_params)

Methods

def execute(self, sql: str, *args: Any, **kwargs: Any) ‑> Cursor

Executes a query and returns a cursor object.

Args

sql
Query template which can contain unified marker.
args
Positional parameters of query.
kwargs
Keyword parameters of query.

Returns

Cursor object used for the query execution.

Expand source code
def execute(self, sql: str, *args: Any, **kwargs: Any) -> dbapi.Cursor:
    """
    Executes a query and returns a cursor object.

    Args:
        sql: Query template which can contain unified marker.
        args: Positional parameters of query.
        kwargs: Keyword parameters of query.
    Returns:
        Cursor object used for the query execution.
    """
    sql, params = self.prepare(sql, *args, **kwargs)

    c = self.conn.cursor()

    return self.context.execute(c, sql, params)
def executemany(self, sql: str, seq_of_args: collections.abc.Sequence[typing.Union[list[typing.Any], dict[str, typing.Any]]]) ‑> Cursor

Executes a query for each parameters in seq_of_args and returns a cursor object.

Args

sql
Query template which can contain unified marker.
seq_of_args
A sequence of parameters of the query.

Returns

Cursor object used for the query execution.

Expand source code
def executemany(self, sql: str, seq_of_args: Sequence[PARAMS]) -> dbapi.Cursor:
    """
    Executes a query for each parameters in `seq_of_args` and returns a cursor object.

    Args:
        sql: Query template which can contain unified marker.
        seq_of_args: A sequence of parameters of the query.
    Returns:
        Cursor object used for the query execution.
    """
    def prepare(ps: Union[list[Any], dict[str, Any]]):
        args = list(ps) if isinstance(ps, (list, tuple)) else []
        kwargs = ps if isinstance(ps, dict) else {}
        return self.prepare(sql, *args, **kwargs)

    rendered, params = prepare(seq_of_args[0])
    seq_of_params: list[PARAMS] = [params]

    for i, ps in enumerate(seq_of_args[1:]):
        _, params = prepare(ps)
        seq_of_params.append(params)

    c = self.conn.cursor()

    return self.context.executemany(c, rendered, seq_of_params)
def prepare(self, sql: str, *args: Any, **kwargs: Any) ‑> tuple[str, typing.Union[list[typing.Any], dict[str, typing.Any]]]

Generates formatted query and a list of parameters.

This method is invoked internally from execute to generate actual query and parameters.

Args

sql
Query template which can contain unified marker.
args
Positional parameters of query.
kwargs
Keyword parameters of query.

Returns

Formatted query and parameters.

Expand source code
def prepare(self, sql: str, *args: Any, **kwargs: Any) -> tuple[str, PARAMS]:
    """
    Generates formatted query and a list of parameters.

    This method is invoked internally from `execute` to generate actual query and parameters.

    Args:
        sql: Query template which can contain unified marker.
        args: Positional parameters of query.
        kwargs: Keyword parameters of query.
    Returns:
        Formatted query and parameters.
    """
    paramstyle = self.context.config.paramstyle or self.conn.api.paramstyle

    return Sql(Marker.of(paramstyle), sql).render(*args, **kwargs)