Module pyracmon.mixin

This module provides mixin type which supplies each model type various DB operations as class methods.

Expand source code
"""
This module provides mixin type which supplies each model type various DB operations as class methods.
"""
from collections.abc import Mapping, Sequence, Callable
from functools import reduce
from typing import Any, Optional, Union, Literal, cast, overload, Protocol, TYPE_CHECKING
from typing_extensions import Self
from .connection import Connection
from .dbapi import Cursor
from .model import Meta, Column, Record, parse_pks, check_columns, model_values, extract_pks
from .select import SelectMixin, AliasedColumn, read_row
from .query import Q, Expression, Conditional, where
from .clause import ORDER, ranged_by, order_by, values
from .util import key_to_index, Qualifier, PKS


if TYPE_CHECKING:
    class CRUDInternalMeta(Meta):
        @classmethod
        def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: ...

        @classmethod
        def support_returning(cls, db: Connection) -> bool: ...
else:
    class CRUDInternalMeta:
        pass


class CRUDMixin(SelectMixin, CRUDInternalMeta):
    """
    Default mixin providing class methods available on all model types.

    Every method takes the DB connection object as its first argument.
    Following arguments are defined in several methods commonly.

    - `pks`
        - Names and values of all primary key columns in form of `dict` .
        - A primary key value. This form is allowed when the table has just one primary key column.
        - e.g. If the table has a single primary key `id` of int, `1` is available to spcecify the row of `id = 1` .
        - e.g. If the table has multiple primary keys `intid` and `strid`, `dict(intid=1, strid="abc")` is a valid argument.
    - `record`
        - A model object or a mapping from column name to its value, which corresponds to a table row.
        - Only columns contained in the record is affected by the operation.
        - e.g. When `dict(c1=1, c2="abc")` is passed for insertion, only `c1` and `c2` are set in INSERT query.
        - e.g. For update, only the columns will be updated. Other columns are not affected.
    - `condition`
        - Query condition which will compose WHERE clause.
        - `pyracmon.query.Q` is a factory class to create condition object.
        - When `None` is passed, all rows are subject to the operation.
    - `qualifier`
        - A mapping from column name to a function which qualifies a placeholder passed by an argument.
        - Detail of qualifier function is described below.
    - `lock`
        - This is reserved argument for locking statement but works just as the postfix of the query currently.
        - The usage will be changed in future version.

    Qualifier function is used typically to convert or replace placeholder marker in insert/update query.
    By default, those queries contain markers like `insert into t (c1, c2) values (?, ?)` (`Q` parameter style).
    We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.

    ```python
    t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()"))
    # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())
    ```

    Be aware that when model object is passed, its column values may differ from actual values in DB after query.
    """
    @classmethod
    def count(cls, db: Connection, condition: Conditional = Q.of()) -> int:
        """
        Count rows which satisfies the condition.

        ```python
        t.count(db, Q.eq(c1=1))
        # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
        Returns:
            The number of rows.
        """
        wc, wp = where(condition)
        c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp)
        return c.fetchone()[0] # type: ignore

    @classmethod
    def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]:
        """
        Fetch a record by primary key(s).

        ```python
        t.fetch(db, 1)
        # SQL: SELECT * FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            lock: Locking statement.
        Returns:
            A model object if exists, otherwise `None`.
        """
        cols, vals = parse_pks(cls, pks)
        cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        wc, wp = where(cond)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)
        row = c.fetchone()
        return read_row(row, *s)[0] if row else None

    @classmethod
    def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]:
        """
        Fetch a record by sequence of primary key(s).

        This method simply concatenates equality conditions on primary key by OR operator.

        ```python
        t.fetch_many(db, [1, 2, 3])
        # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
        ```

        Args:
            db: DB connection.
            seq_pks: Sequence of primary key value.
            lock: Locking statement.
            per_page: Maximum number of keys for an execution of query.
        Returns:
            Model objects in the same order as passed sequence.
        """
        res = []
        index = 0
        while index < len(seq_pks):
            ordered_pks = []
            cond = Q.of()
            for pks in seq_pks[index:index+per_page]:
                cols, vals = parse_pks(cls, pks)
                ordered_pks.append(tuple(v for v in vals))
                cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
            wc, wp = where(cond)
            s = cls.select()
            c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)

            record_map = {}
            for r in [read_row(row, *s)[0] for row in c.fetchall()]:
                pk_values = {c.name:v for c, v in r if c.pk}
                record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r

            res.extend([record_map[k] for k in ordered_pks if k in record_map])
            index += per_page

        return res

    @classmethod
    def fetch_where(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        orders: Mapping[Union[str, AliasedColumn], ORDER] = {},
        limit: Optional[int] = None,
        offset: Optional[int] = None,
        lock: Optional[Any] = None,
    ) -> list[Self]:
        """
        Fetch records which satisfy the condition.

        ```python
        t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
        # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            orders: Ordering specification where key is column name and value denotes whether the order is ascending or not.
            limit: Maximum nuber of rows to fetch. If `None`, all rows are returned.
            offset: The number of rows to skip.
            lock: Locking statement.
        Returns:
            Model objects.
        """
        wc, wp = where(condition)
        rc, rp = ranged_by(limit, offset)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp))
        return [read_row(row, *s)[0] for row in c.fetchall()]

    @classmethod
    def fetch_one(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        lock: Optional[Any] = None,
    ) -> Optional[Self]:
        """
        Fetch a record which satisfies the condition.

        `ValueError` raises When multiple records are found.
        Use this method for queries which certainly returns a single row, such as search by unique key.

        ```python
        t.fetch_one(db, Q.eq(c1=1))
        # SQL: SELECT * FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            lock: Locking statement.
        Returns:
            Model objects If exists, otherwise `None`.
        """
        rs = cls.fetch_where(db, condition, lock=lock)

        if not rs:
            return None
        elif len(rs) == 1:
            return rs[0]
        else:
            raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")

    @classmethod
    def _insert_sql(cls, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}) -> tuple[str, list[str], list[Any]]:
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        value_dict = model_values(cls, model)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def exp(v):
            return lambda i: v

        if any(isinstance(v, Expression) for v in vals):
            key_gen = []
            org_vals = vals
            vals = []
            for v in org_vals:
                if isinstance(v, Expression):
                    key_gen.append(exp(v))
                    vals.extend(v.params)
                else:
                    key_gen.append(lambda i: None)
                    vals.append(v)
            values_clause = values(key_gen, 1, ordered_qs)
        else:
            values_clause = values(len(cols), 1, ordered_qs)

        return f"INSERT INTO {cls.name} ({', '.join(cols)}) VALUES {values_clause}", cols, vals

    @classmethod
    def insert(
        cls,
        db: Connection,
        record: Union[Self, dict[str, Any]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ) -> Self:
        """
        Insert a record.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        ```python
        t.insert(db, dict(c1=1, c2=2))
        # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted record with complete and correct column values.
        Returns:
            Model of inserted record.
        """
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        sql, _, vals = cls._insert_sql(record, qualifier)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *vals)
                s = cls.select()
                return read_row(c.fetchone(), *s)[0]
            else:
                # REVIEW
                # Inserted row can't be specified from the table where no primary keys are defined .
                pass

        db.stmt().execute(sql, *vals)
        for c, v in cls.last_sequences(db, 1):
            setattr(model, c.name, v)
        return model

    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[False] = False) -> list[Self]: ...
    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def insert_many(
        cls,
        db: Connection,
        records: list[Union[Self, dict[str, Any]]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Insert records.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted records with complete and correct column values.
        Returns:
            Models of inserted records or cursor.
        """
        if len(records) == 0:
            return []

        models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records]

        seq_of_params = []

        sql, cols, params = cls._insert_sql(models[0], qualifier)

        cols = set(cols)
        seq_of_params.append(params)

        for m in models[1:]:
            value_dict = model_values(cls, m)
            check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True)
            # REVIEW:
            # The consistency among columns where expression is set is not checked.
            _, _, params = cls._insert_sql(m, qualifier)
            seq_of_params.append(params)

        db.stmt().executemany(sql, seq_of_params)
        num = len(records)
        for c, v in cls.last_sequences(db, num):
            for i, m in enumerate(models):
                setattr(m, c.name, v - (num - i - 1))

        if returning:
            seq_pks = [extract_pks(cls, m) for m in models]
            return cls.fetch_many(db, seq_pks)
        else:
            return models

    @classmethod
    def _update_sql(cls, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, allow_all: bool = True) -> tuple[str, list[str], list[Any]]:
        value_dict = model_values(cls, record, excludes_pk=True)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def set_col(acc, icv):
            i, (c, v) = icv
            if isinstance(v, Expression):
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)(v.expression)}"
                params = v.params
            else:
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)('$_')}"
                params = [v]
            acc[0].append(clause)
            acc[1].extend(params)
            return acc

        setters, params = reduce(set_col, enumerate(zip(cols, vals)), ([], []))

        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Update query to update all records is not allowed.")

        return f"UPDATE {cls.name} SET {', '.join(setters)}{_spacer(wc)}", cols, params + wp

    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def update(
        cls,
        db: Connection,
        pks: PKS,
        record: Record,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update a record by primary key(s).

        This method only updates columns which are found in `record` except for primary key(s).

        ```python
        t.update(db, 1, dict(c1=1, c2=2))
        # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            Whether the record exists and updated or updated record model.
        """
        cols, vals = parse_pks(cls, pks)
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        if returning:
            if cls.support_returning(db):
                models = cls.update_where(db, record, condition, qualifier, returning=True)
                return models[0] if models else None
            else:
                models = cls.update_where(db, record, condition, qualifier, returning=False)
                return cls.fetch(db, pks)
        else:
            return cls.update_where(db, record, condition, qualifier, returning=False) == 1

    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> int: ...
    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def update_many(
        cls,
        db: Connection,
        records: Sequence[Record],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            record: Sequence of objects contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            The number of affected rows or updated records.
        """
        if len(records) == 0:
            return [] if returning else 0

        keys = {c.name for c in cls.columns if c.pk}
        if len(keys) == 0:
            raise ValueError(f"update_many is not available because {cls} does not have primary key columns.")

        def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]):
            if cv[0] in keys:
                acc[0][cv[0]] = cv[1]
            else:
                acc[1][cv[0]] = cv[1]
            return acc

        seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = []
        target_columns: Optional[set[str]] = None

        for vs in [model_values(cls, r, excludes_pk=False) for r in records]:
            if not keys < vs.keys():
                raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.")
            pks, rec = reduce(classify, vs.items(), ({}, {}))
            if target_columns is None:
                check_columns(cls, rec)
                target_columns = set(rec.keys())
            else:
                check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore
            seq_of_values.append((pks, rec))

        sql_first = ""
        seq_of_params: list[list[Any]] = []

        for pks, rec in seq_of_values:
            cols, vals = parse_pks(cls, pks)
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])

            sql, _, params = cls._update_sql(rec, condition, qualifier)
            if not sql_first:
                sql_first = sql
            seq_of_params.append(params)

        if returning:
            db.stmt().executemany(f"{sql_first}", seq_of_params)
            return cls.fetch_many(db, [pks for pks, _ in seq_of_values])
        else:
            return db.stmt().executemany(sql_first, seq_of_params).rowcount

    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[False] = False, allow_all: bool = False) -> int: ...
    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[True] = True, allow_all: bool = False) -> list[Self]: ...
    @classmethod
    def update_where(
        cls,
        db: Connection,
        record: Record,
        condition: Conditional,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
        allow_all: bool = True,
    ):
        """
        Update records which satisfy the condition.

        ```python
        t.update(db, dict(c2=2), Q.eq(c1=1))
        # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            condition: Query condition.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or updated records.
        """
        sql, _, params = cls._update_sql(record, condition, qualifier, allow_all)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *params)
                s = cls.select()
                return [read_row(row, *s)[0] for row in c.fetchall()]
            else:
                raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.")
        else:
            c = db.stmt().execute(sql, *params)

            return c.rowcount

    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def delete(cls, db: Connection, pks: PKS, /, returning: bool = False):
        """
        Delete a record by primary key(s).

        ```python
        t.delete(db, 1)
        # SQL: DELETE FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            returning: Flag to return deleted record if any.
        Returns:
            Whether the record exists and deleted or delete record if any.
        """
        cols, vals = parse_pks(cls, pks)

        if returning:
            models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True)
            return models[0] if models else None
        else:
            return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1

    @classmethod
    @overload
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[False] = False) -> int: ...
    @overload
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False):
        """
        Delete a records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            pks: Primary keys or objects each of which contains values of all primary keys.
            returning: Flag to return deleted records.
        Returns:
            The number of affected rows or deleted records.
        """
        if len(pks) == 0:
            return None

        seq_of_pks: list[dict[str, Any]] = []
        for rec in pks:
            if isinstance(rec, (dict, cls)):
                seq_of_pks.append(extract_pks(cls, rec))
            else:
                cols, vals = parse_pks(cls, rec)
                seq_of_pks.append(dict(zip(cols, vals)))

        condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()])
        wc, wp = where(condition)

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"
        seq_of_params: list[list[Any]] = [wp]

        for v in seq_of_pks[1:]:
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()])
            _, wp = where(condition)
            seq_of_params.append(wp)

        if returning:
            models = cls.fetch_many(db, seq_of_pks)
            db.stmt().executemany(sql, seq_of_params)
            return models
        else:
            return db.stmt().executemany(sql, seq_of_params).rowcount

    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[False] = False, allow_all: bool = True) -> int: ...
    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[True] = True, allow_all: bool = True) -> list[Self]: ...
    @classmethod
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True):
        """
        Delete records which satisfy the condition.

        ```python
        t.delete(db, Q.eq(c1=1))
        # SQL: DELETE FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            returning: Flag to return deleted records.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or deleted records.
        """
        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Delete query to delete all records is not allowed.")

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *wp)
                return [read_row(row, *cls.select())[0] for row in c.fetchall()]
            else:
                current = cls.fetch_where(db, condition)
                c = db.stmt().execute(sql, *wp)
                return current
        else:
            return db.stmt().execute(sql, *wp).rowcount

    @classmethod
    def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]:
        """
        Returns the sequential (auto incremental) values of a table generated by the latest insertion.

        Result contains every sequential columns and their values.
        When the latest query inserts multiple rows, only the last (= biggest) value is returned.

        This method should be overridden by another mixin class defined in dialect module.

        Args:
            db: DB connection.
            num: The number of records inserted by the latest query.
        Returns:
            List of pairs of column and its values.
        """
        return []

    @classmethod
    def support_returning(cls, db: Connection) -> bool:
        """
        Checks whehter this DBMS support **RETURNING** clause or not.

        Args:
            db: DB connection.
        Returns:
            Whehter this DBMS support **RETURNING** clause or not.
        """
        return False


def _spacer(s):
    return (" " + str(s)) if s else ""

Classes

class CRUDInternalMeta
Expand source code
class CRUDInternalMeta(Meta):
    @classmethod
    def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: ...

    @classmethod
    def support_returning(cls, db: Connection) -> bool: ...

Subclasses

class CRUDMixin

Default mixin providing class methods available on all model types.

Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly.

  • pks
    • Names and values of all primary key columns in form of dict .
    • A primary key value. This form is allowed when the table has just one primary key column.
    • e.g. If the table has a single primary key id of int, 1 is available to spcecify the row of id = 1 .
    • e.g. If the table has multiple primary keys intid and strid, dict(intid=1, strid="abc") is a valid argument.
  • record
    • A model object or a mapping from column name to its value, which corresponds to a table row.
    • Only columns contained in the record is affected by the operation.
    • e.g. When dict(c1=1, c2="abc") is passed for insertion, only c1 and c2 are set in INSERT query.
    • e.g. For update, only the columns will be updated. Other columns are not affected.
  • condition
    • Query condition which will compose WHERE clause.
    • Q is a factory class to create condition object.
    • When None is passed, all rows are subject to the operation.
  • qualifier
    • A mapping from column name to a function which qualifies a placeholder passed by an argument.
    • Detail of qualifier function is described below.
  • lock
    • This is reserved argument for locking statement but works just as the postfix of the query currently.
    • The usage will be changed in future version.

Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like insert into t (c1, c2) values (?, ?) (Q parameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.

t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()"))
# SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())

Be aware that when model object is passed, its column values may differ from actual values in DB after query.

Expand source code
class CRUDMixin(SelectMixin, CRUDInternalMeta):
    """
    Default mixin providing class methods available on all model types.

    Every method takes the DB connection object as its first argument.
    Following arguments are defined in several methods commonly.

    - `pks`
        - Names and values of all primary key columns in form of `dict` .
        - A primary key value. This form is allowed when the table has just one primary key column.
        - e.g. If the table has a single primary key `id` of int, `1` is available to spcecify the row of `id = 1` .
        - e.g. If the table has multiple primary keys `intid` and `strid`, `dict(intid=1, strid="abc")` is a valid argument.
    - `record`
        - A model object or a mapping from column name to its value, which corresponds to a table row.
        - Only columns contained in the record is affected by the operation.
        - e.g. When `dict(c1=1, c2="abc")` is passed for insertion, only `c1` and `c2` are set in INSERT query.
        - e.g. For update, only the columns will be updated. Other columns are not affected.
    - `condition`
        - Query condition which will compose WHERE clause.
        - `pyracmon.query.Q` is a factory class to create condition object.
        - When `None` is passed, all rows are subject to the operation.
    - `qualifier`
        - A mapping from column name to a function which qualifies a placeholder passed by an argument.
        - Detail of qualifier function is described below.
    - `lock`
        - This is reserved argument for locking statement but works just as the postfix of the query currently.
        - The usage will be changed in future version.

    Qualifier function is used typically to convert or replace placeholder marker in insert/update query.
    By default, those queries contain markers like `insert into t (c1, c2) values (?, ?)` (`Q` parameter style).
    We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.

    ```python
    t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()"))
    # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())
    ```

    Be aware that when model object is passed, its column values may differ from actual values in DB after query.
    """
    @classmethod
    def count(cls, db: Connection, condition: Conditional = Q.of()) -> int:
        """
        Count rows which satisfies the condition.

        ```python
        t.count(db, Q.eq(c1=1))
        # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
        Returns:
            The number of rows.
        """
        wc, wp = where(condition)
        c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp)
        return c.fetchone()[0] # type: ignore

    @classmethod
    def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]:
        """
        Fetch a record by primary key(s).

        ```python
        t.fetch(db, 1)
        # SQL: SELECT * FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            lock: Locking statement.
        Returns:
            A model object if exists, otherwise `None`.
        """
        cols, vals = parse_pks(cls, pks)
        cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        wc, wp = where(cond)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)
        row = c.fetchone()
        return read_row(row, *s)[0] if row else None

    @classmethod
    def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]:
        """
        Fetch a record by sequence of primary key(s).

        This method simply concatenates equality conditions on primary key by OR operator.

        ```python
        t.fetch_many(db, [1, 2, 3])
        # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
        ```

        Args:
            db: DB connection.
            seq_pks: Sequence of primary key value.
            lock: Locking statement.
            per_page: Maximum number of keys for an execution of query.
        Returns:
            Model objects in the same order as passed sequence.
        """
        res = []
        index = 0
        while index < len(seq_pks):
            ordered_pks = []
            cond = Q.of()
            for pks in seq_pks[index:index+per_page]:
                cols, vals = parse_pks(cls, pks)
                ordered_pks.append(tuple(v for v in vals))
                cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
            wc, wp = where(cond)
            s = cls.select()
            c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)

            record_map = {}
            for r in [read_row(row, *s)[0] for row in c.fetchall()]:
                pk_values = {c.name:v for c, v in r if c.pk}
                record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r

            res.extend([record_map[k] for k in ordered_pks if k in record_map])
            index += per_page

        return res

    @classmethod
    def fetch_where(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        orders: Mapping[Union[str, AliasedColumn], ORDER] = {},
        limit: Optional[int] = None,
        offset: Optional[int] = None,
        lock: Optional[Any] = None,
    ) -> list[Self]:
        """
        Fetch records which satisfy the condition.

        ```python
        t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
        # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            orders: Ordering specification where key is column name and value denotes whether the order is ascending or not.
            limit: Maximum nuber of rows to fetch. If `None`, all rows are returned.
            offset: The number of rows to skip.
            lock: Locking statement.
        Returns:
            Model objects.
        """
        wc, wp = where(condition)
        rc, rp = ranged_by(limit, offset)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp))
        return [read_row(row, *s)[0] for row in c.fetchall()]

    @classmethod
    def fetch_one(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        lock: Optional[Any] = None,
    ) -> Optional[Self]:
        """
        Fetch a record which satisfies the condition.

        `ValueError` raises When multiple records are found.
        Use this method for queries which certainly returns a single row, such as search by unique key.

        ```python
        t.fetch_one(db, Q.eq(c1=1))
        # SQL: SELECT * FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            lock: Locking statement.
        Returns:
            Model objects If exists, otherwise `None`.
        """
        rs = cls.fetch_where(db, condition, lock=lock)

        if not rs:
            return None
        elif len(rs) == 1:
            return rs[0]
        else:
            raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")

    @classmethod
    def _insert_sql(cls, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}) -> tuple[str, list[str], list[Any]]:
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        value_dict = model_values(cls, model)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def exp(v):
            return lambda i: v

        if any(isinstance(v, Expression) for v in vals):
            key_gen = []
            org_vals = vals
            vals = []
            for v in org_vals:
                if isinstance(v, Expression):
                    key_gen.append(exp(v))
                    vals.extend(v.params)
                else:
                    key_gen.append(lambda i: None)
                    vals.append(v)
            values_clause = values(key_gen, 1, ordered_qs)
        else:
            values_clause = values(len(cols), 1, ordered_qs)

        return f"INSERT INTO {cls.name} ({', '.join(cols)}) VALUES {values_clause}", cols, vals

    @classmethod
    def insert(
        cls,
        db: Connection,
        record: Union[Self, dict[str, Any]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ) -> Self:
        """
        Insert a record.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        ```python
        t.insert(db, dict(c1=1, c2=2))
        # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted record with complete and correct column values.
        Returns:
            Model of inserted record.
        """
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        sql, _, vals = cls._insert_sql(record, qualifier)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *vals)
                s = cls.select()
                return read_row(c.fetchone(), *s)[0]
            else:
                # REVIEW
                # Inserted row can't be specified from the table where no primary keys are defined .
                pass

        db.stmt().execute(sql, *vals)
        for c, v in cls.last_sequences(db, 1):
            setattr(model, c.name, v)
        return model

    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[False] = False) -> list[Self]: ...
    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def insert_many(
        cls,
        db: Connection,
        records: list[Union[Self, dict[str, Any]]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Insert records.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted records with complete and correct column values.
        Returns:
            Models of inserted records or cursor.
        """
        if len(records) == 0:
            return []

        models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records]

        seq_of_params = []

        sql, cols, params = cls._insert_sql(models[0], qualifier)

        cols = set(cols)
        seq_of_params.append(params)

        for m in models[1:]:
            value_dict = model_values(cls, m)
            check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True)
            # REVIEW:
            # The consistency among columns where expression is set is not checked.
            _, _, params = cls._insert_sql(m, qualifier)
            seq_of_params.append(params)

        db.stmt().executemany(sql, seq_of_params)
        num = len(records)
        for c, v in cls.last_sequences(db, num):
            for i, m in enumerate(models):
                setattr(m, c.name, v - (num - i - 1))

        if returning:
            seq_pks = [extract_pks(cls, m) for m in models]
            return cls.fetch_many(db, seq_pks)
        else:
            return models

    @classmethod
    def _update_sql(cls, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, allow_all: bool = True) -> tuple[str, list[str], list[Any]]:
        value_dict = model_values(cls, record, excludes_pk=True)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def set_col(acc, icv):
            i, (c, v) = icv
            if isinstance(v, Expression):
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)(v.expression)}"
                params = v.params
            else:
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)('$_')}"
                params = [v]
            acc[0].append(clause)
            acc[1].extend(params)
            return acc

        setters, params = reduce(set_col, enumerate(zip(cols, vals)), ([], []))

        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Update query to update all records is not allowed.")

        return f"UPDATE {cls.name} SET {', '.join(setters)}{_spacer(wc)}", cols, params + wp

    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def update(
        cls,
        db: Connection,
        pks: PKS,
        record: Record,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update a record by primary key(s).

        This method only updates columns which are found in `record` except for primary key(s).

        ```python
        t.update(db, 1, dict(c1=1, c2=2))
        # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            Whether the record exists and updated or updated record model.
        """
        cols, vals = parse_pks(cls, pks)
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        if returning:
            if cls.support_returning(db):
                models = cls.update_where(db, record, condition, qualifier, returning=True)
                return models[0] if models else None
            else:
                models = cls.update_where(db, record, condition, qualifier, returning=False)
                return cls.fetch(db, pks)
        else:
            return cls.update_where(db, record, condition, qualifier, returning=False) == 1

    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> int: ...
    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def update_many(
        cls,
        db: Connection,
        records: Sequence[Record],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            record: Sequence of objects contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            The number of affected rows or updated records.
        """
        if len(records) == 0:
            return [] if returning else 0

        keys = {c.name for c in cls.columns if c.pk}
        if len(keys) == 0:
            raise ValueError(f"update_many is not available because {cls} does not have primary key columns.")

        def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]):
            if cv[0] in keys:
                acc[0][cv[0]] = cv[1]
            else:
                acc[1][cv[0]] = cv[1]
            return acc

        seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = []
        target_columns: Optional[set[str]] = None

        for vs in [model_values(cls, r, excludes_pk=False) for r in records]:
            if not keys < vs.keys():
                raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.")
            pks, rec = reduce(classify, vs.items(), ({}, {}))
            if target_columns is None:
                check_columns(cls, rec)
                target_columns = set(rec.keys())
            else:
                check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore
            seq_of_values.append((pks, rec))

        sql_first = ""
        seq_of_params: list[list[Any]] = []

        for pks, rec in seq_of_values:
            cols, vals = parse_pks(cls, pks)
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])

            sql, _, params = cls._update_sql(rec, condition, qualifier)
            if not sql_first:
                sql_first = sql
            seq_of_params.append(params)

        if returning:
            db.stmt().executemany(f"{sql_first}", seq_of_params)
            return cls.fetch_many(db, [pks for pks, _ in seq_of_values])
        else:
            return db.stmt().executemany(sql_first, seq_of_params).rowcount

    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[False] = False, allow_all: bool = False) -> int: ...
    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[True] = True, allow_all: bool = False) -> list[Self]: ...
    @classmethod
    def update_where(
        cls,
        db: Connection,
        record: Record,
        condition: Conditional,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
        allow_all: bool = True,
    ):
        """
        Update records which satisfy the condition.

        ```python
        t.update(db, dict(c2=2), Q.eq(c1=1))
        # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            condition: Query condition.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or updated records.
        """
        sql, _, params = cls._update_sql(record, condition, qualifier, allow_all)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *params)
                s = cls.select()
                return [read_row(row, *s)[0] for row in c.fetchall()]
            else:
                raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.")
        else:
            c = db.stmt().execute(sql, *params)

            return c.rowcount

    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def delete(cls, db: Connection, pks: PKS, /, returning: bool = False):
        """
        Delete a record by primary key(s).

        ```python
        t.delete(db, 1)
        # SQL: DELETE FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            returning: Flag to return deleted record if any.
        Returns:
            Whether the record exists and deleted or delete record if any.
        """
        cols, vals = parse_pks(cls, pks)

        if returning:
            models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True)
            return models[0] if models else None
        else:
            return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1

    @classmethod
    @overload
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[False] = False) -> int: ...
    @overload
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False):
        """
        Delete a records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            pks: Primary keys or objects each of which contains values of all primary keys.
            returning: Flag to return deleted records.
        Returns:
            The number of affected rows or deleted records.
        """
        if len(pks) == 0:
            return None

        seq_of_pks: list[dict[str, Any]] = []
        for rec in pks:
            if isinstance(rec, (dict, cls)):
                seq_of_pks.append(extract_pks(cls, rec))
            else:
                cols, vals = parse_pks(cls, rec)
                seq_of_pks.append(dict(zip(cols, vals)))

        condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()])
        wc, wp = where(condition)

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"
        seq_of_params: list[list[Any]] = [wp]

        for v in seq_of_pks[1:]:
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()])
            _, wp = where(condition)
            seq_of_params.append(wp)

        if returning:
            models = cls.fetch_many(db, seq_of_pks)
            db.stmt().executemany(sql, seq_of_params)
            return models
        else:
            return db.stmt().executemany(sql, seq_of_params).rowcount

    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[False] = False, allow_all: bool = True) -> int: ...
    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[True] = True, allow_all: bool = True) -> list[Self]: ...
    @classmethod
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True):
        """
        Delete records which satisfy the condition.

        ```python
        t.delete(db, Q.eq(c1=1))
        # SQL: DELETE FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            returning: Flag to return deleted records.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or deleted records.
        """
        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Delete query to delete all records is not allowed.")

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *wp)
                return [read_row(row, *cls.select())[0] for row in c.fetchall()]
            else:
                current = cls.fetch_where(db, condition)
                c = db.stmt().execute(sql, *wp)
                return current
        else:
            return db.stmt().execute(sql, *wp).rowcount

    @classmethod
    def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]:
        """
        Returns the sequential (auto incremental) values of a table generated by the latest insertion.

        Result contains every sequential columns and their values.
        When the latest query inserts multiple rows, only the last (= biggest) value is returned.

        This method should be overridden by another mixin class defined in dialect module.

        Args:
            db: DB connection.
            num: The number of records inserted by the latest query.
        Returns:
            List of pairs of column and its values.
        """
        return []

    @classmethod
    def support_returning(cls, db: Connection) -> bool:
        """
        Checks whehter this DBMS support **RETURNING** clause or not.

        Args:
            db: DB connection.
        Returns:
            Whehter this DBMS support **RETURNING** clause or not.
        """
        return False

Ancestors

Static methods

def count(db: Connection, condition: Conditional = Condition: '' -- []) ‑> int

Count rows which satisfies the condition.

t.count(db, Q.eq(c1=1))
# SQL: SELECT COUNT(*) FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.

Returns

The number of rows.

Expand source code
@classmethod
def count(cls, db: Connection, condition: Conditional = Q.of()) -> int:
    """
    Count rows which satisfies the condition.

    ```python
    t.count(db, Q.eq(c1=1))
    # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
    Returns:
        The number of rows.
    """
    wc, wp = where(condition)
    c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp)
    return c.fetchone()[0] # type: ignore
def delete(db: Connection, pks: Union[Any, dict[str, Any]], /, returning: bool = False)

Delete a record by primary key(s).

t.delete(db, 1)
# SQL: DELETE FROM t WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
returning
Flag to return deleted record if any.

Returns

Whether the record exists and deleted or delete record if any.

Expand source code
@classmethod
def delete(cls, db: Connection, pks: PKS, /, returning: bool = False):
    """
    Delete a record by primary key(s).

    ```python
    t.delete(db, 1)
    # SQL: DELETE FROM t WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        returning: Flag to return deleted record if any.
    Returns:
        Whether the record exists and deleted or delete record if any.
    """
    cols, vals = parse_pks(cls, pks)

    if returning:
        models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True)
        return models[0] if models else None
    else:
        return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1
def delete_many(db: Connection, pks: Union[collections.abc.Sequence[Union[Any, dict[str, Any]]], collections.abc.Sequence[Union[Meta, dict[str, Any]]]], /, returning: bool = False)

Delete a records by set of primary key(s).

This method invokes on executemany defined in DB-API 2.0. Whether it is optimized compared to execute depends on DB driver.

Args

db
DB connection.
pks
Primary keys or objects each of which contains values of all primary keys.
returning
Flag to return deleted records.

Returns

The number of affected rows or deleted records.

Expand source code
@classmethod
def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False):
    """
    Delete a records by set of primary key(s).

    This method invokes on `executemany` defined in DB-API 2.0.
    Whether it is optimized compared to `execute` depends on DB driver.

    Args:
        db: DB connection.
        pks: Primary keys or objects each of which contains values of all primary keys.
        returning: Flag to return deleted records.
    Returns:
        The number of affected rows or deleted records.
    """
    if len(pks) == 0:
        return None

    seq_of_pks: list[dict[str, Any]] = []
    for rec in pks:
        if isinstance(rec, (dict, cls)):
            seq_of_pks.append(extract_pks(cls, rec))
        else:
            cols, vals = parse_pks(cls, rec)
            seq_of_pks.append(dict(zip(cols, vals)))

    condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()])
    wc, wp = where(condition)

    sql = f"DELETE FROM {cls.name}{_spacer(wc)}"
    seq_of_params: list[list[Any]] = [wp]

    for v in seq_of_pks[1:]:
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()])
        _, wp = where(condition)
        seq_of_params.append(wp)

    if returning:
        models = cls.fetch_many(db, seq_of_pks)
        db.stmt().executemany(sql, seq_of_params)
        return models
    else:
        return db.stmt().executemany(sql, seq_of_params).rowcount
def delete_where(db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True)

Delete records which satisfy the condition.

t.delete(db, Q.eq(c1=1))
# SQL: DELETE FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.
returning
Flag to return deleted records.
allow_all
If False, empty condition raises ValueError.

Returns

The number of affected rows or deleted records.

Expand source code
@classmethod
def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True):
    """
    Delete records which satisfy the condition.

    ```python
    t.delete(db, Q.eq(c1=1))
    # SQL: DELETE FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        returning: Flag to return deleted records.
        allow_all: If `False`, empty condition raises `ValueError`.
    Returns:
        The number of affected rows or deleted records.
    """
    wc, wp = where(condition)
    if wc == "" and not allow_all:
        raise ValueError("Delete query to delete all records is not allowed.")

    sql = f"DELETE FROM {cls.name}{_spacer(wc)}"

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *wp)
            return [read_row(row, *cls.select())[0] for row in c.fetchall()]
        else:
            current = cls.fetch_where(db, condition)
            c = db.stmt().execute(sql, *wp)
            return current
    else:
        return db.stmt().execute(sql, *wp).rowcount
def fetch(db: Connection, pks: Union[Any, dict[str, Any]], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]

Fetch a record by primary key(s).

t.fetch(db, 1)
# SQL: SELECT * FROM t WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
lock
Locking statement.

Returns

A model object if exists, otherwise None.

Expand source code
@classmethod
def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]:
    """
    Fetch a record by primary key(s).

    ```python
    t.fetch(db, 1)
    # SQL: SELECT * FROM t WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        lock: Locking statement.
    Returns:
        A model object if exists, otherwise `None`.
    """
    cols, vals = parse_pks(cls, pks)
    cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
    wc, wp = where(cond)
    s = cls.select()
    c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)
    row = c.fetchone()
    return read_row(row, *s)[0] if row else None
def fetch_many(db: Connection, seq_pks: collections.abc.Sequence[typing.Union[typing.Any, dict[str, typing.Any]]], lock: Optional[Any] = None, /, per_page: int = 1000) ‑> list[typing_extensions.Self]

Fetch a record by sequence of primary key(s).

This method simply concatenates equality conditions on primary key by OR operator.

t.fetch_many(db, [1, 2, 3])
# SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3

Args

db
DB connection.
seq_pks
Sequence of primary key value.
lock
Locking statement.
per_page
Maximum number of keys for an execution of query.

Returns

Model objects in the same order as passed sequence.

Expand source code
@classmethod
def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]:
    """
    Fetch a record by sequence of primary key(s).

    This method simply concatenates equality conditions on primary key by OR operator.

    ```python
    t.fetch_many(db, [1, 2, 3])
    # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
    ```

    Args:
        db: DB connection.
        seq_pks: Sequence of primary key value.
        lock: Locking statement.
        per_page: Maximum number of keys for an execution of query.
    Returns:
        Model objects in the same order as passed sequence.
    """
    res = []
    index = 0
    while index < len(seq_pks):
        ordered_pks = []
        cond = Q.of()
        for pks in seq_pks[index:index+per_page]:
            cols, vals = parse_pks(cls, pks)
            ordered_pks.append(tuple(v for v in vals))
            cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        wc, wp = where(cond)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)

        record_map = {}
        for r in [read_row(row, *s)[0] for row in c.fetchall()]:
            pk_values = {c.name:v for c, v in r if c.pk}
            record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r

        res.extend([record_map[k] for k in ordered_pks if k in record_map])
        index += per_page

    return res
def fetch_one(db: Connection, condition: Conditional = Condition: '' -- [], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]

Fetch a record which satisfies the condition.

ValueError raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key.

t.fetch_one(db, Q.eq(c1=1))
# SQL: SELECT * FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.
lock
Locking statement.

Returns

Model objects If exists, otherwise None.

Expand source code
@classmethod
def fetch_one(
    cls,
    db: Connection,
    condition: Conditional = Q.of(),
    lock: Optional[Any] = None,
) -> Optional[Self]:
    """
    Fetch a record which satisfies the condition.

    `ValueError` raises When multiple records are found.
    Use this method for queries which certainly returns a single row, such as search by unique key.

    ```python
    t.fetch_one(db, Q.eq(c1=1))
    # SQL: SELECT * FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        lock: Locking statement.
    Returns:
        Model objects If exists, otherwise `None`.
    """
    rs = cls.fetch_where(db, condition, lock=lock)

    if not rs:
        return None
    elif len(rs) == 1:
        return rs[0]
    else:
        raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")
def fetch_where(db: Connection, condition: Conditional = Condition: '' -- [], orders: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None) ‑> list[typing_extensions.Self]

Fetch records which satisfy the condition.

t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
# SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5

Args

db
DB connection.
condition
Query condition.
orders
Ordering specification where key is column name and value denotes whether the order is ascending or not.
limit
Maximum nuber of rows to fetch. If None, all rows are returned.
offset
The number of rows to skip.
lock
Locking statement.

Returns

Model objects.

Expand source code
@classmethod
def fetch_where(
    cls,
    db: Connection,
    condition: Conditional = Q.of(),
    orders: Mapping[Union[str, AliasedColumn], ORDER] = {},
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    lock: Optional[Any] = None,
) -> list[Self]:
    """
    Fetch records which satisfy the condition.

    ```python
    t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
    # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        orders: Ordering specification where key is column name and value denotes whether the order is ascending or not.
        limit: Maximum nuber of rows to fetch. If `None`, all rows are returned.
        offset: The number of rows to skip.
        lock: Locking statement.
    Returns:
        Model objects.
    """
    wc, wp = where(condition)
    rc, rp = ranged_by(limit, offset)
    s = cls.select()
    c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp))
    return [read_row(row, *s)[0] for row in c.fetchall()]
def insert(db: Connection, record: Union[typing_extensions.Self, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False) ‑> typing_extensions.Self

Insert a record.

If returning is True and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.

t.insert(db, dict(c1=1, c2=2))
# SQL: INSERT INTO t (c1, c2) VALUES (1, 2)

Args

db
DB connection.
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return inserted record with complete and correct column values.

Returns

Model of inserted record.

Expand source code
@classmethod
def insert(
    cls,
    db: Connection,
    record: Union[Self, dict[str, Any]],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
) -> Self:
    """
    Insert a record.

    If `returning` is `True` and the DBMS supports **RETURNING** clause,
    returned model object contains comple and correct column values.
    Otherwise, auto incremental value is set to the returned model object
    but other column values generated inside DBMS such as default value are not set.

    ```python
    t.insert(db, dict(c1=1, c2=2))
    # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
    ```

    Args:
        db: DB connection.
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return inserted record with complete and correct column values.
    Returns:
        Model of inserted record.
    """
    model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
    sql, _, vals = cls._insert_sql(record, qualifier)

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *vals)
            s = cls.select()
            return read_row(c.fetchone(), *s)[0]
        else:
            # REVIEW
            # Inserted row can't be specified from the table where no primary keys are defined .
            pass

    db.stmt().execute(sql, *vals)
    for c, v in cls.last_sequences(db, 1):
        setattr(model, c.name, v)
    return model
def insert_many(db: Connection, records: list[typing.Union[typing_extensions.Self, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Insert records.

If returning is True and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.

Args

db
DB connection.
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return inserted records with complete and correct column values.

Returns

Models of inserted records or cursor.

Expand source code
@classmethod
def insert_many(
    cls,
    db: Connection,
    records: list[Union[Self, dict[str, Any]]],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Insert records.

    If `returning` is `True` and the DBMS supports **RETURNING** clause,
    returned model object contains comple and correct column values.
    Otherwise, auto incremental value is set to the returned model object
    but other column values generated inside DBMS such as default value are not set.

    Args:
        db: DB connection.
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return inserted records with complete and correct column values.
    Returns:
        Models of inserted records or cursor.
    """
    if len(records) == 0:
        return []

    models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records]

    seq_of_params = []

    sql, cols, params = cls._insert_sql(models[0], qualifier)

    cols = set(cols)
    seq_of_params.append(params)

    for m in models[1:]:
        value_dict = model_values(cls, m)
        check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True)
        # REVIEW:
        # The consistency among columns where expression is set is not checked.
        _, _, params = cls._insert_sql(m, qualifier)
        seq_of_params.append(params)

    db.stmt().executemany(sql, seq_of_params)
    num = len(records)
    for c, v in cls.last_sequences(db, num):
        for i, m in enumerate(models):
            setattr(m, c.name, v - (num - i - 1))

    if returning:
        seq_pks = [extract_pks(cls, m) for m in models]
        return cls.fetch_many(db, seq_pks)
    else:
        return models
