Package pyracmon
Base module of pyracmon exporting commonly used objects.
Use * simply to import them.
>>> from pyracmon import *
Expand source code
"""
Base module of pyracmon exporting commonly used objects.  Use `*` simply to import them.
>>> from pyracmon import *
"""
import sys
import types
from typing import Union, Optional, Any, TypeVar, TYPE_CHECKING
from pyracmon.config import default_config
from pyracmon.connection import connect, Connection
from pyracmon.context import ConnectionContext
from pyracmon.graph.serialize import NodeSerializer
from pyracmon.mixin import CRUDMixin
from pyracmon.select import read_row
from pyracmon.model import define_model, Table, Column
from pyracmon.model_graph import GraphEntityMixin
from pyracmon.query import Q, Expression, Conditional, escape_like, where
from pyracmon.query_graph import append_rows
from pyracmon.clause import order_by, ranged_by, holders, values
from pyracmon.stub import output_stub
from pyracmon.graph import new_graph, S
from pyracmon.graph.graph import Graph, GraphView, NodeContainer, ContainerView, Node, NodeView
from pyracmon.graph.spec import GraphSpec
from pyracmon.graph.template import GraphTemplate
from pyracmon.graph.schema import document_type, Typeable, GraphSchema
from pyracmon.graph.serialize import NodeContext
from pyracmon.graph.typing import walk_schema
from pyracmon.testing import TestingMixin
if TYPE_CHECKING:
    from pyracmon.model import Model as _Model
    class Model(_Model):
        pass
else:
    from pyracmon.model import Model
__all__ = [
    "connect",
    "Connection",
    "ConnectionContext",
    "CRUDMixin",
    "read_row",
    "define_model",
    "Table",
    "Column",
    "Q",
    "Expression",
    "Conditional",
    "where",
    "append_rows",
    "escape_like",
    "order_by",
    "ranged_by",
    "holders",
    "values",
    "new_graph",
    "S",
    "Graph",
    "GraphView",
    "NodeContainer",
    "ContainerView",
    "Node",
    "NodeView",
    "document_type",
    "Typeable",
    "walk_schema",
    "GraphSchema",
    "NodeContext",
    "Model",
    "declare_models",
    "graph_template",
    "graph_dict",
    "graph_schema",
]
M = TypeVar('M', bound=Model)
def declare_models(
    dialect: types.ModuleType,
    db: Connection,
    module: Union[types.ModuleType, str] = __name__,
    mixins: list[type] = [],
    excludes: Optional[list[str]] = None,
    includes: Optional[list[str]] = None,
    *,
    testing: bool = False,
    model_type: type[M] = Model,
    write_stub: bool = False,
) -> list[type[M]]:
    """
    Declare model types read from database into the specified module.
    Args:
        dialect: A module exporting `read_schema` function and `mixins` classes.
            `pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available.
        db: Connection already connected to database.
        module: A module or module name where the declarations will be located.
        mixins: Additional mixin classes for declaring model types.
        excludes: Excluding table names.
        includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared.
    Returns:
        Declared model types.
    """
    tables = dialect.read_schema(db, excludes, includes)
    models = []
    mod = module if isinstance(module, types.ModuleType) else sys.modules[module]
    base_mixins = [CRUDMixin, GraphEntityMixin, model_type]
    if testing:
        base_mixins[0:0] = [TestingMixin]
    for t in tables:
        m = define_model(t, mixins + dialect.mixins + base_mixins)
        mod.__dict__[t.name] = m
        models.append(m)
    if write_stub:
        output_stub(None, mod, models, dialect, mixins, testing=testing)
    return models
def graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate:
    """
    Create a graph template on the default `GraphSpec` which handles model object in appropriate ways.
    See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions.
    Args:
        bases: Base templates whose properties and relations are merged into new template.
        definitions: Definitions of template properties.
    Returns:
        Graph template.
    """
    return default_config().graph_spec.new_template(*bases, **definitions)
def graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]:
    """
    Serialize a graph into a `dict` under the default `GraphSpec` .
    See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings.
    Args:
        graph: A view of the graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Serialization result.
    """
    return default_config().graph_spec.to_dict(graph, {}, **settings)
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema:
    """
    Creates `GraphSchema` under the default `GraphSpec` .
    See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings.
    Args:
        template: A template of serializing graph.
        settings: Serialization settings where each key denotes a node name.
    Returns:
        Schema of serialization result.
    """
    return default_config().graph_spec.to_schema(template, **settings)Sub-modules
- pyracmon.batch
- pyracmon.clause
- 
This module provides functions to generate miscellaneous clauses in query. 
- pyracmon.config
- 
This module exports types and functions for configurations … 
- pyracmon.connection
- 
This module provides types and functions for DB connections. 
- pyracmon.context
- 
This module provides the context type which controls query execution as configured. 
- pyracmon.dbapi
- 
This module provides interfaces defined in DB-API 2.0. 
- pyracmon.dialect
- pyracmon.graph
- pyracmon.marker
- 
This module provides the abstration mechanism for marker creation used to embed parameters in a query … 
- pyracmon.mixin
- 
This module provides mixin type which supplies each model type various DB operations as class methods. 
- pyracmon.model
- pyracmon.model_graph
- 
This module provides graph specifications to deal with model types … 
- pyracmon.query
- 
This module exports types and functions for query construction … 
- pyracmon.query_graph
- pyracmon.select
- 
This module exports types and functions used for SELECTqueries …
- pyracmon.sql
- 
This module provides the type for query generation from a template string containing unified marker. 
- pyracmon.stub
- 
This module exports functions to output type stub of model types. 
- pyracmon.testing
- pyracmon.util
- 
Utility types and functions for internal use. 
Functions
- def append_rows(cursor: Cursor, exp: collections.abc.Iterable[typing.Union[Consumable, typing.Any]], graph: Graph, /, **assign: Union[Selection, Any]) ‑> Graph
- 
Adds all rows in cursor into the graph. Values in assignareSelectionor any kind of objects. IfSelectionis passed, the corresponding value in row is selected. In this case, theSelectionmust be contained inexp, otherwiseValueErroris raised.exp = ... c = db.stmt().execute(...) graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0)Args- cursor
- Cursor obtained by query.
- exp
- Expressions used in the query.
- graph
- Graph to append rows.
- assign
- Mapping from graph property name to Selectionor arbitrary value.
 ReturnsThe same graph as passed one. Expand source codedef append_rows(cursor: Cursor, exp: Iterable[Union[Consumable, Any]], graph: Graph, /, **assign: Union[Selection, Any]) -> Graph: """ Adds all rows in cursor into the graph. Values in `assign` are `Selection` or any kind of objects. If `Selection` is passed, the corresponding value in row is selected. In this case, the `Selection` must be contained in `exp` , otherwise `ValueError` is raised. ```python exp = ... c = db.stmt().execute(...) graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0) ``` Args: cursor: Cursor obtained by query. exp: Expressions used in the query. graph: Graph to append rows. assign: Mapping from graph property name to `Selection` or arbitrary value. Returns: The same graph as passed one. """ def get(k: str) -> Any: v = assign[k] if isinstance(v, Consumable): return getattr(r, v.name) else: return v for row in cursor.fetchall(): r = read_row(row, *exp) graph.append(**{k:get(k) for k in assign.keys()}) return graph
- def connect(api: module, *args: Any, **kwargs: Any) ‑> Connection
- 
Connects to DB by passing arguments to DB-API 2.0 module. Every optional argument is passed to api.connectand returns theConnectionobject which wraps obtained DB connection.import psycopg2 from pyracmon import connect db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres") c = db.stmt().execute("SELECT 1") assert c.fetchone()[0] == 1Args- api
- DB-API 2.0 module which exports connect()function.
- args
- Positional arguments passed to api.connect.
- kwargs
- Keyword arguments passed to api.connect.
 ReturnsWrapper of DB-API 2.0 connection. Expand source codedef connect(api: types.ModuleType, *args: Any, **kwargs: Any) -> 'Connection': """ Connects to DB by passing arguments to DB-API 2.0 module. Every optional argument is passed to `api.connect` and returns the `Connection` object which wraps obtained DB connection. ```python import psycopg2 from pyracmon import connect db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres") c = db.stmt().execute("SELECT 1") assert c.fetchone()[0] == 1 ``` Args: api: DB-API 2.0 module which exports `connect` function. args: Positional arguments passed to `api.connect`. kwargs: Keyword arguments passed to `api.connect`. Returns: Wrapper of DB-API 2.0 connection. """ return Connection(api, api.connect(*args, **kwargs), None)
- def declare_models(dialect: module, db: Connection, module: Union[module, str] = 'pyracmon', mixins: list[type] = [], excludes: Optional[list[str]] = None, includes: Optional[list[str]] = None, *, testing: bool = False, model_type: type[~M] = pyracmon.model.Model, write_stub: bool = False) ‑> list[type[~M]]
- 
Declare model types read from database into the specified module. Args- dialect
- A module exporting read_schemafunction andmixinsclasses.pyracmon.dialect.postgresqlandpyracmon.dialect.mysqlare available.
- db
- Connection already connected to database.
- module
- A module or module name where the declarations will be located.
- mixins
- Additional mixin classes for declaring model types.
- excludes
- Excluding table names.
- includes
- Including table names. When this argument is omitted, all tables except for specified in excludesare declared.
 ReturnsDeclared model types. Expand source codedef declare_models( dialect: types.ModuleType, db: Connection, module: Union[types.ModuleType, str] = __name__, mixins: list[type] = [], excludes: Optional[list[str]] = None, includes: Optional[list[str]] = None, *, testing: bool = False, model_type: type[M] = Model, write_stub: bool = False, ) -> list[type[M]]: """ Declare model types read from database into the specified module. Args: dialect: A module exporting `read_schema` function and `mixins` classes. `pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available. db: Connection already connected to database. module: A module or module name where the declarations will be located. mixins: Additional mixin classes for declaring model types. excludes: Excluding table names. includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared. Returns: Declared model types. """ tables = dialect.read_schema(db, excludes, includes) models = [] mod = module if isinstance(module, types.ModuleType) else sys.modules[module] base_mixins = [CRUDMixin, GraphEntityMixin, model_type] if testing: base_mixins[0:0] = [TestingMixin] for t in tables: m = define_model(t, mixins + dialect.mixins + base_mixins) mod.__dict__[t.name] = m models.append(m) if write_stub: output_stub(None, mod, models, dialect, mixins, testing=testing) return models
- def define_model(table_: Table, mixins: Union[type[~MXT], list[type], ForwardRef(None)] = None, model_type: Optional[type[~M]] = pyracmon.model.Model) ‑> type[~M]
- 
Create a model type representing a table. Model type inherits all types in mixinsin order. When the same attribute is defined in multiple mixin types, the former overrides the latter.Every model type has following attributes: name type description name strName of the table. table TableTable schema. columns List[Column]List of column schemas. column AnyAn object whose attribute exposes of column schema of its name. Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass. Unlike dataclass, the constructor does not require all of columns. Omitted columns don't affect predefined operations such as CRUDMixin.insert(). Ifnot nullconstraint exists on the column, insertion will be denied at runtime and exception will be thrown.>>> # CREATE TABLE t1 (col1 int, col2 text, col3 text); >>> table = define_model("t1") >>> model = table(col1=1, col2="a")Attributes are also assignable by normal setter. If attribute name is not a valid column name, TypeErrorraises.>>> model.col3 = "b"Model instance supports iteration which yields pairs of assigned column schema and its value. >>> for c, v in model: >>> print(f"{c.name} = {v}") col1 = 1 col2 = a col3 = bArgs- table__
- Table schema.
- mixin
- Mixin types providing class methods to the model type.
- model_type
- Use this just for type hinting to determine returned model type.
 ReturnsModel type. Expand source codedef define_model(table_: Table, mixins: Union[type[MXT], list[type], None] = None, model_type: Optional[type[M]] = Model) -> type[M]: """ Create a model type representing a table. Model type inherits all types in `mixins` in order. When the same attribute is defined in multiple mixin types, the former overrides the latter. Every model type has following attributes: |name|type|description| |:---|:---|:---| |name|`str`|Name of the table.| |table|`Table`|Table schema.| |columns|`List[Column]`|List of column schemas.| |column|`Any`|An object whose attribute exposes of column schema of its name.| Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass. Unlike dataclass, the constructor does not require all of columns. Omitted columns don't affect predefined operations such as `CRUDMixin.insert` . If `not null` constraint exists on the column, insertion will be denied at runtime and exception will be thrown. ```python >>> # CREATE TABLE t1 (col1 int, col2 text, col3 text); >>> table = define_model("t1") >>> model = table(col1=1, col2="a") ``` Attributes are also assignable by normal setter. If attribute name is not a valid column name, `TypeError` raises. ```python >>> model.col3 = "b" ``` Model instance supports iteration which yields pairs of assigned column schema and its value. ```python >>> for c, v in model: >>> print(f"{c.name} = {v}") col1 = 1 col2 = a col3 = b ``` Args: table__: Table schema. mixin: Mixin types providing class methods to the model type. model_type: Use this just for type hinting to determine returned model type. Returns: Model type. """ column_names = {c.name for c in table_.columns} class Columns: def __init__(self): for c in table_.columns: setattr(self, c.name, c) class Meta(type): name = table_.name table = table_ columns = table_.columns column = Columns() @classmethod def shrink(cls, excludes: list[str], includes: Optional[list[str]] = None) -> Self: """ Creates new model type containing subset of columns. Args: excludes: Column names to exclude. includes: Column names to include. Returns: model type. """ cols = [c for c in cls.columns if (not includes or c.name in includes) and c.name not in excludes] return define_model(Table(cls.name, cols, cls.table.comment), mixins) # type: ignore class Base(Model, metaclass=Meta): pass mixin_types: list[type] = [] if isinstance(mixins, list): mixin_types = mixins elif get_origin(mixins) is not None: mixin_types = cast(list[type], list(get_args(mixins))) elif mixins is not None: raise ValueError(f"Model mixin types should be specified by Mixins or a list of types.") class _Model(type("ModelBase", tuple([Base] + mixin_types), {})): def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) def __repr__(self): cls = cast(type[Base], type(self)) return f"{cls.name}({', '.join([f'{c.name}={repr(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})" def __str__(self): cls = cast(type[Base], type(self)) return f"{cls.name}({', '.join([f'{c.name}={str(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})" def __iter__(self) -> Iterator[tuple[Column, Any]]: cls = cast(type[Base], type(self)) return map(lambda c: (c, getattr(self, c.name)), filter(lambda c: hasattr(self, c.name), cls.columns)) def __setattr__(self, key, value): cls = cast(type[Base], type(self)) if key not in column_names: raise TypeError(f"{key} is not a column of {cls.name}") object.__setattr__(self, key, value) def __getitem__(self, key): return getattr(self, key) def __contains__(self, key): return hasattr(self, key) def __eq__(self, other): cls = type(self) if cls != type(other): return False for k in column_names: if hasattr(self, k) ^ hasattr(other, k): return False if getattr(self, k, None) != getattr(other, k, None): return False return True return cast(type[M], _Model)
