Package pyracmon

Base module of pyracmon exporting commonly used objects. Use * simply to import them.

>>> from pyracmon import *
Expand source code
"""
Base module of pyracmon exporting commonly used objects.  Use `*` simply to import them.

>>> from pyracmon import *
"""
import sys
import types
from typing import Union, Optional, Any, TypeVar, TYPE_CHECKING
from pyracmon.config import default_config
from pyracmon.connection import connect, Connection
from pyracmon.context import ConnectionContext
from pyracmon.graph.serialize import NodeSerializer
from pyracmon.mixin import CRUDMixin
from pyracmon.select import read_row
from pyracmon.model import define_model, Table, Column
from pyracmon.model_graph import GraphEntityMixin
from pyracmon.query import Q, Expression, Conditional, escape_like, where
from pyracmon.query_graph import append_rows
from pyracmon.clause import order_by, ranged_by, holders, values
from pyracmon.stub import output_stub
from pyracmon.graph import new_graph, S
from pyracmon.graph.graph import Graph, GraphView, NodeContainer, ContainerView, Node, NodeView
from pyracmon.graph.spec import GraphSpec
from pyracmon.graph.template import GraphTemplate
from pyracmon.graph.schema import document_type, Typeable, GraphSchema
from pyracmon.graph.serialize import NodeContext
from pyracmon.graph.typing import walk_schema
from pyracmon.testing import TestingMixin


if TYPE_CHECKING:
    from pyracmon.model import Model as _Model
    class Model(_Model):
        pass
else:
    from pyracmon.model import Model


__all__ = [
    "connect",
    "Connection",
    "ConnectionContext",
    "CRUDMixin",
    "read_row",
    "define_model",
    "Table",
    "Column",
    "Q",
    "Expression",
    "Conditional",
    "where",
    "append_rows",
    "escape_like",
    "order_by",
    "ranged_by",
    "holders",
    "values",
    "new_graph",
    "S",
    "Graph",
    "GraphView",
    "NodeContainer",
    "ContainerView",
    "Node",
    "NodeView",
    "document_type",
    "Typeable",
    "walk_schema",
    "GraphSchema",
    "NodeContext",
    "Model",
    "declare_models",
    "graph_template",
    "graph_dict",
    "graph_schema",
]


M = TypeVar('M', bound=Model)


def declare_models(
    dialect: types.ModuleType,
    db: Connection,
    module: Union[types.ModuleType, str] = __name__,
    mixins: list[type] = [],
    excludes: Optional[list[str]] = None,
    includes: Optional[list[str]] = None,
    *,
    testing: bool = False,
    model_type: type[M] = Model,
    write_stub: bool = False,
) -> list[type[M]]:
    """
    Declare model types read from database into the specified module.

    Args:
        dialect: A module exporting `read_schema` function and `mixins` classes.
            `pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available.
        db: Connection already connected to database.
        module: A module or module name where the declarations will be located.
        mixins: Additional mixin classes for declaring model types.
        excludes: Excluding table names.
        includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared.
    Returns:
        Declared model types.
    """
    tables = dialect.read_schema(db, excludes, includes)
    models = []
    mod = module if isinstance(module, types.ModuleType) else sys.modules[module]
    base_mixins = [CRUDMixin, GraphEntityMixin, model_type]
    if testing:
        base_mixins[0:0] = [TestingMixin]
    for t in tables:
        m = define_model(t, mixins + dialect.mixins + base_mixins)
        mod.__dict__[t.name] = m
        models.append(m)
    if write_stub:
        output_stub(None, mod, models, dialect, mixins, testing=testing)
    return models


def graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate:
    """
    Create a graph template on the default `GraphSpec` which handles model object in appropriate ways.

    See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions.

    Args:
        bases: Base templates whose properties and relations are merged into new template.
        definitions: Definitions of template properties.
    Returns:
        Graph template.
    """
    return default_config().graph_spec.new_template(*bases, **definitions)


def graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]:
    """
    Serialize a graph into a `dict` under the default `GraphSpec` .

    See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings.

    Args:
        graph: A view of the graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Serialization result.
    """
    return default_config().graph_spec.to_dict(graph, {}, **settings)


def graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema:
    """
    Creates `GraphSchema` under the default `GraphSpec` .

    See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings.

    Args:
        template: A template of serializing graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Schema of serialization result.
    """
    return default_config().graph_spec.to_schema(template, **settings)

Sub-modules

pyracmon.batch
pyracmon.clause

This module provides functions to generate miscellaneous clauses in query.

pyracmon.config

This module exports types and functions for configurations …

pyracmon.connection

This module provides types and functions for DB connections.

pyracmon.context

This module provides the context type which controls query execution as configured.

pyracmon.dbapi

This module provides interfaces defined in DB-API 2.0.

pyracmon.dialect
pyracmon.graph
pyracmon.marker

This module provides the abstration mechanism for marker creation used to embed parameters in a query …

pyracmon.mixin

This module provides mixin type which supplies each model type various DB operations as class methods.

pyracmon.model
pyracmon.model_graph

This module provides graph specifications to deal with model types …

pyracmon.query

This module exports types and functions for query construction …

pyracmon.query_graph
pyracmon.select

This module exports types and functions used for SELECT queries …

pyracmon.sql

This module provides the type for query generation from a template string containing unified marker.

pyracmon.stub

This module exports functions to output type stub of model types.

pyracmon.testing
pyracmon.util

Utility types and functions for internal use.

Functions

def append_rows(cursor: Cursor, exp: collections.abc.Iterable[typing.Union[Consumable, typing.Any]], graph: Graph, /, **assign: Union[Selection, Any]) ‑> Graph

Adds all rows in cursor into the graph.

Values in assign are Selection or any kind of objects. If Selection is passed, the corresponding value in row is selected. In this case, the Selection must be contained in exp , otherwise ValueError is raised.

exp = ...
c = db.stmt().execute(...)
graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0)

Args

cursor
Cursor obtained by query.
exp
Expressions used in the query.
graph
Graph to append rows.
assign
Mapping from graph property name to Selection or arbitrary value.

Returns

The same graph as passed one.

Expand source code
def append_rows(cursor: Cursor, exp: Iterable[Union[Consumable, Any]], graph: Graph, /, **assign: Union[Selection, Any]) -> Graph:
    """
    Adds all rows in cursor into the graph.

    Values in `assign` are `Selection` or any kind of objects.
    If `Selection` is passed, the corresponding value in row is selected.
    In this case, the `Selection` must be contained in `exp` , otherwise `ValueError` is raised.

    ```python
    exp = ...
    c = db.stmt().execute(...)
    graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0)
    ```

    Args:
        cursor: Cursor obtained by query.
        exp: Expressions used in the query.
        graph: Graph to append rows.
        assign: Mapping from graph property name to `Selection` or arbitrary value.
    Returns:
        The same graph as passed one. 
    """
    def get(k: str) -> Any:
        v = assign[k]
        if isinstance(v, Consumable):
            return getattr(r, v.name)
        else:
            return v

    for row in cursor.fetchall():
        r = read_row(row, *exp)
        graph.append(**{k:get(k) for k in assign.keys()})

    return graph
def connect(api: module, *args: Any, **kwargs: Any) ‑> Connection

Connects to DB by passing arguments to DB-API 2.0 module.

Every optional argument is passed to api.connect and returns the Connection object which wraps obtained DB connection.

import psycopg2
from pyracmon import connect
db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres")
c = db.stmt().execute("SELECT 1")
assert c.fetchone()[0] == 1

Args

api
DB-API 2.0 module which exports connect() function.
args
Positional arguments passed to api.connect.
kwargs
Keyword arguments passed to api.connect.

Returns

Wrapper of DB-API 2.0 connection.

Expand source code
def connect(api: types.ModuleType, *args: Any, **kwargs: Any) -> 'Connection':
    """
    Connects to DB by passing arguments to DB-API 2.0 module.

    Every optional argument is passed to `api.connect` and returns the `Connection` object which wraps obtained DB connection.

    ```python
    import psycopg2
    from pyracmon import connect
    db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres")
    c = db.stmt().execute("SELECT 1")
    assert c.fetchone()[0] == 1
    ```

    Args:
        api: DB-API 2.0 module which exports `connect` function.
        args: Positional arguments passed to `api.connect`.
        kwargs: Keyword arguments passed to `api.connect`.
    Returns:
        Wrapper of DB-API 2.0 connection.
    """
    return Connection(api, api.connect(*args, **kwargs), None)
def declare_models(dialect: module, db: Connection, module: Union[module, str] = 'pyracmon', mixins: list[type] = [], excludes: Optional[list[str]] = None, includes: Optional[list[str]] = None, *, testing: bool = False, model_type: type[~M] = pyracmon.model.Model, write_stub: bool = False) ‑> list[type[~M]]

Declare model types read from database into the specified module.

Args

dialect
A module exporting read_schema function and mixins classes. pyracmon.dialect.postgresql and pyracmon.dialect.mysql are available.
db
Connection already connected to database.
module
A module or module name where the declarations will be located.
mixins
Additional mixin classes for declaring model types.
excludes
Excluding table names.
includes
Including table names. When this argument is omitted, all tables except for specified in excludes are declared.

Returns

Declared model types.

Expand source code
def declare_models(
    dialect: types.ModuleType,
    db: Connection,
    module: Union[types.ModuleType, str] = __name__,
    mixins: list[type] = [],
    excludes: Optional[list[str]] = None,
    includes: Optional[list[str]] = None,
    *,
    testing: bool = False,
    model_type: type[M] = Model,
    write_stub: bool = False,
) -> list[type[M]]:
    """
    Declare model types read from database into the specified module.

    Args:
        dialect: A module exporting `read_schema` function and `mixins` classes.
            `pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available.
        db: Connection already connected to database.
        module: A module or module name where the declarations will be located.
        mixins: Additional mixin classes for declaring model types.
        excludes: Excluding table names.
        includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared.
    Returns:
        Declared model types.
    """
    tables = dialect.read_schema(db, excludes, includes)
    models = []
    mod = module if isinstance(module, types.ModuleType) else sys.modules[module]
    base_mixins = [CRUDMixin, GraphEntityMixin, model_type]
    if testing:
        base_mixins[0:0] = [TestingMixin]
    for t in tables:
        m = define_model(t, mixins + dialect.mixins + base_mixins)
        mod.__dict__[t.name] = m
        models.append(m)
    if write_stub:
        output_stub(None, mod, models, dialect, mixins, testing=testing)
    return models
def define_model(table_: Table, mixins: Union[type[~MXT], list[type], ForwardRef(None)] = None, model_type: Optional[type[~M]] = pyracmon.model.Model) ‑> type[~M]

Create a model type representing a table.

Model type inherits all types in mixins in order. When the same attribute is defined in multiple mixin types, the former overrides the latter.

Every model type has following attributes:

name type description
name str Name of the table.
table Table Table schema.
columns List[Column] List of column schemas.
column Any An object whose attribute exposes of column schema of its name.

Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass. Unlike dataclass, the constructor does not require all of columns. Omitted columns don't affect predefined operations such as CRUDMixin.insert() . If not null constraint exists on the column, insertion will be denied at runtime and exception will be thrown.

>>> # CREATE TABLE t1 (col1 int, col2 text, col3 text);
>>> table = define_model("t1")
>>> model = table(col1=1, col2="a")

Attributes are also assignable by normal setter. If attribute name is not a valid column name, TypeError raises.

>>> model.col3 = "b"

Model instance supports iteration which yields pairs of assigned column schema and its value.

>>> for c, v in model:
>>>     print(f"{c.name} = {v}")
col1 = 1
col2 = a
col3 = b

Args

table__
Table schema.
mixin
Mixin types providing class methods to the model type.
model_type
Use this just for type hinting to determine returned model type.

Returns

Model type.

Expand source code
def define_model(table_: Table, mixins: Union[type[MXT], list[type], None] = None, model_type: Optional[type[M]] = Model) -> type[M]:
    """
    Create a model type representing a table.

    Model type inherits all types in `mixins` in order.
    When the same attribute is defined in multiple mixin types, the former overrides the latter.

    Every model type has following attributes:

    |name|type|description|
    |:---|:---|:---|
    |name|`str`|Name of the table.|
    |table|`Table`|Table schema.|
    |columns|`List[Column]`|List of column schemas.|
    |column|`Any`|An object whose attribute exposes of column schema of its name.|

    Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass.
    Unlike dataclass, the constructor does not require all of columns.
    Omitted columns don't affect predefined operations such as `CRUDMixin.insert` .
    If `not null` constraint exists on the column, insertion will be denied at runtime and exception will be thrown.

    ```python
    >>> # CREATE TABLE t1 (col1 int, col2 text, col3 text);
    >>> table = define_model("t1")
    >>> model = table(col1=1, col2="a")
    ```

    Attributes are also assignable by normal setter. If attribute name is not a valid column name, `TypeError` raises.

    ```python
    >>> model.col3 = "b"
    ```

    Model instance supports iteration which yields pairs of assigned column schema and its value.

    ```python
    >>> for c, v in model:
    >>>     print(f"{c.name} = {v}")
    col1 = 1
    col2 = a
    col3 = b
    ```

    Args:
        table__: Table schema.
        mixin: Mixin types providing class methods to the model type.
        model_type: Use this just for type hinting to determine returned model type.
    Returns:
        Model type.
    """
    column_names = {c.name for c in table_.columns}

    class Columns:
        def __init__(self):
            for c in table_.columns:
                setattr(self, c.name, c)

    class Meta(type):
        name = table_.name
        table = table_
        columns = table_.columns
        column = Columns()

        @classmethod
        def shrink(cls, excludes: list[str], includes: Optional[list[str]] = None) -> Self:
            """
            Creates new model type containing subset of columns.

            Args:
                excludes: Column names to exclude.
                includes: Column names to include.
            Returns:
                model type.
            """
            cols = [c for c in cls.columns if (not includes or c.name in includes) and c.name not in excludes]
            return define_model(Table(cls.name, cols, cls.table.comment), mixins) # type: ignore

    class Base(Model, metaclass=Meta):
        pass

    mixin_types: list[type] = []

    if isinstance(mixins, list):
        mixin_types = mixins
    elif get_origin(mixins) is not None:
        mixin_types = cast(list[type], list(get_args(mixins)))
    elif mixins is not None:
        raise ValueError(f"Model mixin types should be specified by Mixins or a list of types.")

    class _Model(type("ModelBase", tuple([Base] + mixin_types), {})):
        def __init__(self, **kwargs):
            for k, v in kwargs.items():
                setattr(self, k, v)

        def __repr__(self):
            cls = cast(type[Base], type(self))
            return f"{cls.name}({', '.join([f'{c.name}={repr(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})"

        def __str__(self):
            cls = cast(type[Base], type(self))
            return f"{cls.name}({', '.join([f'{c.name}={str(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})"

        def __iter__(self) -> Iterator[tuple[Column, Any]]:
            cls = cast(type[Base], type(self))
            return map(lambda c: (c, getattr(self, c.name)), filter(lambda c: hasattr(self, c.name), cls.columns))

        def __setattr__(self, key, value):
            cls = cast(type[Base], type(self))
            if key not in column_names:
                raise TypeError(f"{key} is not a column of {cls.name}")
            object.__setattr__(self, key, value)

        def __getitem__(self, key):
            return getattr(self, key)

        def __contains__(self, key):
            return hasattr(self, key)

        def __eq__(self, other):
            cls = type(self)
            if cls != type(other):
                return False
            for k in column_names:
                if hasattr(self, k) ^ hasattr(other, k):
                    return False
                if getattr(self, k, None) != getattr(other, k, None):
                    return False
            return True

    return cast(type[M], _Model)
def document_type(t: type, doc: str) ‑> 

Supplies a document to a type.

Args

t
A type.
doc
A document.

Returns

Documented type.

Expand source code
def document_type(t: type, doc: str) -> Annotated:
    """
    Supplies a document to a type.

    Args:
        t: A type.
        doc: A document.
    Returns:
        Documented type.
    """
    return Annotated[t, doc]
def escape_like(v: str) ‑> str

Escape a string for the use in LIKE condition.

Args

v
A string.

Returns

Escaped string.

Expand source code
def escape_like(v: str) -> str:
    """
    Escape a string for the use in `LIKE` condition.

    Args:
        v: A string.
    Returns:
        Escaped string.
    """
    def esc(c):
        if c == "\\":
            return r"\\\\"
        elif c == "%":
            return r"\%"
        elif c == "_":
            return r"\_"
        else:
            return c
    return ''.join(map(esc, v))
def graph_dict(graph: GraphView, **settings: NodeSerializer) ‑> dict[str, typing.Any]

Serialize a graph into a dict under the default GraphSpec .

See GraphSpec.to_dict() for the detail of serialization settings.

Args