def last_sequences(db: Connection, num: int) ‑> list[tuple[Column, int]]

Returns the sequential (auto incremental) values of a table generated by the latest insertion.

Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned.

This method should be overridden by another mixin class defined in dialect module.

Args

db
DB connection.
num
The number of records inserted by the latest query.

Returns

List of pairs of column and its values.

Expand source code
@classmethod
def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]:
    """
    Returns the sequential (auto incremental) values of a table generated by the latest insertion.

    Result contains every sequential columns and their values.
    When the latest query inserts multiple rows, only the last (= biggest) value is returned.

    This method should be overridden by another mixin class defined in dialect module.

    Args:
        db: DB connection.
        num: The number of records inserted by the latest query.
    Returns:
        List of pairs of column and its values.
    """
    return []
def support_returning(db: Connection) ‑> bool

Checks whehter this DBMS support RETURNING clause or not.

Args

db
DB connection.

Returns

Whehter this DBMS support RETURNING clause or not.

Expand source code
@classmethod
def support_returning(cls, db: Connection) -> bool:
    """
    Checks whehter this DBMS support **RETURNING** clause or not.

    Args:
        db: DB connection.
    Returns:
        Whehter this DBMS support **RETURNING** clause or not.
    """
    return False
def update(db: Connection, pks: Union[Any, dict[str, Any]], record: Union[Meta, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Update a record by primary key(s).

This method only updates columns which are found in record except for primary key(s).

t.update(db, 1, dict(c1=1, c2=2))
# SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.

Returns

Whether the record exists and updated or updated record model.

Expand source code
@classmethod
def update(
    cls,
    db: Connection,
    pks: PKS,
    record: Record,
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Update a record by primary key(s).

    This method only updates columns which are found in `record` except for primary key(s).

    ```python
    t.update(db, 1, dict(c1=1, c2=2))
    # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
    Returns:
        Whether the record exists and updated or updated record model.
    """
    cols, vals = parse_pks(cls, pks)
    condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
    if returning:
        if cls.support_returning(db):
            models = cls.update_where(db, record, condition, qualifier, returning=True)
            return models[0] if models else None
        else:
            models = cls.update_where(db, record, condition, qualifier, returning=False)
            return cls.fetch(db, pks)
    else:
        return cls.update_where(db, record, condition, qualifier, returning=False) == 1
def update_many(db: Connection, records: collections.abc.Sequence[typing.Union[Meta, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Update records by set of primary key(s).

This method invokes on executemany defined in DB-API 2.0. Whether it is optimized compared to execute depends on DB driver.

Args

db
DB connection.
record
Sequence of objects contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.

Returns

The number of affected rows or updated records.

Expand source code
@classmethod
def update_many(
    cls,
    db: Connection,
    records: Sequence[Record],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Update records by set of primary key(s).

    This method invokes on `executemany` defined in DB-API 2.0.
    Whether it is optimized compared to `execute` depends on DB driver.

    Args:
        db: DB connection.
        record: Sequence of objects contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
    Returns:
        The number of affected rows or updated records.
    """
    if len(records) == 0:
        return [] if returning else 0

    keys = {c.name for c in cls.columns if c.pk}
    if len(keys) == 0:
        raise ValueError(f"update_many is not available because {cls} does not have primary key columns.")

    def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]):
        if cv[0] in keys:
            acc[0][cv[0]] = cv[1]
        else:
            acc[1][cv[0]] = cv[1]
        return acc

    seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = []
    target_columns: Optional[set[str]] = None

    for vs in [model_values(cls, r, excludes_pk=False) for r in records]:
        if not keys < vs.keys():
            raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.")
        pks, rec = reduce(classify, vs.items(), ({}, {}))
        if target_columns is None:
            check_columns(cls, rec)
            target_columns = set(rec.keys())
        else:
            check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore
        seq_of_values.append((pks, rec))

    sql_first = ""
    seq_of_params: list[list[Any]] = []

    for pks, rec in seq_of_values:
        cols, vals = parse_pks(cls, pks)
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])

        sql, _, params = cls._update_sql(rec, condition, qualifier)
        if not sql_first:
            sql_first = sql
        seq_of_params.append(params)

    if returning:
        db.stmt().executemany(f"{sql_first}", seq_of_params)
        return cls.fetch_many(db, [pks for pks, _ in seq_of_values])
    else:
        return db.stmt().executemany(sql_first, seq_of_params).rowcount
def update_where(db: Connection, record: Union[Meta, dict[str, Any]], condition: Conditional, qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False, allow_all: bool = True)

Update records which satisfy the condition.

t.update(db, dict(c2=2), Q.eq(c1=1))
# SQL: UPDATE t SET c2 = 2 WHERE c1 = 1

Args

db
DB connection.
record
Object contains column values.
condition
Query condition.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.
allow_all
If False, empty condition raises ValueError.

Returns

The number of affected rows or updated records.

Expand source code
@classmethod
def update_where(
    cls,
    db: Connection,
    record: Record,
    condition: Conditional,
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
    allow_all: bool = True,
):
    """
    Update records which satisfy the condition.

    ```python
    t.update(db, dict(c2=2), Q.eq(c1=1))
    # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        record: Object contains column values.
        condition: Query condition.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
        allow_all: If `False`, empty condition raises `ValueError`.
    Returns:
        The number of affected rows or updated records.
    """
    sql, _, params = cls._update_sql(record, condition, qualifier, allow_all)

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *params)
            s = cls.select()
            return [read_row(row, *s)[0] for row in c.fetchall()]
        else:
            raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.")
    else:
        c = db.stmt().execute(sql, *params)

        return c.rowcount

Inherited members