- def document_type(t: type, doc: str) ‑>
- 
Supplies a document to a type. Args- t
- A type.
- doc
- A document.
 ReturnsDocumented type. Expand source codedef document_type(t: type, doc: str) -> Annotated: """ Supplies a document to a type. Args: t: A type. doc: A document. Returns: Documented type. """ return Annotated[t, doc]
- def escape_like(v: str) ‑> str
- 
Escape a string for the use in LIKEcondition.Args- v
- A string.
 ReturnsEscaped string. Expand source codedef escape_like(v: str) -> str: """ Escape a string for the use in `LIKE` condition. Args: v: A string. Returns: Escaped string. """ def esc(c): if c == "\\": return r"\\\\" elif c == "%": return r"\%" elif c == "_": return r"\_" else: return c return ''.join(map(esc, v))
- def graph_dict(graph: GraphView, **settings: NodeSerializer) ‑> dict[str, typing.Any]
- 
Serialize a graph into a dictunder the defaultGraphSpec.See GraphSpec.to_dict()for the detail of serialization settings.Args- graph
- A view of the graph.
- settings
- Serialization settings where each key denotes a node name.
 ReturnsSerialization result. Expand source codedef graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]: """ Serialize a graph into a `dict` under the default `GraphSpec` . See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings. Args: graph: A view of the graph. settings: Serialization settings where each key denotes a node name. Returns: Serialization result. """ return default_config().graph_spec.to_dict(graph, {}, **settings)
- def graph_schema(template: GraphTemplate, **settings: NodeSerializer) ‑> GraphSchema
- 
Creates GraphSchemaunder the defaultGraphSpec.See GraphSpec.to_schema()for the detail of serialization settings.Args- template
- A template of serializing graph.
- settings
- Serialization settings where each key denotes a node name.
 ReturnsSchema of serialization result. Expand source codedef graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema: """ Creates `GraphSchema` under the default `GraphSpec` . See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings. Args: template: A template of serializing graph. settings: Serialization settings where each key denotes a node name. Returns: Schema of serialization result. """ return default_config().graph_spec.to_schema(template, **settings)
- def graph_template(*bases: GraphTemplate, **definitions: type) ‑> GraphTemplate
- 
Create a graph template on the default GraphSpecwhich handles model object in appropriate ways.See GraphSpec.new_template()for the detail of definitions.Args- bases
- Base templates whose properties and relations are merged into new template.
- definitions
- Definitions of template properties.
 ReturnsGraph template. Expand source codedef graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate: """ Create a graph template on the default `GraphSpec` which handles model object in appropriate ways. See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions. Args: bases: Base templates whose properties and relations are merged into new template. definitions: Definitions of template properties. Returns: Graph template. """ return default_config().graph_spec.new_template(*bases, **definitions)
- def holders(length_or_keys: Union[int, collections.abc.Sequence[Union[str, int, NoneType, Expression]]], qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str
- 
Generates partial query string containing placeholder markers separated by comma. Args- length_or_keys
- The number of placeholders or list of placeholder keys.
- qualifier
- Qualifying function for each index.
 ReturnsQuery string. Expand source codedef holders(length_or_keys: Union[int, Sequence[HolderKeys]], qualifier: Optional[Mapping[int, Qualifier]] = None) -> str: """ Generates partial query string containing placeholder markers separated by comma. Args: length_or_keys: The number of placeholders or list of placeholder keys. qualifier: Qualifying function for each index. Returns: Query string. """ if isinstance(length_or_keys, int): hs = ["${_}"] * length_or_keys else: def key(k): if isinstance(k, Expression): return k.expression elif isinstance(k, int): return f"${{_{k}}}" elif k: return f"${{{k}}}" else: return "${_}" hs = [key(k) for k in length_or_keys] if qualifier: hs = [qualifier.get(i, _noop)(h) for i, h in enumerate(hs)] return ', '.join(hs)
- def new_graph(template: GraphTemplate, *bases: Union[Graph, GraphView]) ‑> Graph
- 
Create a graph from a template. Use this function instead of invoking constructor directly. Args- template
- A template of a graph.
- bases
- Other graphs whose nodes are appended to created graph.
 ReturnsCreated graph. Expand source codedef new_graph(template: GraphTemplate, *bases: Union[Graph, GraphView]) -> Graph: """ Create a graph from a template. Use this function instead of invoking constructor directly. Args: template: A template of a graph. bases: Other graphs whose nodes are appended to created graph. Returns: Created graph. """ graph = Graph(template) for b in bases: graph += b return graph
- def order_by(columns: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]], **defaults: Union[bool, tuple[bool, bool], str]) ‑> str
- 
Generates ORDER BYclause from columns and directions.Args- columns
- Columns and directions. Iteration order is kept in rendered clause.
- defaults
- Column names and directions appended to the clause if the column is not contained in columns.
 ReturnsORDER BYclause.Expand source codedef order_by(columns: Mapping[Union[str, AliasedColumn], ORDER], **defaults: ORDER) -> str: """ Generates `ORDER BY` clause from columns and directions. Args: columns: Columns and directions. Iteration order is kept in rendered clause. defaults: Column names and directions appended to the clause if the column is not contained in `columns` . Returns: `ORDER BY` clause. """ columns = dict(columns, **{c:v for c,v in defaults.items() if c not in columns}) def col(cd): if isinstance(cd[1], bool): return f"{cd[0]} ASC" if cd[1] else f"{cd[0]} DESC" elif isinstance(cd[1], str): return f"{cd[0]} {cd[1]}" elif isinstance(cd[1], tuple) and len(cd[1]) == 2: return f"{cd[0]} {'ASC' if cd[1][0] else 'DESC'} NULLS {'FIRST' if cd[1][1] else 'LAST'}" else: raise ValueError(f"Directions must be specified by bool, pair of bools or string: {cd[1]}") return '' if len(columns) == 0 else f"ORDER BY {', '.join(map(col, columns.items()))}"
- def ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) ‑> tuple[str, list[typing.Any]]
- 
Generates LIMIT OFFSETclause using marker.Args- limit
- Limit value. Nonemeans no limitation.
- offset
- Offset value. Nonemeans0.
 ReturnsLIMIT OFFSETclause and its parameters.Expand source codedef ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) -> tuple[str, list[Any]]: """ Generates `LIMIT OFFSET` clause using marker. Args: limit: Limit value. `None` means no limitation. offset: Offset value. `None` means `0`. Returns: `LIMIT OFFSET` clause and its parameters. """ clause, params = [], [] if limit is not None: clause.append("LIMIT $_") params.append(limit) if offset is not None: clause.append("OFFSET $_") params.append(offset) return ' '.join(clause) if clause else '', params
- def read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) ‑> RowValues
- 
Read values in a row according to given selections. This function returns RowValueswhere each value is created by each selection respectively. The type of the selection determines how values in the row are handled:- Selectionconsumes as many values as the number of columns in it and creates a model instance.
- Empty tuple or a string consumes a value, which is stored in RowValuesas it is.
 Args- selections
- List of selections or their equivalents.
- allow_redundancy
- If False,ValueErroris thrown when not all values in a row are consumed.
 ReturnsValues read from the row accoding to the selections. Expand source codedef read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) -> RowValues: """ Read values in a row according to given selections. This function returns `RowValues` where each value is created by each selection respectively. The type of the selection determines how values in the row are handled: - `Selection` consumes as many values as the number of columns in it and creates a model instance. - Empty tuple or a string consumes a value, which is stored in `RowValues` as it is. Args: selections: List of selections or their equivalents. allow_redundancy: If `False`, `ValueError` is thrown when not all values in a row are consumed. Returns: Values read from the row accoding to the selections. """ consumables = [Consumable.to_consumable(s) for s in selections] result = RowValues(consumables) for s in consumables: result.append(s.consume(row)) row = row[len(s):] if not allow_redundancy and len(row) > 0: raise ValueError("Not all elements in row is consumed.") return result