graph
A view of the graph.
settings
Serialization settings where each key denotes a node name.

Returns

Serialization result.

Expand source code
def graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]:
    """
    Serialize a graph into a `dict` under the default `GraphSpec` .

    See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings.

    Args:
        graph: A view of the graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Serialization result.
    """
    return default_config().graph_spec.to_dict(graph, {}, **settings)
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) ‑> GraphSchema

Creates GraphSchema under the default GraphSpec .

See GraphSpec.to_schema() for the detail of serialization settings.

Args

template
A template of serializing graph.
settings
Serialization settings where each key denotes a node name.

Returns

Schema of serialization result.

Expand source code
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema:
    """
    Creates `GraphSchema` under the default `GraphSpec` .

    See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings.

    Args:
        template: A template of serializing graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Schema of serialization result.
    """
    return default_config().graph_spec.to_schema(template, **settings)
def graph_template(*bases: GraphTemplate, **definitions: type) ‑> GraphTemplate

Create a graph template on the default GraphSpec which handles model object in appropriate ways.

See GraphSpec.new_template() for the detail of definitions.

Args

bases
Base templates whose properties and relations are merged into new template.
definitions
Definitions of template properties.

Returns

Graph template.

Expand source code
def graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate:
    """
    Create a graph template on the default `GraphSpec` which handles model object in appropriate ways.

    See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions.

    Args:
        bases: Base templates whose properties and relations are merged into new template.
        definitions: Definitions of template properties.
    Returns:
        Graph template.
    """
    return default_config().graph_spec.new_template(*bases, **definitions)
def holders(length_or_keys: Union[int, collections.abc.Sequence[Union[str, int, NoneType, Expression]]], qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str

Generates partial query string containing placeholder markers separated by comma.

Args

length_or_keys
The number of placeholders or list of placeholder keys.
qualifier
Qualifying function for each index.

Returns

Query string.

Expand source code
def holders(length_or_keys: Union[int, Sequence[HolderKeys]], qualifier: Optional[Mapping[int, Qualifier]] = None) -> str:
    """
    Generates partial query string containing placeholder markers separated by comma.

    Args:
        length_or_keys: The number of placeholders or list of placeholder keys.
        qualifier: Qualifying function for each index.
    Returns:
        Query string.
    """
    if isinstance(length_or_keys, int):
        hs = ["${_}"] * length_or_keys
    else:
        def key(k):
            if isinstance(k, Expression):
                return k.expression
            elif isinstance(k, int):
                return f"${{_{k}}}"
            elif k:
                return f"${{{k}}}"
            else:
                return "${_}"
        hs = [key(k) for k in length_or_keys]

    if qualifier:
        hs = [qualifier.get(i, _noop)(h) for i, h in enumerate(hs)]

    return ', '.join(hs)
def new_graph(template: GraphTemplate, *bases: Union[GraphGraphView]) ‑> Graph

Create a graph from a template.

Use this function instead of invoking constructor directly.

Args

template
A template of a graph.
bases
Other graphs whose nodes are appended to created graph.

Returns

Created graph.

Expand source code
def new_graph(template: GraphTemplate, *bases: Union[Graph, GraphView]) -> Graph:
    """
    Create a graph from a template.

    Use this function instead of invoking constructor directly.

    Args:
        template: A template of a graph.
        bases: Other graphs whose nodes are appended to created graph.
    Returns:
        Created graph.
    """
    graph = Graph(template)

    for b in bases:
        graph += b

    return graph
def order_by(columns: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]], **defaults: Union[bool, tuple[bool, bool], str]) ‑> str

Generates ORDER BY clause from columns and directions.

Args

columns
Columns and directions. Iteration order is kept in rendered clause.
defaults
Column names and directions appended to the clause if the column is not contained in columns .

Returns

ORDER BY clause.

Expand source code
def order_by(columns: Mapping[Union[str, AliasedColumn], ORDER], **defaults: ORDER) -> str:
    """
    Generates `ORDER BY` clause from columns and directions.

    Args:
        columns: Columns and directions. Iteration order is kept in rendered clause.
        defaults: Column names and directions appended to the clause if the column is not contained in `columns` .
    Returns:
        `ORDER BY` clause.
    """
    columns = dict(columns, **{c:v for c,v in defaults.items() if c not in columns})
    def col(cd):
        if isinstance(cd[1], bool):
            return f"{cd[0]} ASC" if cd[1] else f"{cd[0]} DESC"
        elif isinstance(cd[1], str):
            return f"{cd[0]} {cd[1]}"
        elif isinstance(cd[1], tuple) and len(cd[1]) == 2:
            return f"{cd[0]} {'ASC' if cd[1][0] else 'DESC'} NULLS {'FIRST' if cd[1][1] else 'LAST'}"
        else:
            raise ValueError(f"Directions must be specified by bool, pair of bools or string: {cd[1]}")
    return '' if len(columns) == 0 else f"ORDER BY {', '.join(map(col, columns.items()))}"
def ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) ‑> tuple[str, list[typing.Any]]

Generates LIMIT OFFSET clause using marker.

Args

limit
Limit value. None means no limitation.
offset
Offset value. None means 0.

Returns

LIMIT OFFSET clause and its parameters.

Expand source code
def ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) -> tuple[str, list[Any]]:
    """
    Generates `LIMIT OFFSET` clause using marker.

    Args:
        limit: Limit value. `None` means no limitation.
        offset: Offset value. `None` means `0`.
    Returns:
        `LIMIT OFFSET` clause and its parameters.
    """
    clause, params = [], []

    if limit is not None:
        clause.append("LIMIT $_")
        params.append(limit)

    if offset is not None:
        clause.append("OFFSET $_")
        params.append(offset)

    return ' '.join(clause) if clause else '', params
def read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) ‑> RowValues

Read values in a row according to given selections.

This function returns RowValues where each value is created by each selection respectively. The type of the selection determines how values in the row are handled:

  • Selection consumes as many values as the number of columns in it and creates a model instance.
  • Empty tuple or a string consumes a value, which is stored in RowValues as it is.

Args

selections
List of selections or their equivalents.
allow_redundancy
If False, ValueError is thrown when not all values in a row are consumed.

Returns

Values read from the row accoding to the selections.

Expand source code
def read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) -> RowValues:
    """
    Read values in a row according to given selections.

    This function returns `RowValues` where each value is created by each selection respectively.
    The type of the selection determines how values in the row are handled:

    - `Selection` consumes as many values as the number of columns in it and creates a model instance.
    - Empty tuple or a string consumes a value, which is stored in `RowValues` as it is.

    Args:
        selections: List of selections or their equivalents.
        allow_redundancy: If `False`, `ValueError` is thrown when not all values in a row are consumed.
    Returns:
        Values read from the row accoding to the selections.
    """
    consumables = [Consumable.to_consumable(s) for s in selections]

    result = RowValues(consumables)

    for s in consumables:
        result.append(s.consume(row))
        row = row[len(s):]

    if not allow_redundancy and len(row) > 0:
        raise ValueError("Not all elements in row is consumed.")

    return result