- def values(length_or_key_gen: Union[int, collections.abc.Sequence[collections.abc.Callable[[int], Union[str, int, NoneType, Expression]]]], rows: int, qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str
- 
Generates partial query string for VALUESclause in insertion query.Args- length_or_key_gen
- The number of placeholders or list of functions taking row index and returning key for each placeholder.
- rows
- The number of rows to insert.
- qualifier
- Qualifying function for each index.
 ReturnsQuery string. Expand source codedef values(length_or_key_gen: Union[int, Sequence[Callable[[int], HolderKeys]]], rows: int, qualifier: Optional[Mapping[int, Qualifier]] = None) -> str: """ Generates partial query string for `VALUES` clause in insertion query. Args: length_or_key_gen: The number of placeholders or list of functions taking row index and returning key for each placeholder. rows: The number of rows to insert. qualifier: Qualifying function for each index. Returns: Query string. """ if isinstance(length_or_key_gen, int): lok = lambda i: length_or_key_gen else: lok = lambda i: [g(i) for g in length_or_key_gen] # type: ignore return ', '.join([f"({holders(lok(i), qualifier)})" for i in range(rows)])
- def walk_schema(td, with_doc=False) ‑> dict[str, typing.Union[type, typing.Annotated]]
- 
Returns a dictionary as a result of walking a schema object from its root. Args- td
- A schema represented by TypedDict.
- with_doc
- Flag to include documentations into result.
 ReturnsKey value representation of the schema. If with_docisTrue, each value isAnnotated.Expand source codedef walk_schema(td, with_doc=False) -> dict[str, Union[type, Annotated]]: """ Returns a dictionary as a result of walking a schema object from its root. Args: td: A schema represented by `TypedDict`. with_doc: Flag to include documentations into result. Returns: Key value representation of the schema. If `with_doc` is `True`, each value is `Annotated`. """ if '__annotations__' not in td.__dict__: return {} result = {} def put(k, t, doc): if with_doc: result[k] = (t, doc) else: result[k] = t def expand(t): return (get_args(t)[0], lambda x:[x]) if issubgeneric(t, list) else (t, lambda x:x) for k, t in get_type_hints(td, include_extras=True).items(): t, doc = decompose_document(t) t, conv = expand(t) opt_type = is_optional(t) if is_typeddict(t): put(k, conv(walk_schema(t, with_doc)), doc) elif opt_type is not None and is_typeddict(opt_type): put(k, conv(walk_schema(opt_type, with_doc)), doc) else: put(k, conv(t), doc) return result
- def where(condition: Conditional) ‑> tuple[str, list[typing.Any]]
- 
Generates a WHEREclause and parameters representing given condition.If the condition is empty, returned clause is an empty string which does not contain WHEREkeyword.Args- condition
- Condition object.
 ReturnsTuple of WHEREclause and parameters.Expand source codedef where(condition: 'Conditional') -> tuple[str, list[Any]]: """ Generates a `WHERE` clause and parameters representing given condition. If the condition is empty, returned clause is an empty string which does not contain `WHERE` keyword. Args: condition: Condition object. Returns: Tuple of `WHERE` clause and parameters. """ return ('', []) if condition.expression == '' else (f'WHERE {condition.expression}', condition.params)
Classes
- class CRUDMixin
- 
Default mixin providing class methods available on all model types. Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly. - pks- Names and values of all primary key columns in form of dict.
- A primary key value. This form is allowed when the table has just one primary key column.
- e.g. If the table has a single primary key idof int,1is available to spcecify the row ofid = 1.
- e.g. If the table has multiple primary keys intidandstrid,dict(intid=1, strid="abc")is a valid argument.
 
- Names and values of all primary key columns in form of 
- record- A model object or a mapping from column name to its value, which corresponds to a table row.
- Only columns contained in the record is affected by the operation.
- e.g. When dict(c1=1, c2="abc")is passed for insertion, onlyc1andc2are set in INSERT query.
- e.g. For update, only the columns will be updated. Other columns are not affected.
 
- condition- Query condition which will compose WHERE clause.
- Qis a factory class to create condition object.
- When Noneis passed, all rows are subject to the operation.
 
- qualifier- A mapping from column name to a function which qualifies a placeholder passed by an argument.
- Detail of qualifier function is described below.
 
- lock- This is reserved argument for locking statement but works just as the postfix of the query currently.
- The usage will be changed in future version.
 
 Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like insert into t (c1, c2) values (?, ?)(Qparameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()")) # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())Be aware that when model object is passed, its column values may differ from actual values in DB after query. Expand source codeclass CRUDMixin(SelectMixin, CRUDInternalMeta): """ Default mixin providing class methods available on all model types. Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly. - `pks` - Names and values of all primary key columns in form of `dict` . - A primary key value. This form is allowed when the table has just one primary key column. - e.g. If the table has a single primary key `id` of int, `1` is available to spcecify the row of `id = 1` . - e.g. If the table has multiple primary keys `intid` and `strid`, `dict(intid=1, strid="abc")` is a valid argument. - `record` - A model object or a mapping from column name to its value, which corresponds to a table row. - Only columns contained in the record is affected by the operation. - e.g. When `dict(c1=1, c2="abc")` is passed for insertion, only `c1` and `c2` are set in INSERT query. - e.g. For update, only the columns will be updated. Other columns are not affected. - `condition` - Query condition which will compose WHERE clause. - `pyracmon.query.Q` is a factory class to create condition object. - When `None` is passed, all rows are subject to the operation. - `qualifier` - A mapping from column name to a function which qualifies a placeholder passed by an argument. - Detail of qualifier function is described below. - `lock` - This is reserved argument for locking statement but works just as the postfix of the query currently. - The usage will be changed in future version. Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like `insert into t (c1, c2) values (?, ?)` (`Q` parameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below. ```python t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()")) # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now()) ``` Be aware that when model object is passed, its column values may differ from actual values in DB after query. """ @classmethod def count(cls, db: Connection, condition: Conditional = Q.of()) -> int: """ Count rows which satisfies the condition. ```python t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. Returns: The number of rows. """ wc, wp = where(condition) c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp) return c.fetchone()[0] # type: ignore @classmethod def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]: """ Fetch a record by primary key(s). ```python t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). lock: Locking statement. Returns: A model object if exists, otherwise `None`. """ cols, vals = parse_pks(cls, pks) cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) row = c.fetchone() return read_row(row, *s)[0] if row else None @classmethod def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]: """ Fetch a record by sequence of primary key(s). This method simply concatenates equality conditions on primary key by OR operator. ```python t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3 ``` Args: db: DB connection. seq_pks: Sequence of primary key value. lock: Locking statement. per_page: Maximum number of keys for an execution of query. Returns: Model objects in the same order as passed sequence. """ res = [] index = 0 while index < len(seq_pks): ordered_pks = [] cond = Q.of() for pks in seq_pks[index:index+per_page]: cols, vals = parse_pks(cls, pks) ordered_pks.append(tuple(v for v in vals)) cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) record_map = {} for r in [read_row(row, *s)[0] for row in c.fetchall()]: pk_values = {c.name:v for c, v in r if c.pk} record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r res.extend([record_map[k] for k in ordered_pks if k in record_map]) index += per_page return res @classmethod def fetch_where( cls, db: Connection, condition: Conditional = Q.of(), orders: Mapping[Union[str, AliasedColumn], ORDER] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None, ) -> list[Self]: """ Fetch records which satisfy the condition. ```python t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5 ``` Args: db: DB connection. condition: Query condition. orders: Ordering specification where key is column name and value denotes whether the order is ascending or not. limit: Maximum nuber of rows to fetch. If `None`, all rows are returned. offset: The number of rows to skip. lock: Locking statement. Returns: Model objects. """ wc, wp = where(condition) rc, rp = ranged_by(limit, offset) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp)) return [read_row(row, *s)[0] for row in c.fetchall()] @classmethod def fetch_one( cls, db: Connection, condition: Conditional = Q.of(), lock: Optional[Any] = None, ) -> Optional[Self]: """ Fetch a record which satisfies the condition. `ValueError` raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key. ```python t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. lock: Locking statement. Returns: Model objects If exists, otherwise `None`. """ rs = cls.fetch_where(db, condition, lock=lock) if not rs: return None elif len(rs) == 1: return rs[0] else: raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().") @classmethod def _insert_sql(cls, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}) -> tuple[str, list[str], list[Any]]: model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) value_dict = model_values(cls, model) check_columns(cls, value_dict) cols, vals = list(value_dict.keys()), list(value_dict.values()) ordered_qs = key_to_index(qualifier, cols) def exp(v): return lambda i: v if any(isinstance(v, Expression) for v in vals): key_gen = [] org_vals = vals vals = [] for v in org_vals: if isinstance(v, Expression): key_gen.append(exp(v)) vals.extend(v.params) else: key_gen.append(lambda i: None) vals.append(v) values_clause = values(key_gen, 1, ordered_qs) else: values_clause = values(len(cols), 1, ordered_qs) return f"INSERT INTO {cls.name} ({', '.join(cols)}) VALUES {values_clause}", cols, vals @classmethod def insert( cls, db: Connection, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ) -> Self: """ Insert a record. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. ```python t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2) ``` Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted record with complete and correct column values. Returns: Model of inserted record. """ model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) sql, _, vals = cls._insert_sql(record, qualifier) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *vals) s = cls.select() return read_row(c.fetchone(), *s)[0] else: # REVIEW # Inserted row can't be specified from the table where no primary keys are defined . pass db.stmt().execute(sql, *vals) for c, v in cls.last_sequences(db, 1): setattr(model, c.name, v) return model @classmethod @overload def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> list[Self]: ... @classmethod @overload def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def insert_many( cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Insert records. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted records with complete and correct column values. Returns: Models of inserted records or cursor. """ if len(records) == 0: return [] models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records] seq_of_params = [] sql, cols, params = cls._insert_sql(models[0], qualifier) cols = set(cols) seq_of_params.append(params) for m in models[1:]: value_dict = model_values(cls, m) check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True) # REVIEW: # The consistency among columns where expression is set is not checked. _, _, params = cls._insert_sql(m, qualifier) seq_of_params.append(params) db.stmt().executemany(sql, seq_of_params) num = len(records) for c, v in cls.last_sequences(db, num): for i, m in enumerate(models): setattr(m, c.name, v - (num - i - 1)) if returning: seq_pks = [extract_pks(cls, m) for m in models] return cls.fetch_many(db, seq_pks) else: return models @classmethod def _update_sql(cls, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, allow_all: bool = True) -> tuple[str, list[str], list[Any]]: value_dict = model_values(cls, record, excludes_pk=True) check_columns(cls, value_dict) cols, vals = list(value_dict.keys()), list(value_dict.values()) ordered_qs = key_to_index(qualifier, cols) def set_col(acc, icv): i, (c, v) = icv if isinstance(v, Expression): clause = f"{c} = {ordered_qs.get(i, lambda x:x)(v.expression)}" params = v.params else: clause = f"{c} = {ordered_qs.get(i, lambda x:x)('$_')}" params = [v] acc[0].append(clause) acc[1].extend(params) return acc setters, params = reduce(set_col, enumerate(zip(cols, vals)), ([], [])) wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Update query to update all records is not allowed.") return f"UPDATE {cls.name} SET {', '.join(setters)}{_spacer(wc)}", cols, params + wp @classmethod @overload def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> bool: ... @classmethod @overload def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> Optional[Self]: ... @classmethod def update( cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update a record by primary key(s). This method only updates columns which are found in `record` except for primary key(s). ```python t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: Whether the record exists and updated or updated record model. """ cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) if returning: if cls.support_returning(db): models = cls.update_where(db, record, condition, qualifier, returning=True) return models[0] if models else None else: models = cls.update_where(db, record, condition, qualifier, returning=False) return cls.fetch(db, pks) else: return cls.update_where(db, record, condition, qualifier, returning=False) == 1 @classmethod @overload def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> int: ... @classmethod @overload def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def update_many( cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. record: Sequence of objects contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: The number of affected rows or updated records. """ if len(records) == 0: return [] if returning else 0 keys = {c.name for c in cls.columns if c.pk} if len(keys) == 0: raise ValueError(f"update_many is not available because {cls} does not have primary key columns.") def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]): if cv[0] in keys: acc[0][cv[0]] = cv[1] else: acc[1][cv[0]] = cv[1] return acc seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = [] target_columns: Optional[set[str]] = None for vs in [model_values(cls, r, excludes_pk=False) for r in records]: if not keys < vs.keys(): raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.") pks, rec = reduce(classify, vs.items(), ({}, {})) if target_columns is None: check_columns(cls, rec) target_columns = set(rec.keys()) else: check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore seq_of_values.append((pks, rec)) sql_first = "" seq_of_params: list[list[Any]] = [] for pks, rec in seq_of_values: cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) sql, _, params = cls._update_sql(rec, condition, qualifier) if not sql_first: sql_first = sql seq_of_params.append(params) if returning: db.stmt().executemany(f"{sql_first}", seq_of_params) return cls.fetch_many(db, [pks for pks, _ in seq_of_values]) else: return db.stmt().executemany(sql_first, seq_of_params).rowcount @classmethod @overload def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False, allow_all: bool = False) -> int: ... @classmethod @overload def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True, allow_all: bool = False) -> list[Self]: ... @classmethod def update_where( cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, allow_all: bool = True, ): """ Update records which satisfy the condition. ```python t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1 ``` Args: db: DB connection. record: Object contains column values. condition: Query condition. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or updated records. """ sql, _, params = cls._update_sql(record, condition, qualifier, allow_all) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *params) s = cls.select() return [read_row(row, *s)[0] for row in c.fetchall()] else: raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.") else: c = db.stmt().execute(sql, *params) return c.rowcount @classmethod @overload def delete(cls, db: Connection, pks: PKS, /, returning: Literal[False] = False) -> bool: ... @classmethod @overload def delete(cls, db: Connection, pks: PKS, /, returning: Literal[True] = True) -> Optional[Self]: ... @classmethod def delete(cls, db: Connection, pks: PKS, /, returning: bool = False): """ Delete a record by primary key(s). ```python t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). returning: Flag to return deleted record if any. Returns: Whether the record exists and deleted or delete record if any. """ cols, vals = parse_pks(cls, pks) if returning: models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True) return models[0] if models else None else: return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1 @classmethod @overload def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[False] = False) -> int: ... @overload @classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False): """ Delete a records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. pks: Primary keys or objects each of which contains values of all primary keys. returning: Flag to return deleted records. Returns: The number of affected rows or deleted records. """ if len(pks) == 0: return None seq_of_pks: list[dict[str, Any]] = [] for rec in pks: if isinstance(rec, (dict, cls)): seq_of_pks.append(extract_pks(cls, rec)) else: cols, vals = parse_pks(cls, rec) seq_of_pks.append(dict(zip(cols, vals))) condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()]) wc, wp = where(condition) sql = f"DELETE FROM {cls.name}{_spacer(wc)}" seq_of_params: list[list[Any]] = [wp] for v in seq_of_pks[1:]: condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()]) _, wp = where(condition) seq_of_params.append(wp) if returning: models = cls.fetch_many(db, seq_of_pks) db.stmt().executemany(sql, seq_of_params) return models else: return db.stmt().executemany(sql, seq_of_params).rowcount @classmethod @overload def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[False] = False, allow_all: bool = True) -> int: ... @classmethod @overload def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[True] = True, allow_all: bool = True) -> list[Self]: ... @classmethod def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True): """ Delete records which satisfy the condition. ```python t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. returning: Flag to return deleted records. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or deleted records. """ wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Delete query to delete all records is not allowed.") sql = f"DELETE FROM {cls.name}{_spacer(wc)}" if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *wp) return [read_row(row, *cls.select())[0] for row in c.fetchall()] else: current = cls.fetch_where(db, condition) c = db.stmt().execute(sql, *wp) return current else: return db.stmt().execute(sql, *wp).rowcount @classmethod def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: """ Returns the sequential (auto incremental) values of a table generated by the latest insertion. Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned. This method should be overridden by another mixin class defined in dialect module. Args: db: DB connection. num: The number of records inserted by the latest query. Returns: List of pairs of column and its values. """ return [] @classmethod def support_returning(cls, db: Connection) -> bool: """ Checks whehter this DBMS support **RETURNING** clause or not. Args: db: DB connection. Returns: Whehter this DBMS support **RETURNING** clause or not. """ return FalseAncestorsStatic methods- def count(db: Connection, condition: Conditional = Condition: '' -- []) ‑> int
- 
Count rows which satisfies the condition. t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1Args- db
- DB connection.
- condition
- Query condition.
 ReturnsThe number of rows. Expand source code@classmethod def count(cls, db: Connection, condition: Conditional = Q.of()) -> int: """ Count rows which satisfies the condition. ```python t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. Returns: The number of rows. """ wc, wp = where(condition) c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp) return c.fetchone()[0] # type: ignore
- def delete(db: Connection, pks: Union[Any, dict[str, Any]], /, returning: bool = False)
- 
Delete a record by primary key(s). t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1Args- db
- DB connection.
- pks
- Primary key value(s).
- returning
- Flag to return deleted record if any.
 ReturnsWhether the record exists and deleted or delete record if any. Expand source code@classmethod def delete(cls, db: Connection, pks: PKS, /, returning: bool = False): """ Delete a record by primary key(s). ```python t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). returning: Flag to return deleted record if any. Returns: Whether the record exists and deleted or delete record if any. """ cols, vals = parse_pks(cls, pks) if returning: models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True) return models[0] if models else None else: return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1
- def delete_many(db: Connection, pks: Union[collections.abc.Sequence[Union[Any, dict[str, Any]]], collections.abc.Sequence[Union[Meta, dict[str, Any]]]], /, returning: bool = False)
- 
Delete a records by set of primary key(s). This method invokes on executemanydefined in DB-API 2.0. Whether it is optimized compared toexecutedepends on DB driver.Args- db
- DB connection.
- pks
- Primary keys or objects each of which contains values of all primary keys.
- returning
- Flag to return deleted records.
 ReturnsThe number of affected rows or deleted records. Expand source code@classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False): """ Delete a records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. pks: Primary keys or objects each of which contains values of all primary keys. returning: Flag to return deleted records. Returns: The number of affected rows or deleted records. """ if len(pks) == 0: return None seq_of_pks: list[dict[str, Any]] = [] for rec in pks: if isinstance(rec, (dict, cls)): seq_of_pks.append(extract_pks(cls, rec)) else: cols, vals = parse_pks(cls, rec) seq_of_pks.append(dict(zip(cols, vals))) condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()]) wc, wp = where(condition) sql = f"DELETE FROM {cls.name}{_spacer(wc)}" seq_of_params: list[list[Any]] = [wp] for v in seq_of_pks[1:]: condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()]) _, wp = where(condition) seq_of_params.append(wp) if returning: models = cls.fetch_many(db, seq_of_pks) db.stmt().executemany(sql, seq_of_params) return models else: return db.stmt().executemany(sql, seq_of_params).rowcount
- def delete_where(db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True)
- 
Delete records which satisfy the condition. t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1Args- db
- DB connection.
- condition
- Query condition.
- returning
- Flag to return deleted records.
- allow_all
- If False, empty condition raisesValueError.
 ReturnsThe number of affected rows or deleted records. Expand source code@classmethod def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True): """ Delete records which satisfy the condition. ```python t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. returning: Flag to return deleted records. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or deleted records. """ wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Delete query to delete all records is not allowed.") sql = f"DELETE FROM {cls.name}{_spacer(wc)}" if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *wp) return [read_row(row, *cls.select())[0] for row in c.fetchall()] else: current = cls.fetch_where(db, condition) c = db.stmt().execute(sql, *wp) return current else: return db.stmt().execute(sql, *wp).rowcount
- def fetch(db: Connection, pks: Union[Any, dict[str, Any]], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]
- 
Fetch a record by primary key(s). t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1Args- db
- DB connection.
- pks
- Primary key value(s).
- lock
- Locking statement.
 ReturnsA model object if exists, otherwise None.Expand source code@classmethod def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]: """ Fetch a record by primary key(s). ```python t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). lock: Locking statement. Returns: A model object if exists, otherwise `None`. """ cols, vals = parse_pks(cls, pks) cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) row = c.fetchone() return read_row(row, *s)[0] if row else None
- def fetch_many(db: Connection, seq_pks: collections.abc.Sequence[typing.Union[typing.Any, dict[str, typing.Any]]], lock: Optional[Any] = None, /, per_page: int = 1000) ‑> list[typing_extensions.Self]
- 
Fetch a record by sequence of primary key(s). This method simply concatenates equality conditions on primary key by OR operator. t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3Args- db
- DB connection.
- seq_pks
- Sequence of primary key value.
- lock
- Locking statement.
- per_page
- Maximum number of keys for an execution of query.
 ReturnsModel objects in the same order as passed sequence. Expand source code@classmethod def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]: """ Fetch a record by sequence of primary key(s). This method simply concatenates equality conditions on primary key by OR operator. ```python t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3 ``` Args: db: DB connection. seq_pks: Sequence of primary key value. lock: Locking statement. per_page: Maximum number of keys for an execution of query. Returns: Model objects in the same order as passed sequence. """ res = [] index = 0 while index < len(seq_pks): ordered_pks = [] cond = Q.of() for pks in seq_pks[index:index+per_page]: cols, vals = parse_pks(cls, pks) ordered_pks.append(tuple(v for v in vals)) cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) record_map = {} for r in [read_row(row, *s)[0] for row in c.fetchall()]: pk_values = {c.name:v for c, v in r if c.pk} record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r res.extend([record_map[k] for k in ordered_pks if k in record_map]) index += per_page return res
- def fetch_one(db: Connection, condition: Conditional = Condition: '' -- [], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]
- 
Fetch a record which satisfies the condition. ValueErrorraises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key.t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1Args- db
- DB connection.
- condition
- Query condition.
- lock
- Locking statement.
 ReturnsModel objects If exists, otherwise None.Expand source code@classmethod def fetch_one( cls, db: Connection, condition: Conditional = Q.of(), lock: Optional[Any] = None, ) -> Optional[Self]: """ Fetch a record which satisfies the condition. `ValueError` raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key. ```python t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. lock: Locking statement. Returns: Model objects If exists, otherwise `None`. """ rs = cls.fetch_where(db, condition, lock=lock) if not rs: return None elif len(rs) == 1: return rs[0] else: raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")
- def fetch_where(db: Connection, condition: Conditional = Condition: '' -- [], orders: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None) ‑> list[typing_extensions.Self]
- 
Fetch records which satisfy the condition. t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5Args- db
- DB connection.
- condition
- Query condition.
- orders
- Ordering specification where key is column name and value denotes whether the order is ascending or not.
- limit
- Maximum nuber of rows to fetch. If None, all rows are returned.
- offset
- The number of rows to skip.
- lock
- Locking statement.
 ReturnsModel objects. Expand source code@classmethod def fetch_where( cls, db: Connection, condition: Conditional = Q.of(), orders: Mapping[Union[str, AliasedColumn], ORDER] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None, ) -> list[Self]: """ Fetch records which satisfy the condition. ```python t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5 ``` Args: db: DB connection. condition: Query condition. orders: Ordering specification where key is column name and value denotes whether the order is ascending or not. limit: Maximum nuber of rows to fetch. If `None`, all rows are returned. offset: The number of rows to skip. lock: Locking statement. Returns: Model objects. """ wc, wp = where(condition) rc, rp = ranged_by(limit, offset) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp)) return [read_row(row, *s)[0] for row in c.fetchall()]
- def insert(db: Connection, record: Union[typing_extensions.Self, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False) ‑> typing_extensions.Self
- 
Insert a record. If returningisTrueand the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)Args- db
- DB connection.
- record
- Object contains column values.
- qualifier
- Functions qualifying placeholder markers.
- returning
- Flag to return inserted record with complete and correct column values.
 ReturnsModel of inserted record. Expand source code@classmethod def insert( cls, db: Connection, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ) -> Self: """ Insert a record. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. ```python t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2) ``` Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted record with complete and correct column values. Returns: Model of inserted record. """ model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) sql, _, vals = cls._insert_sql(record, qualifier) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *vals) s = cls.select() return read_row(c.fetchone(), *s)[0] else: # REVIEW # Inserted row can't be specified from the table where no primary keys are defined . pass db.stmt().execute(sql, *vals) for c, v in cls.last_sequences(db, 1): setattr(model, c.name, v) return model
- def insert_many(db: Connection, records: list[typing.Union[typing_extensions.Self, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
- 
Insert records. If returningisTrueand the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.Args- db
- DB connection.
- record
- Object contains column values.
- qualifier
- Functions qualifying placeholder markers.
- returning
- Flag to return inserted records with complete and correct column values.
 ReturnsModels of inserted records or cursor. Expand source code@classmethod def insert_many( cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Insert records. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted records with complete and correct column values. Returns: Models of inserted records or cursor. """ if len(records) == 0: return [] models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records] seq_of_params = [] sql, cols, params = cls._insert_sql(models[0], qualifier) cols = set(cols) seq_of_params.append(params) for m in models[1:]: value_dict = model_values(cls, m) check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True) # REVIEW: # The consistency among columns where expression is set is not checked. _, _, params = cls._insert_sql(m, qualifier) seq_of_params.append(params) db.stmt().executemany(sql, seq_of_params) num = len(records) for c, v in cls.last_sequences(db, num): for i, m in enumerate(models): setattr(m, c.name, v - (num - i - 1)) if returning: seq_pks = [extract_pks(cls, m) for m in models] return cls.fetch_many(db, seq_pks) else: return models
- def last_sequences(db: Connection, num: int) ‑> list[tuple[Column, int]]
- 
Returns the sequential (auto incremental) values of a table generated by the latest insertion. Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned. This method should be overridden by another mixin class defined in dialect module. Args- db
- DB connection.
- num
- The number of records inserted by the latest query.
 ReturnsList of pairs of column and its values. Expand source code@classmethod def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: """ Returns the sequential (auto incremental) values of a table generated by the latest insertion. Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned. This method should be overridden by another mixin class defined in dialect module. Args: db: DB connection. num: The number of records inserted by the latest query. Returns: List of pairs of column and its values. """ return []
- def support_returning(db: Connection) ‑> bool
- 
Checks whehter this DBMS support RETURNING clause or not. Args- db
- DB connection.
 ReturnsWhehter this DBMS support RETURNING clause or not. Expand source code@classmethod def support_returning(cls, db: Connection) -> bool: """ Checks whehter this DBMS support **RETURNING** clause or not. Args: db: DB connection. Returns: Whehter this DBMS support **RETURNING** clause or not. """ return False
- def update(db: Connection, pks: Union[Any, dict[str, Any]], record: Union[Meta, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
- 
Update a record by primary key(s). This method only updates columns which are found in recordexcept for primary key(s).t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1Args- db
- DB connection.
- pks
- Primary key value(s).
- record
- Object contains column values.
- qualifier
- Functions qualifying placeholder markers.
- returning
- Flag to return updated records with complete and correct column values.
 ReturnsWhether the record exists and updated or updated record model. Expand source code@classmethod def update( cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update a record by primary key(s). This method only updates columns which are found in `record` except for primary key(s). ```python t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: Whether the record exists and updated or updated record model. """ cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) if returning: if cls.support_returning(db): models = cls.update_where(db, record, condition, qualifier, returning=True) return models[0] if models else None else: models = cls.update_where(db, record, condition, qualifier, returning=False) return cls.fetch(db, pks) else: return cls.update_where(db, record, condition, qualifier, returning=False) == 1
- def update_many(db: Connection, records: collections.abc.Sequence[typing.Union[Meta, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
- 
Update records by set of primary key(s). This method invokes on executemanydefined in DB-API 2.0. Whether it is optimized compared toexecutedepends on DB driver.Args- db
- DB connection.
- record
- Sequence of objects contains column values.
- qualifier
- Functions qualifying placeholder markers.
- returning
- Flag to return updated records with complete and correct column values.
 ReturnsThe number of affected rows or updated records. Expand source code@classmethod def update_many( cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. record: Sequence of objects contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: The number of affected rows or updated records. """ if len(records) == 0: return [] if returning else 0 keys = {c.name for c in cls.columns if c.pk} if len(keys) == 0: raise ValueError(f"update_many is not available because {cls} does not have primary key columns.") def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]): if cv[0] in keys: acc[0][cv[0]] = cv[1] else: acc[1][cv[0]] = cv[1] return acc seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = [] target_columns: Optional[set[str]] = None for vs in [model_values(cls, r, excludes_pk=False) for r in records]: if not keys < vs.keys(): raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.") pks, rec = reduce(classify, vs.items(), ({}, {})) if target_columns is None: check_columns(cls, rec) target_columns = set(rec.keys()) else: check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore seq_of_values.append((pks, rec)) sql_first = "" seq_of_params: list[list[Any]] = [] for pks, rec in seq_of_values: cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) sql, _, params = cls._update_sql(rec, condition, qualifier) if not sql_first: sql_first = sql seq_of_params.append(params) if returning: db.stmt().executemany(f"{sql_first}", seq_of_params) return cls.fetch_many(db, [pks for pks, _ in seq_of_values]) else: return db.stmt().executemany(sql_first, seq_of_params).rowcount
- def update_where(db: Connection, record: Union[Meta, dict[str, Any]], condition: Conditional, qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False, allow_all: bool = True)
- 
Update records which satisfy the condition. t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1Args- db
- DB connection.
- record
- Object contains column values.
- condition
- Query condition.
- qualifier
- Functions qualifying placeholder markers.
- returning
- Flag to return updated records with complete and correct column values.
- allow_all
- If False, empty condition raisesValueError.
 ReturnsThe number of affected rows or updated records. Expand source code@classmethod def update_where( cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, allow_all: bool = True, ): """ Update records which satisfy the condition. ```python t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1 ``` Args: db: DB connection. record: Object contains column values. condition: Query condition. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or updated records. """ sql, _, params = cls._update_sql(record, condition, qualifier, allow_all) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *params) s = cls.select() return [read_row(row, *s)[0] for row in c.fetchall()] else: raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.") else: c = db.stmt().execute(sql, *params) return c.rowcount
 Inherited members
- class Column (name: str, ptype: type, type_info: Optional[Any], pk: bool, fk: Optional[Relations], incremental: Optional[Any], nullable: bool, comment: str = '')
- 
This class represents a schema of a column. Expand source codeclass Column: """ This class represents a schema of a column. """ def __init__( self, name: str, ptype: type, type_info: Optional[Any], pk: bool, fk: Optional[Relations], incremental: Optional[Any], nullable: bool, comment: str = "", ): #: Column name. self.name = name #: Data type in python. self.ptype = ptype #: Type informations obtained from DB. self.type_info = type_info #: Is this column a primary key? self.pk = pk #: Foreign key constraints. self.fk = fk #: If this column is auto-incremental, this object contains the information of the feature, otherwise, `None`. self.incremental = incremental #: Can this column contain null? self.nullable = nullable #: Comment of the column. self.comment = commentInstance variables- var comment
- 
Comment of the column. 
- var fk
- 
Foreign key constraints. 
- var incremental
- 
If this column is auto-incremental, this object contains the information of the feature, otherwise, None.
- var name
- 
Column name. 
- var nullable
- 
Can this column contain null? 
- var pk
- 
Is this column a primary key? 
- var ptype
- 
Data type in python. 
- var type_info
- 
Type informations obtained from DB. 
 