def values(length_or_key_gen: Union[int, collections.abc.Sequence[collections.abc.Callable[[int], Union[str, int, NoneType, Expression]]]], rows: int, qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str

Generates partial query string for VALUES clause in insertion query.

Args

length_or_key_gen
The number of placeholders or list of functions taking row index and returning key for each placeholder.
rows
The number of rows to insert.
qualifier
Qualifying function for each index.

Returns

Query string.

Expand source code
def values(length_or_key_gen: Union[int, Sequence[Callable[[int], HolderKeys]]], rows: int, qualifier: Optional[Mapping[int, Qualifier]] = None) -> str:
    """
    Generates partial query string for `VALUES` clause in insertion query.

    Args:
        length_or_key_gen: The number of placeholders or list of functions taking row index and returning key for each placeholder.
        rows: The number of rows to insert.
        qualifier: Qualifying function for each index.
    Returns:
        Query string.
    """
    if isinstance(length_or_key_gen, int):
        lok = lambda i: length_or_key_gen
    else:
        lok = lambda i: [g(i) for g in length_or_key_gen] # type: ignore

    return ', '.join([f"({holders(lok(i), qualifier)})" for i in range(rows)])
def walk_schema(td, with_doc=False) ‑> dict[str, typing.Union[type, typing.Annotated]]

Returns a dictionary as a result of walking a schema object from its root.

Args

td
A schema represented by TypedDict.
with_doc
Flag to include documentations into result.

Returns

Key value representation of the schema. If with_doc is True, each value is Annotated.

Expand source code
def walk_schema(td, with_doc=False) -> dict[str, Union[type, Annotated]]:
    """
    Returns a dictionary as a result of walking a schema object from its root.

    Args:
        td: A schema represented by `TypedDict`.
        with_doc: Flag to include documentations into result.
    Returns:
        Key value representation of the schema. If `with_doc` is `True`, each value is `Annotated`.
    """
    if '__annotations__' not in td.__dict__:
        return {}

    result = {}

    def put(k, t, doc):
        if with_doc:
            result[k] = (t, doc)
        else:
            result[k] = t

    def expand(t):
        return (get_args(t)[0], lambda x:[x]) if issubgeneric(t, list) else (t, lambda x:x)

    for k, t in get_type_hints(td, include_extras=True).items():
        t, doc = decompose_document(t)

        t, conv = expand(t)

        opt_type = is_optional(t)

        if is_typeddict(t):
            put(k, conv(walk_schema(t, with_doc)), doc)
        elif opt_type is not None and is_typeddict(opt_type):
            put(k, conv(walk_schema(opt_type, with_doc)), doc)
        else:
            put(k, conv(t), doc)
    
    return result
def where(condition: Conditional) ‑> tuple[str, list[typing.Any]]

Generates a WHERE clause and parameters representing given condition.

If the condition is empty, returned clause is an empty string which does not contain WHERE keyword.

Args

condition
Condition object.

Returns

Tuple of WHERE clause and parameters.

Expand source code
def where(condition: 'Conditional') -> tuple[str, list[Any]]:
    """
    Generates a `WHERE` clause and parameters representing given condition.

    If the condition is empty, returned clause is an empty string which does not contain `WHERE` keyword.

    Args:
        condition: Condition object.
    Returns:
        Tuple of `WHERE` clause and parameters.
    """
    return ('', []) if condition.expression == '' else (f'WHERE {condition.expression}', condition.params)

Classes

class CRUDMixin

Default mixin providing class methods available on all model types.

Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly.

  • pks
    • Names and values of all primary key columns in form of dict .
    • A primary key value. This form is allowed when the table has just one primary key column.
    • e.g. If the table has a single primary key id of int, 1 is available to spcecify the row of id = 1 .
    • e.g. If the table has multiple primary keys intid and strid, dict(intid=1, strid="abc") is a valid argument.
  • record
    • A model object or a mapping from column name to its value, which corresponds to a table row.
    • Only columns contained in the record is affected by the operation.
    • e.g. When dict(c1=1, c2="abc") is passed for insertion, only c1 and c2 are set in INSERT query.
    • e.g. For update, only the columns will be updated. Other columns are not affected.
  • condition
    • Query condition which will compose WHERE clause.
    • Q is a factory class to create condition object.
    • When None is passed, all rows are subject to the operation.
  • qualifier
    • A mapping from column name to a function which qualifies a placeholder passed by an argument.
    • Detail of qualifier function is described below.
  • lock
    • This is reserved argument for locking statement but works just as the postfix of the query currently.
    • The usage will be changed in future version.

Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like insert into t (c1, c2) values (?, ?) (Q parameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.

t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()"))
# SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())

Be aware that when model object is passed, its column values may differ from actual values in DB after query.

Expand source code
class CRUDMixin(SelectMixin, CRUDInternalMeta):
    """
    Default mixin providing class methods available on all model types.

    Every method takes the DB connection object as its first argument.
    Following arguments are defined in several methods commonly.

    - `pks`
        - Names and values of all primary key columns in form of `dict` .
        - A primary key value. This form is allowed when the table has just one primary key column.
        - e.g. If the table has a single primary key `id` of int, `1` is available to spcecify the row of `id = 1` .
        - e.g. If the table has multiple primary keys `intid` and `strid`, `dict(intid=1, strid="abc")` is a valid argument.
    - `record`
        - A model object or a mapping from column name to its value, which corresponds to a table row.
        - Only columns contained in the record is affected by the operation.
        - e.g. When `dict(c1=1, c2="abc")` is passed for insertion, only `c1` and `c2` are set in INSERT query.
        - e.g. For update, only the columns will be updated. Other columns are not affected.
    - `condition`
        - Query condition which will compose WHERE clause.
        - `pyracmon.query.Q` is a factory class to create condition object.
        - When `None` is passed, all rows are subject to the operation.
    - `qualifier`
        - A mapping from column name to a function which qualifies a placeholder passed by an argument.
        - Detail of qualifier function is described below.
    - `lock`
        - This is reserved argument for locking statement but works just as the postfix of the query currently.
        - The usage will be changed in future version.

    Qualifier function is used typically to convert or replace placeholder marker in insert/update query.
    By default, those queries contain markers like `insert into t (c1, c2) values (?, ?)` (`Q` parameter style).
    We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.

    ```python
    t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()"))
    # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())
    ```

    Be aware that when model object is passed, its column values may differ from actual values in DB after query.
    """
    @classmethod
    def count(cls, db: Connection, condition: Conditional = Q.of()) -> int:
        """
        Count rows which satisfies the condition.

        ```python
        t.count(db, Q.eq(c1=1))
        # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
        Returns:
            The number of rows.
        """
        wc, wp = where(condition)
        c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp)
        return c.fetchone()[0] # type: ignore

    @classmethod
    def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]:
        """
        Fetch a record by primary key(s).

        ```python
        t.fetch(db, 1)
        # SQL: SELECT * FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            lock: Locking statement.
        Returns:
            A model object if exists, otherwise `None`.
        """
        cols, vals = parse_pks(cls, pks)
        cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        wc, wp = where(cond)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)
        row = c.fetchone()
        return read_row(row, *s)[0] if row else None

    @classmethod
    def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]:
        """
        Fetch a record by sequence of primary key(s).

        This method simply concatenates equality conditions on primary key by OR operator.

        ```python
        t.fetch_many(db, [1, 2, 3])
        # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
        ```

        Args:
            db: DB connection.
            seq_pks: Sequence of primary key value.
            lock: Locking statement.
            per_page: Maximum number of keys for an execution of query.
        Returns:
            Model objects in the same order as passed sequence.
        """
        res = []
        index = 0
        while index < len(seq_pks):
            ordered_pks = []
            cond = Q.of()
            for pks in seq_pks[index:index+per_page]:
                cols, vals = parse_pks(cls, pks)
                ordered_pks.append(tuple(v for v in vals))
                cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
            wc, wp = where(cond)
            s = cls.select()
            c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)

            record_map = {}
            for r in [read_row(row, *s)[0] for row in c.fetchall()]:
                pk_values = {c.name:v for c, v in r if c.pk}
                record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r

            res.extend([record_map[k] for k in ordered_pks if k in record_map])
            index += per_page

        return res

    @classmethod
    def fetch_where(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        orders: Mapping[Union[str, AliasedColumn], ORDER] = {},
        limit: Optional[int] = None,
        offset: Optional[int] = None,
        lock: Optional[Any] = None,
    ) -> list[Self]:
        """
        Fetch records which satisfy the condition.

        ```python
        t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
        # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            orders: Ordering specification where key is column name and value denotes whether the order is ascending or not.
            limit: Maximum nuber of rows to fetch. If `None`, all rows are returned.
            offset: The number of rows to skip.
            lock: Locking statement.
        Returns:
            Model objects.
        """
        wc, wp = where(condition)
        rc, rp = ranged_by(limit, offset)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp))
        return [read_row(row, *s)[0] for row in c.fetchall()]

    @classmethod
    def fetch_one(
        cls,
        db: Connection,
        condition: Conditional = Q.of(),
        lock: Optional[Any] = None,
    ) -> Optional[Self]:
        """
        Fetch a record which satisfies the condition.

        `ValueError` raises When multiple records are found.
        Use this method for queries which certainly returns a single row, such as search by unique key.

        ```python
        t.fetch_one(db, Q.eq(c1=1))
        # SQL: SELECT * FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            lock: Locking statement.
        Returns:
            Model objects If exists, otherwise `None`.
        """
        rs = cls.fetch_where(db, condition, lock=lock)

        if not rs:
            return None
        elif len(rs) == 1:
            return rs[0]
        else:
            raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")

    @classmethod
    def _insert_sql(cls, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}) -> tuple[str, list[str], list[Any]]:
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        value_dict = model_values(cls, model)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def exp(v):
            return lambda i: v

        if any(isinstance(v, Expression) for v in vals):
            key_gen = []
            org_vals = vals
            vals = []
            for v in org_vals:
                if isinstance(v, Expression):
                    key_gen.append(exp(v))
                    vals.extend(v.params)
                else:
                    key_gen.append(lambda i: None)
                    vals.append(v)
            values_clause = values(key_gen, 1, ordered_qs)
        else:
            values_clause = values(len(cols), 1, ordered_qs)

        return f"INSERT INTO {cls.name} ({', '.join(cols)}) VALUES {values_clause}", cols, vals

    @classmethod
    def insert(
        cls,
        db: Connection,
        record: Union[Self, dict[str, Any]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ) -> Self:
        """
        Insert a record.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        ```python
        t.insert(db, dict(c1=1, c2=2))
        # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted record with complete and correct column values.
        Returns:
            Model of inserted record.
        """
        model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
        sql, _, vals = cls._insert_sql(record, qualifier)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *vals)
                s = cls.select()
                return read_row(c.fetchone(), *s)[0]
            else:
                # REVIEW
                # Inserted row can't be specified from the table where no primary keys are defined .
                pass

        db.stmt().execute(sql, *vals)
        for c, v in cls.last_sequences(db, 1):
            setattr(model, c.name, v)
        return model

    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[False] = False) -> list[Self]: ...
    @classmethod
    @overload
    def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {},
                    /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def insert_many(
        cls,
        db: Connection,
        records: list[Union[Self, dict[str, Any]]],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Insert records.

        If `returning` is `True` and the DBMS supports **RETURNING** clause,
        returned model object contains comple and correct column values.
        Otherwise, auto incremental value is set to the returned model object
        but other column values generated inside DBMS such as default value are not set.

        Args:
            db: DB connection.
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return inserted records with complete and correct column values.
        Returns:
            Models of inserted records or cursor.
        """
        if len(records) == 0:
            return []

        models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records]

        seq_of_params = []

        sql, cols, params = cls._insert_sql(models[0], qualifier)

        cols = set(cols)
        seq_of_params.append(params)

        for m in models[1:]:
            value_dict = model_values(cls, m)
            check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True)
            # REVIEW:
            # The consistency among columns where expression is set is not checked.
            _, _, params = cls._insert_sql(m, qualifier)
            seq_of_params.append(params)

        db.stmt().executemany(sql, seq_of_params)
        num = len(records)
        for c, v in cls.last_sequences(db, num):
            for i, m in enumerate(models):
                setattr(m, c.name, v - (num - i - 1))

        if returning:
            seq_pks = [extract_pks(cls, m) for m in models]
            return cls.fetch_many(db, seq_pks)
        else:
            return models

    @classmethod
    def _update_sql(cls, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, allow_all: bool = True) -> tuple[str, list[str], list[Any]]:
        value_dict = model_values(cls, record, excludes_pk=True)
        check_columns(cls, value_dict)
        cols, vals = list(value_dict.keys()), list(value_dict.values())
        ordered_qs = key_to_index(qualifier, cols)

        def set_col(acc, icv):
            i, (c, v) = icv
            if isinstance(v, Expression):
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)(v.expression)}"
                params = v.params
            else:
                clause = f"{c} = {ordered_qs.get(i, lambda x:x)('$_')}"
                params = [v]
            acc[0].append(clause)
            acc[1].extend(params)
            return acc

        setters, params = reduce(set_col, enumerate(zip(cols, vals)), ([], []))

        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Update query to update all records is not allowed.")

        return f"UPDATE {cls.name} SET {', '.join(setters)}{_spacer(wc)}", cols, params + wp

    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def update(
        cls,
        db: Connection,
        pks: PKS,
        record: Record,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update a record by primary key(s).

        This method only updates columns which are found in `record` except for primary key(s).

        ```python
        t.update(db, 1, dict(c1=1, c2=2))
        # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            record: Object contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            Whether the record exists and updated or updated record model.
        """
        cols, vals = parse_pks(cls, pks)
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        if returning:
            if cls.support_returning(db):
                models = cls.update_where(db, record, condition, qualifier, returning=True)
                return models[0] if models else None
            else:
                models = cls.update_where(db, record, condition, qualifier, returning=False)
                return cls.fetch(db, pks)
        else:
            return cls.update_where(db, record, condition, qualifier, returning=False) == 1

    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[False] = False) -> int: ...
    @classmethod
    @overload
    def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {},
               /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def update_many(
        cls,
        db: Connection,
        records: Sequence[Record],
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
    ):
        """
        Update records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            record: Sequence of objects contains column values.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
        Returns:
            The number of affected rows or updated records.
        """
        if len(records) == 0:
            return [] if returning else 0

        keys = {c.name for c in cls.columns if c.pk}
        if len(keys) == 0:
            raise ValueError(f"update_many is not available because {cls} does not have primary key columns.")

        def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]):
            if cv[0] in keys:
                acc[0][cv[0]] = cv[1]
            else:
                acc[1][cv[0]] = cv[1]
            return acc

        seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = []
        target_columns: Optional[set[str]] = None

        for vs in [model_values(cls, r, excludes_pk=False) for r in records]:
            if not keys < vs.keys():
                raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.")
            pks, rec = reduce(classify, vs.items(), ({}, {}))
            if target_columns is None:
                check_columns(cls, rec)
                target_columns = set(rec.keys())
            else:
                check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore
            seq_of_values.append((pks, rec))

        sql_first = ""
        seq_of_params: list[list[Any]] = []

        for pks, rec in seq_of_values:
            cols, vals = parse_pks(cls, pks)
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])

            sql, _, params = cls._update_sql(rec, condition, qualifier)
            if not sql_first:
                sql_first = sql
            seq_of_params.append(params)

        if returning:
            db.stmt().executemany(f"{sql_first}", seq_of_params)
            return cls.fetch_many(db, [pks for pks, _ in seq_of_values])
        else:
            return db.stmt().executemany(sql_first, seq_of_params).rowcount

    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[False] = False, allow_all: bool = False) -> int: ...
    @classmethod
    @overload
    def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {},
                     /, returning: Literal[True] = True, allow_all: bool = False) -> list[Self]: ...
    @classmethod
    def update_where(
        cls,
        db: Connection,
        record: Record,
        condition: Conditional,
        qualifier: Mapping[str, Qualifier] = {},
        /,
        returning: bool = False,
        allow_all: bool = True,
    ):
        """
        Update records which satisfy the condition.

        ```python
        t.update(db, dict(c2=2), Q.eq(c1=1))
        # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            record: Object contains column values.
            condition: Query condition.
            qualifier: Functions qualifying placeholder markers.
            returning: Flag to return updated records with complete and correct column values.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or updated records.
        """
        sql, _, params = cls._update_sql(record, condition, qualifier, allow_all)

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *params)
                s = cls.select()
                return [read_row(row, *s)[0] for row in c.fetchall()]
            else:
                raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.")
        else:
            c = db.stmt().execute(sql, *params)

            return c.rowcount

    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[False] = False) -> bool: ...
    @classmethod
    @overload
    def delete(cls, db: Connection, pks: PKS, /, returning: Literal[True] = True) -> Optional[Self]: ...
    @classmethod
    def delete(cls, db: Connection, pks: PKS, /, returning: bool = False):
        """
        Delete a record by primary key(s).

        ```python
        t.delete(db, 1)
        # SQL: DELETE FROM t WHERE id = 1
        ```

        Args:
            db: DB connection.
            pks: Primary key value(s).
            returning: Flag to return deleted record if any.
        Returns:
            Whether the record exists and deleted or delete record if any.
        """
        cols, vals = parse_pks(cls, pks)

        if returning:
            models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True)
            return models[0] if models else None
        else:
            return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1

    @classmethod
    @overload
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[False] = False) -> int: ...
    @overload
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[True] = True) -> list[Self]: ...
    @classmethod
    def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False):
        """
        Delete a records by set of primary key(s).

        This method invokes on `executemany` defined in DB-API 2.0.
        Whether it is optimized compared to `execute` depends on DB driver.

        Args:
            db: DB connection.
            pks: Primary keys or objects each of which contains values of all primary keys.
            returning: Flag to return deleted records.
        Returns:
            The number of affected rows or deleted records.
        """
        if len(pks) == 0:
            return None

        seq_of_pks: list[dict[str, Any]] = []
        for rec in pks:
            if isinstance(rec, (dict, cls)):
                seq_of_pks.append(extract_pks(cls, rec))
            else:
                cols, vals = parse_pks(cls, rec)
                seq_of_pks.append(dict(zip(cols, vals)))

        condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()])
        wc, wp = where(condition)

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"
        seq_of_params: list[list[Any]] = [wp]

        for v in seq_of_pks[1:]:
            condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()])
            _, wp = where(condition)
            seq_of_params.append(wp)

        if returning:
            models = cls.fetch_many(db, seq_of_pks)
            db.stmt().executemany(sql, seq_of_params)
            return models
        else:
            return db.stmt().executemany(sql, seq_of_params).rowcount

    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[False] = False, allow_all: bool = True) -> int: ...
    @classmethod
    @overload
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[True] = True, allow_all: bool = True) -> list[Self]: ...
    @classmethod
    def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True):
        """
        Delete records which satisfy the condition.

        ```python
        t.delete(db, Q.eq(c1=1))
        # SQL: DELETE FROM t WHERE c1 = 1
        ```

        Args:
            db: DB connection.
            condition: Query condition.
            returning: Flag to return deleted records.
            allow_all: If `False`, empty condition raises `ValueError`.
        Returns:
            The number of affected rows or deleted records.
        """
        wc, wp = where(condition)
        if wc == "" and not allow_all:
            raise ValueError("Delete query to delete all records is not allowed.")

        sql = f"DELETE FROM {cls.name}{_spacer(wc)}"

        if returning:
            if cls.support_returning(db):
                c = db.stmt().execute(f"{sql} RETURNING *", *wp)
                return [read_row(row, *cls.select())[0] for row in c.fetchall()]
            else:
                current = cls.fetch_where(db, condition)
                c = db.stmt().execute(sql, *wp)
                return current
        else:
            return db.stmt().execute(sql, *wp).rowcount

    @classmethod
    def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]:
        """
        Returns the sequential (auto incremental) values of a table generated by the latest insertion.

        Result contains every sequential columns and their values.
        When the latest query inserts multiple rows, only the last (= biggest) value is returned.

        This method should be overridden by another mixin class defined in dialect module.

        Args:
            db: DB connection.
            num: The number of records inserted by the latest query.
        Returns:
            List of pairs of column and its values.
        """
        return []

    @classmethod
    def support_returning(cls, db: Connection) -> bool:
        """
        Checks whehter this DBMS support **RETURNING** clause or not.

        Args:
            db: DB connection.
        Returns:
            Whehter this DBMS support **RETURNING** clause or not.
        """
        return False

Ancestors

Static methods

def count(db: Connection, condition: Conditional = Condition: '' -- []) ‑> int

Count rows which satisfies the condition.

t.count(db, Q.eq(c1=1))
# SQL: SELECT COUNT(*) FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.

Returns

The number of rows.

Expand source code
@classmethod
def count(cls, db: Connection, condition: Conditional = Q.of()) -> int:
    """
    Count rows which satisfies the condition.

    ```python
    t.count(db, Q.eq(c1=1))
    # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
    Returns:
        The number of rows.
    """
    wc, wp = where(condition)
    c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp)
    return c.fetchone()[0] # type: ignore
def delete(db: Connection, pks: Union[Any, dict[str, Any]], /, returning: bool = False)

Delete a record by primary key(s).

t.delete(db, 1)
# SQL: DELETE FROM t WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
returning
Flag to return deleted record if any.

Returns

Whether the record exists and deleted or delete record if any.

Expand source code
@classmethod
def delete(cls, db: Connection, pks: PKS, /, returning: bool = False):
    """
    Delete a record by primary key(s).

    ```python
    t.delete(db, 1)
    # SQL: DELETE FROM t WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        returning: Flag to return deleted record if any.
    Returns:
        Whether the record exists and deleted or delete record if any.
    """
    cols, vals = parse_pks(cls, pks)

    if returning:
        models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True)
        return models[0] if models else None
    else:
        return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1
def delete_many(db: Connection, pks: Union[collections.abc.Sequence[Union[Any, dict[str, Any]]], collections.abc.Sequence[Union[Meta, dict[str, Any]]]], /, returning: bool = False)

Delete a records by set of primary key(s).

This method invokes on executemany defined in DB-API 2.0. Whether it is optimized compared to execute depends on DB driver.

Args

db
DB connection.
pks
Primary keys or objects each of which contains values of all primary keys.
returning
Flag to return deleted records.

Returns

The number of affected rows or deleted records.

Expand source code
@classmethod
def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False):
    """
    Delete a records by set of primary key(s).

    This method invokes on `executemany` defined in DB-API 2.0.
    Whether it is optimized compared to `execute` depends on DB driver.

    Args:
        db: DB connection.
        pks: Primary keys or objects each of which contains values of all primary keys.
        returning: Flag to return deleted records.
    Returns:
        The number of affected rows or deleted records.
    """
    if len(pks) == 0:
        return None

    seq_of_pks: list[dict[str, Any]] = []
    for rec in pks:
        if isinstance(rec, (dict, cls)):
            seq_of_pks.append(extract_pks(cls, rec))
        else:
            cols, vals = parse_pks(cls, rec)
            seq_of_pks.append(dict(zip(cols, vals)))

    condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()])
    wc, wp = where(condition)

    sql = f"DELETE FROM {cls.name}{_spacer(wc)}"
    seq_of_params: list[list[Any]] = [wp]

    for v in seq_of_pks[1:]:
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()])
        _, wp = where(condition)
        seq_of_params.append(wp)

    if returning:
        models = cls.fetch_many(db, seq_of_pks)
        db.stmt().executemany(sql, seq_of_params)
        return models
    else:
        return db.stmt().executemany(sql, seq_of_params).rowcount
def delete_where(db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True)

Delete records which satisfy the condition.

t.delete(db, Q.eq(c1=1))
# SQL: DELETE FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.
returning
Flag to return deleted records.
allow_all
If False, empty condition raises ValueError.

Returns

The number of affected rows or deleted records.

Expand source code
@classmethod
def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True):
    """
    Delete records which satisfy the condition.

    ```python
    t.delete(db, Q.eq(c1=1))
    # SQL: DELETE FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        returning: Flag to return deleted records.
        allow_all: If `False`, empty condition raises `ValueError`.
    Returns:
        The number of affected rows or deleted records.
    """
    wc, wp = where(condition)
    if wc == "" and not allow_all:
        raise ValueError("Delete query to delete all records is not allowed.")

    sql = f"DELETE FROM {cls.name}{_spacer(wc)}"

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *wp)
            return [read_row(row, *cls.select())[0] for row in c.fetchall()]
        else:
            current = cls.fetch_where(db, condition)
            c = db.stmt().execute(sql, *wp)
            return current
    else:
        return db.stmt().execute(sql, *wp).rowcount
def fetch(db: Connection, pks: Union[Any, dict[str, Any]], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]

Fetch a record by primary key(s).

t.fetch(db, 1)
# SQL: SELECT * FROM t WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
lock
Locking statement.

Returns

A model object if exists, otherwise None.

Expand source code
@classmethod
def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]:
    """
    Fetch a record by primary key(s).

    ```python
    t.fetch(db, 1)
    # SQL: SELECT * FROM t WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        lock: Locking statement.
    Returns:
        A model object if exists, otherwise `None`.
    """
    cols, vals = parse_pks(cls, pks)
    cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
    wc, wp = where(cond)
    s = cls.select()
    c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)
    row = c.fetchone()
    return read_row(row, *s)[0] if row else None
def fetch_many(db: Connection, seq_pks: collections.abc.Sequence[typing.Union[typing.Any, dict[str, typing.Any]]], lock: Optional[Any] = None, /, per_page: int = 1000) ‑> list[typing_extensions.Self]

Fetch a record by sequence of primary key(s).

This method simply concatenates equality conditions on primary key by OR operator.

t.fetch_many(db, [1, 2, 3])
# SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3

Args

db
DB connection.
seq_pks
Sequence of primary key value.
lock
Locking statement.
per_page
Maximum number of keys for an execution of query.

Returns

Model objects in the same order as passed sequence.

Expand source code
@classmethod
def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]:
    """
    Fetch a record by sequence of primary key(s).

    This method simply concatenates equality conditions on primary key by OR operator.

    ```python
    t.fetch_many(db, [1, 2, 3])
    # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
    ```

    Args:
        db: DB connection.
        seq_pks: Sequence of primary key value.
        lock: Locking statement.
        per_page: Maximum number of keys for an execution of query.
    Returns:
        Model objects in the same order as passed sequence.
    """
    res = []
    index = 0
    while index < len(seq_pks):
        ordered_pks = []
        cond = Q.of()
        for pks in seq_pks[index:index+per_page]:
            cols, vals = parse_pks(cls, pks)
            ordered_pks.append(tuple(v for v in vals))
            cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
        wc, wp = where(cond)
        s = cls.select()
        c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp)

        record_map = {}
        for r in [read_row(row, *s)[0] for row in c.fetchall()]:
            pk_values = {c.name:v for c, v in r if c.pk}
            record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r

        res.extend([record_map[k] for k in ordered_pks if k in record_map])
        index += per_page

    return res