- class Conditional (expression='', params=None)
- 
Represents a query condition composed of an expression and parameters. Parameters must be a list where the index of each parameter matches the index of placeholder for it. The expression accepts only the unified marker $_.Applying logical operators such as &,|and~generates new condition.>>> c1 = Q.of("a = $_", 0) >>> c2 = Q.of("b < $_", 1) >>> c3 = Q.of("c > $_", 2) >>> c = ~(c1 & c2 | c3) >>> c Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2]Expand source codeclass Conditional(Expression): """ Represents a query condition composed of an expression and parameters. Parameters must be a list where the index of each parameter matches the index of placeholder for it. The expression accepts only the unified marker `$_`. Applying logical operators such as `&`, `|` and `~` generates new condition. ```python >>> c1 = Q.of("a = $_", 0) >>> c2 = Q.of("b < $_", 1) >>> c3 = Q.of("c > $_", 2) >>> c = ~(c1 & c2 | c3) >>> c Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2] ``` """ @classmethod def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `AND`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ return reduce(lambda acc, c: acc & c, conditionals, Conditional()) @classmethod def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `OR`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ if len(conditionals) == 0: return Conditional("1 = 0") return reduce(lambda acc, c: acc | c, conditionals, Conditional()) def __init__(self, expression="", params=None): super().__init__(expression, params or []) def __repr__(self): return f"Condition: '{self.expression}' -- {self.params}" def __and__(self, other) -> 'Conditional': expression = "" if self.expression and other.expression: expression = f"({self.expression}) AND ({other.expression})" elif self.expression: expression = self.expression elif other.expression: expression = other.expression return Conditional(expression, self.params + other.params) def __or__(self, other) -> 'Conditional': expression = "" if self.expression and other.expression: expression = f"({self.expression}) OR ({other.expression})" elif self.expression: expression = self.expression elif other.expression: expression = other.expression return Conditional(expression, self.params + other.params) def __invert__(self) -> 'Conditional': if self.expression: return Conditional(f"NOT ({self.expression})", self.params) else: return Conditional(f"1 = 0", [])AncestorsStatic methods- def all(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional
- 
Concatenates condition objects with AND.Args- conditionals
- Condition objects.
 ReturnsConcatenated condition object. Expand source code@classmethod def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `AND`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ return reduce(lambda acc, c: acc & c, conditionals, Conditional())
- def any(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional
- 
Concatenates condition objects with OR.Args- conditionals
- Condition objects.
 ReturnsConcatenated condition object. Expand source code@classmethod def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `OR`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ if len(conditionals) == 0: return Conditional("1 = 0") return reduce(lambda acc, c: acc | c, conditionals, Conditional())
 Inherited members
- class Connection (api, conn: Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None)
- 
Wrapper class of DB-API 2.0 Connection. Every instance works as the proxy object to original connection, therefore any attribute in it is still available. Expand source codeclass Connection(dbapi.Connection): """ Wrapper class of DB-API 2.0 Connection. Every instance works as the proxy object to original connection, therefore any attribute in it is still available. """ _characters = string.ascii_letters + string.digits + ".=" def __init__(self, api, conn: dbapi.Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None): #: A string which identifies a connection. self.identifier = self._gen_identifier() #: DB-API 2.0 module. self.api = api #: Original connection object. self.conn = conn self._context_factory = context_factory self._context = None def __getattr__(self, name): return getattr(self.conn, name) def __enter__(self): if hasattr(self.conn, "__enter__"): self.conn.__enter__() # type: ignore return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self.conn, "__exit__"): self.conn.__exit__(exc_type, exc_value, traceback) # type: ignore else: if exc_value is None: self.conn.rollback() else: self.conn.commit() self.conn.close() def _gen_identifier(self): return threading.current_thread().name + "-" + secrets.token_hex(4) @property def context(self) -> ConnectionContext: """ Context object used for this connection. """ if not self._context: self._context = (self._context_factory or ConnectionContext)() self._context.identifier = self.identifier return self._context def close(self) -> None: return self.conn.close() def commit(self) -> None: return self.conn.commit() def rollback(self) -> None: return self.conn.rollback() def cursor(self) -> dbapi.Cursor: return self.conn.cursor() def use(self, factory: Callable[[], ConnectionContext]) -> Self: """ Sets factory function of `ConnectionContext` to use custom context. When the context is already set, it will be replaced with new one. Args: factory: Function returning custom context. Returns: This instance. """ self._context_factory = factory self._context = None return self def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement': """ Creates new `Statement` which executes queries on this connection. Args: context: Context object used in the statement. If `None`, the context of this connection is used. Returns: Created statement. """ return Statement(self, context or self.context)Ancestors- Connection
- typing.Protocol
- typing.Generic
 Instance variables- var api
- 
DB-API 2.0 module. 
- var conn
- 
Original connection object. 
- var context : ConnectionContext
- 
Context object used for this connection. Expand source code@property def context(self) -> ConnectionContext: """ Context object used for this connection. """ if not self._context: self._context = (self._context_factory or ConnectionContext)() self._context.identifier = self.identifier return self._context
- var identifier
- 
A string which identifies a connection. 
 Methods- def close(self) ‑> None
- 
Expand source codedef close(self) -> None: return self.conn.close()
- def commit(self) ‑> None
- 
Expand source codedef commit(self) -> None: return self.conn.commit()
- def cursor(self) ‑> Cursor
- 
Expand source codedef cursor(self) -> dbapi.Cursor: return self.conn.cursor()
- def rollback(self) ‑> None
- 
Expand source codedef rollback(self) -> None: return self.conn.rollback()
- def stmt(self, context: Optional[ConnectionContext] = None) ‑> Statement
- 
Creates new Statementwhich executes queries on this connection.Args- context
- Context object used in the statement. If None, the context of this connection is used.
 ReturnsCreated statement. Expand source codedef stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement': """ Creates new `Statement` which executes queries on this connection. Args: context: Context object used in the statement. If `None`, the context of this connection is used. Returns: Created statement. """ return Statement(self, context or self.context)
- def use(self, factory: Callable[[], ConnectionContext]) ‑> typing_extensions.Self
- 
Sets factory function of ConnectionContextto use custom context.When the context is already set, it will be replaced with new one. Args- factory
- Function returning custom context.
 ReturnsThis instance. Expand source codedef use(self, factory: Callable[[], ConnectionContext]) -> Self: """ Sets factory function of `ConnectionContext` to use custom context. When the context is already set, it will be replaced with new one. Args: factory: Function returning custom context. Returns: This instance. """ self._context_factory = factory self._context = None return self
 
- class ConnectionContext (identifier: Optional[str] = None, **configurations)
- 
This class represents a context of query execution. By default, the context based on global configuration is used. You should declare your own context class and set it to Connection.use()if you want to change the behavior.Custom context is also useful to change cursor state before and after query execution. Overwrite executemethod to do it.Expand source codeclass ConnectionContext: """ This class represents a context of query execution. By default, the context based on global configuration is used. You should declare your own context class and set it to `Connection.use` if you want to change the behavior. Custom context is also useful to change cursor state before and after query execution. Overwrite `execute` method to do it. """ def __init__(self, identifier: Optional[str] = None, **configurations): #: Identifier of this context. `None` by default. self.identifier = identifier #: Configuration used in this context. self.config = default_config().derive(**configurations) def _message(self, message): return f"({self.identifier}) {message}" if self.identifier else message def configure(self, **configurations: Any) -> 'ConnectionContext': """ Change configurations of this context. Changes by this method never affect global configuration even if the context is based on it. Args: configurations: Configurations. See `pyracmon.config` to know available keys. Returns: This instance. """ self.config.set(**configurations) return self def execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor: """ Executes a query on a cursor. Args: cursor: Cursor object. sql: Query string. params: Query parameters. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, params, False) def executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor: """ Repeats query on a cursor for sequencee of parameters. This method works similar to `execute` but invoke `executemany` instead. Args: cursor: Cursor object. sql: Query string. seq_of_args: A sequence of parameters of the query. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, seq_of_params, True) @overload def _execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS, is_many: Literal[False] = False) -> dbapi.Cursor: ... @overload def _execute(self, cursor: dbapi.Cursor, sql: str, params: Sequence[PARAMS], is_many: Literal[True] = True) -> dbapi.Cursor: ... def _execute( self, cursor: dbapi.Cursor, sql: str, params, is_many: bool = False, ) -> dbapi.Cursor: logger = _logger(self.config) if logger: sql_log = sql if len(sql) <= self.config.sql_log_length else f"{sql[0:self.config.sql_log_length]}..." logger.log(self.config.log_level, self._message(sql_log)) if self.config.parameter_log: if is_many: for ps in params: logger.log(self.config.log_level, self._message(f"Parameters: {ps}")) else: logger.log(self.config.log_level, self._message(f"Parameters: {params}")) if is_many: cursor.executemany(sql, params) else: cursor.execute(sql, params) return cursorInstance variables- var config
- 
Configuration used in this context. 
- var identifier
- 
Identifier of this context. Noneby default.
 Methods- def configure(self, **configurations: Any) ‑> ConnectionContext
- 
Change configurations of this context. Changes by this method never affect global configuration even if the context is based on it. Args- configurations
- Configurations. See pyracmon.configto know available keys.
 ReturnsThis instance. Expand source codedef configure(self, **configurations: Any) -> 'ConnectionContext': """ Change configurations of this context. Changes by this method never affect global configuration even if the context is based on it. Args: configurations: Configurations. See `pyracmon.config` to know available keys. Returns: This instance. """ self.config.set(**configurations) return self
- def execute(self, cursor: Cursor, sql: str, params: Union[list[Any], dict[str, Any]]) ‑> Cursor
- 
Executes a query on a cursor. Args- cursor
- Cursor object.
- sql
- Query string.
- params
- Query parameters.
 ReturnsGiven cursor object. Internal state may be changed by the execution of the query. Expand source codedef execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor: """ Executes a query on a cursor. Args: cursor: Cursor object. sql: Query string. params: Query parameters. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, params, False)
- def executemany(self, cursor: Cursor, sql: str, seq_of_params: collections.abc.Sequence[typing.Union[list[typing.Any], dict[str, typing.Any]]]) ‑> Cursor
- 
Repeats query on a cursor for sequencee of parameters. This method works similar to executebut invokeexecutemanyinstead.Args- cursor
- Cursor object.
- sql
- Query string.
- seq_of_args
- A sequence of parameters of the query.
 ReturnsGiven cursor object. Internal state may be changed by the execution of the query. Expand source codedef executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor: """ Repeats query on a cursor for sequencee of parameters. This method works similar to `execute` but invoke `executemany` instead. Args: cursor: Cursor object. sql: Query string. seq_of_args: A sequence of parameters of the query. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, seq_of_params, True)
 
- class ContainerView (*args, **kwargs)
- 
The interface of the view of a node set, i.e. NodeContainerandNode.Children.Expand source codeclass ContainerView(Protocol, Generic[T]): """ The interface of the view of a node set, i.e. `NodeContainer` and `Node.Children` . """ def __bool__(self) -> bool: """Returns whether this container is not empty.""" ... def __call__(self) -> T: """Returns a base container.""" ... def __len__(self) -> int: """Returns the number of nodes.""" ... def __iter__(self) -> Iterator['NodeView']: """Iterates views of nodes.""" ... @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" ... def __getattr__(self, key) -> 'ContainerView': """Returns a view of the first node or empty container view if it does not exist.""" ...Ancestors- typing.Protocol
- typing.Generic
 Subclasses- pyracmon.graph.graph._EmptyContainerView
 
- class Expression (expression: str, params: list[typing.Any])
- 
Abstraction of expression in any query. Expand source codeclass Expression: """ Abstraction of expression in any query. """ def __init__(self, expression: str, params: list[Any]): #: Expression string. self.expression = expression #: Parameters corresponding to placeholders in the expression. self.params = paramsSubclassesInstance variables- var expression
- 
Expression string. 
- var params
- 
Parameters corresponding to placeholders in the expression. 
 
- class Graph (template: GraphTemplate)
- 
This class represents a graph composed of tree-structured node containers. The structure is determined by GraphTemplate. Usenew_graph()Instead of constructor to create new graph instance.template = GraphSpac().new_template( a = (int, lambda x:x), b = (str, lambda x:x), c = (str, lambda x:x), ) template.a << template.b << template.c graph = new_graph(template)In above code, a graph which has 3 properties ( abc) and a structure whereais parent ofbandbis parent ofcis created.append(replace) is a method to store entities in the graph with tying them each other according to the structure. Entites are encapsulated byNodewhich can have an edge to parent node.graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y")In append, entities are first sorted in descending order, and then:- Search a node whose entity is identical to the first entity from the corresponding node container.- If found, new node is not created and the identical node is set to next parent.
- Otherwise, new node is appended and it is set to next parent.
 
- Apply this to following entities in order. A difference is that identical node is searched from the sequence of parents in the session.
 In example here, the identification is done by entity value itself ( lambda x:x). Next code is the example where identical nodes are found.graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y")In the first append,aandbhas its identical node andais identical in the second.cin the second one is not identical to any node because parent nodeb="c"is already added as new node.Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating append.Expand source codeclass Graph: """ This class represents a graph composed of tree-structured node containers. The structure is determined by `GraphTemplate`. Use `new_graph` Instead of constructor to create new graph instance. ```python template = GraphSpac().new_template( a = (int, lambda x:x), b = (str, lambda x:x), c = (str, lambda x:x), ) template.a << template.b << template.c graph = new_graph(template) ``` In above code, a graph which has 3 properties ( `a` `b` `c` ) and a structure where `a` is parent of `b` and `b` is parent of `c` is created. `append` ( `replace` ) is a method to store entities in the graph with tying them each other according to the structure. Entites are encapsulated by `Node` which can have an edge to parent node. ```python graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y") ``` In `append`, entities are first sorted in descending order, and then: - Search a node whose entity is *identical* to the first entity from the corresponding node container. - If found, new node is not created and the *identical* node is set to next parent. - Otherwise, new node is appended and it is set to next parent. - Apply this to following entities in order. A difference is that *identical* node is searched from the sequence of parents in the session. In example here, the identification is done by entity value itself ( `lambda x:x` ). Next code is the example where *identical* nodes are found. ```python graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y") ``` In the first `append`, `a` and `b` has its *identical* node and `a` is *identical* in the second. `c` in the second one is not *identical* to any node because parent node `b="c"` is already added as new node. Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating `append` . """ def __init__(self, template: GraphTemplate): #: Graph template. self.template: GraphTemplate = template #: A `dict` containing node containers by their names. self.containers: dict[str, NodeContainer] = {p.name:self._to_container(p) for p in template} self._view = None def _to_container(self, prop: GraphTemplate.Property) -> 'NodeContainer': if isinstance(prop.kind, GraphTemplate): return _GraphNodeContainer(prop) else: return NodeContainer(prop) def _container_of(self, prop: GraphTemplate.Property) -> Optional['NodeContainer']: candidates = [c for c in self.containers.values() if c.prop.is_compatible(prop)] if len(candidates) > 1: raise ValueError(f"Container can't be determined from property '{prop.name}'.") return candidates[0] if candidates else None def __add__(self, another: Union[Self, GraphView]) -> 'Graph': """ Create new graph by adding this graph and another graph. New graph has the same template as this graph's. On the other hand, because this method depends on `__iadd__()`, another graph must not have the same template. Args: another: Graph or its view. Returns: Created graph. """ graph = Graph(self.template) graph += self graph += another return graph def __iadd__(self, another: Union[Self, GraphView]) -> Self: """ Append nodes from another graph. Nodes of another graph are traversed from its root and appended to compatible containers each other. Args: another: Graph or its view. Returns: This graph. """ graph = another if isinstance(another, Graph) else another() def add(n: Node, anc: dict[str, list[Node]]): c = self._container_of(n.prop) if c: c.append(n.entity, anc) for ch_ in n.children.values(): for m in ch_.nodes: add(m, anc.copy()) for c_ in graph.roots: for n_ in c_.nodes: add(n_, {}) return self @property def roots(self) -> Iterable['NodeContainer']: """ Returns root node containers. """ return filter(lambda c: c.prop.parent is None, self.containers.values()) @property def view(self) -> GraphView: """ Returns an unmodifiable view of this graph. The view object works as the accessor to graph nodes. ```python >>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iteration ``` """ if self._view is None: graph = self class _GraphView: def __call__(self) -> Graph: """Returns the greph of this view.""" return graph def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]: """Iterates views of root containers.""" return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values())) def __getattr__(self, name: str) -> ContainerView: """Returns a view of a container of the name.""" return graph.containers[name].view self._view = _GraphView() return self._view def _append(self, to_replace: bool, entities: dict[str, Any]) -> Self: props = [p for p in self.template if p.name in entities] filtered = set() for p in props: if (p.parent is None) or (p.parent.name not in entities) or (p.parent.name in filtered): if p.entity_filter is None or p.entity_filter(entities[p.name]): filtered.add(p.name) ancestors = {} for k in [p.name for p in props if p.name in filtered]: self.containers[k].append(entities[k], ancestors, to_replace) return self def append(self, **entities: Any) -> Self: """ Append entities with associated property names. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(False, entities) def replace(self, **entities: Any) -> Self: """ Works similarly to `append`, but entities of identical nodes are replaced with given entities. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(True, entities)Instance variables- var containers
- 
A dictcontaining node containers by their names.
- var roots : collections.abc.Iterable['NodeContainer']
- 
Returns root node containers. Expand source code@property def roots(self) -> Iterable['NodeContainer']: """ Returns root node containers. """ return filter(lambda c: c.prop.parent is None, self.containers.values())
- var template
- 
Graph template. 
- var view : GraphView
- 
Returns an unmodifiable view of this graph. The view object works as the accessor to graph nodes. >>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iterationExpand source code@property def view(self) -> GraphView: """ Returns an unmodifiable view of this graph. The view object works as the accessor to graph nodes. ```python >>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iteration ``` """ if self._view is None: graph = self class _GraphView: def __call__(self) -> Graph: """Returns the greph of this view.""" return graph def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]: """Iterates views of root containers.""" return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values())) def __getattr__(self, name: str) -> ContainerView: """Returns a view of a container of the name.""" return graph.containers[name].view self._view = _GraphView() return self._view
 Methods- def append(self, **entities: Any) ‑> typing_extensions.Self