def fetch_one(db: Connection, condition: Conditional = Condition: '' -- [], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]

Fetch a record which satisfies the condition.

ValueError raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key.

t.fetch_one(db, Q.eq(c1=1))
# SQL: SELECT * FROM t WHERE c1 = 1

Args

db
DB connection.
condition
Query condition.
lock
Locking statement.

Returns

Model objects If exists, otherwise None.

Expand source code
@classmethod
def fetch_one(
    cls,
    db: Connection,
    condition: Conditional = Q.of(),
    lock: Optional[Any] = None,
) -> Optional[Self]:
    """
    Fetch a record which satisfies the condition.

    `ValueError` raises When multiple records are found.
    Use this method for queries which certainly returns a single row, such as search by unique key.

    ```python
    t.fetch_one(db, Q.eq(c1=1))
    # SQL: SELECT * FROM t WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        lock: Locking statement.
    Returns:
        Model objects If exists, otherwise `None`.
    """
    rs = cls.fetch_where(db, condition, lock=lock)

    if not rs:
        return None
    elif len(rs) == 1:
        return rs[0]
    else:
        raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")
def fetch_where(db: Connection, condition: Conditional = Condition: '' -- [], orders: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None) ‑> list[typing_extensions.Self]

Fetch records which satisfy the condition.

t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
# SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5

Args

db
DB connection.
condition
Query condition.
orders
Ordering specification where key is column name and value denotes whether the order is ascending or not.
limit
Maximum nuber of rows to fetch. If None, all rows are returned.
offset
The number of rows to skip.
lock
Locking statement.

Returns

Model objects.

Expand source code
@classmethod
def fetch_where(
    cls,
    db: Connection,
    condition: Conditional = Q.of(),
    orders: Mapping[Union[str, AliasedColumn], ORDER] = {},
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    lock: Optional[Any] = None,
) -> list[Self]:
    """
    Fetch records which satisfy the condition.

    ```python
    t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5)
    # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
    ```

    Args:
        db: DB connection.
        condition: Query condition.
        orders: Ordering specification where key is column name and value denotes whether the order is ascending or not.
        limit: Maximum nuber of rows to fetch. If `None`, all rows are returned.
        offset: The number of rows to skip.
        lock: Locking statement.
    Returns:
        Model objects.
    """
    wc, wp = where(condition)
    rc, rp = ranged_by(limit, offset)
    s = cls.select()
    c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp))
    return [read_row(row, *s)[0] for row in c.fetchall()]
def insert(db: Connection, record: Union[typing_extensions.Self, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False) ‑> typing_extensions.Self

Insert a record.

If returning is True and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.

t.insert(db, dict(c1=1, c2=2))
# SQL: INSERT INTO t (c1, c2) VALUES (1, 2)

Args

db
DB connection.
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return inserted record with complete and correct column values.

Returns

Model of inserted record.

Expand source code
@classmethod
def insert(
    cls,
    db: Connection,
    record: Union[Self, dict[str, Any]],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
) -> Self:
    """
    Insert a record.

    If `returning` is `True` and the DBMS supports **RETURNING** clause,
    returned model object contains comple and correct column values.
    Otherwise, auto incremental value is set to the returned model object
    but other column values generated inside DBMS such as default value are not set.

    ```python
    t.insert(db, dict(c1=1, c2=2))
    # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
    ```

    Args:
        db: DB connection.
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return inserted record with complete and correct column values.
    Returns:
        Model of inserted record.
    """
    model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record))
    sql, _, vals = cls._insert_sql(record, qualifier)

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *vals)
            s = cls.select()
            return read_row(c.fetchone(), *s)[0]
        else:
            # REVIEW
            # Inserted row can't be specified from the table where no primary keys are defined .
            pass

    db.stmt().execute(sql, *vals)
    for c, v in cls.last_sequences(db, 1):
        setattr(model, c.name, v)
    return model
def insert_many(db: Connection, records: list[typing.Union[typing_extensions.Self, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Insert records.

If returning is True and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.

Args

db
DB connection.
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return inserted records with complete and correct column values.

Returns

Models of inserted records or cursor.

Expand source code
@classmethod
def insert_many(
    cls,
    db: Connection,
    records: list[Union[Self, dict[str, Any]]],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Insert records.

    If `returning` is `True` and the DBMS supports **RETURNING** clause,
    returned model object contains comple and correct column values.
    Otherwise, auto incremental value is set to the returned model object
    but other column values generated inside DBMS such as default value are not set.

    Args:
        db: DB connection.
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return inserted records with complete and correct column values.
    Returns:
        Models of inserted records or cursor.
    """
    if len(records) == 0:
        return []

    models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records]

    seq_of_params = []

    sql, cols, params = cls._insert_sql(models[0], qualifier)

    cols = set(cols)
    seq_of_params.append(params)

    for m in models[1:]:
        value_dict = model_values(cls, m)
        check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True)
        # REVIEW:
        # The consistency among columns where expression is set is not checked.
        _, _, params = cls._insert_sql(m, qualifier)
        seq_of_params.append(params)

    db.stmt().executemany(sql, seq_of_params)
    num = len(records)
    for c, v in cls.last_sequences(db, num):
        for i, m in enumerate(models):
            setattr(m, c.name, v - (num - i - 1))

    if returning:
        seq_pks = [extract_pks(cls, m) for m in models]
        return cls.fetch_many(db, seq_pks)
    else:
        return models
def last_sequences(db: Connection, num: int) ‑> list[tuple[Column, int]]

Returns the sequential (auto incremental) values of a table generated by the latest insertion.

Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned.

This method should be overridden by another mixin class defined in dialect module.

Args

db
DB connection.
num
The number of records inserted by the latest query.

Returns

List of pairs of column and its values.

Expand source code
@classmethod
def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]:
    """
    Returns the sequential (auto incremental) values of a table generated by the latest insertion.

    Result contains every sequential columns and their values.
    When the latest query inserts multiple rows, only the last (= biggest) value is returned.

    This method should be overridden by another mixin class defined in dialect module.

    Args:
        db: DB connection.
        num: The number of records inserted by the latest query.
    Returns:
        List of pairs of column and its values.
    """
    return []
def support_returning(db: Connection) ‑> bool

Checks whehter this DBMS support RETURNING clause or not.

Args

db
DB connection.

Returns

Whehter this DBMS support RETURNING clause or not.

Expand source code
@classmethod
def support_returning(cls, db: Connection) -> bool:
    """
    Checks whehter this DBMS support **RETURNING** clause or not.

    Args:
        db: DB connection.
    Returns:
        Whehter this DBMS support **RETURNING** clause or not.
    """
    return False
def update(db: Connection, pks: Union[Any, dict[str, Any]], record: Union[Meta, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Update a record by primary key(s).

This method only updates columns which are found in record except for primary key(s).

t.update(db, 1, dict(c1=1, c2=2))
# SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1

Args

db
DB connection.
pks
Primary key value(s).
record
Object contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.

Returns

Whether the record exists and updated or updated record model.

Expand source code
@classmethod
def update(
    cls,
    db: Connection,
    pks: PKS,
    record: Record,
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Update a record by primary key(s).

    This method only updates columns which are found in `record` except for primary key(s).

    ```python
    t.update(db, 1, dict(c1=1, c2=2))
    # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
    ```

    Args:
        db: DB connection.
        pks: Primary key value(s).
        record: Object contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
    Returns:
        Whether the record exists and updated or updated record model.
    """
    cols, vals = parse_pks(cls, pks)
    condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])
    if returning:
        if cls.support_returning(db):
            models = cls.update_where(db, record, condition, qualifier, returning=True)
            return models[0] if models else None
        else:
            models = cls.update_where(db, record, condition, qualifier, returning=False)
            return cls.fetch(db, pks)
    else:
        return cls.update_where(db, record, condition, qualifier, returning=False) == 1
def update_many(db: Connection, records: collections.abc.Sequence[typing.Union[Meta, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)

Update records by set of primary key(s).

This method invokes on executemany defined in DB-API 2.0. Whether it is optimized compared to execute depends on DB driver.

Args

db
DB connection.
record
Sequence of objects contains column values.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.

Returns

The number of affected rows or updated records.

Expand source code
@classmethod
def update_many(
    cls,
    db: Connection,
    records: Sequence[Record],
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
):
    """
    Update records by set of primary key(s).

    This method invokes on `executemany` defined in DB-API 2.0.
    Whether it is optimized compared to `execute` depends on DB driver.

    Args:
        db: DB connection.
        record: Sequence of objects contains column values.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
    Returns:
        The number of affected rows or updated records.
    """
    if len(records) == 0:
        return [] if returning else 0

    keys = {c.name for c in cls.columns if c.pk}
    if len(keys) == 0:
        raise ValueError(f"update_many is not available because {cls} does not have primary key columns.")

    def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]):
        if cv[0] in keys:
            acc[0][cv[0]] = cv[1]
        else:
            acc[1][cv[0]] = cv[1]
        return acc

    seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = []
    target_columns: Optional[set[str]] = None

    for vs in [model_values(cls, r, excludes_pk=False) for r in records]:
        if not keys < vs.keys():
            raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.")
        pks, rec = reduce(classify, vs.items(), ({}, {}))
        if target_columns is None:
            check_columns(cls, rec)
            target_columns = set(rec.keys())
        else:
            check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore
        seq_of_values.append((pks, rec))

    sql_first = ""
    seq_of_params: list[list[Any]] = []

    for pks, rec in seq_of_values:
        cols, vals = parse_pks(cls, pks)
        condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])

        sql, _, params = cls._update_sql(rec, condition, qualifier)
        if not sql_first:
            sql_first = sql
        seq_of_params.append(params)

    if returning:
        db.stmt().executemany(f"{sql_first}", seq_of_params)
        return cls.fetch_many(db, [pks for pks, _ in seq_of_values])
    else:
        return db.stmt().executemany(sql_first, seq_of_params).rowcount
def update_where(db: Connection, record: Union[Meta, dict[str, Any]], condition: Conditional, qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False, allow_all: bool = True)

Update records which satisfy the condition.

t.update(db, dict(c2=2), Q.eq(c1=1))
# SQL: UPDATE t SET c2 = 2 WHERE c1 = 1

Args

db
DB connection.
record
Object contains column values.
condition
Query condition.
qualifier
Functions qualifying placeholder markers.
returning
Flag to return updated records with complete and correct column values.
allow_all
If False, empty condition raises ValueError.

Returns

The number of affected rows or updated records.

Expand source code
@classmethod
def update_where(
    cls,
    db: Connection,
    record: Record,
    condition: Conditional,
    qualifier: Mapping[str, Qualifier] = {},
    /,
    returning: bool = False,
    allow_all: bool = True,
):
    """
    Update records which satisfy the condition.

    ```python
    t.update(db, dict(c2=2), Q.eq(c1=1))
    # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
    ```

    Args:
        db: DB connection.
        record: Object contains column values.
        condition: Query condition.
        qualifier: Functions qualifying placeholder markers.
        returning: Flag to return updated records with complete and correct column values.
        allow_all: If `False`, empty condition raises `ValueError`.
    Returns:
        The number of affected rows or updated records.
    """
    sql, _, params = cls._update_sql(record, condition, qualifier, allow_all)

    if returning:
        if cls.support_returning(db):
            c = db.stmt().execute(f"{sql} RETURNING *", *params)
            s = cls.select()
            return [read_row(row, *s)[0] for row in c.fetchall()]
        else:
            raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.")
    else:
        c = db.stmt().execute(sql, *params)

        return c.rowcount

Inherited members

class Column (name: str, ptype: type, type_info: Optional[Any], pk: bool, fk: Optional[Relations], incremental: Optional[Any], nullable: bool, comment: str = '')

This class represents a schema of a column.

Expand source code
class Column:
    """
    This class represents a schema of a column.
    """
    def __init__(
        self,
        name: str,
        ptype: type,
        type_info: Optional[Any],
        pk: bool,
        fk: Optional[Relations],
        incremental: Optional[Any],
        nullable: bool,
        comment: str = "",
    ):
        #: Column name.
        self.name = name
        #: Data type in python.
        self.ptype = ptype
        #: Type informations obtained from DB.
        self.type_info = type_info
        #: Is this column a primary key?
        self.pk = pk
        #: Foreign key constraints.
        self.fk = fk
        #: If this column is auto-incremental, this object contains the information of the feature, otherwise, `None`.
        self.incremental = incremental
        #: Can this column contain null?
        self.nullable = nullable
        #: Comment of the column.
        self.comment = comment

Instance variables

var comment

Comment of the column.

var fk

Foreign key constraints.

var incremental

If this column is auto-incremental, this object contains the information of the feature, otherwise, None.

var name

Column name.

var nullable

Can this column contain null?

var pk

Is this column a primary key?

var ptype

Data type in python.

var type_info

Type informations obtained from DB.

class Conditional (expression='', params=None)

Represents a query condition composed of an expression and parameters.

Parameters must be a list where the index of each parameter matches the index of placeholder for it. The expression accepts only the unified marker $_.

Applying logical operators such as &, | and ~ generates new condition.

>>> c1 = Q.of("a = $_", 0)
>>> c2 = Q.of("b < $_", 1)
>>> c3 = Q.of("c > $_", 2)
>>> c = ~(c1 & c2 | c3)
>>> c
Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2]
Expand source code
class Conditional(Expression):
    """
    Represents a query condition composed of an expression and parameters.

    Parameters must be a list where the index of each parameter matches the index of placeholder for it.
    The expression accepts only the unified marker `$_`.

    Applying logical operators such as `&`, `|` and `~` generates new condition.

    ```python
    >>> c1 = Q.of("a = $_", 0)
    >>> c2 = Q.of("b < $_", 1)
    >>> c3 = Q.of("c > $_", 2)
    >>> c = ~(c1 & c2 | c3)
    >>> c
    Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2]
    ```
    """
    @classmethod
    def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional':
        """
        Concatenates condition objects with `AND`.

        Args:
            conditionals: Condition objects.
        Returns:
            Concatenated condition object.
        """
        return reduce(lambda acc, c: acc & c, conditionals, Conditional())

    @classmethod
    def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional':
        """
        Concatenates condition objects with `OR`.

        Args:
            conditionals: Condition objects.
        Returns:
            Concatenated condition object.
        """
        if len(conditionals) == 0:
            return Conditional("1 = 0")
        return reduce(lambda acc, c: acc | c, conditionals, Conditional())

    def __init__(self, expression="", params=None):
        super().__init__(expression, params or [])

    def __repr__(self):
        return f"Condition: '{self.expression}' -- {self.params}"

    def __and__(self, other) -> 'Conditional':
        expression = ""
        if self.expression and other.expression:
            expression = f"({self.expression}) AND ({other.expression})"
        elif self.expression:
            expression = self.expression
        elif other.expression:
            expression = other.expression

        return Conditional(expression, self.params + other.params)

    def __or__(self, other) -> 'Conditional':
        expression = ""
        if self.expression and other.expression:
            expression = f"({self.expression}) OR ({other.expression})"
        elif self.expression:
            expression = self.expression
        elif other.expression:
            expression = other.expression

        return Conditional(expression, self.params + other.params)

    def __invert__(self) -> 'Conditional':
        if self.expression:
            return Conditional(f"NOT ({self.expression})", self.params)
        else:
            return Conditional(f"1 = 0", [])

Ancestors

Static methods