- 
Append entities with associated property names. Args- entities
- Entities keyed with associated property names.
 ReturnsThis graph. Expand source codedef append(self, **entities: Any) -> Self: """ Append entities with associated property names. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(False, entities)
- def replace(self, **entities: Any) ‑> typing_extensions.Self
- 
Works similarly to append, but entities of identical nodes are replaced with given entities.Args- entities
- Entities keyed with associated property names.
 ReturnsThis graph. Expand source codedef replace(self, **entities: Any) -> Self: """ Works similarly to `append`, but entities of identical nodes are replaced with given entities. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(True, entities)
 
- Search a node whose entity is identical to the first entity from the corresponding node container.
- class GraphSchema (spec: Any, template: GraphTemplate, **serializers: NodeSerializer)
- 
This class exposes a property to get the schema of serialization result of a graph. TODO: Dependency to GraphSpecshould be replaced in another way.Expand source codeclass GraphSchema: """ This class exposes a property to get the schema of serialization result of a graph. TODO: Dependency to `GraphSpec` should be replaced in another way. """ def __init__(self, spec: Any, template: GraphTemplate, **serializers: NodeSerializer): #: Specification of graph operations. self.spec = spec #: Graph template to serialize. self.template = template #: `NodeSerializer`s used for the serialization. self.serializers = serializers def _return_from(self, prop: GraphTemplate.Property) -> type: """ Get a type the node of passed property will be serialized. """ ns = self.serializers[prop.name] # Type of the node entity. entity_type = prop.kind if isinstance(entity_type, GraphTemplate): # GraphTemplate type is ignored because serializer added by sub() resolve the type by iteself. entity_type = _templateType(entity_type) # Return type of the NodeSerializer. ns_type = signature(ns.serializer).return_annotation # Return type of base serializer obtained from GraphSpec. base = chain_serializers(self.spec.find_serializers(entity_type)) base_type = signature(base).return_annotation if base else Signature.empty #base_type = entity_type if base_type == Signature.empty else base_type # If the return type contains a single type parameter, previous type is applied to it. # Serializer without return annotation is supposed to return input type as it is. def next_resolvable(it: Iterator[type]) -> type: while True: res = next(it, None) if res is None: break elif res != Signature.empty: return res return Signature.empty def resolve(it: Iterator[type]) -> type: origin = next_resolvable(it) if origin == Signature.empty: return origin elif issubgeneric(origin, Typeable): if not Typeable.is_resolved(origin): param = resolve(it) if param == Signature.empty: # Type parameter is not known. return Signature.empty # Replace type parameter. origin = origin[param] # type: ignore return Typeable.resolve(origin, resolve(it), self.spec) else: args = get_args(origin) if args: # origin is generics. type_params = list(filter(lambda ia: isinstance(ia[1], TypeVar), enumerate(args))) # python < 3.10 param_num = len(type_params) if param_num == 0: return origin elif param_num == 1: # Replace type parameter param = resolve(it) return origin[param] # type: ignore else: return Signature.empty # python >= 3.10 #match len(type_params): # case 0: # return origin # case 1: # # Replace type parameter # param = resolve(it) # return origin[param] # type: ignore # case _: # return Signature.empty else: return origin return resolve(iter([ns_type, base_type, entity_type, entity_type])) def schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]: """ Generates structured and documented schema for a template property. Args: prop: A template property. Returns: Schema with documentation. """ return_type = self._return_from(prop) doc = self.serializers[prop.name]._doc or "" # TypedDict type is also a subclass of dict. if issubclass(return_type, dict): annotations = {} for c in filter(lambda c: c.name in self.serializers, prop.children): ns = self.serializers[c.name] cs = self.schema_of(c) t, d = decompose_document(cs) if ns.be_merged: if not issubclass(t, dict): raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.") annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, cs) annotations[ns.namer(c.name)] = rt else: annotations[ns.namer(c.name)] = document_type(list[t], d) td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None return document_type(generate_schema(annotations, td_type), doc) else: return document_type(return_type, doc) @property def schema(self) -> type[TypedDict]: """ Generates `TypedDict` which represents the schema of serialized graph. """ annotations: dict[str, Any] = {} def put_root_schema(p: GraphTemplate.Property): nonlocal annotations ns = self.serializers[p.name] dt = self.schema_of(p) if ns.be_merged: t, d = decompose_document(dt) annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, dt) annotations[ns.namer(p.name)] = rt else: t, d = decompose_document(dt) annotations[ns.namer(p.name)] = document_type(list[t], d) roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values()) for p in roots: put_root_schema(p) return generate_schema(annotations) def serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]: """ Serialize graph into a dictionary. Args: graph: A view of a graph. node_params: Parameters passed to `SerializationContext` and used by *serializer* s. Returns: Serialization result. """ return self.spec.to_dict(graph, node_params, **self.serializers)Instance variables- var schema : type[typing.TypedDict]
- 
Generates TypedDictwhich represents the schema of serialized graph.Expand source code@property def schema(self) -> type[TypedDict]: """ Generates `TypedDict` which represents the schema of serialized graph. """ annotations: dict[str, Any] = {} def put_root_schema(p: GraphTemplate.Property): nonlocal annotations ns = self.serializers[p.name] dt = self.schema_of(p) if ns.be_merged: t, d = decompose_document(dt) annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, dt) annotations[ns.namer(p.name)] = rt else: t, d = decompose_document(dt) annotations[ns.namer(p.name)] = document_type(list[t], d) roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values()) for p in roots: put_root_schema(p) return generate_schema(annotations)
- var serializers
- 
NodeSerializers used for the serialization.
- var spec
- 
Specification of graph operations. 
- var template
- 
Graph template to serialize. 
 Methods- def schema_of(self, prop: GraphTemplate.Property) ‑> Type[Annotated]
- 
Generates structured and documented schema for a template property. Args- prop
- A template property.
 ReturnsSchema with documentation. Expand source codedef schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]: """ Generates structured and documented schema for a template property. Args: prop: A template property. Returns: Schema with documentation. """ return_type = self._return_from(prop) doc = self.serializers[prop.name]._doc or "" # TypedDict type is also a subclass of dict. if issubclass(return_type, dict): annotations = {} for c in filter(lambda c: c.name in self.serializers, prop.children): ns = self.serializers[c.name] cs = self.schema_of(c) t, d = decompose_document(cs) if ns.be_merged: if not issubclass(t, dict): raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.") annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, cs) annotations[ns.namer(c.name)] = rt else: annotations[ns.namer(c.name)] = document_type(list[t], d) td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None return document_type(generate_schema(annotations, td_type), doc) else: return document_type(return_type, doc)
- def serialize(self, graph: GraphView, **node_params: dict[str, typing.Any]) ‑> dict[str, typing.Any]
- 
Serialize graph into a dictionary. Args- graph
- A view of a graph.
- node_params
- Parameters passed to SerializationContextand used by serializer s.
 ReturnsSerialization result. Expand source codedef serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]: """ Serialize graph into a dictionary. Args: graph: A view of a graph. node_params: Parameters passed to `SerializationContext` and used by *serializer* s. Returns: Serialization result. """ return self.spec.to_dict(graph, node_params, **self.serializers)
 
- class GraphView (*args, **kwargs)
- 
The interface of the view of graph. Expand source codeclass GraphView(Protocol): """ The interface of the view of graph. """ def __call__(self) -> 'Graph': ... def __iter__(self) -> Iterator[tuple[str, 'ContainerView[NodeContainer]']]: """ Iterates root container views. Returns: Iterator of pairs of name and container view. """ ... def __getattr__(self, name: str) -> 'ContainerView': """ Returns a container view by its name. Args: name: Container name. i.e. template property name for the node container. Returns: Container view. """ ...Ancestors- typing.Protocol
- typing.Generic
 
- class Model
- 
Expand source codeclass Model(Mixins[Unpack[MXS]], metaclass=Meta): """ Base type of model types. This class only works as a marker of model types and gives no functionalities to them. """ def __init__(self, **kwargs) -> None: ... # for typing
- class Node (prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int)
- 
This class represents a node which contains an entity. Expand source codeclass Node: """ This class represents a node which contains an entity. """ class Children: """ This class represents a child nodes of a node. """ def __init__(self, prop: GraphTemplate.Property): #: Template property. self.prop = prop self.nodes: list[Node] = [] self.keys = set() self._view = None @property def name(self) -> str: """ Returns the name of corresponding template property. """ return self.prop.name @property def view(self) -> ContainerView['Node.Children']: """ Returns an unmodifiable view of child nodes. """ if self._view is None: base = self class _ChildrenView: def __bool__(self): """Returns whether this container is not empty.""" return len(base.nodes) != 0 def __call__(self): """Returns children container.""" return base def __iter__(self): """Iterates views of child nodes.""" return map(lambda n: n.view, base.nodes) def __len__(self): """Returns the number of child nodes.""" return len(base.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index): """Returns a view of child node at the index.""" if isinstance(index, slice): return [n.view for n in base.nodes[index]] else: return base.nodes[index].view def __getattr__(self, key): """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, base.prop.children), None) if child: return base.nodes[0].children[key].view if len(base.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{base.prop.name}' does not have a child property '{key}'.") self._view = _ChildrenView() return self._view def __contains__(self, node: 'Node') -> bool: return node in self.keys def __iter__(self) -> Iterator['Node']: return iter(self.nodes) def append(self, node): if node not in self.keys: self.keys.add(node) self.nodes.append(node) def __init__(self, prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int): #: Template property. self.prop = prop #: An entity value. self.entity = entity self.key = key self.parents = set() self.children: dict[str, Node.Children] = {c.name: Node.Children(c) for c in prop.children} self._index = index self._view = None def __contains__(self, key: str) -> bool: return key in self.children @property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name @property def view(self) -> NodeView: """ Returns an unmodifiable view of this node. The view object works as the accessor to entity and child nodes. """ if self._view is None: node = self class _NodeView(NodeView): def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" return node.entity def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" return node.children[key].view def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" return map(lambda nc: (nc[0], nc[1].view), node.children.items()) self._view = _NodeView() return self._view def add_child(self, child: 'Node') -> Self: """ Adds a child node. Args: child: Child node. Returns: This instance. """ if child.prop.template != self.prop.template: raise ValueError(f"Nodes from different graph template can't be associated.") self.children[child.prop.name].append(child) child.parents.add(self) return self def has_child(self, child: 'Node') -> bool: """ Checks this node contains the node identical to given node. Args: child: Node to search. Returns: `True` if exists. """ if child.prop.template != self.prop.template: return False elif child.prop.name in self.children: return child in self.children[child.prop.name].keys else: return FalseSubclasses- pyracmon.graph.graph._GraphNode
 Class variables- var Children