def all(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional

Concatenates condition objects with AND.

Args

conditionals
Condition objects.

Returns

Concatenated condition object.

Expand source code
@classmethod
def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional':
    """
    Concatenates condition objects with `AND`.

    Args:
        conditionals: Condition objects.
    Returns:
        Concatenated condition object.
    """
    return reduce(lambda acc, c: acc & c, conditionals, Conditional())
def any(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional

Concatenates condition objects with OR.

Args

conditionals
Condition objects.

Returns

Concatenated condition object.

Expand source code
@classmethod
def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional':
    """
    Concatenates condition objects with `OR`.

    Args:
        conditionals: Condition objects.
    Returns:
        Concatenated condition object.
    """
    if len(conditionals) == 0:
        return Conditional("1 = 0")
    return reduce(lambda acc, c: acc | c, conditionals, Conditional())

Inherited members

class Connection (api, conn: Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None)

Wrapper class of DB-API 2.0 Connection.

Every instance works as the proxy object to original connection, therefore any attribute in it is still available.

Expand source code
class Connection(dbapi.Connection):
    """
    Wrapper class of DB-API 2.0 Connection.

    Every instance works as the proxy object to original connection, therefore any attribute in it is still available.
    """
    _characters = string.ascii_letters + string.digits + ".="

    def __init__(self, api, conn: dbapi.Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None):
        #: A string which identifies a connection.
        self.identifier = self._gen_identifier()
        #: DB-API 2.0 module.
        self.api = api
        #: Original connection object.
        self.conn = conn
        self._context_factory = context_factory
        self._context = None

    def __getattr__(self, name):
        return getattr(self.conn, name)

    def __enter__(self):
        if hasattr(self.conn, "__enter__"):
            self.conn.__enter__() # type: ignore
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if hasattr(self.conn, "__exit__"):
            self.conn.__exit__(exc_type, exc_value, traceback) # type: ignore
        else:
            if exc_value is None:
                self.conn.rollback()
            else:
                self.conn.commit()
            self.conn.close()

    def _gen_identifier(self):
        return threading.current_thread().name + "-" + secrets.token_hex(4)

    @property
    def context(self) -> ConnectionContext:
        """
        Context object used for this connection.
        """
        if not self._context:
            self._context = (self._context_factory or ConnectionContext)()
            self._context.identifier = self.identifier
        return self._context

    def close(self) -> None:
        return self.conn.close()

    def commit(self) -> None:
        return self.conn.commit()

    def rollback(self) -> None:
        return self.conn.rollback()

    def cursor(self) -> dbapi.Cursor:
        return self.conn.cursor()

    def use(self, factory: Callable[[], ConnectionContext]) -> Self:
        """
        Sets factory function of `ConnectionContext` to use custom context.

        When the context is already set, it will be replaced with new one.

        Args:
            factory: Function returning custom context.
        Returns:
            This instance.
        """
        self._context_factory = factory
        self._context = None
        return self

    def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement':
        """
        Creates new `Statement` which executes queries on this connection.

        Args:
            context: Context object used in the statement. If `None`, the context of this connection is used.
        Returns:
            Created statement.
        """
        return Statement(self, context or self.context)

Ancestors

Instance variables

var api

DB-API 2.0 module.

var conn

Original connection object.

var contextConnectionContext

Context object used for this connection.

Expand source code
@property
def context(self) -> ConnectionContext:
    """
    Context object used for this connection.
    """
    if not self._context:
        self._context = (self._context_factory or ConnectionContext)()
        self._context.identifier = self.identifier
    return self._context
var identifier

A string which identifies a connection.

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    return self.conn.close()
def commit(self) ‑> None
Expand source code
def commit(self) -> None:
    return self.conn.commit()
def cursor(self) ‑> Cursor
Expand source code
def cursor(self) -> dbapi.Cursor:
    return self.conn.cursor()
def rollback(self) ‑> None
Expand source code
def rollback(self) -> None:
    return self.conn.rollback()
def stmt(self, context: Optional[ConnectionContext] = None) ‑> Statement

Creates new Statement which executes queries on this connection.

Args

context
Context object used in the statement. If None, the context of this connection is used.

Returns

Created statement.

Expand source code
def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement':
    """
    Creates new `Statement` which executes queries on this connection.

    Args:
        context: Context object used in the statement. If `None`, the context of this connection is used.
    Returns:
        Created statement.
    """
    return Statement(self, context or self.context)
def use(self, factory: Callable[[], ConnectionContext]) ‑> typing_extensions.Self

Sets factory function of ConnectionContext to use custom context.

When the context is already set, it will be replaced with new one.

Args

factory
Function returning custom context.

Returns

This instance.

Expand source code
def use(self, factory: Callable[[], ConnectionContext]) -> Self:
    """
    Sets factory function of `ConnectionContext` to use custom context.

    When the context is already set, it will be replaced with new one.

    Args:
        factory: Function returning custom context.
    Returns:
        This instance.
    """
    self._context_factory = factory
    self._context = None
    return self
class ConnectionContext (identifier: Optional[str] = None, **configurations)

This class represents a context of query execution.

By default, the context based on global configuration is used. You should declare your own context class and set it to Connection.use() if you want to change the behavior.

Custom context is also useful to change cursor state before and after query execution. Overwrite execute method to do it.

Expand source code
class ConnectionContext:
    """
    This class represents a context of query execution.

    By default, the context based on global configuration is used.
    You should declare your own context class and set it to `Connection.use` if you want to change the behavior.

    Custom context is also useful to change cursor state before and after query execution.  Overwrite `execute` method to do it.
    """
    def __init__(self, identifier: Optional[str] = None, **configurations):
        #: Identifier of this context. `None` by default.
        self.identifier = identifier
        #: Configuration used in this context.
        self.config = default_config().derive(**configurations)

    def _message(self, message):
        return f"({self.identifier}) {message}" if self.identifier else message

    def configure(self, **configurations: Any) -> 'ConnectionContext':
        """
        Change configurations of this context.

        Changes by this method never affect global configuration even if the context is based on it.

        Args:
            configurations: Configurations. See `pyracmon.config` to know available keys.
        Returns:
            This instance.
        """
        self.config.set(**configurations)
        return self

    def execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor:
        """
        Executes a query on a cursor.

        Args:
            cursor: Cursor object.
            sql: Query string.
            params: Query parameters.
        Returns:
            Given cursor object. Internal state may be changed by the execution of the query.
        """
        return self._execute(cursor, sql, params, False)

    def executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor:
        """
        Repeats query on a cursor for sequencee of parameters.

        This method works similar to `execute` but invoke `executemany` instead.

        Args:
            cursor: Cursor object.
            sql: Query string.
            seq_of_args: A sequence of parameters of the query.
        Returns:
            Given cursor object. Internal state may be changed by the execution of the query.
        """
        return self._execute(cursor, sql, seq_of_params, True)

    @overload
    def _execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS, is_many: Literal[False] = False) -> dbapi.Cursor: ...
    @overload
    def _execute(self, cursor: dbapi.Cursor, sql: str, params: Sequence[PARAMS], is_many: Literal[True] = True) -> dbapi.Cursor: ...
    def _execute(
        self,
        cursor: dbapi.Cursor,
        sql: str,
        params,
        is_many: bool = False,
    ) -> dbapi.Cursor:
        logger = _logger(self.config)

        if logger:
            sql_log = sql if len(sql) <= self.config.sql_log_length else f"{sql[0:self.config.sql_log_length]}..."

            logger.log(self.config.log_level, self._message(sql_log))

            if self.config.parameter_log:
                if is_many:
                    for ps in params:
                        logger.log(self.config.log_level, self._message(f"Parameters: {ps}"))
                else:
                    logger.log(self.config.log_level, self._message(f"Parameters: {params}"))

        if is_many:
            cursor.executemany(sql, params)
        else:
            cursor.execute(sql, params)

        return cursor

Instance variables

var config

Configuration used in this context.

var identifier

Identifier of this context. None by default.

Methods

def configure(self, **configurations: Any) ‑> ConnectionContext

Change configurations of this context.

Changes by this method never affect global configuration even if the context is based on it.

Args

configurations
Configurations. See pyracmon.config to know available keys.

Returns

This instance.

Expand source code
def configure(self, **configurations: Any) -> 'ConnectionContext':
    """
    Change configurations of this context.

    Changes by this method never affect global configuration even if the context is based on it.

    Args:
        configurations: Configurations. See `pyracmon.config` to know available keys.
    Returns:
        This instance.
    """
    self.config.set(**configurations)
    return self
def execute(self, cursor: Cursor, sql: str, params: Union[list[Any], dict[str, Any]]) ‑> Cursor

Executes a query on a cursor.

Args

cursor
Cursor object.
sql
Query string.
params
Query parameters.

Returns

Given cursor object. Internal state may be changed by the execution of the query.

Expand source code
def execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor:
    """
    Executes a query on a cursor.

    Args:
        cursor: Cursor object.
        sql: Query string.
        params: Query parameters.
    Returns:
        Given cursor object. Internal state may be changed by the execution of the query.
    """
    return self._execute(cursor, sql, params, False)
def executemany(self, cursor: Cursor, sql: str, seq_of_params: collections.abc.Sequence[typing.Union[list[typing.Any], dict[str, typing.Any]]]) ‑> Cursor

Repeats query on a cursor for sequencee of parameters.

This method works similar to execute but invoke executemany instead.

Args

cursor
Cursor object.
sql
Query string.
seq_of_args
A sequence of parameters of the query.

Returns

Given cursor object. Internal state may be changed by the execution of the query.

Expand source code
def executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor:
    """
    Repeats query on a cursor for sequencee of parameters.

    This method works similar to `execute` but invoke `executemany` instead.

    Args:
        cursor: Cursor object.
        sql: Query string.
        seq_of_args: A sequence of parameters of the query.
    Returns:
        Given cursor object. Internal state may be changed by the execution of the query.
    """
    return self._execute(cursor, sql, seq_of_params, True)
class ContainerView (*args, **kwargs)

The interface of the view of a node set, i.e. NodeContainer and Node.Children .

Expand source code
class ContainerView(Protocol, Generic[T]):
    """
    The interface of the view of a node set, i.e. `NodeContainer` and `Node.Children` .
    """
    def __bool__(self) -> bool:
        """Returns whether this container is not empty."""
        ...
    def __call__(self) -> T:
        """Returns a base container."""
        ...
    def __len__(self) -> int:
        """Returns the number of nodes."""
        ...
    def __iter__(self) -> Iterator['NodeView']:
        """Iterates views of nodes."""
        ...
    @overload
    def __getitem__(self, index: int) -> 'NodeView': ...
    @overload
    def __getitem__(self, index: slice) -> Iterable['NodeView']: ...
    def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]:
        """Returns a view of a node at the index."""
        ...
    def __getattr__(self, key) -> 'ContainerView':
        """Returns a view of the first node or empty container view if it does not exist."""
        ...

Ancestors

  • typing.Protocol
  • typing.Generic

Subclasses

  • pyracmon.graph.graph._EmptyContainerView
class Expression (expression: str, params: list[typing.Any])

Abstraction of expression in any query.

Expand source code
class Expression:
    """
    Abstraction of expression in any query.
    """
    def __init__(self, expression: str, params: list[Any]):
        #: Expression string.
        self.expression = expression
        #: Parameters corresponding to placeholders in the expression.
        self.params = params

Subclasses

Instance variables

var expression

Expression string.

var params

Parameters corresponding to placeholders in the expression.

class Graph (template: GraphTemplate)

This class represents a graph composed of tree-structured node containers.

The structure is determined by GraphTemplate. Use new_graph() Instead of constructor to create new graph instance.

template = GraphSpac().new_template(
    a = (int, lambda x:x),
    b = (str, lambda x:x),
    c = (str, lambda x:x),
)
template.a << template.b << template.c
graph = new_graph(template)

In above code, a graph which has 3 properties ( a b c ) and a structure where a is parent of b and b is parent of c is created.

append ( replace ) is a method to store entities in the graph with tying them each other according to the structure. Entites are encapsulated by Node which can have an edge to parent node.

graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y")

In append, entities are first sorted in descending order, and then:

  • Search a node whose entity is identical to the first entity from the corresponding node container.
    • If found, new node is not created and the identical node is set to next parent.
    • Otherwise, new node is appended and it is set to next parent.
  • Apply this to following entities in order. A difference is that identical node is searched from the sequence of parents in the session.

In example here, the identification is done by entity value itself ( lambda x:x ). Next code is the example where identical nodes are found.

graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y")

In the first append, a and b has its identical node and a is identical in the second. c in the second one is not identical to any node because parent node b="c" is already added as new node.

Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating append .

Expand source code
class Graph:
    """
    This class represents a graph composed of tree-structured node containers.

    The structure is determined by `GraphTemplate`. Use `new_graph` Instead of constructor to create new graph instance.

    ```python
    template = GraphSpac().new_template(
        a = (int, lambda x:x),
        b = (str, lambda x:x),
        c = (str, lambda x:x),
    )
    template.a << template.b << template.c
    graph = new_graph(template)
    ```

    In above code, a graph which has 3 properties ( `a` `b` `c` ) and a structure where `a` is parent of `b` and `b` is parent of `c` is created.

    `append` ( `replace` ) is a method to store entities in the graph with tying them each other according to the structure.
    Entites are encapsulated by `Node` which can have an edge to parent node.

    ```python
    graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y")
    ```

    In `append`, entities are first sorted in descending order, and then:

    - Search a node whose entity is *identical* to the first entity from the corresponding node container.
        - If found, new node is not created and the *identical* node is set to next parent.
        - Otherwise, new node is appended and it is set to next parent.
    - Apply this to following entities in order. A difference is that *identical* node is searched from the sequence of parents in the session.

    In example here, the identification is done by entity value itself ( `lambda x:x` ). Next code is the example where *identical* nodes are found.

    ```python
    graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y")
    ```

    In the first `append`, `a` and `b` has its *identical* node and `a` is *identical* in the second.
    `c` in the second one is not *identical* to any node because parent node `b="c"` is already added as new node.

    Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating `append` .
    """
    def __init__(self, template: GraphTemplate):
        #: Graph template.
        self.template: GraphTemplate = template
        #: A `dict` containing node containers by their names.
        self.containers: dict[str, NodeContainer] = {p.name:self._to_container(p) for p in template}
        self._view = None

    def _to_container(self, prop: GraphTemplate.Property) -> 'NodeContainer':
        if isinstance(prop.kind, GraphTemplate):
            return _GraphNodeContainer(prop)
        else:
            return NodeContainer(prop)

    def _container_of(self, prop: GraphTemplate.Property) -> Optional['NodeContainer']:
        candidates = [c for c in self.containers.values() if c.prop.is_compatible(prop)]
        if len(candidates) > 1:
            raise ValueError(f"Container can't be determined from property '{prop.name}'.")
        return candidates[0] if candidates else None

    def __add__(self, another: Union[Self, GraphView]) -> 'Graph':
        """
        Create new graph by adding this graph and another graph.

        New graph has the same template as this graph's.
        On the other hand, because this method depends on `__iadd__()`, another graph must not have the same template.

        Args:
            another: Graph or its view.
        Returns:
            Created graph.
        """
        graph = Graph(self.template)

        graph += self
        graph += another

        return graph

    def __iadd__(self, another: Union[Self, GraphView]) -> Self:
        """
        Append nodes from another graph.

        Nodes of another graph are traversed from its root and appended to compatible containers each other.

        Args:
            another: Graph or its view.
        Returns:
            This graph.
        """
        graph = another if isinstance(another, Graph) else another()

        def add(n: Node, anc: dict[str, list[Node]]):
            c = self._container_of(n.prop)
            if c:
                c.append(n.entity, anc)
            for ch_ in n.children.values():
                for m in ch_.nodes:
                    add(m, anc.copy())

        for c_ in graph.roots:
            for n_ in c_.nodes:
                add(n_, {})

        return self

    @property
    def roots(self) -> Iterable['NodeContainer']:
        """
        Returns root node containers.
        """
        return filter(lambda c: c.prop.parent is None, self.containers.values())

    @property
    def view(self) -> GraphView:
        """
        Returns an unmodifiable view of this graph.

        The view object works as the accessor to graph nodes.

        ```python
        >>> template = GraphSpac().new_template(a=int, b=str, c=str)
        >>> template.a << template.b
        >>> graph = new_graph(template)
        >>> view = graph.view
        >>> assert view() is graph                        # invocation
        >>> assert view.a is graph.containers["a"].view   # attribute
        >>> assert [c().name for c in view] == ["a", "c"] # iteration
        ```
        """
        if self._view is None:
            graph = self
            class _GraphView:
                def __call__(self) -> Graph:
                    """Returns the greph of this view."""
                    return graph
                def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]:
                    """Iterates views of root containers."""
                    return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values()))
                def __getattr__(self, name: str) -> ContainerView:
                    """Returns a view of a container of the name."""
                    return graph.containers[name].view
            self._view = _GraphView()
        return self._view

    def _append(self, to_replace: bool, entities: dict[str, Any]) -> Self:
        props = [p for p in self.template if p.name in entities]

        filtered = set()
        for p in props:
            if (p.parent is None) or (p.parent.name not in entities) or (p.parent.name in filtered):
                if p.entity_filter is None or p.entity_filter(entities[p.name]):
                    filtered.add(p.name)

        ancestors = {}
        for k in [p.name for p in props if p.name in filtered]:
            self.containers[k].append(entities[k], ancestors, to_replace)

        return self

    def append(self, **entities: Any) -> Self:
        """
        Append entities with associated property names.

        Args:
            entities: Entities keyed with associated property names.
        Returns:
            This graph.
        """
        return self._append(False, entities)

    def replace(self, **entities: Any) -> Self:
        """
        Works similarly to `append`, but entities of identical nodes are replaced with given entities.

        Args:
            entities: Entities keyed with associated property names.
        Returns:
            This graph.
        """
        return self._append(True, entities)

Instance variables

var containers

A dict containing node containers by their names.

var roots : collections.abc.Iterable['NodeContainer']

Returns root node containers.

Expand source code
@property
def roots(self) -> Iterable['NodeContainer']:
    """
    Returns root node containers.
    """
    return filter(lambda c: c.prop.parent is None, self.containers.values())
var template

Graph template.

var viewGraphView

Returns an unmodifiable view of this graph.

The view object works as the accessor to graph nodes.

>>> template = GraphSpac().new_template(a=int, b=str, c=str)
>>> template.a << template.b
>>> graph = new_graph(template)
>>> view = graph.view
>>> assert view() is graph                        # invocation
>>> assert view.a is graph.containers["a"].view   # attribute
>>> assert [c().name for c in view] == ["a", "c"] # iteration
Expand source code
@property
def view(self) -> GraphView:
    """
    Returns an unmodifiable view of this graph.

    The view object works as the accessor to graph nodes.

    ```python
    >>> template = GraphSpac().new_template(a=int, b=str, c=str)
    >>> template.a << template.b
    >>> graph = new_graph(template)
    >>> view = graph.view
    >>> assert view() is graph                        # invocation
    >>> assert view.a is graph.containers["a"].view   # attribute
    >>> assert [c().name for c in view] == ["a", "c"] # iteration
    ```
    """
    if self._view is None:
        graph = self
        class _GraphView:
            def __call__(self) -> Graph:
                """Returns the greph of this view."""
                return graph
            def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]:
                """Iterates views of root containers."""
                return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values()))
            def __getattr__(self, name: str) -> ContainerView:
                """Returns a view of a container of the name."""
                return graph.containers[name].view
        self._view = _GraphView()
    return self._view

Methods

def append(self, **entities: Any) ‑> typing_extensions.Self

Append entities with associated property names.

Args

entities
Entities keyed with associated property names.

Returns

This graph.

Expand source code
def append(self, **entities: Any) -> Self:
    """
    Append entities with associated property names.

    Args:
        entities: Entities keyed with associated property names.
    Returns:
        This graph.
    """
    return self._append(False, entities)
def replace(self, **entities: Any) ‑> typing_extensions.Self

Works similarly to append, but entities of identical nodes are replaced with given entities.

Args

entities
Entities keyed with associated property names.

Returns

This graph.

Expand source code
def replace(self, **entities: Any) -> Self:
    """
    Works similarly to `append`, but entities of identical nodes are replaced with given entities.

    Args:
        entities: Entities keyed with associated property names.
    Returns:
        This graph.
    """
    return self._append(True, entities)
class GraphSchema (spec: Any, template: GraphTemplate, **serializers: NodeSerializer)

This class exposes a property to get the schema of serialization result of a graph.

TODO: Dependency to GraphSpec should be replaced in another way.

Expand source code
class GraphSchema:
    """
    This class exposes a property to get the schema of serialization result of a graph.

    TODO: Dependency to `GraphSpec` should be replaced in another way.
    """
    def __init__(self, spec: Any, template: GraphTemplate, **serializers: NodeSerializer):
        #: Specification of graph operations.
        self.spec = spec
        #: Graph template to serialize.
        self.template = template
        #: `NodeSerializer`s used for the serialization.
        self.serializers = serializers

    def _return_from(self, prop: GraphTemplate.Property) -> type:
        """
        Get a type the node of passed property will be serialized.
        """
        ns = self.serializers[prop.name]

        # Type of the node entity.
        entity_type = prop.kind
        if isinstance(entity_type, GraphTemplate):
            # GraphTemplate type is ignored because serializer added by sub() resolve the type by iteself.
            entity_type = _templateType(entity_type)

        # Return type of the NodeSerializer.
        ns_type = signature(ns.serializer).return_annotation

        # Return type of base serializer obtained from GraphSpec.
        base = chain_serializers(self.spec.find_serializers(entity_type))
        base_type = signature(base).return_annotation if base else Signature.empty
        #base_type = entity_type if base_type == Signature.empty else base_type

        # If the return type contains a single type parameter, previous type is applied to it.
        # Serializer without return annotation is supposed to return input type as it is.
        def next_resolvable(it: Iterator[type]) -> type:
            while True:
                res = next(it, None)
                if res is None:
                    break
                elif res != Signature.empty:
                    return res
            return Signature.empty

        def resolve(it: Iterator[type]) -> type:
            origin = next_resolvable(it)
            if origin == Signature.empty:
                return origin
            elif issubgeneric(origin, Typeable):
                if not Typeable.is_resolved(origin):
                    param = resolve(it)
                    if param == Signature.empty:
                        # Type parameter is not known.
                        return Signature.empty
                    # Replace type parameter.
                    origin = origin[param] # type: ignore
                return Typeable.resolve(origin, resolve(it), self.spec)
            else:
                args = get_args(origin)
                if args:
                    # origin is generics.
                    type_params = list(filter(lambda ia: isinstance(ia[1], TypeVar), enumerate(args)))
                    # python < 3.10
                    param_num = len(type_params)
                    if param_num == 0:
                        return origin
                    elif param_num == 1:
                        # Replace type parameter
                        param = resolve(it)
                        return origin[param] # type: ignore
                    else:
                        return Signature.empty
                    # python >= 3.10
                    #match len(type_params):
                    #    case 0:
                    #        return origin
                    #    case 1:
                    #        # Replace type parameter
                    #        param = resolve(it)
                    #        return origin[param] # type: ignore
                    #    case _:
                    #        return Signature.empty
                else:
                    return origin

        return resolve(iter([ns_type, base_type, entity_type, entity_type]))

    def schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]:
        """
        Generates structured and documented schema for a template property.

        Args:
            prop: A template property.
        Returns:
            Schema with documentation.
        """
        return_type = self._return_from(prop)

        doc = self.serializers[prop.name]._doc or ""

        # TypedDict type is also a subclass of dict.
        if issubclass(return_type, dict):
            annotations = {}

            for c in filter(lambda c: c.name in self.serializers, prop.children):
                ns = self.serializers[c.name]
                cs = self.schema_of(c)

                t, d = decompose_document(cs)

                if ns.be_merged:
                    if not issubclass(t, dict):
                        raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.")
                    annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()})
                elif ns.be_singular:
                    rt = signature(ns.aggregator).return_annotation
                    rt = replace_optional_typevar(rt, cs)
                    annotations[ns.namer(c.name)] = rt
                else:
                    annotations[ns.namer(c.name)] = document_type(list[t], d)

            td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None

            return document_type(generate_schema(annotations, td_type), doc)
        else:
            return document_type(return_type, doc)

    @property
    def schema(self) -> type[TypedDict]:
        """
        Generates `TypedDict` which represents the schema of serialized graph.
        """
        annotations: dict[str, Any] = {}

        def put_root_schema(p: GraphTemplate.Property):
            nonlocal annotations

            ns = self.serializers[p.name]
            dt = self.schema_of(p)

            if ns.be_merged:
                t, d = decompose_document(dt)
                annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()})
            elif ns.be_singular:
                rt = signature(ns.aggregator).return_annotation
                rt = replace_optional_typevar(rt, dt)
                annotations[ns.namer(p.name)] = rt
            else:
                t, d = decompose_document(dt)
                annotations[ns.namer(p.name)] = document_type(list[t], d)

        roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values())

        for p in roots:
            put_root_schema(p)

        return generate_schema(annotations)

    def serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]:
        """
        Serialize graph into a dictionary.

        Args:
            graph: A view of a graph.
            node_params: Parameters passed to `SerializationContext` and used by *serializer* s.
        Returns:
            Serialization result.
        """
        return self.spec.to_dict(graph, node_params, **self.serializers)

Instance variables

var schema : type[typing.TypedDict]

Generates TypedDict which represents the schema of serialized graph.

Expand source code
@property
def schema(self) -> type[TypedDict]:
    """
    Generates `TypedDict` which represents the schema of serialized graph.
    """
    annotations: dict[str, Any] = {}

    def put_root_schema(p: GraphTemplate.Property):
        nonlocal annotations

        ns = self.serializers[p.name]
        dt = self.schema_of(p)

        if ns.be_merged:
            t, d = decompose_document(dt)
            annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()})
        elif ns.be_singular:
            rt = signature(ns.aggregator).return_annotation
            rt = replace_optional_typevar(rt, dt)
            annotations[ns.namer(p.name)] = rt
        else:
            t, d = decompose_document(dt)
            annotations[ns.namer(p.name)] = document_type(list[t], d)

    roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values())

    for p in roots:
        put_root_schema(p)

    return generate_schema(annotations)
var serializers

NodeSerializers used for the serialization.

var spec

Specification of graph operations.

var template

Graph template to serialize.

Methods

def schema_of(self, prop: GraphTemplate.Property) ‑> Type[Annotated]

Generates structured and documented schema for a template property.

Args

prop
A template property.

Returns

Schema with documentation.

Expand source code
def schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]:
    """
    Generates structured and documented schema for a template property.

    Args:
        prop: A template property.
    Returns:
        Schema with documentation.
    """
    return_type = self._return_from(prop)

    doc = self.serializers[prop.name]._doc or ""

    # TypedDict type is also a subclass of dict.
    if issubclass(return_type, dict):
        annotations = {}

        for c in filter(lambda c: c.name in self.serializers, prop.children):
            ns = self.serializers[c.name]
            cs = self.schema_of(c)

            t, d = decompose_document(cs)

            if ns.be_merged:
                if not issubclass(t, dict):
                    raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.")
                annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()})
            elif ns.be_singular:
                rt = signature(ns.aggregator).return_annotation
                rt = replace_optional_typevar(rt, cs)
                annotations[ns.namer(c.name)] = rt
            else:
                annotations[ns.namer(c.name)] = document_type(list[t], d)

        td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None

        return document_type(generate_schema(annotations, td_type), doc)
    else:
        return document_type(return_type, doc)
def serialize(self, graph: GraphView, **node_params: dict[str, typing.Any]) ‑> dict[str, typing.Any]

Serialize graph into a dictionary.

Args

graph
A view of a graph.
node_params
Parameters passed to SerializationContext and used by serializer s.

Returns

Serialization result.

Expand source code
def serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]:
    """
    Serialize graph into a dictionary.

    Args:
        graph: A view of a graph.
        node_params: Parameters passed to `SerializationContext` and used by *serializer* s.
    Returns:
        Serialization result.
    """
    return self.spec.to_dict(graph, node_params, **self.serializers)
class GraphView (*args, **kwargs)

The interface of the view of graph.

Expand source code
class GraphView(Protocol):
    """
    The interface of the view of graph.
    """
    def __call__(self) -> 'Graph': ...
    def __iter__(self) -> Iterator[tuple[str, 'ContainerView[NodeContainer]']]:
        """
        Iterates root container views.

        Returns:
            Iterator of pairs of name and container view.
        """
        ...
    def __getattr__(self, name: str) -> 'ContainerView':
        """
        Returns a container view by its name.

        Args:
            name: Container name. i.e. template property name for the node container.
        Returns:
            Container view.
        """
        ...

Ancestors

  • typing.Protocol
  • typing.Generic
class Model
Expand source code
class Model(Mixins[Unpack[MXS]], metaclass=Meta):
    """
    Base type of model types.

    This class only works as a marker of model types and gives no functionalities to them.
    """
    def __init__(self, **kwargs) -> None: ... # for typing
class Node (prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int)

This class represents a node which contains an entity.

Expand source code
class Node:
    """
    This class represents a node which contains an entity.
    """
    class Children:
        """
        This class represents a child nodes of a node.
        """
        def __init__(self, prop: GraphTemplate.Property):
            #: Template property.
            self.prop = prop
            self.nodes: list[Node] = []
            self.keys = set()
            self._view = None

        @property
        def name(self) -> str:
            """
            Returns the name of corresponding template property.
            """
            return self.prop.name

        @property
        def view(self) -> ContainerView['Node.Children']:
            """
            Returns an unmodifiable view of child nodes.
            """
            if self._view is None:
                base = self
                class _ChildrenView:
                    def __bool__(self):
                        """Returns whether this container is not empty."""
                        return len(base.nodes) != 0
                    def __call__(self):
                        """Returns children container."""
                        return base
                    def __iter__(self):
                        """Iterates views of child nodes."""
                        return map(lambda n: n.view, base.nodes)
                    def __len__(self):
                        """Returns the number of child nodes."""
                        return len(base.nodes)
                    @overload
                    def __getitem__(self, index: int) -> 'NodeView': ...
                    @overload
                    def __getitem__(self, index: slice) -> Iterable['NodeView']: ...
                    def __getitem__(self, index):
                        """Returns a view of child node at the index."""
                        if isinstance(index, slice):
                            return [n.view for n in base.nodes[index]]
                        else:
                            return base.nodes[index].view
                    def __getattr__(self, key):
                        """Returns a view of the first node or empty container view if it does not exist."""
                        child = next(filter(lambda c: c.name == key, base.prop.children), None)
                        if child:
                            return base.nodes[0].children[key].view if len(base.nodes) > 0 else _EmptyContainerView(child)
                        else:
                            raise KeyError(f"Graph property '{base.prop.name}' does not have a child property '{key}'.")
                self._view = _ChildrenView()
            return self._view

        def __contains__(self, node: 'Node') -> bool:
            return node in self.keys

        def __iter__(self) -> Iterator['Node']:
            return iter(self.nodes)

        def append(self, node):
            if node not in self.keys:
                self.keys.add(node)
                self.nodes.append(node)

    def __init__(self, prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int):
        #: Template property.
        self.prop = prop
        #: An entity value.
        self.entity = entity
        self.key = key
        self.parents = set()
        self.children: dict[str, Node.Children] = {c.name: Node.Children(c) for c in prop.children}
        self._index = index
        self._view = None

    def __contains__(self, key: str) -> bool:
        return key in self.children

    @property
    def name(self) -> str:
        """
        Returns the container name, which is same as the name of template property.
        """
        return self.prop.name

    @property
    def view(self) -> NodeView:
        """
        Returns an unmodifiable view of this node.

        The view object works as the accessor to entity and child nodes.
        """
        if self._view is None:
            node = self
            class _NodeView(NodeView):
                def __call__(self, alt: Any = None) -> Any:
                    """Returns an entity of this node."""
                    return node.entity
                def __getattr__(self, key: str) -> ContainerView:
                    """Returns a view of child nodes by its name."""
                    return node.children[key].view
                def __iter__(self) -> Iterator[tuple[str, ContainerView]]:
                    """Iterate key-value pairs of child nodes."""
                    return map(lambda nc: (nc[0], nc[1].view), node.children.items())
            self._view = _NodeView()
        return self._view

    def add_child(self, child: 'Node') -> Self:
        """
        Adds a child node.

        Args:
            child: Child node.
        Returns:
            This instance.
        """
        if child.prop.template != self.prop.template:
            raise ValueError(f"Nodes from different graph template can't be associated.")
        self.children[child.prop.name].append(child)
        child.parents.add(self)
        return self

    def has_child(self, child: 'Node') -> bool:
        """
        Checks this node contains the node identical to given node.

        Args:
            child: Node to search.
        Returns:
            `True` if exists.
        """
        if child.prop.template != self.prop.template:
            return False
        elif child.prop.name in self.children:
            return child in self.children[child.prop.name].keys
        else:
            return False

Subclasses

  • pyracmon.graph.graph._GraphNode

Class variables

var Children

This class represents a child nodes of a node.

Instance variables

var entity

An entity value.

var name : str

Returns the container name, which is same as the name of template property.

Expand source code
@property
def name(self) -> str:
    """
    Returns the container name, which is same as the name of template property.
    """
    return self.prop.name
var prop

Template property.

var viewNodeView

Returns an unmodifiable view of this node.

The view object works as the accessor to entity and child nodes.

Expand source code
@property
def view(self) -> NodeView:
    """
    Returns an unmodifiable view of this node.

    The view object works as the accessor to entity and child nodes.
    """
    if self._view is None:
        node = self
        class _NodeView(NodeView):
            def __call__(self, alt: Any = None) -> Any:
                """Returns an entity of this node."""
                return node.entity
            def __getattr__(self, key: str) -> ContainerView:
                """Returns a view of child nodes by its name."""
                return node.children[key].view
            def __iter__(self) -> Iterator[tuple[str, ContainerView]]:
                """Iterate key-value pairs of child nodes."""
                return map(lambda nc: (nc[0], nc[1].view), node.children.items())
        self._view = _NodeView()
    return self._view

Methods

def add_child(self, child: Node) ‑> typing_extensions.Self

Adds a child node.

Args

child
Child node.

Returns

This instance.

Expand source code
def add_child(self, child: 'Node') -> Self:
    """
    Adds a child node.

    Args:
        child: Child node.
    Returns:
        This instance.
    """
    if child.prop.template != self.prop.template:
        raise ValueError(f"Nodes from different graph template can't be associated.")
    self.children[child.prop.name].append(child)
    child.parents.add(self)
    return self