- 
This class represents a child nodes of a node. 
 Instance variables- var entity
- 
An entity value. 
- var name : str
- 
Returns the container name, which is same as the name of template property. Expand source code@property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name
- var prop
- 
Template property. 
- var view : NodeView
- 
Returns an unmodifiable view of this node. The view object works as the accessor to entity and child nodes. Expand source code@property def view(self) -> NodeView: """ Returns an unmodifiable view of this node. The view object works as the accessor to entity and child nodes. """ if self._view is None: node = self class _NodeView(NodeView): def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" return node.entity def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" return node.children[key].view def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" return map(lambda nc: (nc[0], nc[1].view), node.children.items()) self._view = _NodeView() return self._view
 Methods- def add_child(self, child: Node) ‑> typing_extensions.Self
- 
Adds a child node. Args- child
- Child node.
 ReturnsThis instance. Expand source codedef add_child(self, child: 'Node') -> Self: """ Adds a child node. Args: child: Child node. Returns: This instance. """ if child.prop.template != self.prop.template: raise ValueError(f"Nodes from different graph template can't be associated.") self.children[child.prop.name].append(child) child.parents.add(self) return self
- def has_child(self, child: Node) ‑> bool
- 
Checks this node contains the node identical to given node. Args- child
- Node to search.
 ReturnsTrueif exists.Expand source codedef has_child(self, child: 'Node') -> bool: """ Checks this node contains the node identical to given node. Args: child: Node to search. Returns: `True` if exists. """ if child.prop.template != self.prop.template: return False elif child.prop.name in self.children: return child in self.children[child.prop.name].keys else: return False
 
- class NodeContainer (prop: GraphTemplate.Property)
- 
This class represents a container of nodes for a template property. Expand source codeclass NodeContainer: """ This class represents a container of nodes for a template property. """ def __init__(self, prop: GraphTemplate.Property): #: Template property. self.prop = prop self.nodes: list[Node] = [] self.keys: dict[Any, list[int]] = {} self._view = None @property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name @property def view(self) -> ContainerView['NodeContainer']: """ Returns an unmodifiable view of this container. The view object works as the accessor to container components. ```python template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # length ``` """ if self._view is None: container = self class _ContainerView: def __bool__(self): """Returns whether this container is not empty.""" return len(container.nodes) != 0 def __call__(self) -> NodeContainer: """Returns a base container.""" return container def __len__(self): """Returns the number of nodes.""" return len(container.nodes) def __iter__(self): """Iterates views of nodes.""" return map(lambda n: n.view, container.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" if isinstance(index, slice): return [n.view for n in container.nodes[index]] else: return container.nodes[index].view def __getattr__(self, key) -> ContainerView: """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, container.prop.children), None) if child: return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.") self._view = _ContainerView() return self._view def append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False): """ Add an entity to this container. Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in `anscestors` . Args: entity: An entity to be stored in the node. ancestors: Parent nodes mapped by property names. to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed. """ policy: IdentifyPolicy = self.prop.policy or neverPolicy() key = policy.get_identifier(entity) parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors) new_nodes = identicals.copy() for pn in parents: index = len(self.nodes) node = Node(self.prop, entity, key, index) self.nodes.append(node) if key is not None: self.keys.setdefault(key, []).append(index) new_nodes.append(node) if pn is not None: pn.add_child(node) if to_replace: for n in identicals: n.entity = entity ancestors[self.prop.name] = new_nodesSubclasses- pyracmon.graph.graph._GraphNodeContainer
 Instance variables- var name : str
- 
Returns the container name, which is same as the name of template property. Expand source code@property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name
- var prop
- 
Template property. 
- var view : ContainerView[NodeContainer]
- 
Returns an unmodifiable view of this container. The view object works as the accessor to container components. template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # lengthExpand source code@property def view(self) -> ContainerView['NodeContainer']: """ Returns an unmodifiable view of this container. The view object works as the accessor to container components. ```python template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # length ``` """ if self._view is None: container = self class _ContainerView: def __bool__(self): """Returns whether this container is not empty.""" return len(container.nodes) != 0 def __call__(self) -> NodeContainer: """Returns a base container.""" return container def __len__(self): """Returns the number of nodes.""" return len(container.nodes) def __iter__(self): """Iterates views of nodes.""" return map(lambda n: n.view, container.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" if isinstance(index, slice): return [n.view for n in container.nodes[index]] else: return container.nodes[index].view def __getattr__(self, key) -> ContainerView: """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, container.prop.children), None) if child: return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.") self._view = _ContainerView() return self._view
 Methods- def append(self, entity: Any, ancestors: collections.abc.MutableMapping[str, list['Node']], to_replace: bool = False)
- 
Add an entity to this container. Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in anscestors.Args- entity
- An entity to be stored in the node.
- ancestors
- Parent nodes mapped by property names.
- to_replace
- If True, the entity of identical node is replaced. Otherwise, it is not changed.
 Expand source codedef append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False): """ Add an entity to this container. Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in `anscestors` . Args: entity: An entity to be stored in the node. ancestors: Parent nodes mapped by property names. to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed. """ policy: IdentifyPolicy = self.prop.policy or neverPolicy() key = policy.get_identifier(entity) parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors) new_nodes = identicals.copy() for pn in parents: index = len(self.nodes) node = Node(self.prop, entity, key, index) self.nodes.append(node) if key is not None: self.keys.setdefault(key, []).append(index) new_nodes.append(node) if pn is not None: pn.add_child(node) if to_replace: for n in identicals: n.entity = entity ancestors[self.prop.name] = new_nodes
 
- class NodeContext (context: SerializationContext, params: NodeParams)
- 
A class containing informations for serialization of a single node. The instance of this class is passed to the serialization function. Properties listed below are available to control serialization. - context: SerializationContextfor the serialization of the graph.
- node: Nodeto serialize.
- value: Entity value of the Node.
- params: Arbitrary values which is passed from invoking scope with being bound to the key of node name.
 Every serializer has to call serialize()to get the result of preceeding serializers, or make a result direcly from the node.Expand source codeclass NodeContext: """ A class containing informations for serialization of a single node. The instance of this class is passed to the serialization function. Properties listed below are available to control serialization. - context: `SerializationContext` for the serialization of the graph. - node: `Node` to serialize. - value: Entity value of the `Node` . - params: Arbitrary values which is passed from invoking scope with being bound to the key of node name. Every serializer has to call `serialize()` to get the result of preceeding serializers, or make a result direcly from the node. """ def __init__(self, context: 'SerializationContext', params: NodeParams) -> None: #: `SerializationContext` for the serializaion of the graph.` self.context = context #: Arbitrary values passed by outside for the node. self.params = params # Set on demand. self._node: Optional[Node] = None self._iterator: Optional[Iterator[Any]] = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._node = None self._iterator = None @property def node(self) -> Node: # Node must be set when passed to serialization function. return cast(Node, self._node) @property def value(self) -> Any: return cast(Node, self._node).entity def serialize(self) -> Any: """ Obtain a value serialized by preceeding serializers. """ try: return next(cast(Iterator[Any], self._iterator))(self) except StopIteration: return self.node.entityInstance variables- var context
- 
SerializationContextfor the serializaion of the graph.`
- var node : Node
- 
Expand source code@property def node(self) -> Node: # Node must be set when passed to serialization function. return cast(Node, self._node)
- var params
- 
Arbitrary values passed by outside for the node. 
- var value : Any
- 
Expand source code@property def value(self) -> Any: return cast(Node, self._node).entity
 Methods- def serialize(self) ‑> Any
- 
Obtain a value serialized by preceeding serializers. Expand source codedef serialize(self) -> Any: """ Obtain a value serialized by preceeding serializers. """ try: return next(cast(Iterator[Any], self._iterator))(self) except StopIteration: return self.node.entity
 
- context: 
- class NodeView
- 
Expand source codeclass NodeView: def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" ... def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" ... def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" ...
- class Q (**kwargs: Any)
- 
This class provides utility class methods creating conditions. Using of()is the most simple way to create a condition clause with parameters.>>> Q.of("a = $_", 1) Condition: 'a = $_' -- [1]Other utility methods correspond to basic operators defined in SQL. They takes keyword arguments and create conditions by applying operator to each item respectively. >>> Q.eq(a=1) Condition: 'a = %s' -- [1] >>> Q.in_(a=[1, 2, 3]) Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3] >>> Q.like(a="abc") Condition: 'a LIKE %s' -- ["%abc%"]Multiple arguments generates a condition which concatenates conditions with logical operator, by default AND.>>> Q.eq(a=1, b=2) Condition: 'a = %s AND b = %s' -- [1, 2]Those methods also accept table alias which is prepended to columns. >>> Q.eq("t", a=1, b=2) Condition: 't.a = %s AND t.b = %s'Additionally, the instance of this class has its own functionality to generate condition. Each parameter passed to the constructor becomes an instance method of the instance, which takes a condition clause including placeholders which will take parameters in query execution phase. Statement.execute()allows unified marker$_in spite of DB driver.>>> q = Q(a=1) >>> q.a("a = $_") Condition: 'a = $_' -- [1]Method whose name is not passed to the constructor renders empty condition which has no effect on the query. >>> q.b("b = $_") Condition: '' -- []By default, Noneis equivalent to not being passed. GivingTrueat the first argument in constructor changes the behavior.>>> q = Q(a=1, b=None) >>> q.b("b = $_") Condition: '' -- [] >>> q = Q(True, a=1, b=None) >>> q.b("b = $_") Condition: 'b = $_' -- [None]This feature simplifies a query construction in cases some parameters are absent. >>> def search(db, q): >>> w, params = where(q.a("a = $_") & q.b("b = $_")) >>> db.stmt().execute(f"SELECT * FROM table {w}", *params) >>> >>> search(db, Q(a=1)) # SELECT * FROM table WHERE a = 1 >>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2 >>> search(db, Q()) # SELECT * FROM tableInitializes an instance. Args- _include_none_
- Whether include attributes whose value is None.
- kwargs
- Denotes pairs of attribute name and parameter.
 Expand source codeclass Q: """ This class provides utility class methods creating conditions. Using `of()` is the most simple way to create a condition clause with parameters. ```python >>> Q.of("a = $_", 1) Condition: 'a = $_' -- [1] ``` Other utility methods correspond to basic operators defined in SQL. They takes keyword arguments and create conditions by applying operator to each item respectively. ```python >>> Q.eq(a=1) Condition: 'a = %s' -- [1] >>> Q.in_(a=[1, 2, 3]) Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3] >>> Q.like(a="abc") Condition: 'a LIKE %s' -- ["%abc%"] ``` Multiple arguments generates a condition which concatenates conditions with logical operator, by default `AND` . ```python >>> Q.eq(a=1, b=2) Condition: 'a = %s AND b = %s' -- [1, 2] ``` Those methods also accept table alias which is prepended to columns. ```python >>> Q.eq("t", a=1, b=2) Condition: 't.a = %s AND t.b = %s' ``` Additionally, the instance of this class has its own functionality to generate condition. Each parameter passed to the constructor becomes an instance method of the instance, which takes a condition clause including placeholders which will take parameters in query execution phase. `pyracmon.connection.Statement.execute` allows unified marker `$_` in spite of DB driver. ```python >>> q = Q(a=1) >>> q.a("a = $_") Condition: 'a = $_' -- [1] ``` Method whose name is not passed to the constructor renders empty condition which has no effect on the query. ```python >>> q.b("b = $_") Condition: '' -- [] ``` By default, `None` is equivalent to not being passed. Giving `True` at the first argument in constructor changes the behavior. ```python >>> q = Q(a=1, b=None) >>> q.b("b = $_") Condition: '' -- [] >>> q = Q(True, a=1, b=None) >>> q.b("b = $_") Condition: 'b = $_' -- [None] ``` This feature simplifies a query construction in cases some parameters are absent. ```python >>> def search(db, q): >>> w, params = where(q.a("a = $_") & q.b("b = $_")) >>> db.stmt().execute(f"SELECT * FROM table {w}", *params) >>> >>> search(db, Q(a=1)) # SELECT * FROM table WHERE a = 1 >>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2 >>> search(db, Q()) # SELECT * FROM table ``` """ class Attribute(Queryable[str]): # type: ignore def __init__(self, value): self.value = value def __call__( self, expression: Union[str, Callable[[Any], str]], convert: Optional[Union[Callable[[Any], Any], Any]] = None, ) -> 'Conditional': """ Creates conditional object composed of given expression and the attribute value as parameters. Args: expression: A clause or a function generating a clause by taking the attribute value. convert: A function converting the attribute value to parameters. If this function returns a value which is not a list, a list having only the value is used. Returns: Condition. """ expression = expression if isinstance(expression, str) else expression(self.value) if callable(convert): params = convert(self.value) elif convert is not None: params = convert else: params = [self.value] return Conditional(expression, params if isinstance(params, list) else [params]) @property def all(self) -> 'Q.Attribute': """ Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `AND`. Returns: Composite attribute. """ return Q.CompositeAttribute(self.value, True) @property def any(self) -> 'Q.Attribute': """ Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `OR`. Returns: Composite attribute. """ return Q.CompositeAttribute(self.value, False) def __bool__(self): return True def __and__(self, other: 'Conditional') -> 'Conditional': return other if bool(self.value) else Conditional() def __or__(self, other: 'Conditional') -> 'Conditional': return other if not bool(self.value) else Conditional() def __getattr__(self, key): """ Exposes a method which works similarly to 'Q' 's utility method of the same name. ```python >>> q = Q(a = 1) >>> q.a.eq("col") Condition: 'col = $_' -- [1] >>> >>> q.a.eq("col", None, "t") Condition: 't.col = $_' -- [1] >>> >>> q.a.eq("col", lambda x: x*2, "t") Condition: 't.col = $_' -- [2] ``` """ method = getattr(Q, key) def invoke(col, convert=None, *args, **kwargs): if callable(convert): value = convert(self.value) else: value = convert if convert is not None else self.value kwargs.update({col: value}) return method(*args, **kwargs) return invoke class CompositeAttribute(Attribute): def __init__(self, value, and_): super().__init__(value) self._and = and_ def __call__(self, expression, convert=None): conds = [Q.Attribute(v)(expression, convert) for v in self.value] return Conditional.all(conds) if self._and else Conditional.any(conds) def __getattr__(self, key): method = getattr(Q, key) def invoke(col, convert=None, *args, **kwargs): def conv(v): if callable(convert): return convert(v) else: # REVIEW Replacing every parameter in the list with the same value is meaningless? return convert if convert is not None else v conds = [method(*args, **dict(chain(kwargs.items(), [(col, conv(v))]))) for v in self.value] return Conditional.all(conds) if self._and else Conditional.any(conds) return invoke class NoAttribute(Attribute): def __init__(self): super().__init__(None) def __call__(self, expression, holder=lambda x:x): return Conditional() @property def all(self): return self @property def any(self): return self def __bool__(self): return False def __and__(self, other: 'Conditional') -> 'Conditional': return Conditional() def __or__(self, other: 'Conditional') -> 'Conditional': return Conditional() def __getattr__(self, key): method = getattr(Q, key) def invoke(col, convert=None, *args): return Conditional() return invoke def __init__(self, _include_none_: bool = False, **kwargs: Any): """ Initializes an instance. Args: _include_none_: Whether include attributes whose value is `None`. kwargs: Denotes pairs of attribute name and parameter. """ self.attributes = dict([(k, v) for k, v in kwargs.items() if _include_none_ or v is not None]) def __getattr__(self, key) -> Attribute: if key in self.attributes: return Q.Attribute(self.attributes[key]) else: return Q.NoAttribute() @classmethod def of(cls, expression: str = "", *params: Any) -> 'Conditional': """ Creates a condition directly from an expression and parameters. Args: expression: Condition expression. params: Parameters used in the condition. Returns: Condition object. """ return Conditional(expression, list(params)) @classmethod def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Creates a condition applying `=` operator to columns. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NULL", [] elif val is True: return f"{col}", [] elif val is False: return f"NOT {col}", [] return None return _conditional("=", _and_, kwargs, is_null, _alias_) @classmethod def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `!=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NOT NULL", [] elif val is True: return f"NOT {col}", [] elif val is False: return f"{col}", [] return None return _conditional("!=", _and_, kwargs, is_null, _alias_) @classmethod def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "1 = 0", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} IN ({holder})", val return _conditional("IN", _and_, kwargs, in_list, _alias_) @classmethod def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `NOT IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} NOT IN ({holder})", val return _conditional("NOT IN", _and_, kwargs, in_list, _alias_) @classmethod def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, kwargs, None, _alias_) @classmethod def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_) @classmethod def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_) @classmethod def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_) @classmethod def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<", _and_, kwargs, None, _alias_) @classmethod def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<=", _and_, kwargs, None, _alias_) @classmethod def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">", _and_, kwargs, None, _alias_) @classmethod def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">=", _and_, kwargs, None, _alias_)Class variables- var Attribute