def has_child(self, child: Node) ‑> bool

Checks this node contains the node identical to given node.

Args

child
Node to search.

Returns

True if exists.

Expand source code
def has_child(self, child: 'Node') -> bool:
    """
    Checks this node contains the node identical to given node.

    Args:
        child: Node to search.
    Returns:
        `True` if exists.
    """
    if child.prop.template != self.prop.template:
        return False
    elif child.prop.name in self.children:
        return child in self.children[child.prop.name].keys
    else:
        return False
class NodeContainer (prop: GraphTemplate.Property)

This class represents a container of nodes for a template property.

Expand source code
class NodeContainer:
    """
    This class represents a container of nodes for a template property.
    """
    def __init__(self, prop: GraphTemplate.Property):
        #: Template property.
        self.prop = prop
        self.nodes: list[Node] = []
        self.keys: dict[Any, list[int]] = {}
        self._view = None

    @property
    def name(self) -> str:
        """
        Returns the container name, which is same as the name of template property.
        """
        return self.prop.name

    @property
    def view(self) -> ContainerView['NodeContainer']:
        """
        Returns an unmodifiable view of this container.

        The view object works as the accessor to container components.

        ```python
        template = GraphSpac().new_template(a=int, b=str, c=str)
        template.a << template.b
        graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c")
        container = graph.containers["a"]
        view = graph.view.a
        assert view() is container                             # invocation
        assert view.b is container.nodes[0].children["b"].view # attribute
        assert view[1] is container.nodes[1].view              # index
        assert [n() for n in view] == [1, 2]                   # iteration
        assert len(view) == 2                                  # length
        ```
        """
        if self._view is None:
            container = self
            class _ContainerView:
                def __bool__(self):
                    """Returns whether this container is not empty."""
                    return len(container.nodes) != 0
                def __call__(self) -> NodeContainer:
                    """Returns a base container."""
                    return container
                def __len__(self):
                    """Returns the number of nodes."""
                    return len(container.nodes)
                def __iter__(self):
                    """Iterates views of nodes."""
                    return map(lambda n: n.view, container.nodes)
                @overload
                def __getitem__(self, index: int) -> 'NodeView': ...
                @overload
                def __getitem__(self, index: slice) -> Iterable['NodeView']: ...
                def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]:
                    """Returns a view of a node at the index."""
                    if isinstance(index, slice):
                        return [n.view for n in container.nodes[index]]
                    else:
                        return container.nodes[index].view
                def __getattr__(self, key) -> ContainerView:
                    """Returns a view of the first node or empty container view if it does not exist."""
                    child = next(filter(lambda c: c.name == key, container.prop.children), None)
                    if child:
                        return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child)
                    else:
                        raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.")
            self._view = _ContainerView()
        return self._view

    def append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False):
        """
        Add an entity to this container.

        Identical node is searched by examining whether this container already contains a node of the identical entity
        and its parent is found in `anscestors` .

        Args:
            entity: An entity to be stored in the node.
            ancestors: Parent nodes mapped by property names.
            to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed.
        """
        policy: IdentifyPolicy = self.prop.policy or neverPolicy()

        key = policy.get_identifier(entity)

        parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors)

        new_nodes = identicals.copy()

        for pn in parents:
            index = len(self.nodes)

            node = Node(self.prop, entity, key, index)
            self.nodes.append(node)
            if key is not None:
                self.keys.setdefault(key, []).append(index)
            new_nodes.append(node)

            if pn is not None:
                pn.add_child(node)

        if to_replace:
            for n in identicals:
                n.entity = entity

        ancestors[self.prop.name] = new_nodes

Subclasses

  • pyracmon.graph.graph._GraphNodeContainer

Instance variables

var name : str

Returns the container name, which is same as the name of template property.

Expand source code
@property
def name(self) -> str:
    """
    Returns the container name, which is same as the name of template property.
    """
    return self.prop.name
var prop

Template property.

var viewContainerView[NodeContainer]

Returns an unmodifiable view of this container.

The view object works as the accessor to container components.

template = GraphSpac().new_template(a=int, b=str, c=str)
template.a << template.b
graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c")
container = graph.containers["a"]
view = graph.view.a
assert view() is container                             # invocation
assert view.b is container.nodes[0].children["b"].view # attribute
assert view[1] is container.nodes[1].view              # index
assert [n() for n in view] == [1, 2]                   # iteration
assert len(view) == 2                                  # length
Expand source code
@property
def view(self) -> ContainerView['NodeContainer']:
    """
    Returns an unmodifiable view of this container.

    The view object works as the accessor to container components.

    ```python
    template = GraphSpac().new_template(a=int, b=str, c=str)
    template.a << template.b
    graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c")
    container = graph.containers["a"]
    view = graph.view.a
    assert view() is container                             # invocation
    assert view.b is container.nodes[0].children["b"].view # attribute
    assert view[1] is container.nodes[1].view              # index
    assert [n() for n in view] == [1, 2]                   # iteration
    assert len(view) == 2                                  # length
    ```
    """
    if self._view is None:
        container = self
        class _ContainerView:
            def __bool__(self):
                """Returns whether this container is not empty."""
                return len(container.nodes) != 0
            def __call__(self) -> NodeContainer:
                """Returns a base container."""
                return container
            def __len__(self):
                """Returns the number of nodes."""
                return len(container.nodes)
            def __iter__(self):
                """Iterates views of nodes."""
                return map(lambda n: n.view, container.nodes)
            @overload
            def __getitem__(self, index: int) -> 'NodeView': ...
            @overload
            def __getitem__(self, index: slice) -> Iterable['NodeView']: ...
            def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]:
                """Returns a view of a node at the index."""
                if isinstance(index, slice):
                    return [n.view for n in container.nodes[index]]
                else:
                    return container.nodes[index].view
            def __getattr__(self, key) -> ContainerView:
                """Returns a view of the first node or empty container view if it does not exist."""
                child = next(filter(lambda c: c.name == key, container.prop.children), None)
                if child:
                    return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child)
                else:
                    raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.")
        self._view = _ContainerView()
    return self._view

Methods

def append(self, entity: Any, ancestors: collections.abc.MutableMapping[str, list['Node']], to_replace: bool = False)

Add an entity to this container.

Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in anscestors .

Args

entity
An entity to be stored in the node.
ancestors
Parent nodes mapped by property names.
to_replace
If True, the entity of identical node is replaced. Otherwise, it is not changed.
Expand source code
def append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False):
    """
    Add an entity to this container.

    Identical node is searched by examining whether this container already contains a node of the identical entity
    and its parent is found in `anscestors` .

    Args:
        entity: An entity to be stored in the node.
        ancestors: Parent nodes mapped by property names.
        to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed.
    """
    policy: IdentifyPolicy = self.prop.policy or neverPolicy()

    key = policy.get_identifier(entity)

    parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors)

    new_nodes = identicals.copy()

    for pn in parents:
        index = len(self.nodes)

        node = Node(self.prop, entity, key, index)
        self.nodes.append(node)
        if key is not None:
            self.keys.setdefault(key, []).append(index)
        new_nodes.append(node)

        if pn is not None:
            pn.add_child(node)

    if to_replace:
        for n in identicals:
            n.entity = entity

    ancestors[self.prop.name] = new_nodes
class NodeContext (context: SerializationContext, params: NodeParams)

A class containing informations for serialization of a single node.

The instance of this class is passed to the serialization function. Properties listed below are available to control serialization.

  • context: SerializationContext for the serialization of the graph.
  • node: Node to serialize.
  • value: Entity value of the Node .
  • params: Arbitrary values which is passed from invoking scope with being bound to the key of node name.

Every serializer has to call serialize() to get the result of preceeding serializers, or make a result direcly from the node.

Expand source code
class NodeContext:
    """
    A class containing informations for serialization of a single node.

    The instance of this class is passed to the serialization function.
    Properties listed below are available to control serialization.

    - context: `SerializationContext` for the serialization of the graph.
    - node: `Node` to serialize.
    - value: Entity value of the `Node` .
    - params: Arbitrary values which is passed from invoking scope with being bound to the key of node name.

    Every serializer has to call `serialize()` to get the result of preceeding serializers,
    or make a result direcly from the node.
    """
    def __init__(self, context: 'SerializationContext', params: NodeParams) -> None:
        #: `SerializationContext` for the serializaion of the graph.`
        self.context = context
        #: Arbitrary values passed by outside for the node.
        self.params = params
        # Set on demand.
        self._node: Optional[Node] = None
        self._iterator: Optional[Iterator[Any]] = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._node = None
        self._iterator = None

    @property
    def node(self) -> Node:
        # Node must be set when passed to serialization function.
        return cast(Node, self._node)

    @property
    def value(self) -> Any:
        return cast(Node, self._node).entity

    def serialize(self) -> Any:
        """
        Obtain a value serialized by preceeding serializers.
        """
        try:
            return next(cast(Iterator[Any], self._iterator))(self)
        except StopIteration:
            return self.node.entity

Instance variables

var context