- 
Base class for protocol classes. Protocol classes are defined as:: class Proto(Protocol): def meth(self) -> int: ...Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example:: class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type checkSee PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:: class GenProto(Protocol[T]): def meth(self) -> T: ...
- var CompositeAttribute
- 
Base class for protocol classes. Protocol classes are defined as:: class Proto(Protocol): def meth(self) -> int: ...Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example:: class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type checkSee PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:: class GenProto(Protocol[T]): def meth(self) -> T: ...
- var NoAttribute
- 
Base class for protocol classes. Protocol classes are defined as:: class Proto(Protocol): def meth(self) -> int: ...Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example:: class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type checkSee PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:: class GenProto(Protocol[T]): def meth(self) -> T: ...
 Static methods- def endswith(**kwargs: str) ‑> Conditional
- 
Works like eq, but appliesLIKE. Given parameters will be escaped and prepended with wildcards (%) to execute backward match.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_)
- def eq(**kwargs: Any) ‑> Conditional
- 
Creates a condition applying =operator to columns.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Creates a condition applying `=` operator to columns. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NULL", [] elif val is True: return f"{col}", [] elif val is False: return f"NOT {col}", [] return None return _conditional("=", _and_, kwargs, is_null, _alias_)
- def ge(**kwargs: Any) ‑> Conditional
- 
Works like eq, but applies>=.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">=", _and_, kwargs, None, _alias_)
- def gt(**kwargs: Any) ‑> Conditional
- 
Works like eq, but applies>.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">", _and_, kwargs, None, _alias_)
- def in_(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional
- 
Works like eq, but appliesIN.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "1 = 0", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} IN ({holder})", val return _conditional("IN", _and_, kwargs, in_list, _alias_)
- def le(**kwargs: Any) ‑> Conditional
- 
Works like eq, but applies<=.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<=", _and_, kwargs, None, _alias_)
- def like(**kwargs: str) ‑> Conditional
- 
Works like eq, but appliesLIKE. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
- def lt(**kwargs: Any) ‑> Conditional
- 
Works like eq, but applies<.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<", _and_, kwargs, None, _alias_)
- def match(**kwargs: str) ‑> Conditional
- 
Works like eq, but appliesLIKE. Given parameters will be passed to query without being escaped or enclosed.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, kwargs, None, _alias_)
- def neq(**kwargs: Any) ‑> Conditional
- 
Works like eq, but applies!=.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `!=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NOT NULL", [] elif val is True: return f"NOT {col}", [] elif val is False: return f"{col}", [] return None return _conditional("!=", _and_, kwargs, is_null, _alias_)
- def not_in(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional
- 
Works like eq, but appliesNOT IN.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `NOT IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} NOT IN ({holder})", val return _conditional("NOT IN", _and_, kwargs, in_list, _alias_)
- def of(expression: str = '', *params: Any) ‑> Conditional
- 
Creates a condition directly from an expression and parameters. Args- expression
- Condition expression.
- params
- Parameters used in the condition.
 ReturnsCondition object. Expand source code@classmethod def of(cls, expression: str = "", *params: Any) -> 'Conditional': """ Creates a condition directly from an expression and parameters. Args: expression: Condition expression. params: Parameters used in the condition. Returns: Condition object. """ return Conditional(expression, list(params))
- def startswith(**kwargs: str) ‑> Conditional
- 
Works like eq, but appliesLIKE. Given parameters will be escaped and appended with wildcards (%) to execute prefix match.Args- _alias_
- Table alias.
- _and_
- Specifies concatenating logical operator is ANDorOR.
- kwargs
- Column names and parameters.
 ReturnsCondition object. Expand source code@classmethod def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
 
- class S
- 
An utility class to build NodeSerializer.This class provides factory class methods to create NodeSerializereach of which works in the same way as the method of the same name declared onNodeSerializer.Use them to supply NodeSerializers to functions to serialize a graph or to create a graph schema such asgraph_dict()orgraph_schema().graph_dict( graph, a = S.of(), b = S.head(), )Expand source codeclass S(metaclass=SerializerMeta): """ An utility class to build `NodeSerializer` . This class provides factory class methods to create `NodeSerializer` each of which works in the same way as the method of the same name declared on `NodeSerializer` . Use them to supply `NodeSerializer`s to functions to serialize a graph or to create a graph schema such as `graph_dict` or `graph_schema` . ```python graph_dict( graph, a = S.of(), b = S.head(), ) ``` """ @classmethod def of( cls, namer: Optional[Union[str, Callable[[str], str]]] = None, aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None, *serializers: Serializer, ) -> 'NodeSerializer': """ Create an instance of `NodeSerializer`. Args: namer: A string or naming function. aggregator: An aggregation function or an index of node to select in node container. serializer: A list of *serializer* s. Returns: Created `NodeSerializer` . """ return NodeSerializer(namer, aggregator, *serializers)Static methods- def of(namer: Union[str, Callable[[str], str], ForwardRef(None)] = None, aggregator: Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]], ForwardRef(None)] = None, *serializers: Callable[[ForwardRef('NodeContext')], Any]) ‑> NodeSerializer
- 
Create an instance of NodeSerializer.Args- namer
- A string or naming function.
- aggregator
- An aggregation function or an index of node to select in node container.
- serializer
- A list of serializer s.
 ReturnsCreated NodeSerializer.Expand source code@classmethod def of( cls, namer: Optional[Union[str, Callable[[str], str]]] = None, aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None, *serializers: Serializer, ) -> 'NodeSerializer': """ Create an instance of `NodeSerializer`. Args: namer: A string or naming function. aggregator: An aggregation function or an index of node to select in node container. serializer: A list of *serializer* s. Returns: Created `NodeSerializer` . """ return NodeSerializer(namer, aggregator, *serializers)
 Methods- def alter(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def at(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def doc(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def each(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def fold(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def head(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def last(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def merge(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def name(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def select(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
- def sub(*args, **kwargs)
- 
Expand source codedef g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
 
- class Table (name: str, columns: list[Column], comment: str = '')
- 
This class represents a schema of a table. Expand source codeclass Table: """ This class represents a schema of a table. """ def __init__(self, name: str, columns: list[Column], comment: str = ""): #: Table name. self.name = name #: Columns in the table. self.columns = columns #: Comment of the table. self.comment = comment def find(self, name: str) -> Optional[Column]: """ Find a column by name. Args: name: Column name. Returns: The column if exists, otherwise `None`. """ return next(filter(lambda c: c.name == name, self.columns), None)Instance variables- var columns
- 
Columns in the table. 
- var comment
- 
Comment of the table. 
- var name
- 
Table name. 
 Methods- def find(self, name: str) ‑> Optional[Column]
- 
Find a column by name. Args- name
- Column name.
 ReturnsThe column if exists, otherwise None.Expand source codedef find(self, name: str) -> Optional[Column]: """ Find a column by name. Args: name: Column name. Returns: The column if exists, otherwise `None`. """ return next(filter(lambda c: c.name == name, self.columns), None)
 
- class Typeable
- 
An interface for generic type which is resolved into a concrete type by a type parameter. Inherit this class and declare static method whose signature is resolve(me, bound, arg, spec) -> type.>>> class A(Typeable[T]): >>> @staticmethod >>> def resolve(me, bound, arg, spec): >>> ... >>> return some_type >>> >>> Typeable.resolve(A[T], int, spec)Type resolution starts from Typeable.resolve()which invokes the static method with following arguments.- Type to resolve itself, in this case, A[T].
- A resolved type which replace T.- argis the first candidate.
- When argis alsoTypeable, this resolution flow is applied to it recursively until concrete type if determined.
 
- argis passed through as it is.
- specis passed through as it is.
 Expand source codeclass Typeable(Generic[T]): """ An interface for generic type which is resolved into a concrete type by a type parameter. Inherit this class and declare static method whose signature is `resolve(me, bound, arg, spec) -> type`. ```python >>> class A(Typeable[T]): >>> @staticmethod >>> def resolve(me, bound, arg, spec): >>> ... >>> return some_type >>> >>> Typeable.resolve(A[T], int, spec) ``` Type resolution starts from `Typeable.resolve` which invokes the static method with following arguments. - Type to resolve itself, in this case, `A[T]`. - A resolved type which replace `T`. - `arg` is the first candidate. - When `arg` is also `Typeable` , this resolution flow is applied to it recursively until concrete type if determined. - `arg` is passed through as it is. - `spec` is passed through as it is. """ @staticmethod def resolve(typeable, arg: type, spec: Any) -> type: """ Resolve a `Typeable` type into a concrete type by a type for its type parameter. Args: typeable: `Typeable` type having a generic type parameter. arg: Type to replace a type parameter. spec: An object containing information for schema generation. Returns: Resolved type. """ if get_origin(typeable) is Typeable: raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.") bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return Typeable.resolve(typeable[arg], arg, spec) elif issubgeneric(bound, Typeable): bound = Typeable.resolve(bound, arg, spec) return typeable.resolve(typeable, bound, arg, spec) else: return typeable.resolve(typeable, bound, arg, spec) @staticmethod def is_resolved(typeable: type['Typeable']) -> bool: """ Checks a type parameter of given `Typeable` is alredy resolved. Args: typeable: `Typeable` type having a generic type parameter. Returns: Whether the type parameter is already resolved or not. """ bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return False elif issubgeneric(bound, Typeable): return Typeable.is_resolved(bound) else: return TrueAncestors- typing.Generic
 SubclassesStatic methods- def is_resolved(typeable: type['Typeable']) ‑> bool
- 
Checks a type parameter of given Typeableis alredy resolved.Args- typeable
- Typeabletype having a generic type parameter.
 ReturnsWhether the type parameter is already resolved or not. Expand source code@staticmethod def is_resolved(typeable: type['Typeable']) -> bool: """ Checks a type parameter of given `Typeable` is alredy resolved. Args: typeable: `Typeable` type having a generic type parameter. Returns: Whether the type parameter is already resolved or not. """ bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return False elif issubgeneric(bound, Typeable): return Typeable.is_resolved(bound) else: return True
- def resolve(typeable, arg: type, spec: Any) ‑> type
- 
Resolve a Typeabletype into a concrete type by a type for its type parameter.Args- typeable
- Typeabletype having a generic type parameter.
- arg
- Type to replace a type parameter.
- spec
- An object containing information for schema generation.
 ReturnsResolved type. Expand source code@staticmethod def resolve(typeable, arg: type, spec: Any) -> type: """ Resolve a `Typeable` type into a concrete type by a type for its type parameter. Args: typeable: `Typeable` type having a generic type parameter. arg: Type to replace a type parameter. spec: An object containing information for schema generation. Returns: Resolved type. """ if get_origin(typeable) is Typeable: raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.") bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return Typeable.resolve(typeable[arg], arg, spec) elif issubgeneric(bound, Typeable): bound = Typeable.resolve(bound, arg, spec) return typeable.resolve(typeable, bound, arg, spec) else: return typeable.resolve(typeable, bound, arg, spec)
 
- Type to resolve itself, in this case,