SerializationContext for the serializaion of the graph.`

var nodeNode
Expand source code
@property
def node(self) -> Node:
    # Node must be set when passed to serialization function.
    return cast(Node, self._node)
var params

Arbitrary values passed by outside for the node.

var value : Any
Expand source code
@property
def value(self) -> Any:
    return cast(Node, self._node).entity

Methods

def serialize(self) ‑> Any

Obtain a value serialized by preceeding serializers.

Expand source code
def serialize(self) -> Any:
    """
    Obtain a value serialized by preceeding serializers.
    """
    try:
        return next(cast(Iterator[Any], self._iterator))(self)
    except StopIteration:
        return self.node.entity
class NodeView
Expand source code
class NodeView:
    def __call__(self, alt: Any = None) -> Any:
        """Returns an entity of this node."""
        ...
    def __getattr__(self, key: str) -> ContainerView:
        """Returns a view of child nodes by its name."""
        ...
    def __iter__(self) -> Iterator[tuple[str, ContainerView]]:
        """Iterate key-value pairs of child nodes."""
        ...
class Q (**kwargs: Any)

This class provides utility class methods creating conditions.

Using of() is the most simple way to create a condition clause with parameters.

>>> Q.of("a = $_", 1)
Condition: 'a = $_' -- [1]

Other utility methods correspond to basic operators defined in SQL. They takes keyword arguments and create conditions by applying operator to each item respectively.

>>> Q.eq(a=1)
Condition: 'a = %s' -- [1]
>>> Q.in_(a=[1, 2, 3])
Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3]
>>> Q.like(a="abc")
Condition: 'a LIKE %s' -- ["%abc%"]

Multiple arguments generates a condition which concatenates conditions with logical operator, by default AND .

>>> Q.eq(a=1, b=2)
Condition: 'a = %s AND b = %s' -- [1, 2]

Those methods also accept table alias which is prepended to columns.

>>> Q.eq("t", a=1, b=2)
Condition: 't.a = %s AND t.b = %s'

Additionally, the instance of this class has its own functionality to generate condition.

Each parameter passed to the constructor becomes an instance method of the instance, which takes a condition clause including placeholders which will take parameters in query execution phase. Statement.execute() allows unified marker $_ in spite of DB driver.

>>> q = Q(a=1)
>>> q.a("a = $_")
Condition: 'a = $_' -- [1]

Method whose name is not passed to the constructor renders empty condition which has no effect on the query.

>>> q.b("b = $_")
Condition: '' -- []

By default, None is equivalent to not being passed. Giving True at the first argument in constructor changes the behavior.

>>> q = Q(a=1, b=None)
>>> q.b("b = $_")
Condition: '' -- []
>>> q = Q(True, a=1, b=None)
>>> q.b("b = $_")
Condition: 'b = $_' -- [None]

This feature simplifies a query construction in cases some parameters are absent.

>>> def search(db, q):
>>>     w, params = where(q.a("a = $_") & q.b("b = $_"))
>>>     db.stmt().execute(f"SELECT * FROM table {w}", *params)
>>> 
>>> search(db, Q(a=1))      # SELECT * FROM table WHERE a = 1
>>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2
>>> search(db, Q())         # SELECT * FROM table

Initializes an instance.

Args

_include_none_
Whether include attributes whose value is None.
kwargs
Denotes pairs of attribute name and parameter.
Expand source code
class Q:
    """
    This class provides utility class methods creating conditions.

    Using `of()` is the most simple way to create a condition clause with parameters.

    ```python
    >>> Q.of("a = $_", 1)
    Condition: 'a = $_' -- [1]
    ```

    Other utility methods correspond to basic operators defined in SQL.
    They takes keyword arguments and create conditions by applying operator to each item respectively.

    ```python
    >>> Q.eq(a=1)
    Condition: 'a = %s' -- [1]
    >>> Q.in_(a=[1, 2, 3])
    Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3]
    >>> Q.like(a="abc")
    Condition: 'a LIKE %s' -- ["%abc%"]
    ```

    Multiple arguments generates a condition which concatenates conditions with logical operator, by default `AND` .

    ```python
    >>> Q.eq(a=1, b=2)
    Condition: 'a = %s AND b = %s' -- [1, 2]
    ```

    Those methods also accept table alias which is prepended to columns.

    ```python
    >>> Q.eq("t", a=1, b=2)
    Condition: 't.a = %s AND t.b = %s'
    ```

    Additionally, the instance of this class has its own functionality to generate condition.

    Each parameter passed to the constructor becomes an instance method of the instance,
    which takes a condition clause including placeholders which will take parameters in query execution phase.
    `pyracmon.connection.Statement.execute` allows unified marker `$_` in spite of DB driver.

    ```python
    >>> q = Q(a=1)
    >>> q.a("a = $_")
    Condition: 'a = $_' -- [1]
    ```

    Method whose name is not passed to the constructor renders empty condition which has no effect on the query.

    ```python
    >>> q.b("b = $_")
    Condition: '' -- []
    ```

    By default, `None` is equivalent to not being passed. Giving `True` at the first argument in constructor changes the behavior.

    ```python
    >>> q = Q(a=1, b=None)
    >>> q.b("b = $_")
    Condition: '' -- []
    >>> q = Q(True, a=1, b=None)
    >>> q.b("b = $_")
    Condition: 'b = $_' -- [None]
    ```

    This feature simplifies a query construction in cases some parameters are absent.

    ```python
    >>> def search(db, q):
    >>>     w, params = where(q.a("a = $_") & q.b("b = $_"))
    >>>     db.stmt().execute(f"SELECT * FROM table {w}", *params)
    >>> 
    >>> search(db, Q(a=1))      # SELECT * FROM table WHERE a = 1
    >>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2
    >>> search(db, Q())         # SELECT * FROM table
    ```
    """
    class Attribute(Queryable[str]): # type: ignore
        def __init__(self, value):
            self.value = value

        def __call__(
            self,
            expression: Union[str, Callable[[Any], str]],
            convert: Optional[Union[Callable[[Any], Any], Any]] = None,
        ) -> 'Conditional':
            """
            Creates conditional object composed of given expression and the attribute value as parameters.

            Args:
                expression: A clause or a function generating a clause by taking the attribute value.
                convert: A function converting the attribute value to parameters.
                    If this function returns a value which is not a list, a list having only the value is used.
            Returns:
                Condition.
            """
            expression = expression if isinstance(expression, str) else expression(self.value)

            if callable(convert):
                params = convert(self.value)
            elif convert is not None:
                params = convert
            else:
                params = [self.value]

            return Conditional(expression, params if isinstance(params, list) else [params])

        @property
        def all(self) -> 'Q.Attribute':
            """
            Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `AND`.

            Returns:
                Composite attribute.
            """
            return Q.CompositeAttribute(self.value, True)

        @property
        def any(self) -> 'Q.Attribute':
            """
            Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `OR`.

            Returns:
                Composite attribute.
            """
            return Q.CompositeAttribute(self.value, False)

        def __bool__(self):
            return True

        def __and__(self, other: 'Conditional') -> 'Conditional':
            return other if bool(self.value) else Conditional()

        def __or__(self, other: 'Conditional') -> 'Conditional':
            return other if not bool(self.value) else Conditional()

        def __getattr__(self, key):
            """
            Exposes a method which works similarly to 'Q' 's utility method of the same name.

            ```python
            >>> q = Q(a = 1)
            >>> q.a.eq("col")
            Condition: 'col = $_' -- [1]
            >>>
            >>> q.a.eq("col", None, "t")
            Condition: 't.col = $_' -- [1]
            >>>
            >>> q.a.eq("col", lambda x: x*2, "t")
            Condition: 't.col = $_' -- [2]
            ```
            """
            method = getattr(Q, key)
            def invoke(col, convert=None, *args, **kwargs):
                if callable(convert):
                    value = convert(self.value)
                else:
                    value = convert if convert is not None else self.value
                kwargs.update({col: value})
                return method(*args, **kwargs)
            return invoke

    class CompositeAttribute(Attribute):
        def __init__(self, value, and_):
            super().__init__(value)
            self._and = and_

        def __call__(self, expression, convert=None):
            conds = [Q.Attribute(v)(expression, convert) for v in self.value]
            return Conditional.all(conds) if self._and else Conditional.any(conds)

        def __getattr__(self, key):
            method = getattr(Q, key)
            def invoke(col, convert=None, *args, **kwargs):
                def conv(v):
                    if callable(convert):
                        return convert(v)
                    else:
                        # REVIEW Replacing every parameter in the list with the same value is meaningless?
                        return convert if convert is not None else v
                conds = [method(*args, **dict(chain(kwargs.items(), [(col, conv(v))]))) for v in self.value]
                return Conditional.all(conds) if self._and else Conditional.any(conds)
            return invoke

    class NoAttribute(Attribute):
        def __init__(self):
            super().__init__(None)

        def __call__(self, expression, holder=lambda x:x):
            return Conditional()

        @property
        def all(self):
            return self

        @property
        def any(self):
            return self

        def __bool__(self):
            return False

        def __and__(self, other: 'Conditional') -> 'Conditional':
            return Conditional()

        def __or__(self, other: 'Conditional') -> 'Conditional':
            return Conditional()

        def __getattr__(self, key):
            method = getattr(Q, key)
            def invoke(col, convert=None, *args):
                return Conditional()
            return invoke

    def __init__(self, _include_none_: bool = False, **kwargs: Any):
        """
        Initializes an instance.

        Args:
            _include_none_: Whether include attributes whose value is `None`.
            kwargs: Denotes pairs of attribute name and parameter.
        """
        self.attributes = dict([(k, v) for k, v in kwargs.items() if _include_none_ or v is not None])

    def __getattr__(self, key) -> Attribute:
        if key in self.attributes:
            return Q.Attribute(self.attributes[key])
        else:
            return Q.NoAttribute()

    @classmethod
    def of(cls, expression: str = "", *params: Any) -> 'Conditional':
        """
        Creates a condition directly from an expression and parameters.

        Args:
            expression: Condition expression.
            params: Parameters used in the condition.
        Returns:
            Condition object.
        """
        return Conditional(expression, list(params))

    @classmethod
    def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Creates a condition applying `=` operator to columns.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        def is_null(col, val):
            if val is None:
                return f"{col} IS NULL", []
            elif val is True:
                return f"{col}", []
            elif val is False:
                return f"NOT {col}", []
            return None
        return _conditional("=", _and_, kwargs, is_null, _alias_)

    @classmethod
    def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Works like `eq`, but applies `!=`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        def is_null(col, val):
            if val is None:
                return f"{col} IS NOT NULL", []
            elif val is True:
                return f"NOT {col}", []
            elif val is False:
                return f"{col}", []
            return None
        return _conditional("!=", _and_, kwargs, is_null, _alias_)

    @classmethod
    def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional':
        """
        Works like `eq`, but applies `IN`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        def in_list(col, val):
            if len(val) == 0:
                return "1 = 0", []
            else:
                holder = ', '.join(['$_'] * len(val))
                return f"{col} IN ({holder})", val
        return _conditional("IN", _and_, kwargs, in_list, _alias_)

    @classmethod
    def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional':
        """
        Works like `eq`, but applies `NOT IN`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        def in_list(col, val):
            if len(val) == 0:
                return "", []
            else:
                holder = ', '.join(['$_'] * len(val))
                return f"{col} NOT IN ({holder})", val
        return _conditional("NOT IN", _and_, kwargs, in_list, _alias_)

    @classmethod
    def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
        """
        Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("LIKE", _and_, kwargs, None, _alias_)

    @classmethod
    def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
        """
        Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)

    @classmethod
    def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
        """
        Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)

    @classmethod
    def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
        """
        Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_)

    @classmethod
    def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Works like `eq`, but applies `<`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("<", _and_, kwargs, None, _alias_)

    @classmethod
    def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Works like `eq`, but applies `<=`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional("<=", _and_, kwargs, None, _alias_)

    @classmethod
    def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Works like `eq`, but applies `>`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional(">", _and_, kwargs, None, _alias_)

    @classmethod
    def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
        """
        Works like `eq`, but applies `>=`.

        Args:
            _alias_: Table alias.
            _and_: Specifies concatenating logical operator is `AND` or `OR`.
            kwargs: Column names and parameters.
        Returns:
            Condition object.
        """
        return _conditional(">=", _and_, kwargs, None, _alias_)

Class variables

var Attribute

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
var CompositeAttribute

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
var NoAttribute

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...

Static methods

def endswith(**kwargs: str) ‑> Conditional

Works like eq, but applies LIKE. Given parameters will be escaped and prepended with wildcards (%) to execute backward match.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
    """
    Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_)
def eq(**kwargs: Any) ‑> Conditional

Creates a condition applying = operator to columns.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Creates a condition applying `=` operator to columns.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    def is_null(col, val):
        if val is None:
            return f"{col} IS NULL", []
        elif val is True:
            return f"{col}", []
        elif val is False:
            return f"NOT {col}", []
        return None
    return _conditional("=", _and_, kwargs, is_null, _alias_)
def ge(**kwargs: Any) ‑> Conditional

Works like eq, but applies >=.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Works like `eq`, but applies `>=`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional(">=", _and_, kwargs, None, _alias_)
def gt(**kwargs: Any) ‑> Conditional

Works like eq, but applies >.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Works like `eq`, but applies `>`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional(">", _and_, kwargs, None, _alias_)
def in_(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional

Works like eq, but applies IN.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional':
    """
    Works like `eq`, but applies `IN`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    def in_list(col, val):
        if len(val) == 0:
            return "1 = 0", []
        else:
            holder = ', '.join(['$_'] * len(val))
            return f"{col} IN ({holder})", val
    return _conditional("IN", _and_, kwargs, in_list, _alias_)
def le(**kwargs: Any) ‑> Conditional

Works like eq, but applies <=.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Works like `eq`, but applies `<=`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("<=", _and_, kwargs, None, _alias_)
def like(**kwargs: str) ‑> Conditional

Works like eq, but applies LIKE. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
    """
    Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
def lt(**kwargs: Any) ‑> Conditional

Works like eq, but applies <.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Works like `eq`, but applies `<`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("<", _and_, kwargs, None, _alias_)
def match(**kwargs: str) ‑> Conditional

Works like eq, but applies LIKE. Given parameters will be passed to query without being escaped or enclosed.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
    """
    Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("LIKE", _and_, kwargs, None, _alias_)
def neq(**kwargs: Any) ‑> Conditional

Works like eq, but applies !=.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional':
    """
    Works like `eq`, but applies `!=`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    def is_null(col, val):
        if val is None:
            return f"{col} IS NOT NULL", []
        elif val is True:
            return f"NOT {col}", []
        elif val is False:
            return f"{col}", []
        return None
    return _conditional("!=", _and_, kwargs, is_null, _alias_)
def not_in(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional

Works like eq, but applies NOT IN.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional':
    """
    Works like `eq`, but applies `NOT IN`.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    def in_list(col, val):
        if len(val) == 0:
            return "", []
        else:
            holder = ', '.join(['$_'] * len(val))
            return f"{col} NOT IN ({holder})", val
    return _conditional("NOT IN", _and_, kwargs, in_list, _alias_)
def of(expression: str = '', *params: Any) ‑> Conditional

Creates a condition directly from an expression and parameters.

Args

expression
Condition expression.
params
Parameters used in the condition.

Returns

Condition object.

Expand source code
@classmethod
def of(cls, expression: str = "", *params: Any) -> 'Conditional':
    """
    Creates a condition directly from an expression and parameters.

    Args:
        expression: Condition expression.
        params: Parameters used in the condition.
    Returns:
        Condition object.
    """
    return Conditional(expression, list(params))
def startswith(**kwargs: str) ‑> Conditional

Works like eq, but applies LIKE. Given parameters will be escaped and appended with wildcards (%) to execute prefix match.

Args

_alias_
Table alias.
_and_
Specifies concatenating logical operator is AND or OR.
kwargs
Column names and parameters.

Returns

Condition object.

Expand source code
@classmethod
def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional':
    """
    Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match.

    Args:
        _alias_: Table alias.
        _and_: Specifies concatenating logical operator is `AND` or `OR`.
        kwargs: Column names and parameters.
    Returns:
        Condition object.
    """
    return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
class S

An utility class to build NodeSerializer .

This class provides factory class methods to create NodeSerializer each of which works in the same way as the method of the same name declared on NodeSerializer .

Use them to supply NodeSerializers to functions to serialize a graph or to create a graph schema such as graph_dict() or graph_schema() .

graph_dict(
    graph,
    a = S.of(),
    b = S.head(),
)
Expand source code
class S(metaclass=SerializerMeta):
    """
    An utility class to build `NodeSerializer` .

    This class provides factory class methods to create `NodeSerializer`
    each of which works in the same way as the method of the same name declared on `NodeSerializer` .

    Use them to supply `NodeSerializer`s to functions to serialize a graph or to create a graph schema
    such as `graph_dict` or `graph_schema` .

    ```python
    graph_dict(
        graph,
        a = S.of(),
        b = S.head(),
    )
    ```
    """
    @classmethod
    def of(
        cls,
        namer: Optional[Union[str, Callable[[str], str]]] = None,
        aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None,
        *serializers: Serializer,
    ) -> 'NodeSerializer':
        """
        Create an instance of `NodeSerializer`.

        Args:
            namer: A string or naming function.
            aggregator: An aggregation function or an index of node to select in node container.
            serializer: A list of *serializer* s.
        Returns:
            Created `NodeSerializer` .
        """
        return NodeSerializer(namer, aggregator, *serializers)

Static methods

def of(namer: Union[str, Callable[[str], str], ForwardRef(None)] = None, aggregator: Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]], ForwardRef(None)] = None, *serializers: Callable[[ForwardRef('NodeContext')], Any]) ‑> NodeSerializer

Create an instance of NodeSerializer.

Args

namer
A string or naming function.
aggregator
An aggregation function or an index of node to select in node container.
serializer
A list of serializer s.

Returns

Created NodeSerializer .

Expand source code
@classmethod
def of(
    cls,
    namer: Optional[Union[str, Callable[[str], str]]] = None,
    aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None,
    *serializers: Serializer,
) -> 'NodeSerializer':
    """
    Create an instance of `NodeSerializer`.

    Args:
        namer: A string or naming function.
        aggregator: An aggregation function or an index of node to select in node container.
        serializer: A list of *serializer* s.
    Returns:
        Created `NodeSerializer` .
    """
    return NodeSerializer(namer, aggregator, *serializers)

Methods

def alter(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def at(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def doc(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def each(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def fold(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def head(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def last(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def merge(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def name(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def select(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
def sub(*args, **kwargs)
Expand source code
def g(*args, **kwargs):
    ns = NodeSerializer()
    nf = getattr(ns, n)
    return nf(*args, **kwargs)
class Table (name: str, columns: list[Column], comment: str = '')

This class represents a schema of a table.

Expand source code
class Table:
    """
    This class represents a schema of a table.
    """
    def __init__(self, name: str, columns: list[Column], comment: str = ""):
        #: Table name.
        self.name = name
        #: Columns in the table.
        self.columns = columns
        #: Comment of the table.
        self.comment = comment

    def find(self, name: str) -> Optional[Column]:
        """
        Find a column by name.

        Args:
            name: Column name.
        Returns:
            The column if exists, otherwise `None`.
        """
        return next(filter(lambda c: c.name == name, self.columns), None)

Instance variables

var columns

Columns in the table.

var comment

Comment of the table.

var name

Table name.

Methods

def find(self, name: str) ‑> Optional[Column]

Find a column by name.

Args

name
Column name.

Returns

The column if exists, otherwise None.

Expand source code
def find(self, name: str) -> Optional[Column]:
    """
    Find a column by name.

    Args:
        name: Column name.
    Returns:
        The column if exists, otherwise `None`.
    """
    return next(filter(lambda c: c.name == name, self.columns), None)
class Typeable

An interface for generic type which is resolved into a concrete type by a type parameter.

Inherit this class and declare static method whose signature is resolve(me, bound, arg, spec) -> type.

>>> class A(Typeable[T]):
>>>     @staticmethod
>>>     def resolve(me, bound, arg, spec):
>>>         ...
>>>         return some_type
>>>
>>> Typeable.resolve(A[T], int, spec)

Type resolution starts from Typeable.resolve() which invokes the static method with following arguments.

  • Type to resolve itself, in this case, A[T].
  • A resolved type which replace T.
    • arg is the first candidate.
    • When arg is also Typeable , this resolution flow is applied to it recursively until concrete type if determined.
  • arg is passed through as it is.
  • spec is passed through as it is.
Expand source code
class Typeable(Generic[T]):
    """
    An interface for generic type which is resolved into a concrete type by a type parameter.

    Inherit this class and declare static method whose signature is `resolve(me, bound, arg, spec) -> type`.

    ```python
    >>> class A(Typeable[T]):
    >>>     @staticmethod
    >>>     def resolve(me, bound, arg, spec):
    >>>         ...
    >>>         return some_type
    >>>
    >>> Typeable.resolve(A[T], int, spec)
    ```

    Type resolution starts from `Typeable.resolve` which invokes the static method with following arguments.

    - Type to resolve itself, in this case, `A[T]`.
    - A resolved type which replace `T`.
        - `arg` is the first candidate.
        - When `arg` is also `Typeable` , this resolution flow is applied to it recursively until concrete type if determined.
    - `arg` is passed through as it is.
    - `spec` is passed through as it is.
    """
    @staticmethod
    def resolve(typeable, arg: type, spec: Any) -> type:
        """
        Resolve a `Typeable` type into a concrete type by a type for its type parameter.

        Args:
            typeable: `Typeable` type having a generic type parameter.
            arg: Type to replace a type parameter.
            spec: An object containing information for schema generation.
        Returns:
            Resolved type.
        """
        if get_origin(typeable) is Typeable:
            raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.")

        bound = get_args(typeable)[0]

        if isinstance(bound, TypeVar):
            return Typeable.resolve(typeable[arg], arg, spec)
        elif issubgeneric(bound, Typeable):
            bound = Typeable.resolve(bound, arg, spec)
            return typeable.resolve(typeable, bound, arg, spec)
        else:
            return typeable.resolve(typeable, bound, arg, spec)

    @staticmethod
    def is_resolved(typeable: type['Typeable']) -> bool:
        """
        Checks a type parameter of given `Typeable` is alredy resolved.

        Args:
            typeable: `Typeable` type having a generic type parameter.
        Returns:
            Whether the type parameter is already resolved or not.
        """
        bound = get_args(typeable)[0]
        if isinstance(bound, TypeVar):
            return False
        elif issubgeneric(bound, Typeable):
            return Typeable.is_resolved(bound)
        else:
            return True

Ancestors

  • typing.Generic

Subclasses

Static methods

def is_resolved(typeable: type['Typeable']) ‑> bool

Checks a type parameter of given Typeable is alredy resolved.

Args

typeable
Typeable type having a generic type parameter.

Returns

Whether the type parameter is already resolved or not.

Expand source code
@staticmethod
def is_resolved(typeable: type['Typeable']) -> bool:
    """
    Checks a type parameter of given `Typeable` is alredy resolved.

    Args:
        typeable: `Typeable` type having a generic type parameter.
    Returns:
        Whether the type parameter is already resolved or not.
    """
    bound = get_args(typeable)[0]
    if isinstance(bound, TypeVar):
        return False
    elif issubgeneric(bound, Typeable):
        return Typeable.is_resolved(bound)
    else:
        return True
def resolve(typeable, arg: type, spec: Any) ‑> type

Resolve a Typeable type into a concrete type by a type for its type parameter.

Args

typeable
Typeable type having a generic type parameter.
arg
Type to replace a type parameter.
spec
An object containing information for schema generation.

Returns

Resolved type.

Expand source code
@staticmethod
def resolve(typeable, arg: type, spec: Any) -> type:
    """
    Resolve a `Typeable` type into a concrete type by a type for its type parameter.

    Args:
        typeable: `Typeable` type having a generic type parameter.
        arg: Type to replace a type parameter.
        spec: An object containing information for schema generation.
    Returns:
        Resolved type.
    """
    if get_origin(typeable) is Typeable:
        raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.")

    bound = get_args(typeable)[0]

    if isinstance(bound, TypeVar):
        return Typeable.resolve(typeable[arg], arg, spec)
    elif issubgeneric(bound, Typeable):
        bound = Typeable.resolve(bound, arg, spec)
        return typeable.resolve(typeable, bound, arg, spec)
    else:
        return typeable.resolve(typeable, bound, arg, spec)