Package pyracmon
Base module of pyracmon exporting commonly used objects.
Use *
simply to import them.
>>> from pyracmon import *
Expand source code
"""
Base module of pyracmon exporting commonly used objects. Use `*` simply to import them.
>>> from pyracmon import *
"""
import sys
import types
from typing import Union, Optional, Any, TypeVar, TYPE_CHECKING
from pyracmon.config import default_config
from pyracmon.connection import connect, Connection
from pyracmon.context import ConnectionContext
from pyracmon.graph.serialize import NodeSerializer
from pyracmon.mixin import CRUDMixin
from pyracmon.select import read_row
from pyracmon.model import define_model, Table, Column
from pyracmon.model_graph import GraphEntityMixin
from pyracmon.query import Q, Expression, Conditional, escape_like, where
from pyracmon.query_graph import append_rows
from pyracmon.clause import order_by, ranged_by, holders, values
from pyracmon.stub import output_stub
from pyracmon.graph import new_graph, S
from pyracmon.graph.graph import Graph, GraphView, NodeContainer, ContainerView, Node, NodeView
from pyracmon.graph.spec import GraphSpec
from pyracmon.graph.template import GraphTemplate
from pyracmon.graph.schema import document_type, Typeable, GraphSchema
from pyracmon.graph.serialize import NodeContext
from pyracmon.graph.typing import walk_schema
from pyracmon.testing import TestingMixin
if TYPE_CHECKING:
from pyracmon.model import Model as _Model
class Model(_Model):
pass
else:
from pyracmon.model import Model
__all__ = [
"connect",
"Connection",
"ConnectionContext",
"CRUDMixin",
"read_row",
"define_model",
"Table",
"Column",
"Q",
"Expression",
"Conditional",
"where",
"append_rows",
"escape_like",
"order_by",
"ranged_by",
"holders",
"values",
"new_graph",
"S",
"Graph",
"GraphView",
"NodeContainer",
"ContainerView",
"Node",
"NodeView",
"document_type",
"Typeable",
"walk_schema",
"GraphSchema",
"NodeContext",
"Model",
"declare_models",
"graph_template",
"graph_dict",
"graph_schema",
]
M = TypeVar('M', bound=Model)
def declare_models(
dialect: types.ModuleType,
db: Connection,
module: Union[types.ModuleType, str] = __name__,
mixins: list[type] = [],
excludes: Optional[list[str]] = None,
includes: Optional[list[str]] = None,
*,
testing: bool = False,
model_type: type[M] = Model,
write_stub: bool = False,
) -> list[type[M]]:
"""
Declare model types read from database into the specified module.
Args:
dialect: A module exporting `read_schema` function and `mixins` classes.
`pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available.
db: Connection already connected to database.
module: A module or module name where the declarations will be located.
mixins: Additional mixin classes for declaring model types.
excludes: Excluding table names.
includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared.
Returns:
Declared model types.
"""
tables = dialect.read_schema(db, excludes, includes)
models = []
mod = module if isinstance(module, types.ModuleType) else sys.modules[module]
base_mixins = [CRUDMixin, GraphEntityMixin, model_type]
if testing:
base_mixins[0:0] = [TestingMixin]
for t in tables:
m = define_model(t, mixins + dialect.mixins + base_mixins)
mod.__dict__[t.name] = m
models.append(m)
if write_stub:
output_stub(None, mod, models, dialect, mixins, testing=testing)
return models
def graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate:
"""
Create a graph template on the default `GraphSpec` which handles model object in appropriate ways.
See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions.
Args:
bases: Base templates whose properties and relations are merged into new template.
definitions: Definitions of template properties.
Returns:
Graph template.
"""
return default_config().graph_spec.new_template(*bases, **definitions)
def graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]:
"""
Serialize a graph into a `dict` under the default `GraphSpec` .
See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings.
Args:
graph: A view of the graph.
settings: Serialization settings where each key denotes a node name.
Returns:
Serialization result.
"""
return default_config().graph_spec.to_dict(graph, {}, **settings)
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema:
"""
Creates `GraphSchema` under the default `GraphSpec` .
See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings.
Args:
template: A template of serializing graph.
settings: Serialization settings where each key denotes a node name.
Returns:
Schema of serialization result.
"""
return default_config().graph_spec.to_schema(template, **settings)
Sub-modules
pyracmon.batch
pyracmon.clause
-
This module provides functions to generate miscellaneous clauses in query.
pyracmon.config
-
This module exports types and functions for configurations …
pyracmon.connection
-
This module provides types and functions for DB connections.
pyracmon.context
-
This module provides the context type which controls query execution as configured.
pyracmon.dbapi
-
This module provides interfaces defined in DB-API 2.0.
pyracmon.dialect
pyracmon.graph
pyracmon.marker
-
This module provides the abstration mechanism for marker creation used to embed parameters in a query …
pyracmon.mixin
-
This module provides mixin type which supplies each model type various DB operations as class methods.
pyracmon.model
pyracmon.model_graph
-
This module provides graph specifications to deal with model types …
pyracmon.query
-
This module exports types and functions for query construction …
pyracmon.query_graph
pyracmon.select
-
This module exports types and functions used for
SELECT
queries … pyracmon.sql
-
This module provides the type for query generation from a template string containing unified marker.
pyracmon.stub
-
This module exports functions to output type stub of model types.
pyracmon.testing
pyracmon.util
-
Utility types and functions for internal use.
Functions
def append_rows(cursor: Cursor, exp: collections.abc.Iterable[typing.Union[Consumable, typing.Any]], graph: Graph, /, **assign: Union[Selection, Any]) ‑> Graph
-
Adds all rows in cursor into the graph.
Values in
assign
areSelection
or any kind of objects. IfSelection
is passed, the corresponding value in row is selected. In this case, theSelection
must be contained inexp
, otherwiseValueError
is raised.exp = ... c = db.stmt().execute(...) graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0)
Args
cursor
- Cursor obtained by query.
exp
- Expressions used in the query.
graph
- Graph to append rows.
assign
- Mapping from graph property name to
Selection
or arbitrary value.
Returns
The same graph as passed one.
Expand source code
def append_rows(cursor: Cursor, exp: Iterable[Union[Consumable, Any]], graph: Graph, /, **assign: Union[Selection, Any]) -> Graph: """ Adds all rows in cursor into the graph. Values in `assign` are `Selection` or any kind of objects. If `Selection` is passed, the corresponding value in row is selected. In this case, the `Selection` must be contained in `exp` , otherwise `ValueError` is raised. ```python exp = ... c = db.stmt().execute(...) graph = add_all(c, exp, new_graph(SomeGraph), a=exp.a, b=exp.b, c=0) ``` Args: cursor: Cursor obtained by query. exp: Expressions used in the query. graph: Graph to append rows. assign: Mapping from graph property name to `Selection` or arbitrary value. Returns: The same graph as passed one. """ def get(k: str) -> Any: v = assign[k] if isinstance(v, Consumable): return getattr(r, v.name) else: return v for row in cursor.fetchall(): r = read_row(row, *exp) graph.append(**{k:get(k) for k in assign.keys()}) return graph
def connect(api: module, *args: Any, **kwargs: Any) ‑> Connection
-
Connects to DB by passing arguments to DB-API 2.0 module.
Every optional argument is passed to
api.connect
and returns theConnection
object which wraps obtained DB connection.import psycopg2 from pyracmon import connect db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres") c = db.stmt().execute("SELECT 1") assert c.fetchone()[0] == 1
Args
api
- DB-API 2.0 module which exports
connect()
function. args
- Positional arguments passed to
api.connect
. kwargs
- Keyword arguments passed to
api.connect
.
Returns
Wrapper of DB-API 2.0 connection.
Expand source code
def connect(api: types.ModuleType, *args: Any, **kwargs: Any) -> 'Connection': """ Connects to DB by passing arguments to DB-API 2.0 module. Every optional argument is passed to `api.connect` and returns the `Connection` object which wraps obtained DB connection. ```python import psycopg2 from pyracmon import connect db = connect(psycopg2, host="localhost", port=5432, dbname="pyracmon", user="postgres", password="postgres") c = db.stmt().execute("SELECT 1") assert c.fetchone()[0] == 1 ``` Args: api: DB-API 2.0 module which exports `connect` function. args: Positional arguments passed to `api.connect`. kwargs: Keyword arguments passed to `api.connect`. Returns: Wrapper of DB-API 2.0 connection. """ return Connection(api, api.connect(*args, **kwargs), None)
def declare_models(dialect: module, db: Connection, module: Union[module, str] = 'pyracmon', mixins: list[type] = [], excludes: Optional[list[str]] = None, includes: Optional[list[str]] = None, *, testing: bool = False, model_type: type[~M] = pyracmon.model.Model, write_stub: bool = False) ‑> list[type[~M]]
-
Declare model types read from database into the specified module.
Args
dialect
- A module exporting
read_schema
function andmixins
classes.pyracmon.dialect.postgresql
andpyracmon.dialect.mysql
are available. db
- Connection already connected to database.
module
- A module or module name where the declarations will be located.
mixins
- Additional mixin classes for declaring model types.
excludes
- Excluding table names.
includes
- Including table names. When this argument is omitted, all tables except for specified in
excludes
are declared.
Returns
Declared model types.
Expand source code
def declare_models( dialect: types.ModuleType, db: Connection, module: Union[types.ModuleType, str] = __name__, mixins: list[type] = [], excludes: Optional[list[str]] = None, includes: Optional[list[str]] = None, *, testing: bool = False, model_type: type[M] = Model, write_stub: bool = False, ) -> list[type[M]]: """ Declare model types read from database into the specified module. Args: dialect: A module exporting `read_schema` function and `mixins` classes. `pyracmon.dialect.postgresql` and `pyracmon.dialect.mysql` are available. db: Connection already connected to database. module: A module or module name where the declarations will be located. mixins: Additional mixin classes for declaring model types. excludes: Excluding table names. includes: Including table names. When this argument is omitted, all tables except for specified in `excludes` are declared. Returns: Declared model types. """ tables = dialect.read_schema(db, excludes, includes) models = [] mod = module if isinstance(module, types.ModuleType) else sys.modules[module] base_mixins = [CRUDMixin, GraphEntityMixin, model_type] if testing: base_mixins[0:0] = [TestingMixin] for t in tables: m = define_model(t, mixins + dialect.mixins + base_mixins) mod.__dict__[t.name] = m models.append(m) if write_stub: output_stub(None, mod, models, dialect, mixins, testing=testing) return models
def define_model(table_: Table, mixins: Union[type[~MXT], list[type], ForwardRef(None)] = None, model_type: Optional[type[~M]] = pyracmon.model.Model) ‑> type[~M]
-
Create a model type representing a table.
Model type inherits all types in
mixins
in order. When the same attribute is defined in multiple mixin types, the former overrides the latter.Every model type has following attributes:
name type description name str
Name of the table. table Table
Table schema. columns List[Column]
List of column schemas. column Any
An object whose attribute exposes of column schema of its name. Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass. Unlike dataclass, the constructor does not require all of columns. Omitted columns don't affect predefined operations such as
CRUDMixin.insert()
. Ifnot null
constraint exists on the column, insertion will be denied at runtime and exception will be thrown.>>> # CREATE TABLE t1 (col1 int, col2 text, col3 text); >>> table = define_model("t1") >>> model = table(col1=1, col2="a")
Attributes are also assignable by normal setter. If attribute name is not a valid column name,
TypeError
raises.>>> model.col3 = "b"
Model instance supports iteration which yields pairs of assigned column schema and its value.
>>> for c, v in model: >>> print(f"{c.name} = {v}") col1 = 1 col2 = a col3 = b
Args
table__
- Table schema.
mixin
- Mixin types providing class methods to the model type.
model_type
- Use this just for type hinting to determine returned model type.
Returns
Model type.
Expand source code
def define_model(table_: Table, mixins: Union[type[MXT], list[type], None] = None, model_type: Optional[type[M]] = Model) -> type[M]: """ Create a model type representing a table. Model type inherits all types in `mixins` in order. When the same attribute is defined in multiple mixin types, the former overrides the latter. Every model type has following attributes: |name|type|description| |:---|:---|:---| |name|`str`|Name of the table.| |table|`Table`|Table schema.| |columns|`List[Column]`|List of column schemas.| |column|`Any`|An object whose attribute exposes of column schema of its name.| Model instances are created by passing the constructor keyword arguments composed of column names and values like builtin dataclass. Unlike dataclass, the constructor does not require all of columns. Omitted columns don't affect predefined operations such as `CRUDMixin.insert` . If `not null` constraint exists on the column, insertion will be denied at runtime and exception will be thrown. ```python >>> # CREATE TABLE t1 (col1 int, col2 text, col3 text); >>> table = define_model("t1") >>> model = table(col1=1, col2="a") ``` Attributes are also assignable by normal setter. If attribute name is not a valid column name, `TypeError` raises. ```python >>> model.col3 = "b" ``` Model instance supports iteration which yields pairs of assigned column schema and its value. ```python >>> for c, v in model: >>> print(f"{c.name} = {v}") col1 = 1 col2 = a col3 = b ``` Args: table__: Table schema. mixin: Mixin types providing class methods to the model type. model_type: Use this just for type hinting to determine returned model type. Returns: Model type. """ column_names = {c.name for c in table_.columns} class Columns: def __init__(self): for c in table_.columns: setattr(self, c.name, c) class Meta(type): name = table_.name table = table_ columns = table_.columns column = Columns() @classmethod def shrink(cls, excludes: list[str], includes: Optional[list[str]] = None) -> Self: """ Creates new model type containing subset of columns. Args: excludes: Column names to exclude. includes: Column names to include. Returns: model type. """ cols = [c for c in cls.columns if (not includes or c.name in includes) and c.name not in excludes] return define_model(Table(cls.name, cols, cls.table.comment), mixins) # type: ignore class Base(Model, metaclass=Meta): pass mixin_types: list[type] = [] if isinstance(mixins, list): mixin_types = mixins elif get_origin(mixins) is not None: mixin_types = cast(list[type], list(get_args(mixins))) elif mixins is not None: raise ValueError(f"Model mixin types should be specified by Mixins or a list of types.") class _Model(type("ModelBase", tuple([Base] + mixin_types), {})): def __init__(self, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) def __repr__(self): cls = cast(type[Base], type(self)) return f"{cls.name}({', '.join([f'{c.name}={repr(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})" def __str__(self): cls = cast(type[Base], type(self)) return f"{cls.name}({', '.join([f'{c.name}={str(getattr(self, c.name))}' for c in cls.columns if hasattr(self, c.name)])})" def __iter__(self) -> Iterator[tuple[Column, Any]]: cls = cast(type[Base], type(self)) return map(lambda c: (c, getattr(self, c.name)), filter(lambda c: hasattr(self, c.name), cls.columns)) def __setattr__(self, key, value): cls = cast(type[Base], type(self)) if key not in column_names: raise TypeError(f"{key} is not a column of {cls.name}") object.__setattr__(self, key, value) def __getitem__(self, key): return getattr(self, key) def __contains__(self, key): return hasattr(self, key) def __eq__(self, other): cls = type(self) if cls != type(other): return False for k in column_names: if hasattr(self, k) ^ hasattr(other, k): return False if getattr(self, k, None) != getattr(other, k, None): return False return True return cast(type[M], _Model)
def document_type(t: type, doc: str) ‑>
-
Supplies a document to a type.
Args
t
- A type.
doc
- A document.
Returns
Documented type.
Expand source code
def document_type(t: type, doc: str) -> Annotated: """ Supplies a document to a type. Args: t: A type. doc: A document. Returns: Documented type. """ return Annotated[t, doc]
def escape_like(v: str) ‑> str
-
Escape a string for the use in
LIKE
condition.Args
v
- A string.
Returns
Escaped string.
Expand source code
def escape_like(v: str) -> str: """ Escape a string for the use in `LIKE` condition. Args: v: A string. Returns: Escaped string. """ def esc(c): if c == "\\": return r"\\\\" elif c == "%": return r"\%" elif c == "_": return r"\_" else: return c return ''.join(map(esc, v))
def graph_dict(graph: GraphView, **settings: NodeSerializer) ‑> dict[str, typing.Any]
-
Serialize a graph into a
dict
under the defaultGraphSpec
.See
GraphSpec.to_dict()
for the detail of serialization settings.Args
graph
- A view of the graph.
settings
- Serialization settings where each key denotes a node name.
Returns
Serialization result.
Expand source code
def graph_dict(graph: GraphView, **settings: NodeSerializer) -> dict[str, Any]: """ Serialize a graph into a `dict` under the default `GraphSpec` . See `pyracmon.graph.GraphSpec.to_dict` for the detail of serialization settings. Args: graph: A view of the graph. settings: Serialization settings where each key denotes a node name. Returns: Serialization result. """ return default_config().graph_spec.to_dict(graph, {}, **settings)
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) ‑> GraphSchema
-
Creates
GraphSchema
under the defaultGraphSpec
.See
GraphSpec.to_schema()
for the detail of serialization settings.Args
template
- A template of serializing graph.
settings
- Serialization settings where each key denotes a node name.
Returns
Schema of serialization result.
Expand source code
def graph_schema(template: GraphTemplate, **settings: NodeSerializer) -> GraphSchema: """ Creates `GraphSchema` under the default `GraphSpec` . See `pyracmon.graph.GraphSpec.to_schema` for the detail of serialization settings. Args: template: A template of serializing graph. settings: Serialization settings where each key denotes a node name. Returns: Schema of serialization result. """ return default_config().graph_spec.to_schema(template, **settings)
def graph_template(*bases: GraphTemplate, **definitions: type) ‑> GraphTemplate
-
Create a graph template on the default
GraphSpec
which handles model object in appropriate ways.See
GraphSpec.new_template()
for the detail of definitions.Args
bases
- Base templates whose properties and relations are merged into new template.
definitions
- Definitions of template properties.
Returns
Graph template.
Expand source code
def graph_template(*bases: GraphTemplate, **definitions: type) -> GraphTemplate: """ Create a graph template on the default `GraphSpec` which handles model object in appropriate ways. See `pyracmon.graph.GraphSpec.new_template` for the detail of definitions. Args: bases: Base templates whose properties and relations are merged into new template. definitions: Definitions of template properties. Returns: Graph template. """ return default_config().graph_spec.new_template(*bases, **definitions)
def holders(length_or_keys: Union[int, collections.abc.Sequence[Union[str, int, NoneType, Expression]]], qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str
-
Generates partial query string containing placeholder markers separated by comma.
Args
length_or_keys
- The number of placeholders or list of placeholder keys.
qualifier
- Qualifying function for each index.
Returns
Query string.
Expand source code
def holders(length_or_keys: Union[int, Sequence[HolderKeys]], qualifier: Optional[Mapping[int, Qualifier]] = None) -> str: """ Generates partial query string containing placeholder markers separated by comma. Args: length_or_keys: The number of placeholders or list of placeholder keys. qualifier: Qualifying function for each index. Returns: Query string. """ if isinstance(length_or_keys, int): hs = ["${_}"] * length_or_keys else: def key(k): if isinstance(k, Expression): return k.expression elif isinstance(k, int): return f"${{_{k}}}" elif k: return f"${{{k}}}" else: return "${_}" hs = [key(k) for k in length_or_keys] if qualifier: hs = [qualifier.get(i, _noop)(h) for i, h in enumerate(hs)] return ', '.join(hs)
def new_graph(template: GraphTemplate, *bases: Union[Graph, GraphView]) ‑> Graph
-
Create a graph from a template.
Use this function instead of invoking constructor directly.
Args
template
- A template of a graph.
bases
- Other graphs whose nodes are appended to created graph.
Returns
Created graph.
Expand source code
def new_graph(template: GraphTemplate, *bases: Union[Graph, GraphView]) -> Graph: """ Create a graph from a template. Use this function instead of invoking constructor directly. Args: template: A template of a graph. bases: Other graphs whose nodes are appended to created graph. Returns: Created graph. """ graph = Graph(template) for b in bases: graph += b return graph
def order_by(columns: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]], **defaults: Union[bool, tuple[bool, bool], str]) ‑> str
-
Generates
ORDER BY
clause from columns and directions.Args
columns
- Columns and directions. Iteration order is kept in rendered clause.
defaults
- Column names and directions appended to the clause if the column is not contained in
columns
.
Returns
ORDER BY
clause.Expand source code
def order_by(columns: Mapping[Union[str, AliasedColumn], ORDER], **defaults: ORDER) -> str: """ Generates `ORDER BY` clause from columns and directions. Args: columns: Columns and directions. Iteration order is kept in rendered clause. defaults: Column names and directions appended to the clause if the column is not contained in `columns` . Returns: `ORDER BY` clause. """ columns = dict(columns, **{c:v for c,v in defaults.items() if c not in columns}) def col(cd): if isinstance(cd[1], bool): return f"{cd[0]} ASC" if cd[1] else f"{cd[0]} DESC" elif isinstance(cd[1], str): return f"{cd[0]} {cd[1]}" elif isinstance(cd[1], tuple) and len(cd[1]) == 2: return f"{cd[0]} {'ASC' if cd[1][0] else 'DESC'} NULLS {'FIRST' if cd[1][1] else 'LAST'}" else: raise ValueError(f"Directions must be specified by bool, pair of bools or string: {cd[1]}") return '' if len(columns) == 0 else f"ORDER BY {', '.join(map(col, columns.items()))}"
def ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) ‑> tuple[str, list[typing.Any]]
-
Generates
LIMIT OFFSET
clause using marker.Args
limit
- Limit value.
None
means no limitation. offset
- Offset value.
None
means0
.
Returns
LIMIT OFFSET
clause and its parameters.Expand source code
def ranged_by(limit: Optional[int] = None, offset: Optional[int] = None) -> tuple[str, list[Any]]: """ Generates `LIMIT OFFSET` clause using marker. Args: limit: Limit value. `None` means no limitation. offset: Offset value. `None` means `0`. Returns: `LIMIT OFFSET` clause and its parameters. """ clause, params = [], [] if limit is not None: clause.append("LIMIT $_") params.append(limit) if offset is not None: clause.append("OFFSET $_") params.append(offset) return ' '.join(clause) if clause else '', params
def read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) ‑> RowValues
-
Read values in a row according to given selections.
This function returns
RowValues
where each value is created by each selection respectively. The type of the selection determines how values in the row are handled:Selection
consumes as many values as the number of columns in it and creates a model instance.- Empty tuple or a string consumes a value, which is stored in
RowValues
as it is.
Args
selections
- List of selections or their equivalents.
allow_redundancy
- If
False
,ValueError
is thrown when not all values in a row are consumed.
Returns
Values read from the row accoding to the selections.
Expand source code
def read_row(row, *selections: Union[Consumable, str, tuple], allow_redundancy: bool = False) -> RowValues: """ Read values in a row according to given selections. This function returns `RowValues` where each value is created by each selection respectively. The type of the selection determines how values in the row are handled: - `Selection` consumes as many values as the number of columns in it and creates a model instance. - Empty tuple or a string consumes a value, which is stored in `RowValues` as it is. Args: selections: List of selections or their equivalents. allow_redundancy: If `False`, `ValueError` is thrown when not all values in a row are consumed. Returns: Values read from the row accoding to the selections. """ consumables = [Consumable.to_consumable(s) for s in selections] result = RowValues(consumables) for s in consumables: result.append(s.consume(row)) row = row[len(s):] if not allow_redundancy and len(row) > 0: raise ValueError("Not all elements in row is consumed.") return result
def values(length_or_key_gen: Union[int, collections.abc.Sequence[collections.abc.Callable[[int], Union[str, int, NoneType, Expression]]]], rows: int, qualifier: Optional[collections.abc.Mapping[int, collections.abc.Callable[[str], str]]] = None) ‑> str
-
Generates partial query string for
VALUES
clause in insertion query.Args
length_or_key_gen
- The number of placeholders or list of functions taking row index and returning key for each placeholder.
rows
- The number of rows to insert.
qualifier
- Qualifying function for each index.
Returns
Query string.
Expand source code
def values(length_or_key_gen: Union[int, Sequence[Callable[[int], HolderKeys]]], rows: int, qualifier: Optional[Mapping[int, Qualifier]] = None) -> str: """ Generates partial query string for `VALUES` clause in insertion query. Args: length_or_key_gen: The number of placeholders or list of functions taking row index and returning key for each placeholder. rows: The number of rows to insert. qualifier: Qualifying function for each index. Returns: Query string. """ if isinstance(length_or_key_gen, int): lok = lambda i: length_or_key_gen else: lok = lambda i: [g(i) for g in length_or_key_gen] # type: ignore return ', '.join([f"({holders(lok(i), qualifier)})" for i in range(rows)])
def walk_schema(td, with_doc=False) ‑> dict[str, typing.Union[type, typing.Annotated]]
-
Returns a dictionary as a result of walking a schema object from its root.
Args
td
- A schema represented by
TypedDict
. with_doc
- Flag to include documentations into result.
Returns
Key value representation of the schema. If
with_doc
isTrue
, each value isAnnotated
.Expand source code
def walk_schema(td, with_doc=False) -> dict[str, Union[type, Annotated]]: """ Returns a dictionary as a result of walking a schema object from its root. Args: td: A schema represented by `TypedDict`. with_doc: Flag to include documentations into result. Returns: Key value representation of the schema. If `with_doc` is `True`, each value is `Annotated`. """ if '__annotations__' not in td.__dict__: return {} result = {} def put(k, t, doc): if with_doc: result[k] = (t, doc) else: result[k] = t def expand(t): return (get_args(t)[0], lambda x:[x]) if issubgeneric(t, list) else (t, lambda x:x) for k, t in get_type_hints(td, include_extras=True).items(): t, doc = decompose_document(t) t, conv = expand(t) opt_type = is_optional(t) if is_typeddict(t): put(k, conv(walk_schema(t, with_doc)), doc) elif opt_type is not None and is_typeddict(opt_type): put(k, conv(walk_schema(opt_type, with_doc)), doc) else: put(k, conv(t), doc) return result
def where(condition: Conditional) ‑> tuple[str, list[typing.Any]]
-
Generates a
WHERE
clause and parameters representing given condition.If the condition is empty, returned clause is an empty string which does not contain
WHERE
keyword.Args
condition
- Condition object.
Returns
Tuple of
WHERE
clause and parameters.Expand source code
def where(condition: 'Conditional') -> tuple[str, list[Any]]: """ Generates a `WHERE` clause and parameters representing given condition. If the condition is empty, returned clause is an empty string which does not contain `WHERE` keyword. Args: condition: Condition object. Returns: Tuple of `WHERE` clause and parameters. """ return ('', []) if condition.expression == '' else (f'WHERE {condition.expression}', condition.params)
Classes
class CRUDMixin
-
Default mixin providing class methods available on all model types.
Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly.
pks
- Names and values of all primary key columns in form of
dict
. - A primary key value. This form is allowed when the table has just one primary key column.
- e.g. If the table has a single primary key
id
of int,1
is available to spcecify the row ofid = 1
. - e.g. If the table has multiple primary keys
intid
andstrid
,dict(intid=1, strid="abc")
is a valid argument.
- Names and values of all primary key columns in form of
record
- A model object or a mapping from column name to its value, which corresponds to a table row.
- Only columns contained in the record is affected by the operation.
- e.g. When
dict(c1=1, c2="abc")
is passed for insertion, onlyc1
andc2
are set in INSERT query. - e.g. For update, only the columns will be updated. Other columns are not affected.
condition
- Query condition which will compose WHERE clause.
Q
is a factory class to create condition object.- When
None
is passed, all rows are subject to the operation.
qualifier
- A mapping from column name to a function which qualifies a placeholder passed by an argument.
- Detail of qualifier function is described below.
lock
- This is reserved argument for locking statement but works just as the postfix of the query currently.
- The usage will be changed in future version.
Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like
insert into t (c1, c2) values (?, ?)
(Q
parameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below.t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()")) # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now())
Be aware that when model object is passed, its column values may differ from actual values in DB after query.
Expand source code
class CRUDMixin(SelectMixin, CRUDInternalMeta): """ Default mixin providing class methods available on all model types. Every method takes the DB connection object as its first argument. Following arguments are defined in several methods commonly. - `pks` - Names and values of all primary key columns in form of `dict` . - A primary key value. This form is allowed when the table has just one primary key column. - e.g. If the table has a single primary key `id` of int, `1` is available to spcecify the row of `id = 1` . - e.g. If the table has multiple primary keys `intid` and `strid`, `dict(intid=1, strid="abc")` is a valid argument. - `record` - A model object or a mapping from column name to its value, which corresponds to a table row. - Only columns contained in the record is affected by the operation. - e.g. When `dict(c1=1, c2="abc")` is passed for insertion, only `c1` and `c2` are set in INSERT query. - e.g. For update, only the columns will be updated. Other columns are not affected. - `condition` - Query condition which will compose WHERE clause. - `pyracmon.query.Q` is a factory class to create condition object. - When `None` is passed, all rows are subject to the operation. - `qualifier` - A mapping from column name to a function which qualifies a placeholder passed by an argument. - Detail of qualifier function is described below. - `lock` - This is reserved argument for locking statement but works just as the postfix of the query currently. - The usage will be changed in future version. Qualifier function is used typically to convert or replace placeholder marker in insert/update query. By default, those queries contain markers like `insert into t (c1, c2) values (?, ?)` (`Q` parameter style). We need sometimes qualify markers to apply DB function, calculation, type cast and so on. This feature enables them like below. ```python t.insert(db, dict(c1=1, c2=None), dict(c1=lambda x: f"{x}+1", c2=lambda x: "now()")) # SQL: INSERT INTO t (c1, c2) VALUES (?+1, now()) ``` Be aware that when model object is passed, its column values may differ from actual values in DB after query. """ @classmethod def count(cls, db: Connection, condition: Conditional = Q.of()) -> int: """ Count rows which satisfies the condition. ```python t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. Returns: The number of rows. """ wc, wp = where(condition) c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp) return c.fetchone()[0] # type: ignore @classmethod def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]: """ Fetch a record by primary key(s). ```python t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). lock: Locking statement. Returns: A model object if exists, otherwise `None`. """ cols, vals = parse_pks(cls, pks) cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) row = c.fetchone() return read_row(row, *s)[0] if row else None @classmethod def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]: """ Fetch a record by sequence of primary key(s). This method simply concatenates equality conditions on primary key by OR operator. ```python t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3 ``` Args: db: DB connection. seq_pks: Sequence of primary key value. lock: Locking statement. per_page: Maximum number of keys for an execution of query. Returns: Model objects in the same order as passed sequence. """ res = [] index = 0 while index < len(seq_pks): ordered_pks = [] cond = Q.of() for pks in seq_pks[index:index+per_page]: cols, vals = parse_pks(cls, pks) ordered_pks.append(tuple(v for v in vals)) cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) record_map = {} for r in [read_row(row, *s)[0] for row in c.fetchall()]: pk_values = {c.name:v for c, v in r if c.pk} record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r res.extend([record_map[k] for k in ordered_pks if k in record_map]) index += per_page return res @classmethod def fetch_where( cls, db: Connection, condition: Conditional = Q.of(), orders: Mapping[Union[str, AliasedColumn], ORDER] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None, ) -> list[Self]: """ Fetch records which satisfy the condition. ```python t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5 ``` Args: db: DB connection. condition: Query condition. orders: Ordering specification where key is column name and value denotes whether the order is ascending or not. limit: Maximum nuber of rows to fetch. If `None`, all rows are returned. offset: The number of rows to skip. lock: Locking statement. Returns: Model objects. """ wc, wp = where(condition) rc, rp = ranged_by(limit, offset) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp)) return [read_row(row, *s)[0] for row in c.fetchall()] @classmethod def fetch_one( cls, db: Connection, condition: Conditional = Q.of(), lock: Optional[Any] = None, ) -> Optional[Self]: """ Fetch a record which satisfies the condition. `ValueError` raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key. ```python t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. lock: Locking statement. Returns: Model objects If exists, otherwise `None`. """ rs = cls.fetch_where(db, condition, lock=lock) if not rs: return None elif len(rs) == 1: return rs[0] else: raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().") @classmethod def _insert_sql(cls, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}) -> tuple[str, list[str], list[Any]]: model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) value_dict = model_values(cls, model) check_columns(cls, value_dict) cols, vals = list(value_dict.keys()), list(value_dict.values()) ordered_qs = key_to_index(qualifier, cols) def exp(v): return lambda i: v if any(isinstance(v, Expression) for v in vals): key_gen = [] org_vals = vals vals = [] for v in org_vals: if isinstance(v, Expression): key_gen.append(exp(v)) vals.extend(v.params) else: key_gen.append(lambda i: None) vals.append(v) values_clause = values(key_gen, 1, ordered_qs) else: values_clause = values(len(cols), 1, ordered_qs) return f"INSERT INTO {cls.name} ({', '.join(cols)}) VALUES {values_clause}", cols, vals @classmethod def insert( cls, db: Connection, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ) -> Self: """ Insert a record. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. ```python t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2) ``` Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted record with complete and correct column values. Returns: Model of inserted record. """ model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) sql, _, vals = cls._insert_sql(record, qualifier) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *vals) s = cls.select() return read_row(c.fetchone(), *s)[0] else: # REVIEW # Inserted row can't be specified from the table where no primary keys are defined . pass db.stmt().execute(sql, *vals) for c, v in cls.last_sequences(db, 1): setattr(model, c.name, v) return model @classmethod @overload def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> list[Self]: ... @classmethod @overload def insert_many(cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def insert_many( cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Insert records. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted records with complete and correct column values. Returns: Models of inserted records or cursor. """ if len(records) == 0: return [] models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records] seq_of_params = [] sql, cols, params = cls._insert_sql(models[0], qualifier) cols = set(cols) seq_of_params.append(params) for m in models[1:]: value_dict = model_values(cls, m) check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True) # REVIEW: # The consistency among columns where expression is set is not checked. _, _, params = cls._insert_sql(m, qualifier) seq_of_params.append(params) db.stmt().executemany(sql, seq_of_params) num = len(records) for c, v in cls.last_sequences(db, num): for i, m in enumerate(models): setattr(m, c.name, v - (num - i - 1)) if returning: seq_pks = [extract_pks(cls, m) for m in models] return cls.fetch_many(db, seq_pks) else: return models @classmethod def _update_sql(cls, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, allow_all: bool = True) -> tuple[str, list[str], list[Any]]: value_dict = model_values(cls, record, excludes_pk=True) check_columns(cls, value_dict) cols, vals = list(value_dict.keys()), list(value_dict.values()) ordered_qs = key_to_index(qualifier, cols) def set_col(acc, icv): i, (c, v) = icv if isinstance(v, Expression): clause = f"{c} = {ordered_qs.get(i, lambda x:x)(v.expression)}" params = v.params else: clause = f"{c} = {ordered_qs.get(i, lambda x:x)('$_')}" params = [v] acc[0].append(clause) acc[1].extend(params) return acc setters, params = reduce(set_col, enumerate(zip(cols, vals)), ([], [])) wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Update query to update all records is not allowed.") return f"UPDATE {cls.name} SET {', '.join(setters)}{_spacer(wc)}", cols, params + wp @classmethod @overload def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> bool: ... @classmethod @overload def update(cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> Optional[Self]: ... @classmethod def update( cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update a record by primary key(s). This method only updates columns which are found in `record` except for primary key(s). ```python t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: Whether the record exists and updated or updated record model. """ cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) if returning: if cls.support_returning(db): models = cls.update_where(db, record, condition, qualifier, returning=True) return models[0] if models else None else: models = cls.update_where(db, record, condition, qualifier, returning=False) return cls.fetch(db, pks) else: return cls.update_where(db, record, condition, qualifier, returning=False) == 1 @classmethod @overload def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False) -> int: ... @classmethod @overload def update_many(cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def update_many( cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. record: Sequence of objects contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: The number of affected rows or updated records. """ if len(records) == 0: return [] if returning else 0 keys = {c.name for c in cls.columns if c.pk} if len(keys) == 0: raise ValueError(f"update_many is not available because {cls} does not have primary key columns.") def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]): if cv[0] in keys: acc[0][cv[0]] = cv[1] else: acc[1][cv[0]] = cv[1] return acc seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = [] target_columns: Optional[set[str]] = None for vs in [model_values(cls, r, excludes_pk=False) for r in records]: if not keys < vs.keys(): raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.") pks, rec = reduce(classify, vs.items(), ({}, {})) if target_columns is None: check_columns(cls, rec) target_columns = set(rec.keys()) else: check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore seq_of_values.append((pks, rec)) sql_first = "" seq_of_params: list[list[Any]] = [] for pks, rec in seq_of_values: cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) sql, _, params = cls._update_sql(rec, condition, qualifier) if not sql_first: sql_first = sql seq_of_params.append(params) if returning: db.stmt().executemany(f"{sql_first}", seq_of_params) return cls.fetch_many(db, [pks for pks, _ in seq_of_values]) else: return db.stmt().executemany(sql_first, seq_of_params).rowcount @classmethod @overload def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[False] = False, allow_all: bool = False) -> int: ... @classmethod @overload def update_where(cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: Literal[True] = True, allow_all: bool = False) -> list[Self]: ... @classmethod def update_where( cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, allow_all: bool = True, ): """ Update records which satisfy the condition. ```python t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1 ``` Args: db: DB connection. record: Object contains column values. condition: Query condition. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or updated records. """ sql, _, params = cls._update_sql(record, condition, qualifier, allow_all) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *params) s = cls.select() return [read_row(row, *s)[0] for row in c.fetchall()] else: raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.") else: c = db.stmt().execute(sql, *params) return c.rowcount @classmethod @overload def delete(cls, db: Connection, pks: PKS, /, returning: Literal[False] = False) -> bool: ... @classmethod @overload def delete(cls, db: Connection, pks: PKS, /, returning: Literal[True] = True) -> Optional[Self]: ... @classmethod def delete(cls, db: Connection, pks: PKS, /, returning: bool = False): """ Delete a record by primary key(s). ```python t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). returning: Flag to return deleted record if any. Returns: Whether the record exists and deleted or delete record if any. """ cols, vals = parse_pks(cls, pks) if returning: models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True) return models[0] if models else None else: return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1 @classmethod @overload def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[False] = False) -> int: ... @overload @classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: Literal[True] = True) -> list[Self]: ... @classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False): """ Delete a records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. pks: Primary keys or objects each of which contains values of all primary keys. returning: Flag to return deleted records. Returns: The number of affected rows or deleted records. """ if len(pks) == 0: return None seq_of_pks: list[dict[str, Any]] = [] for rec in pks: if isinstance(rec, (dict, cls)): seq_of_pks.append(extract_pks(cls, rec)) else: cols, vals = parse_pks(cls, rec) seq_of_pks.append(dict(zip(cols, vals))) condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()]) wc, wp = where(condition) sql = f"DELETE FROM {cls.name}{_spacer(wc)}" seq_of_params: list[list[Any]] = [wp] for v in seq_of_pks[1:]: condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()]) _, wp = where(condition) seq_of_params.append(wp) if returning: models = cls.fetch_many(db, seq_of_pks) db.stmt().executemany(sql, seq_of_params) return models else: return db.stmt().executemany(sql, seq_of_params).rowcount @classmethod @overload def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[False] = False, allow_all: bool = True) -> int: ... @classmethod @overload def delete_where(cls, db: Connection, condition: Conditional, /, returning: Literal[True] = True, allow_all: bool = True) -> list[Self]: ... @classmethod def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True): """ Delete records which satisfy the condition. ```python t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. returning: Flag to return deleted records. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or deleted records. """ wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Delete query to delete all records is not allowed.") sql = f"DELETE FROM {cls.name}{_spacer(wc)}" if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *wp) return [read_row(row, *cls.select())[0] for row in c.fetchall()] else: current = cls.fetch_where(db, condition) c = db.stmt().execute(sql, *wp) return current else: return db.stmt().execute(sql, *wp).rowcount @classmethod def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: """ Returns the sequential (auto incremental) values of a table generated by the latest insertion. Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned. This method should be overridden by another mixin class defined in dialect module. Args: db: DB connection. num: The number of records inserted by the latest query. Returns: List of pairs of column and its values. """ return [] @classmethod def support_returning(cls, db: Connection) -> bool: """ Checks whehter this DBMS support **RETURNING** clause or not. Args: db: DB connection. Returns: Whehter this DBMS support **RETURNING** clause or not. """ return False
Ancestors
Static methods
def count(db: Connection, condition: Conditional = Condition: '' -- []) ‑> int
-
Count rows which satisfies the condition.
t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1
Args
db
- DB connection.
condition
- Query condition.
Returns
The number of rows.
Expand source code
@classmethod def count(cls, db: Connection, condition: Conditional = Q.of()) -> int: """ Count rows which satisfies the condition. ```python t.count(db, Q.eq(c1=1)) # SQL: SELECT COUNT(*) FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. Returns: The number of rows. """ wc, wp = where(condition) c = db.stmt().execute(f"SELECT COUNT(*) FROM {cls.name}{_spacer(wc)}", *wp) return c.fetchone()[0] # type: ignore
def delete(db: Connection, pks: Union[Any, dict[str, Any]], /, returning: bool = False)
-
Delete a record by primary key(s).
t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1
Args
db
- DB connection.
pks
- Primary key value(s).
returning
- Flag to return deleted record if any.
Returns
Whether the record exists and deleted or delete record if any.
Expand source code
@classmethod def delete(cls, db: Connection, pks: PKS, /, returning: bool = False): """ Delete a record by primary key(s). ```python t.delete(db, 1) # SQL: DELETE FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). returning: Flag to return deleted record if any. Returns: Whether the record exists and deleted or delete record if any. """ cols, vals = parse_pks(cls, pks) if returning: models = cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]), returning=True) return models[0] if models else None else: return cls.delete_where(db, Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)])) == 1
def delete_many(db: Connection, pks: Union[collections.abc.Sequence[Union[Any, dict[str, Any]]], collections.abc.Sequence[Union[Meta, dict[str, Any]]]], /, returning: bool = False)
-
Delete a records by set of primary key(s).
This method invokes on
executemany
defined in DB-API 2.0. Whether it is optimized compared toexecute
depends on DB driver.Args
db
- DB connection.
pks
- Primary keys or objects each of which contains values of all primary keys.
returning
- Flag to return deleted records.
Returns
The number of affected rows or deleted records.
Expand source code
@classmethod def delete_many(cls, db: Connection, pks: Union[Sequence[PKS], Sequence[Record]], /, returning: bool = False): """ Delete a records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. pks: Primary keys or objects each of which contains values of all primary keys. returning: Flag to return deleted records. Returns: The number of affected rows or deleted records. """ if len(pks) == 0: return None seq_of_pks: list[dict[str, Any]] = [] for rec in pks: if isinstance(rec, (dict, cls)): seq_of_pks.append(extract_pks(cls, rec)) else: cols, vals = parse_pks(cls, rec) seq_of_pks.append(dict(zip(cols, vals))) condition = Conditional.all([Q.eq(**{c: v}) for c, v in seq_of_pks[0].items()]) wc, wp = where(condition) sql = f"DELETE FROM {cls.name}{_spacer(wc)}" seq_of_params: list[list[Any]] = [wp] for v in seq_of_pks[1:]: condition = Conditional.all([Q.eq(**{c: v}) for c, v in v.items()]) _, wp = where(condition) seq_of_params.append(wp) if returning: models = cls.fetch_many(db, seq_of_pks) db.stmt().executemany(sql, seq_of_params) return models else: return db.stmt().executemany(sql, seq_of_params).rowcount
def delete_where(db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True)
-
Delete records which satisfy the condition.
t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1
Args
db
- DB connection.
condition
- Query condition.
returning
- Flag to return deleted records.
allow_all
- If
False
, empty condition raisesValueError
.
Returns
The number of affected rows or deleted records.
Expand source code
@classmethod def delete_where(cls, db: Connection, condition: Conditional, /, returning: bool = False, allow_all: bool = True): """ Delete records which satisfy the condition. ```python t.delete(db, Q.eq(c1=1)) # SQL: DELETE FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. returning: Flag to return deleted records. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or deleted records. """ wc, wp = where(condition) if wc == "" and not allow_all: raise ValueError("Delete query to delete all records is not allowed.") sql = f"DELETE FROM {cls.name}{_spacer(wc)}" if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *wp) return [read_row(row, *cls.select())[0] for row in c.fetchall()] else: current = cls.fetch_where(db, condition) c = db.stmt().execute(sql, *wp) return current else: return db.stmt().execute(sql, *wp).rowcount
def fetch(db: Connection, pks: Union[Any, dict[str, Any]], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]
-
Fetch a record by primary key(s).
t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1
Args
db
- DB connection.
pks
- Primary key value(s).
lock
- Locking statement.
Returns
A model object if exists, otherwise
None
.Expand source code
@classmethod def fetch(cls, db: Connection, pks: PKS, lock: Optional[Any] = None) -> Optional[Self]: """ Fetch a record by primary key(s). ```python t.fetch(db, 1) # SQL: SELECT * FROM t WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). lock: Locking statement. Returns: A model object if exists, otherwise `None`. """ cols, vals = parse_pks(cls, pks) cond = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) row = c.fetchone() return read_row(row, *s)[0] if row else None
def fetch_many(db: Connection, seq_pks: collections.abc.Sequence[typing.Union[typing.Any, dict[str, typing.Any]]], lock: Optional[Any] = None, /, per_page: int = 1000) ‑> list[typing_extensions.Self]
-
Fetch a record by sequence of primary key(s).
This method simply concatenates equality conditions on primary key by OR operator.
t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3
Args
db
- DB connection.
seq_pks
- Sequence of primary key value.
lock
- Locking statement.
per_page
- Maximum number of keys for an execution of query.
Returns
Model objects in the same order as passed sequence.
Expand source code
@classmethod def fetch_many(cls, db: Connection, seq_pks: Sequence[PKS], lock: Optional[Any] = None, /, per_page: int = 1000) -> list[Self]: """ Fetch a record by sequence of primary key(s). This method simply concatenates equality conditions on primary key by OR operator. ```python t.fetch_many(db, [1, 2, 3]) # SQL: SELECT * FROM t WHERE id = 1 OR id = 2 OR id = 3 ``` Args: db: DB connection. seq_pks: Sequence of primary key value. lock: Locking statement. per_page: Maximum number of keys for an execution of query. Returns: Model objects in the same order as passed sequence. """ res = [] index = 0 while index < len(seq_pks): ordered_pks = [] cond = Q.of() for pks in seq_pks[index:index+per_page]: cols, vals = parse_pks(cls, pks) ordered_pks.append(tuple(v for v in vals)) cond |= Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) wc, wp = where(cond) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(lock)}", *wp) record_map = {} for r in [read_row(row, *s)[0] for row in c.fetchall()]: pk_values = {c.name:v for c, v in r if c.pk} record_map[tuple([v for _, v in check_columns(cls, pk_values, lambda c: c.pk, True)])] = r res.extend([record_map[k] for k in ordered_pks if k in record_map]) index += per_page return res
def fetch_one(db: Connection, condition: Conditional = Condition: '' -- [], lock: Optional[Any] = None) ‑> Optional[typing_extensions.Self]
-
Fetch a record which satisfies the condition.
ValueError
raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key.t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1
Args
db
- DB connection.
condition
- Query condition.
lock
- Locking statement.
Returns
Model objects If exists, otherwise
None
.Expand source code
@classmethod def fetch_one( cls, db: Connection, condition: Conditional = Q.of(), lock: Optional[Any] = None, ) -> Optional[Self]: """ Fetch a record which satisfies the condition. `ValueError` raises When multiple records are found. Use this method for queries which certainly returns a single row, such as search by unique key. ```python t.fetch_one(db, Q.eq(c1=1)) # SQL: SELECT * FROM t WHERE c1 = 1 ``` Args: db: DB connection. condition: Query condition. lock: Locking statement. Returns: Model objects If exists, otherwise `None`. """ rs = cls.fetch_where(db, condition, lock=lock) if not rs: return None elif len(rs) == 1: return rs[0] else: raise ValueError(f"{len(rs)} records are found on the invocation of fetch_one().")
def fetch_where(db: Connection, condition: Conditional = Condition: '' -- [], orders: collections.abc.Mapping[typing.Union[str, AliasedColumn], typing.Union[bool, tuple[bool, bool], str]] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None) ‑> list[typing_extensions.Self]
-
Fetch records which satisfy the condition.
t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5
Args
db
- DB connection.
condition
- Query condition.
orders
- Ordering specification where key is column name and value denotes whether the order is ascending or not.
limit
- Maximum nuber of rows to fetch. If
None
, all rows are returned. offset
- The number of rows to skip.
lock
- Locking statement.
Returns
Model objects.
Expand source code
@classmethod def fetch_where( cls, db: Connection, condition: Conditional = Q.of(), orders: Mapping[Union[str, AliasedColumn], ORDER] = {}, limit: Optional[int] = None, offset: Optional[int] = None, lock: Optional[Any] = None, ) -> list[Self]: """ Fetch records which satisfy the condition. ```python t.fetch_where(db, Q.eq(c1=1), dict(c2=True), 10, 5) # SQL: SELECT * FROM t WHERE c1 = 1 ORDER BY c2 ASC LIMIT 10 OFFSET 5 ``` Args: db: DB connection. condition: Query condition. orders: Ordering specification where key is column name and value denotes whether the order is ascending or not. limit: Maximum nuber of rows to fetch. If `None`, all rows are returned. offset: The number of rows to skip. lock: Locking statement. Returns: Model objects. """ wc, wp = where(condition) rc, rp = ranged_by(limit, offset) s = cls.select() c = db.stmt().execute(f"SELECT {s} FROM {cls.name}{_spacer(wc)}{_spacer(order_by(orders))}{_spacer(rc)}{_spacer(lock)}", *(wp + rp)) return [read_row(row, *s)[0] for row in c.fetchall()]
def insert(db: Connection, record: Union[typing_extensions.Self, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False) ‑> typing_extensions.Self
-
Insert a record.
If
returning
isTrue
and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2)
Args
db
- DB connection.
record
- Object contains column values.
qualifier
- Functions qualifying placeholder markers.
returning
- Flag to return inserted record with complete and correct column values.
Returns
Model of inserted record.
Expand source code
@classmethod def insert( cls, db: Connection, record: Union[Self, dict[str, Any]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ) -> Self: """ Insert a record. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. ```python t.insert(db, dict(c1=1, c2=2)) # SQL: INSERT INTO t (c1, c2) VALUES (1, 2) ``` Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted record with complete and correct column values. Returns: Model of inserted record. """ model: Self = cast(Self, record) if isinstance(record, cls) else cls(**cast(dict, record)) sql, _, vals = cls._insert_sql(record, qualifier) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *vals) s = cls.select() return read_row(c.fetchone(), *s)[0] else: # REVIEW # Inserted row can't be specified from the table where no primary keys are defined . pass db.stmt().execute(sql, *vals) for c, v in cls.last_sequences(db, 1): setattr(model, c.name, v) return model
def insert_many(db: Connection, records: list[typing.Union[typing_extensions.Self, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
-
Insert records.
If
returning
isTrue
and the DBMS supports RETURNING clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set.Args
db
- DB connection.
record
- Object contains column values.
qualifier
- Functions qualifying placeholder markers.
returning
- Flag to return inserted records with complete and correct column values.
Returns
Models of inserted records or cursor.
Expand source code
@classmethod def insert_many( cls, db: Connection, records: list[Union[Self, dict[str, Any]]], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Insert records. If `returning` is `True` and the DBMS supports **RETURNING** clause, returned model object contains comple and correct column values. Otherwise, auto incremental value is set to the returned model object but other column values generated inside DBMS such as default value are not set. Args: db: DB connection. record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return inserted records with complete and correct column values. Returns: Models of inserted records or cursor. """ if len(records) == 0: return [] models: list[Self] = [cast(Self, r) if isinstance(r, cls) else cls(**cast(dict, r)) for r in records] seq_of_params = [] sql, cols, params = cls._insert_sql(models[0], qualifier) cols = set(cols) seq_of_params.append(params) for m in models[1:]: value_dict = model_values(cls, m) check_columns(cls, value_dict, lambda c: c.name in cols, requires_all=True) # REVIEW: # The consistency among columns where expression is set is not checked. _, _, params = cls._insert_sql(m, qualifier) seq_of_params.append(params) db.stmt().executemany(sql, seq_of_params) num = len(records) for c, v in cls.last_sequences(db, num): for i, m in enumerate(models): setattr(m, c.name, v - (num - i - 1)) if returning: seq_pks = [extract_pks(cls, m) for m in models] return cls.fetch_many(db, seq_pks) else: return models
def last_sequences(db: Connection, num: int) ‑> list[tuple[Column, int]]
-
Returns the sequential (auto incremental) values of a table generated by the latest insertion.
Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned.
This method should be overridden by another mixin class defined in dialect module.
Args
db
- DB connection.
num
- The number of records inserted by the latest query.
Returns
List of pairs of column and its values.
Expand source code
@classmethod def last_sequences(cls, db: Connection, num: int) -> list[tuple[Column, int]]: """ Returns the sequential (auto incremental) values of a table generated by the latest insertion. Result contains every sequential columns and their values. When the latest query inserts multiple rows, only the last (= biggest) value is returned. This method should be overridden by another mixin class defined in dialect module. Args: db: DB connection. num: The number of records inserted by the latest query. Returns: List of pairs of column and its values. """ return []
def support_returning(db: Connection) ‑> bool
-
Checks whehter this DBMS support RETURNING clause or not.
Args
db
- DB connection.
Returns
Whehter this DBMS support RETURNING clause or not.
Expand source code
@classmethod def support_returning(cls, db: Connection) -> bool: """ Checks whehter this DBMS support **RETURNING** clause or not. Args: db: DB connection. Returns: Whehter this DBMS support **RETURNING** clause or not. """ return False
def update(db: Connection, pks: Union[Any, dict[str, Any]], record: Union[Meta, dict[str, Any]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
-
Update a record by primary key(s).
This method only updates columns which are found in
record
except for primary key(s).t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1
Args
db
- DB connection.
pks
- Primary key value(s).
record
- Object contains column values.
qualifier
- Functions qualifying placeholder markers.
returning
- Flag to return updated records with complete and correct column values.
Returns
Whether the record exists and updated or updated record model.
Expand source code
@classmethod def update( cls, db: Connection, pks: PKS, record: Record, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update a record by primary key(s). This method only updates columns which are found in `record` except for primary key(s). ```python t.update(db, 1, dict(c1=1, c2=2)) # SQL: UPDATE t SET c1 = 1, c2 = 2 WHERE id = 1 ``` Args: db: DB connection. pks: Primary key value(s). record: Object contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: Whether the record exists and updated or updated record model. """ cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) if returning: if cls.support_returning(db): models = cls.update_where(db, record, condition, qualifier, returning=True) return models[0] if models else None else: models = cls.update_where(db, record, condition, qualifier, returning=False) return cls.fetch(db, pks) else: return cls.update_where(db, record, condition, qualifier, returning=False) == 1
def update_many(db: Connection, records: collections.abc.Sequence[typing.Union[Meta, dict[str, typing.Any]]], qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False)
-
Update records by set of primary key(s).
This method invokes on
executemany
defined in DB-API 2.0. Whether it is optimized compared toexecute
depends on DB driver.Args
db
- DB connection.
record
- Sequence of objects contains column values.
qualifier
- Functions qualifying placeholder markers.
returning
- Flag to return updated records with complete and correct column values.
Returns
The number of affected rows or updated records.
Expand source code
@classmethod def update_many( cls, db: Connection, records: Sequence[Record], qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, ): """ Update records by set of primary key(s). This method invokes on `executemany` defined in DB-API 2.0. Whether it is optimized compared to `execute` depends on DB driver. Args: db: DB connection. record: Sequence of objects contains column values. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. Returns: The number of affected rows or updated records. """ if len(records) == 0: return [] if returning else 0 keys = {c.name for c in cls.columns if c.pk} if len(keys) == 0: raise ValueError(f"update_many is not available because {cls} does not have primary key columns.") def classify(acc: tuple[dict[str, Any], dict[str, Any]], cv: tuple[str, Any]): if cv[0] in keys: acc[0][cv[0]] = cv[1] else: acc[1][cv[0]] = cv[1] return acc seq_of_values: list[tuple[dict[str, Any], dict[str, Any]]] = [] target_columns: Optional[set[str]] = None for vs in [model_values(cls, r, excludes_pk=False) for r in records]: if not keys < vs.keys(): raise ValueError(f"Every row must contain values of all primary keys and at least one update column value.") pks, rec = reduce(classify, vs.items(), ({}, {})) if target_columns is None: check_columns(cls, rec) target_columns = set(rec.keys()) else: check_columns(cls, rec, lambda c: c.name in target_columns, True) # type: ignore seq_of_values.append((pks, rec)) sql_first = "" seq_of_params: list[list[Any]] = [] for pks, rec in seq_of_values: cols, vals = parse_pks(cls, pks) condition = Conditional.all([Q.eq(**{c: v}) for c, v in zip(cols, vals)]) sql, _, params = cls._update_sql(rec, condition, qualifier) if not sql_first: sql_first = sql seq_of_params.append(params) if returning: db.stmt().executemany(f"{sql_first}", seq_of_params) return cls.fetch_many(db, [pks for pks, _ in seq_of_values]) else: return db.stmt().executemany(sql_first, seq_of_params).rowcount
def update_where(db: Connection, record: Union[Meta, dict[str, Any]], condition: Conditional, qualifier: collections.abc.Mapping[str, collections.abc.Callable[[str], str]] = {}, /, returning: bool = False, allow_all: bool = True)
-
Update records which satisfy the condition.
t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1
Args
db
- DB connection.
record
- Object contains column values.
condition
- Query condition.
qualifier
- Functions qualifying placeholder markers.
returning
- Flag to return updated records with complete and correct column values.
allow_all
- If
False
, empty condition raisesValueError
.
Returns
The number of affected rows or updated records.
Expand source code
@classmethod def update_where( cls, db: Connection, record: Record, condition: Conditional, qualifier: Mapping[str, Qualifier] = {}, /, returning: bool = False, allow_all: bool = True, ): """ Update records which satisfy the condition. ```python t.update(db, dict(c2=2), Q.eq(c1=1)) # SQL: UPDATE t SET c2 = 2 WHERE c1 = 1 ``` Args: db: DB connection. record: Object contains column values. condition: Query condition. qualifier: Functions qualifying placeholder markers. returning: Flag to return updated records with complete and correct column values. allow_all: If `False`, empty condition raises `ValueError`. Returns: The number of affected rows or updated records. """ sql, _, params = cls._update_sql(record, condition, qualifier, allow_all) if returning: if cls.support_returning(db): c = db.stmt().execute(f"{sql} RETURNING *", *params) s = cls.select() return [read_row(row, *s)[0] for row in c.fetchall()] else: raise NotImplementedError(f"RETURNING is not supported and there is no way to fetch updated rows exactly.") else: c = db.stmt().execute(sql, *params) return c.rowcount
Inherited members
class Column (name: str, ptype: type, type_info: Optional[Any], pk: bool, fk: Optional[Relations], incremental: Optional[Any], nullable: bool, comment: str = '')
-
This class represents a schema of a column.
Expand source code
class Column: """ This class represents a schema of a column. """ def __init__( self, name: str, ptype: type, type_info: Optional[Any], pk: bool, fk: Optional[Relations], incremental: Optional[Any], nullable: bool, comment: str = "", ): #: Column name. self.name = name #: Data type in python. self.ptype = ptype #: Type informations obtained from DB. self.type_info = type_info #: Is this column a primary key? self.pk = pk #: Foreign key constraints. self.fk = fk #: If this column is auto-incremental, this object contains the information of the feature, otherwise, `None`. self.incremental = incremental #: Can this column contain null? self.nullable = nullable #: Comment of the column. self.comment = comment
Instance variables
var comment
-
Comment of the column.
var fk
-
Foreign key constraints.
var incremental
-
If this column is auto-incremental, this object contains the information of the feature, otherwise,
None
. var name
-
Column name.
var nullable
-
Can this column contain null?
var pk
-
Is this column a primary key?
var ptype
-
Data type in python.
var type_info
-
Type informations obtained from DB.
class Conditional (expression='', params=None)
-
Represents a query condition composed of an expression and parameters.
Parameters must be a list where the index of each parameter matches the index of placeholder for it. The expression accepts only the unified marker
$_
.Applying logical operators such as
&
,|
and~
generates new condition.>>> c1 = Q.of("a = $_", 0) >>> c2 = Q.of("b < $_", 1) >>> c3 = Q.of("c > $_", 2) >>> c = ~(c1 & c2 | c3) >>> c Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2]
Expand source code
class Conditional(Expression): """ Represents a query condition composed of an expression and parameters. Parameters must be a list where the index of each parameter matches the index of placeholder for it. The expression accepts only the unified marker `$_`. Applying logical operators such as `&`, `|` and `~` generates new condition. ```python >>> c1 = Q.of("a = $_", 0) >>> c2 = Q.of("b < $_", 1) >>> c3 = Q.of("c > $_", 2) >>> c = ~(c1 & c2 | c3) >>> c Condition: NOT (((a = $_) AND (b < $_)) OR (c > $_)) -- [0, 1, 2] ``` """ @classmethod def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `AND`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ return reduce(lambda acc, c: acc & c, conditionals, Conditional()) @classmethod def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `OR`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ if len(conditionals) == 0: return Conditional("1 = 0") return reduce(lambda acc, c: acc | c, conditionals, Conditional()) def __init__(self, expression="", params=None): super().__init__(expression, params or []) def __repr__(self): return f"Condition: '{self.expression}' -- {self.params}" def __and__(self, other) -> 'Conditional': expression = "" if self.expression and other.expression: expression = f"({self.expression}) AND ({other.expression})" elif self.expression: expression = self.expression elif other.expression: expression = other.expression return Conditional(expression, self.params + other.params) def __or__(self, other) -> 'Conditional': expression = "" if self.expression and other.expression: expression = f"({self.expression}) OR ({other.expression})" elif self.expression: expression = self.expression elif other.expression: expression = other.expression return Conditional(expression, self.params + other.params) def __invert__(self) -> 'Conditional': if self.expression: return Conditional(f"NOT ({self.expression})", self.params) else: return Conditional(f"1 = 0", [])
Ancestors
Static methods
def all(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional
-
Concatenates condition objects with
AND
.Args
conditionals
- Condition objects.
Returns
Concatenated condition object.
Expand source code
@classmethod def all(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `AND`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ return reduce(lambda acc, c: acc & c, conditionals, Conditional())
def any(conditionals: collections.abc.Sequence['Conditional']) ‑> Conditional
-
Concatenates condition objects with
OR
.Args
conditionals
- Condition objects.
Returns
Concatenated condition object.
Expand source code
@classmethod def any(cls, conditionals: Sequence['Conditional']) -> 'Conditional': """ Concatenates condition objects with `OR`. Args: conditionals: Condition objects. Returns: Concatenated condition object. """ if len(conditionals) == 0: return Conditional("1 = 0") return reduce(lambda acc, c: acc | c, conditionals, Conditional())
Inherited members
class Connection (api, conn: Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None)
-
Wrapper class of DB-API 2.0 Connection.
Every instance works as the proxy object to original connection, therefore any attribute in it is still available.
Expand source code
class Connection(dbapi.Connection): """ Wrapper class of DB-API 2.0 Connection. Every instance works as the proxy object to original connection, therefore any attribute in it is still available. """ _characters = string.ascii_letters + string.digits + ".=" def __init__(self, api, conn: dbapi.Connection, context_factory: Optional[Callable[[], ConnectionContext]] = None): #: A string which identifies a connection. self.identifier = self._gen_identifier() #: DB-API 2.0 module. self.api = api #: Original connection object. self.conn = conn self._context_factory = context_factory self._context = None def __getattr__(self, name): return getattr(self.conn, name) def __enter__(self): if hasattr(self.conn, "__enter__"): self.conn.__enter__() # type: ignore return self def __exit__(self, exc_type, exc_value, traceback): if hasattr(self.conn, "__exit__"): self.conn.__exit__(exc_type, exc_value, traceback) # type: ignore else: if exc_value is None: self.conn.rollback() else: self.conn.commit() self.conn.close() def _gen_identifier(self): return threading.current_thread().name + "-" + secrets.token_hex(4) @property def context(self) -> ConnectionContext: """ Context object used for this connection. """ if not self._context: self._context = (self._context_factory or ConnectionContext)() self._context.identifier = self.identifier return self._context def close(self) -> None: return self.conn.close() def commit(self) -> None: return self.conn.commit() def rollback(self) -> None: return self.conn.rollback() def cursor(self) -> dbapi.Cursor: return self.conn.cursor() def use(self, factory: Callable[[], ConnectionContext]) -> Self: """ Sets factory function of `ConnectionContext` to use custom context. When the context is already set, it will be replaced with new one. Args: factory: Function returning custom context. Returns: This instance. """ self._context_factory = factory self._context = None return self def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement': """ Creates new `Statement` which executes queries on this connection. Args: context: Context object used in the statement. If `None`, the context of this connection is used. Returns: Created statement. """ return Statement(self, context or self.context)
Ancestors
- Connection
- typing.Protocol
- typing.Generic
Instance variables
var api
-
DB-API 2.0 module.
var conn
-
Original connection object.
var context : ConnectionContext
-
Context object used for this connection.
Expand source code
@property def context(self) -> ConnectionContext: """ Context object used for this connection. """ if not self._context: self._context = (self._context_factory or ConnectionContext)() self._context.identifier = self.identifier return self._context
var identifier
-
A string which identifies a connection.
Methods
def close(self) ‑> None
-
Expand source code
def close(self) -> None: return self.conn.close()
def commit(self) ‑> None
-
Expand source code
def commit(self) -> None: return self.conn.commit()
def cursor(self) ‑> Cursor
-
Expand source code
def cursor(self) -> dbapi.Cursor: return self.conn.cursor()
def rollback(self) ‑> None
-
Expand source code
def rollback(self) -> None: return self.conn.rollback()
def stmt(self, context: Optional[ConnectionContext] = None) ‑> Statement
-
Creates new
Statement
which executes queries on this connection.Args
context
- Context object used in the statement. If
None
, the context of this connection is used.
Returns
Created statement.
Expand source code
def stmt(self, context: Optional[ConnectionContext] = None) -> 'Statement': """ Creates new `Statement` which executes queries on this connection. Args: context: Context object used in the statement. If `None`, the context of this connection is used. Returns: Created statement. """ return Statement(self, context or self.context)
def use(self, factory: Callable[[], ConnectionContext]) ‑> typing_extensions.Self
-
Sets factory function of
ConnectionContext
to use custom context.When the context is already set, it will be replaced with new one.
Args
factory
- Function returning custom context.
Returns
This instance.
Expand source code
def use(self, factory: Callable[[], ConnectionContext]) -> Self: """ Sets factory function of `ConnectionContext` to use custom context. When the context is already set, it will be replaced with new one. Args: factory: Function returning custom context. Returns: This instance. """ self._context_factory = factory self._context = None return self
class ConnectionContext (identifier: Optional[str] = None, **configurations)
-
This class represents a context of query execution.
By default, the context based on global configuration is used. You should declare your own context class and set it to
Connection.use()
if you want to change the behavior.Custom context is also useful to change cursor state before and after query execution. Overwrite
execute
method to do it.Expand source code
class ConnectionContext: """ This class represents a context of query execution. By default, the context based on global configuration is used. You should declare your own context class and set it to `Connection.use` if you want to change the behavior. Custom context is also useful to change cursor state before and after query execution. Overwrite `execute` method to do it. """ def __init__(self, identifier: Optional[str] = None, **configurations): #: Identifier of this context. `None` by default. self.identifier = identifier #: Configuration used in this context. self.config = default_config().derive(**configurations) def _message(self, message): return f"({self.identifier}) {message}" if self.identifier else message def configure(self, **configurations: Any) -> 'ConnectionContext': """ Change configurations of this context. Changes by this method never affect global configuration even if the context is based on it. Args: configurations: Configurations. See `pyracmon.config` to know available keys. Returns: This instance. """ self.config.set(**configurations) return self def execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor: """ Executes a query on a cursor. Args: cursor: Cursor object. sql: Query string. params: Query parameters. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, params, False) def executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor: """ Repeats query on a cursor for sequencee of parameters. This method works similar to `execute` but invoke `executemany` instead. Args: cursor: Cursor object. sql: Query string. seq_of_args: A sequence of parameters of the query. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, seq_of_params, True) @overload def _execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS, is_many: Literal[False] = False) -> dbapi.Cursor: ... @overload def _execute(self, cursor: dbapi.Cursor, sql: str, params: Sequence[PARAMS], is_many: Literal[True] = True) -> dbapi.Cursor: ... def _execute( self, cursor: dbapi.Cursor, sql: str, params, is_many: bool = False, ) -> dbapi.Cursor: logger = _logger(self.config) if logger: sql_log = sql if len(sql) <= self.config.sql_log_length else f"{sql[0:self.config.sql_log_length]}..." logger.log(self.config.log_level, self._message(sql_log)) if self.config.parameter_log: if is_many: for ps in params: logger.log(self.config.log_level, self._message(f"Parameters: {ps}")) else: logger.log(self.config.log_level, self._message(f"Parameters: {params}")) if is_many: cursor.executemany(sql, params) else: cursor.execute(sql, params) return cursor
Instance variables
var config
-
Configuration used in this context.
var identifier
-
Identifier of this context.
None
by default.
Methods
def configure(self, **configurations: Any) ‑> ConnectionContext
-
Change configurations of this context.
Changes by this method never affect global configuration even if the context is based on it.
Args
configurations
- Configurations. See
pyracmon.config
to know available keys.
Returns
This instance.
Expand source code
def configure(self, **configurations: Any) -> 'ConnectionContext': """ Change configurations of this context. Changes by this method never affect global configuration even if the context is based on it. Args: configurations: Configurations. See `pyracmon.config` to know available keys. Returns: This instance. """ self.config.set(**configurations) return self
def execute(self, cursor: Cursor, sql: str, params: Union[list[Any], dict[str, Any]]) ‑> Cursor
-
Executes a query on a cursor.
Args
cursor
- Cursor object.
sql
- Query string.
params
- Query parameters.
Returns
Given cursor object. Internal state may be changed by the execution of the query.
Expand source code
def execute(self, cursor: dbapi.Cursor, sql: str, params: PARAMS) -> dbapi.Cursor: """ Executes a query on a cursor. Args: cursor: Cursor object. sql: Query string. params: Query parameters. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, params, False)
def executemany(self, cursor: Cursor, sql: str, seq_of_params: collections.abc.Sequence[typing.Union[list[typing.Any], dict[str, typing.Any]]]) ‑> Cursor
-
Repeats query on a cursor for sequencee of parameters.
This method works similar to
execute
but invokeexecutemany
instead.Args
cursor
- Cursor object.
sql
- Query string.
seq_of_args
- A sequence of parameters of the query.
Returns
Given cursor object. Internal state may be changed by the execution of the query.
Expand source code
def executemany(self, cursor: dbapi.Cursor, sql: str, seq_of_params: Sequence[PARAMS]) -> dbapi.Cursor: """ Repeats query on a cursor for sequencee of parameters. This method works similar to `execute` but invoke `executemany` instead. Args: cursor: Cursor object. sql: Query string. seq_of_args: A sequence of parameters of the query. Returns: Given cursor object. Internal state may be changed by the execution of the query. """ return self._execute(cursor, sql, seq_of_params, True)
class ContainerView (*args, **kwargs)
-
The interface of the view of a node set, i.e.
NodeContainer
andNode.Children
.Expand source code
class ContainerView(Protocol, Generic[T]): """ The interface of the view of a node set, i.e. `NodeContainer` and `Node.Children` . """ def __bool__(self) -> bool: """Returns whether this container is not empty.""" ... def __call__(self) -> T: """Returns a base container.""" ... def __len__(self) -> int: """Returns the number of nodes.""" ... def __iter__(self) -> Iterator['NodeView']: """Iterates views of nodes.""" ... @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" ... def __getattr__(self, key) -> 'ContainerView': """Returns a view of the first node or empty container view if it does not exist.""" ...
Ancestors
- typing.Protocol
- typing.Generic
Subclasses
- pyracmon.graph.graph._EmptyContainerView
class Expression (expression: str, params: list[typing.Any])
-
Abstraction of expression in any query.
Expand source code
class Expression: """ Abstraction of expression in any query. """ def __init__(self, expression: str, params: list[Any]): #: Expression string. self.expression = expression #: Parameters corresponding to placeholders in the expression. self.params = params
Subclasses
Instance variables
var expression
-
Expression string.
var params
-
Parameters corresponding to placeholders in the expression.
class Graph (template: GraphTemplate)
-
This class represents a graph composed of tree-structured node containers.
The structure is determined by
GraphTemplate
. Usenew_graph()
Instead of constructor to create new graph instance.template = GraphSpac().new_template( a = (int, lambda x:x), b = (str, lambda x:x), c = (str, lambda x:x), ) template.a << template.b << template.c graph = new_graph(template)
In above code, a graph which has 3 properties (
a
b
c
) and a structure wherea
is parent ofb
andb
is parent ofc
is created.append
(replace
) is a method to store entities in the graph with tying them each other according to the structure. Entites are encapsulated byNode
which can have an edge to parent node.graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y")
In
append
, entities are first sorted in descending order, and then:- Search a node whose entity is identical to the first entity from the corresponding node container.
- If found, new node is not created and the identical node is set to next parent.
- Otherwise, new node is appended and it is set to next parent.
- Apply this to following entities in order. A difference is that identical node is searched from the sequence of parents in the session.
In example here, the identification is done by entity value itself (
lambda x:x
). Next code is the example where identical nodes are found.graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y")
In the first
append
,a
andb
has its identical node anda
is identical in the second.c
in the second one is not identical to any node because parent nodeb="c"
is already added as new node.Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating
append
.Expand source code
class Graph: """ This class represents a graph composed of tree-structured node containers. The structure is determined by `GraphTemplate`. Use `new_graph` Instead of constructor to create new graph instance. ```python template = GraphSpac().new_template( a = (int, lambda x:x), b = (str, lambda x:x), c = (str, lambda x:x), ) template.a << template.b << template.c graph = new_graph(template) ``` In above code, a graph which has 3 properties ( `a` `b` `c` ) and a structure where `a` is parent of `b` and `b` is parent of `c` is created. `append` ( `replace` ) is a method to store entities in the graph with tying them each other according to the structure. Entites are encapsulated by `Node` which can have an edge to parent node. ```python graph.append(a=1, b="a", c="x").append(a=2, b="b", c="y") ``` In `append`, entities are first sorted in descending order, and then: - Search a node whose entity is *identical* to the first entity from the corresponding node container. - If found, new node is not created and the *identical* node is set to next parent. - Otherwise, new node is appended and it is set to next parent. - Apply this to following entities in order. A difference is that *identical* node is searched from the sequence of parents in the session. In example here, the identification is done by entity value itself ( `lambda x:x` ). Next code is the example where *identical* nodes are found. ```python graph.append(a=1, b="a", c="z").append(a=2, b="c", c="y") ``` In the first `append`, `a` and `b` has its *identical* node and `a` is *identical* in the second. `c` in the second one is not *identical* to any node because parent node `b="c"` is already added as new node. Due to the identification mechanism, entity relationships in the graph is guaranteed after repeating `append` . """ def __init__(self, template: GraphTemplate): #: Graph template. self.template: GraphTemplate = template #: A `dict` containing node containers by their names. self.containers: dict[str, NodeContainer] = {p.name:self._to_container(p) for p in template} self._view = None def _to_container(self, prop: GraphTemplate.Property) -> 'NodeContainer': if isinstance(prop.kind, GraphTemplate): return _GraphNodeContainer(prop) else: return NodeContainer(prop) def _container_of(self, prop: GraphTemplate.Property) -> Optional['NodeContainer']: candidates = [c for c in self.containers.values() if c.prop.is_compatible(prop)] if len(candidates) > 1: raise ValueError(f"Container can't be determined from property '{prop.name}'.") return candidates[0] if candidates else None def __add__(self, another: Union[Self, GraphView]) -> 'Graph': """ Create new graph by adding this graph and another graph. New graph has the same template as this graph's. On the other hand, because this method depends on `__iadd__()`, another graph must not have the same template. Args: another: Graph or its view. Returns: Created graph. """ graph = Graph(self.template) graph += self graph += another return graph def __iadd__(self, another: Union[Self, GraphView]) -> Self: """ Append nodes from another graph. Nodes of another graph are traversed from its root and appended to compatible containers each other. Args: another: Graph or its view. Returns: This graph. """ graph = another if isinstance(another, Graph) else another() def add(n: Node, anc: dict[str, list[Node]]): c = self._container_of(n.prop) if c: c.append(n.entity, anc) for ch_ in n.children.values(): for m in ch_.nodes: add(m, anc.copy()) for c_ in graph.roots: for n_ in c_.nodes: add(n_, {}) return self @property def roots(self) -> Iterable['NodeContainer']: """ Returns root node containers. """ return filter(lambda c: c.prop.parent is None, self.containers.values()) @property def view(self) -> GraphView: """ Returns an unmodifiable view of this graph. The view object works as the accessor to graph nodes. ```python >>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iteration ``` """ if self._view is None: graph = self class _GraphView: def __call__(self) -> Graph: """Returns the greph of this view.""" return graph def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]: """Iterates views of root containers.""" return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values())) def __getattr__(self, name: str) -> ContainerView: """Returns a view of a container of the name.""" return graph.containers[name].view self._view = _GraphView() return self._view def _append(self, to_replace: bool, entities: dict[str, Any]) -> Self: props = [p for p in self.template if p.name in entities] filtered = set() for p in props: if (p.parent is None) or (p.parent.name not in entities) or (p.parent.name in filtered): if p.entity_filter is None or p.entity_filter(entities[p.name]): filtered.add(p.name) ancestors = {} for k in [p.name for p in props if p.name in filtered]: self.containers[k].append(entities[k], ancestors, to_replace) return self def append(self, **entities: Any) -> Self: """ Append entities with associated property names. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(False, entities) def replace(self, **entities: Any) -> Self: """ Works similarly to `append`, but entities of identical nodes are replaced with given entities. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(True, entities)
Instance variables
var containers
-
A
dict
containing node containers by their names. var roots : collections.abc.Iterable['NodeContainer']
-
Returns root node containers.
Expand source code
@property def roots(self) -> Iterable['NodeContainer']: """ Returns root node containers. """ return filter(lambda c: c.prop.parent is None, self.containers.values())
var template
-
Graph template.
var view : GraphView
-
Returns an unmodifiable view of this graph.
The view object works as the accessor to graph nodes.
>>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iteration
Expand source code
@property def view(self) -> GraphView: """ Returns an unmodifiable view of this graph. The view object works as the accessor to graph nodes. ```python >>> template = GraphSpac().new_template(a=int, b=str, c=str) >>> template.a << template.b >>> graph = new_graph(template) >>> view = graph.view >>> assert view() is graph # invocation >>> assert view.a is graph.containers["a"].view # attribute >>> assert [c().name for c in view] == ["a", "c"] # iteration ``` """ if self._view is None: graph = self class _GraphView: def __call__(self) -> Graph: """Returns the greph of this view.""" return graph def __iter__(self) -> Iterator[tuple[str, ContainerView[NodeContainer]]]: """Iterates views of root containers.""" return map(lambda c: (c.name, c.view), filter(lambda c: c.prop.parent is None, graph.containers.values())) def __getattr__(self, name: str) -> ContainerView: """Returns a view of a container of the name.""" return graph.containers[name].view self._view = _GraphView() return self._view
Methods
def append(self, **entities: Any) ‑> typing_extensions.Self
-
Append entities with associated property names.
Args
entities
- Entities keyed with associated property names.
Returns
This graph.
Expand source code
def append(self, **entities: Any) -> Self: """ Append entities with associated property names. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(False, entities)
def replace(self, **entities: Any) ‑> typing_extensions.Self
-
Works similarly to
append
, but entities of identical nodes are replaced with given entities.Args
entities
- Entities keyed with associated property names.
Returns
This graph.
Expand source code
def replace(self, **entities: Any) -> Self: """ Works similarly to `append`, but entities of identical nodes are replaced with given entities. Args: entities: Entities keyed with associated property names. Returns: This graph. """ return self._append(True, entities)
- Search a node whose entity is identical to the first entity from the corresponding node container.
class GraphSchema (spec: Any, template: GraphTemplate, **serializers: NodeSerializer)
-
This class exposes a property to get the schema of serialization result of a graph.
TODO: Dependency to
GraphSpec
should be replaced in another way.Expand source code
class GraphSchema: """ This class exposes a property to get the schema of serialization result of a graph. TODO: Dependency to `GraphSpec` should be replaced in another way. """ def __init__(self, spec: Any, template: GraphTemplate, **serializers: NodeSerializer): #: Specification of graph operations. self.spec = spec #: Graph template to serialize. self.template = template #: `NodeSerializer`s used for the serialization. self.serializers = serializers def _return_from(self, prop: GraphTemplate.Property) -> type: """ Get a type the node of passed property will be serialized. """ ns = self.serializers[prop.name] # Type of the node entity. entity_type = prop.kind if isinstance(entity_type, GraphTemplate): # GraphTemplate type is ignored because serializer added by sub() resolve the type by iteself. entity_type = _templateType(entity_type) # Return type of the NodeSerializer. ns_type = signature(ns.serializer).return_annotation # Return type of base serializer obtained from GraphSpec. base = chain_serializers(self.spec.find_serializers(entity_type)) base_type = signature(base).return_annotation if base else Signature.empty #base_type = entity_type if base_type == Signature.empty else base_type # If the return type contains a single type parameter, previous type is applied to it. # Serializer without return annotation is supposed to return input type as it is. def next_resolvable(it: Iterator[type]) -> type: while True: res = next(it, None) if res is None: break elif res != Signature.empty: return res return Signature.empty def resolve(it: Iterator[type]) -> type: origin = next_resolvable(it) if origin == Signature.empty: return origin elif issubgeneric(origin, Typeable): if not Typeable.is_resolved(origin): param = resolve(it) if param == Signature.empty: # Type parameter is not known. return Signature.empty # Replace type parameter. origin = origin[param] # type: ignore return Typeable.resolve(origin, resolve(it), self.spec) else: args = get_args(origin) if args: # origin is generics. type_params = list(filter(lambda ia: isinstance(ia[1], TypeVar), enumerate(args))) # python < 3.10 param_num = len(type_params) if param_num == 0: return origin elif param_num == 1: # Replace type parameter param = resolve(it) return origin[param] # type: ignore else: return Signature.empty # python >= 3.10 #match len(type_params): # case 0: # return origin # case 1: # # Replace type parameter # param = resolve(it) # return origin[param] # type: ignore # case _: # return Signature.empty else: return origin return resolve(iter([ns_type, base_type, entity_type, entity_type])) def schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]: """ Generates structured and documented schema for a template property. Args: prop: A template property. Returns: Schema with documentation. """ return_type = self._return_from(prop) doc = self.serializers[prop.name]._doc or "" # TypedDict type is also a subclass of dict. if issubclass(return_type, dict): annotations = {} for c in filter(lambda c: c.name in self.serializers, prop.children): ns = self.serializers[c.name] cs = self.schema_of(c) t, d = decompose_document(cs) if ns.be_merged: if not issubclass(t, dict): raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.") annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, cs) annotations[ns.namer(c.name)] = rt else: annotations[ns.namer(c.name)] = document_type(list[t], d) td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None return document_type(generate_schema(annotations, td_type), doc) else: return document_type(return_type, doc) @property def schema(self) -> type[TypedDict]: """ Generates `TypedDict` which represents the schema of serialized graph. """ annotations: dict[str, Any] = {} def put_root_schema(p: GraphTemplate.Property): nonlocal annotations ns = self.serializers[p.name] dt = self.schema_of(p) if ns.be_merged: t, d = decompose_document(dt) annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, dt) annotations[ns.namer(p.name)] = rt else: t, d = decompose_document(dt) annotations[ns.namer(p.name)] = document_type(list[t], d) roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values()) for p in roots: put_root_schema(p) return generate_schema(annotations) def serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]: """ Serialize graph into a dictionary. Args: graph: A view of a graph. node_params: Parameters passed to `SerializationContext` and used by *serializer* s. Returns: Serialization result. """ return self.spec.to_dict(graph, node_params, **self.serializers)
Instance variables
var schema : type[typing.TypedDict]
-
Generates
TypedDict
which represents the schema of serialized graph.Expand source code
@property def schema(self) -> type[TypedDict]: """ Generates `TypedDict` which represents the schema of serialized graph. """ annotations: dict[str, Any] = {} def put_root_schema(p: GraphTemplate.Property): nonlocal annotations ns = self.serializers[p.name] dt = self.schema_of(p) if ns.be_merged: t, d = decompose_document(dt) annotations.update(**{ns.namer(k):t_ for k, t_ in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, dt) annotations[ns.namer(p.name)] = rt else: t, d = decompose_document(dt) annotations[ns.namer(p.name)] = document_type(list[t], d) roots = filter(lambda p: p.parent is None and p.name in self.serializers, self.template._properties.values()) for p in roots: put_root_schema(p) return generate_schema(annotations)
var serializers
-
NodeSerializer
s used for the serialization. var spec
-
Specification of graph operations.
var template
-
Graph template to serialize.
Methods
def schema_of(self, prop: GraphTemplate.Property) ‑> Type[Annotated]
-
Generates structured and documented schema for a template property.
Args
prop
- A template property.
Returns
Schema with documentation.
Expand source code
def schema_of(self, prop: GraphTemplate.Property) -> Type[Annotated]: """ Generates structured and documented schema for a template property. Args: prop: A template property. Returns: Schema with documentation. """ return_type = self._return_from(prop) doc = self.serializers[prop.name]._doc or "" # TypedDict type is also a subclass of dict. if issubclass(return_type, dict): annotations = {} for c in filter(lambda c: c.name in self.serializers, prop.children): ns = self.serializers[c.name] cs = self.schema_of(c) t, d = decompose_document(cs) if ns.be_merged: if not issubclass(t, dict): raise ValueError(f"Property '{c.name}' is not configured to be serialized into dict.") annotations.update(**{ns.namer(k):t for k, t in get_type_hints(t, include_extras=True).items()}) elif ns.be_singular: rt = signature(ns.aggregator).return_annotation rt = replace_optional_typevar(rt, cs) annotations[ns.namer(c.name)] = rt else: annotations[ns.namer(c.name)] = document_type(list[t], d) td_type: Optional[type[TypedDict]] = cast(type[TypedDict], return_type) if is_typeddict(return_type) else None return document_type(generate_schema(annotations, td_type), doc) else: return document_type(return_type, doc)
def serialize(self, graph: GraphView, **node_params: dict[str, typing.Any]) ‑> dict[str, typing.Any]
-
Serialize graph into a dictionary.
Args
graph
- A view of a graph.
node_params
- Parameters passed to
SerializationContext
and used by serializer s.
Returns
Serialization result.
Expand source code
def serialize(self, graph: GraphView, **node_params: dict[str, Any]) -> dict[str, Any]: """ Serialize graph into a dictionary. Args: graph: A view of a graph. node_params: Parameters passed to `SerializationContext` and used by *serializer* s. Returns: Serialization result. """ return self.spec.to_dict(graph, node_params, **self.serializers)
class GraphView (*args, **kwargs)
-
The interface of the view of graph.
Expand source code
class GraphView(Protocol): """ The interface of the view of graph. """ def __call__(self) -> 'Graph': ... def __iter__(self) -> Iterator[tuple[str, 'ContainerView[NodeContainer]']]: """ Iterates root container views. Returns: Iterator of pairs of name and container view. """ ... def __getattr__(self, name: str) -> 'ContainerView': """ Returns a container view by its name. Args: name: Container name. i.e. template property name for the node container. Returns: Container view. """ ...
Ancestors
- typing.Protocol
- typing.Generic
class Model
-
Expand source code
class Model(Mixins[Unpack[MXS]], metaclass=Meta): """ Base type of model types. This class only works as a marker of model types and gives no functionalities to them. """ def __init__(self, **kwargs) -> None: ... # for typing
class Node (prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int)
-
This class represents a node which contains an entity.
Expand source code
class Node: """ This class represents a node which contains an entity. """ class Children: """ This class represents a child nodes of a node. """ def __init__(self, prop: GraphTemplate.Property): #: Template property. self.prop = prop self.nodes: list[Node] = [] self.keys = set() self._view = None @property def name(self) -> str: """ Returns the name of corresponding template property. """ return self.prop.name @property def view(self) -> ContainerView['Node.Children']: """ Returns an unmodifiable view of child nodes. """ if self._view is None: base = self class _ChildrenView: def __bool__(self): """Returns whether this container is not empty.""" return len(base.nodes) != 0 def __call__(self): """Returns children container.""" return base def __iter__(self): """Iterates views of child nodes.""" return map(lambda n: n.view, base.nodes) def __len__(self): """Returns the number of child nodes.""" return len(base.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index): """Returns a view of child node at the index.""" if isinstance(index, slice): return [n.view for n in base.nodes[index]] else: return base.nodes[index].view def __getattr__(self, key): """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, base.prop.children), None) if child: return base.nodes[0].children[key].view if len(base.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{base.prop.name}' does not have a child property '{key}'.") self._view = _ChildrenView() return self._view def __contains__(self, node: 'Node') -> bool: return node in self.keys def __iter__(self) -> Iterator['Node']: return iter(self.nodes) def append(self, node): if node not in self.keys: self.keys.add(node) self.nodes.append(node) def __init__(self, prop: GraphTemplate.Property, entity: Any, key: Optional[Any], index: int): #: Template property. self.prop = prop #: An entity value. self.entity = entity self.key = key self.parents = set() self.children: dict[str, Node.Children] = {c.name: Node.Children(c) for c in prop.children} self._index = index self._view = None def __contains__(self, key: str) -> bool: return key in self.children @property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name @property def view(self) -> NodeView: """ Returns an unmodifiable view of this node. The view object works as the accessor to entity and child nodes. """ if self._view is None: node = self class _NodeView(NodeView): def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" return node.entity def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" return node.children[key].view def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" return map(lambda nc: (nc[0], nc[1].view), node.children.items()) self._view = _NodeView() return self._view def add_child(self, child: 'Node') -> Self: """ Adds a child node. Args: child: Child node. Returns: This instance. """ if child.prop.template != self.prop.template: raise ValueError(f"Nodes from different graph template can't be associated.") self.children[child.prop.name].append(child) child.parents.add(self) return self def has_child(self, child: 'Node') -> bool: """ Checks this node contains the node identical to given node. Args: child: Node to search. Returns: `True` if exists. """ if child.prop.template != self.prop.template: return False elif child.prop.name in self.children: return child in self.children[child.prop.name].keys else: return False
Subclasses
- pyracmon.graph.graph._GraphNode
Class variables
var Children
-
This class represents a child nodes of a node.
Instance variables
var entity
-
An entity value.
var name : str
-
Returns the container name, which is same as the name of template property.
Expand source code
@property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name
var prop
-
Template property.
var view : NodeView
-
Returns an unmodifiable view of this node.
The view object works as the accessor to entity and child nodes.
Expand source code
@property def view(self) -> NodeView: """ Returns an unmodifiable view of this node. The view object works as the accessor to entity and child nodes. """ if self._view is None: node = self class _NodeView(NodeView): def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" return node.entity def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" return node.children[key].view def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" return map(lambda nc: (nc[0], nc[1].view), node.children.items()) self._view = _NodeView() return self._view
Methods
def add_child(self, child: Node) ‑> typing_extensions.Self
-
Adds a child node.
Args
child
- Child node.
Returns
This instance.
Expand source code
def add_child(self, child: 'Node') -> Self: """ Adds a child node. Args: child: Child node. Returns: This instance. """ if child.prop.template != self.prop.template: raise ValueError(f"Nodes from different graph template can't be associated.") self.children[child.prop.name].append(child) child.parents.add(self) return self
def has_child(self, child: Node) ‑> bool
-
Checks this node contains the node identical to given node.
Args
child
- Node to search.
Returns
True
if exists.Expand source code
def has_child(self, child: 'Node') -> bool: """ Checks this node contains the node identical to given node. Args: child: Node to search. Returns: `True` if exists. """ if child.prop.template != self.prop.template: return False elif child.prop.name in self.children: return child in self.children[child.prop.name].keys else: return False
class NodeContainer (prop: GraphTemplate.Property)
-
This class represents a container of nodes for a template property.
Expand source code
class NodeContainer: """ This class represents a container of nodes for a template property. """ def __init__(self, prop: GraphTemplate.Property): #: Template property. self.prop = prop self.nodes: list[Node] = [] self.keys: dict[Any, list[int]] = {} self._view = None @property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name @property def view(self) -> ContainerView['NodeContainer']: """ Returns an unmodifiable view of this container. The view object works as the accessor to container components. ```python template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # length ``` """ if self._view is None: container = self class _ContainerView: def __bool__(self): """Returns whether this container is not empty.""" return len(container.nodes) != 0 def __call__(self) -> NodeContainer: """Returns a base container.""" return container def __len__(self): """Returns the number of nodes.""" return len(container.nodes) def __iter__(self): """Iterates views of nodes.""" return map(lambda n: n.view, container.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" if isinstance(index, slice): return [n.view for n in container.nodes[index]] else: return container.nodes[index].view def __getattr__(self, key) -> ContainerView: """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, container.prop.children), None) if child: return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.") self._view = _ContainerView() return self._view def append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False): """ Add an entity to this container. Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in `anscestors` . Args: entity: An entity to be stored in the node. ancestors: Parent nodes mapped by property names. to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed. """ policy: IdentifyPolicy = self.prop.policy or neverPolicy() key = policy.get_identifier(entity) parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors) new_nodes = identicals.copy() for pn in parents: index = len(self.nodes) node = Node(self.prop, entity, key, index) self.nodes.append(node) if key is not None: self.keys.setdefault(key, []).append(index) new_nodes.append(node) if pn is not None: pn.add_child(node) if to_replace: for n in identicals: n.entity = entity ancestors[self.prop.name] = new_nodes
Subclasses
- pyracmon.graph.graph._GraphNodeContainer
Instance variables
var name : str
-
Returns the container name, which is same as the name of template property.
Expand source code
@property def name(self) -> str: """ Returns the container name, which is same as the name of template property. """ return self.prop.name
var prop
-
Template property.
var view : ContainerView[NodeContainer]
-
Returns an unmodifiable view of this container.
The view object works as the accessor to container components.
template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # length
Expand source code
@property def view(self) -> ContainerView['NodeContainer']: """ Returns an unmodifiable view of this container. The view object works as the accessor to container components. ```python template = GraphSpac().new_template(a=int, b=str, c=str) template.a << template.b graph = new_graph(template).append(a=1, b="a").append(a=1, b="b").append(a=2, b="c") container = graph.containers["a"] view = graph.view.a assert view() is container # invocation assert view.b is container.nodes[0].children["b"].view # attribute assert view[1] is container.nodes[1].view # index assert [n() for n in view] == [1, 2] # iteration assert len(view) == 2 # length ``` """ if self._view is None: container = self class _ContainerView: def __bool__(self): """Returns whether this container is not empty.""" return len(container.nodes) != 0 def __call__(self) -> NodeContainer: """Returns a base container.""" return container def __len__(self): """Returns the number of nodes.""" return len(container.nodes) def __iter__(self): """Iterates views of nodes.""" return map(lambda n: n.view, container.nodes) @overload def __getitem__(self, index: int) -> 'NodeView': ... @overload def __getitem__(self, index: slice) -> Iterable['NodeView']: ... def __getitem__(self, index: Union[int, slice]) -> Union['NodeView', Iterable['NodeView']]: """Returns a view of a node at the index.""" if isinstance(index, slice): return [n.view for n in container.nodes[index]] else: return container.nodes[index].view def __getattr__(self, key) -> ContainerView: """Returns a view of the first node or empty container view if it does not exist.""" child = next(filter(lambda c: c.name == key, container.prop.children), None) if child: return container.nodes[0].children[key].view if len(container.nodes) > 0 else _EmptyContainerView(child) else: raise KeyError(f"Graph property '{container.prop.name}' does not have a child property '{key}'.") self._view = _ContainerView() return self._view
Methods
def append(self, entity: Any, ancestors: collections.abc.MutableMapping[str, list['Node']], to_replace: bool = False)
-
Add an entity to this container.
Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in
anscestors
.Args
entity
- An entity to be stored in the node.
ancestors
- Parent nodes mapped by property names.
to_replace
- If
True
, the entity of identical node is replaced. Otherwise, it is not changed.
Expand source code
def append(self, entity: Any, ancestors: MutableMapping[str, list['Node']], to_replace: bool = False): """ Add an entity to this container. Identical node is searched by examining whether this container already contains a node of the identical entity and its parent is found in `anscestors` . Args: entity: An entity to be stored in the node. ancestors: Parent nodes mapped by property names. to_replace: If `True`, the entity of identical node is replaced. Otherwise, it is not changed. """ policy: IdentifyPolicy = self.prop.policy or neverPolicy() key = policy.get_identifier(entity) parents, identicals = policy.identify(self.prop, [self.nodes[i] for i in self.keys.get(key, [])], ancestors) new_nodes = identicals.copy() for pn in parents: index = len(self.nodes) node = Node(self.prop, entity, key, index) self.nodes.append(node) if key is not None: self.keys.setdefault(key, []).append(index) new_nodes.append(node) if pn is not None: pn.add_child(node) if to_replace: for n in identicals: n.entity = entity ancestors[self.prop.name] = new_nodes
class NodeContext (context: SerializationContext, params: NodeParams)
-
A class containing informations for serialization of a single node.
The instance of this class is passed to the serialization function. Properties listed below are available to control serialization.
- context:
SerializationContext
for the serialization of the graph. - node:
Node
to serialize. - value: Entity value of the
Node
. - params: Arbitrary values which is passed from invoking scope with being bound to the key of node name.
Every serializer has to call
serialize()
to get the result of preceeding serializers, or make a result direcly from the node.Expand source code
class NodeContext: """ A class containing informations for serialization of a single node. The instance of this class is passed to the serialization function. Properties listed below are available to control serialization. - context: `SerializationContext` for the serialization of the graph. - node: `Node` to serialize. - value: Entity value of the `Node` . - params: Arbitrary values which is passed from invoking scope with being bound to the key of node name. Every serializer has to call `serialize()` to get the result of preceeding serializers, or make a result direcly from the node. """ def __init__(self, context: 'SerializationContext', params: NodeParams) -> None: #: `SerializationContext` for the serializaion of the graph.` self.context = context #: Arbitrary values passed by outside for the node. self.params = params # Set on demand. self._node: Optional[Node] = None self._iterator: Optional[Iterator[Any]] = None def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._node = None self._iterator = None @property def node(self) -> Node: # Node must be set when passed to serialization function. return cast(Node, self._node) @property def value(self) -> Any: return cast(Node, self._node).entity def serialize(self) -> Any: """ Obtain a value serialized by preceeding serializers. """ try: return next(cast(Iterator[Any], self._iterator))(self) except StopIteration: return self.node.entity
Instance variables
var context
-
SerializationContext
for the serializaion of the graph.` var node : Node
-
Expand source code
@property def node(self) -> Node: # Node must be set when passed to serialization function. return cast(Node, self._node)
var params
-
Arbitrary values passed by outside for the node.
var value : Any
-
Expand source code
@property def value(self) -> Any: return cast(Node, self._node).entity
Methods
def serialize(self) ‑> Any
-
Obtain a value serialized by preceeding serializers.
Expand source code
def serialize(self) -> Any: """ Obtain a value serialized by preceeding serializers. """ try: return next(cast(Iterator[Any], self._iterator))(self) except StopIteration: return self.node.entity
- context:
class NodeView
-
Expand source code
class NodeView: def __call__(self, alt: Any = None) -> Any: """Returns an entity of this node.""" ... def __getattr__(self, key: str) -> ContainerView: """Returns a view of child nodes by its name.""" ... def __iter__(self) -> Iterator[tuple[str, ContainerView]]: """Iterate key-value pairs of child nodes.""" ...
class Q (**kwargs: Any)
-
This class provides utility class methods creating conditions.
Using
of()
is the most simple way to create a condition clause with parameters.>>> Q.of("a = $_", 1) Condition: 'a = $_' -- [1]
Other utility methods correspond to basic operators defined in SQL. They takes keyword arguments and create conditions by applying operator to each item respectively.
>>> Q.eq(a=1) Condition: 'a = %s' -- [1] >>> Q.in_(a=[1, 2, 3]) Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3] >>> Q.like(a="abc") Condition: 'a LIKE %s' -- ["%abc%"]
Multiple arguments generates a condition which concatenates conditions with logical operator, by default
AND
.>>> Q.eq(a=1, b=2) Condition: 'a = %s AND b = %s' -- [1, 2]
Those methods also accept table alias which is prepended to columns.
>>> Q.eq("t", a=1, b=2) Condition: 't.a = %s AND t.b = %s'
Additionally, the instance of this class has its own functionality to generate condition.
Each parameter passed to the constructor becomes an instance method of the instance, which takes a condition clause including placeholders which will take parameters in query execution phase.
Statement.execute()
allows unified marker$_
in spite of DB driver.>>> q = Q(a=1) >>> q.a("a = $_") Condition: 'a = $_' -- [1]
Method whose name is not passed to the constructor renders empty condition which has no effect on the query.
>>> q.b("b = $_") Condition: '' -- []
By default,
None
is equivalent to not being passed. GivingTrue
at the first argument in constructor changes the behavior.>>> q = Q(a=1, b=None) >>> q.b("b = $_") Condition: '' -- [] >>> q = Q(True, a=1, b=None) >>> q.b("b = $_") Condition: 'b = $_' -- [None]
This feature simplifies a query construction in cases some parameters are absent.
>>> def search(db, q): >>> w, params = where(q.a("a = $_") & q.b("b = $_")) >>> db.stmt().execute(f"SELECT * FROM table {w}", *params) >>> >>> search(db, Q(a=1)) # SELECT * FROM table WHERE a = 1 >>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2 >>> search(db, Q()) # SELECT * FROM table
Initializes an instance.
Args
_include_none_
- Whether include attributes whose value is
None
. kwargs
- Denotes pairs of attribute name and parameter.
Expand source code
class Q: """ This class provides utility class methods creating conditions. Using `of()` is the most simple way to create a condition clause with parameters. ```python >>> Q.of("a = $_", 1) Condition: 'a = $_' -- [1] ``` Other utility methods correspond to basic operators defined in SQL. They takes keyword arguments and create conditions by applying operator to each item respectively. ```python >>> Q.eq(a=1) Condition: 'a = %s' -- [1] >>> Q.in_(a=[1, 2, 3]) Condition: 'a IN (%s, %s, %s)' -- [1, 2, 3] >>> Q.like(a="abc") Condition: 'a LIKE %s' -- ["%abc%"] ``` Multiple arguments generates a condition which concatenates conditions with logical operator, by default `AND` . ```python >>> Q.eq(a=1, b=2) Condition: 'a = %s AND b = %s' -- [1, 2] ``` Those methods also accept table alias which is prepended to columns. ```python >>> Q.eq("t", a=1, b=2) Condition: 't.a = %s AND t.b = %s' ``` Additionally, the instance of this class has its own functionality to generate condition. Each parameter passed to the constructor becomes an instance method of the instance, which takes a condition clause including placeholders which will take parameters in query execution phase. `pyracmon.connection.Statement.execute` allows unified marker `$_` in spite of DB driver. ```python >>> q = Q(a=1) >>> q.a("a = $_") Condition: 'a = $_' -- [1] ``` Method whose name is not passed to the constructor renders empty condition which has no effect on the query. ```python >>> q.b("b = $_") Condition: '' -- [] ``` By default, `None` is equivalent to not being passed. Giving `True` at the first argument in constructor changes the behavior. ```python >>> q = Q(a=1, b=None) >>> q.b("b = $_") Condition: '' -- [] >>> q = Q(True, a=1, b=None) >>> q.b("b = $_") Condition: 'b = $_' -- [None] ``` This feature simplifies a query construction in cases some parameters are absent. ```python >>> def search(db, q): >>> w, params = where(q.a("a = $_") & q.b("b = $_")) >>> db.stmt().execute(f"SELECT * FROM table {w}", *params) >>> >>> search(db, Q(a=1)) # SELECT * FROM table WHERE a = 1 >>> search(db, Q(a=1, b=2)) # SELECT * FROM table WHERE a = 1 AND b = 2 >>> search(db, Q()) # SELECT * FROM table ``` """ class Attribute(Queryable[str]): # type: ignore def __init__(self, value): self.value = value def __call__( self, expression: Union[str, Callable[[Any], str]], convert: Optional[Union[Callable[[Any], Any], Any]] = None, ) -> 'Conditional': """ Creates conditional object composed of given expression and the attribute value as parameters. Args: expression: A clause or a function generating a clause by taking the attribute value. convert: A function converting the attribute value to parameters. If this function returns a value which is not a list, a list having only the value is used. Returns: Condition. """ expression = expression if isinstance(expression, str) else expression(self.value) if callable(convert): params = convert(self.value) elif convert is not None: params = convert else: params = [self.value] return Conditional(expression, params if isinstance(params, list) else [params]) @property def all(self) -> 'Q.Attribute': """ Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `AND`. Returns: Composite attribute. """ return Q.CompositeAttribute(self.value, True) @property def any(self) -> 'Q.Attribute': """ Returns composite attribute which applies conditions to every values iterated from attribute value and join them with `OR`. Returns: Composite attribute. """ return Q.CompositeAttribute(self.value, False) def __bool__(self): return True def __and__(self, other: 'Conditional') -> 'Conditional': return other if bool(self.value) else Conditional() def __or__(self, other: 'Conditional') -> 'Conditional': return other if not bool(self.value) else Conditional() def __getattr__(self, key): """ Exposes a method which works similarly to 'Q' 's utility method of the same name. ```python >>> q = Q(a = 1) >>> q.a.eq("col") Condition: 'col = $_' -- [1] >>> >>> q.a.eq("col", None, "t") Condition: 't.col = $_' -- [1] >>> >>> q.a.eq("col", lambda x: x*2, "t") Condition: 't.col = $_' -- [2] ``` """ method = getattr(Q, key) def invoke(col, convert=None, *args, **kwargs): if callable(convert): value = convert(self.value) else: value = convert if convert is not None else self.value kwargs.update({col: value}) return method(*args, **kwargs) return invoke class CompositeAttribute(Attribute): def __init__(self, value, and_): super().__init__(value) self._and = and_ def __call__(self, expression, convert=None): conds = [Q.Attribute(v)(expression, convert) for v in self.value] return Conditional.all(conds) if self._and else Conditional.any(conds) def __getattr__(self, key): method = getattr(Q, key) def invoke(col, convert=None, *args, **kwargs): def conv(v): if callable(convert): return convert(v) else: # REVIEW Replacing every parameter in the list with the same value is meaningless? return convert if convert is not None else v conds = [method(*args, **dict(chain(kwargs.items(), [(col, conv(v))]))) for v in self.value] return Conditional.all(conds) if self._and else Conditional.any(conds) return invoke class NoAttribute(Attribute): def __init__(self): super().__init__(None) def __call__(self, expression, holder=lambda x:x): return Conditional() @property def all(self): return self @property def any(self): return self def __bool__(self): return False def __and__(self, other: 'Conditional') -> 'Conditional': return Conditional() def __or__(self, other: 'Conditional') -> 'Conditional': return Conditional() def __getattr__(self, key): method = getattr(Q, key) def invoke(col, convert=None, *args): return Conditional() return invoke def __init__(self, _include_none_: bool = False, **kwargs: Any): """ Initializes an instance. Args: _include_none_: Whether include attributes whose value is `None`. kwargs: Denotes pairs of attribute name and parameter. """ self.attributes = dict([(k, v) for k, v in kwargs.items() if _include_none_ or v is not None]) def __getattr__(self, key) -> Attribute: if key in self.attributes: return Q.Attribute(self.attributes[key]) else: return Q.NoAttribute() @classmethod def of(cls, expression: str = "", *params: Any) -> 'Conditional': """ Creates a condition directly from an expression and parameters. Args: expression: Condition expression. params: Parameters used in the condition. Returns: Condition object. """ return Conditional(expression, list(params)) @classmethod def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Creates a condition applying `=` operator to columns. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NULL", [] elif val is True: return f"{col}", [] elif val is False: return f"NOT {col}", [] return None return _conditional("=", _and_, kwargs, is_null, _alias_) @classmethod def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `!=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NOT NULL", [] elif val is True: return f"NOT {col}", [] elif val is False: return f"{col}", [] return None return _conditional("!=", _and_, kwargs, is_null, _alias_) @classmethod def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "1 = 0", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} IN ({holder})", val return _conditional("IN", _and_, kwargs, in_list, _alias_) @classmethod def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `NOT IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} NOT IN ({holder})", val return _conditional("NOT IN", _and_, kwargs, in_list, _alias_) @classmethod def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, kwargs, None, _alias_) @classmethod def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_) @classmethod def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_) @classmethod def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_) @classmethod def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<", _and_, kwargs, None, _alias_) @classmethod def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<=", _and_, kwargs, None, _alias_) @classmethod def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">", _and_, kwargs, None, _alias_) @classmethod def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">=", _and_, kwargs, None, _alias_)
Class variables
var Attribute
-
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto(Protocol[T]): def meth(self) -> T: ...
var CompositeAttribute
-
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto(Protocol[T]): def meth(self) -> T: ...
var NoAttribute
-
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto(Protocol[T]): def meth(self) -> T: ...
Static methods
def endswith(**kwargs: str) ‑> Conditional
-
Works like
eq
, but appliesLIKE
. Given parameters will be escaped and prepended with wildcards (%) to execute backward match.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def endswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and prepended with wildcards (%) to execute backward match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}" for k, v in kwargs.items()}, None, _alias_)
def eq(**kwargs: Any) ‑> Conditional
-
Creates a condition applying
=
operator to columns.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def eq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Creates a condition applying `=` operator to columns. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NULL", [] elif val is True: return f"{col}", [] elif val is False: return f"NOT {col}", [] return None return _conditional("=", _and_, kwargs, is_null, _alias_)
def ge(**kwargs: Any) ‑> Conditional
-
Works like
eq
, but applies>=
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def ge(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">=", _and_, kwargs, None, _alias_)
def gt(**kwargs: Any) ‑> Conditional
-
Works like
eq
, but applies>
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def gt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `>`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional(">", _and_, kwargs, None, _alias_)
def in_(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional
-
Works like
eq
, but appliesIN
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def in_(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "1 = 0", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} IN ({holder})", val return _conditional("IN", _and_, kwargs, in_list, _alias_)
def le(**kwargs: Any) ‑> Conditional
-
Works like
eq
, but applies<=
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def le(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<=", _and_, kwargs, None, _alias_)
def like(**kwargs: str) ‑> Conditional
-
Works like
eq
, but appliesLIKE
. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def like(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and enclosed with wildcards (%) to execute partial match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"%{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
def lt(**kwargs: Any) ‑> Conditional
-
Works like
eq
, but applies<
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def lt(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `<`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("<", _and_, kwargs, None, _alias_)
def match(**kwargs: str) ‑> Conditional
-
Works like
eq
, but appliesLIKE
. Given parameters will be passed to query without being escaped or enclosed.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def match(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be passed to query without being escaped or enclosed. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, kwargs, None, _alias_)
def neq(**kwargs: Any) ‑> Conditional
-
Works like
eq
, but applies!=
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def neq(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Any) -> 'Conditional': """ Works like `eq`, but applies `!=`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def is_null(col, val): if val is None: return f"{col} IS NOT NULL", [] elif val is True: return f"NOT {col}", [] elif val is False: return f"{col}", [] return None return _conditional("!=", _and_, kwargs, is_null, _alias_)
def not_in(**kwargs: collections.abc.Sequence[typing.Any]) ‑> Conditional
-
Works like
eq
, but appliesNOT IN
.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def not_in(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: Sequence[Any]) -> 'Conditional': """ Works like `eq`, but applies `NOT IN`. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ def in_list(col, val): if len(val) == 0: return "", [] else: holder = ', '.join(['$_'] * len(val)) return f"{col} NOT IN ({holder})", val return _conditional("NOT IN", _and_, kwargs, in_list, _alias_)
def of(expression: str = '', *params: Any) ‑> Conditional
-
Creates a condition directly from an expression and parameters.
Args
expression
- Condition expression.
params
- Parameters used in the condition.
Returns
Condition object.
Expand source code
@classmethod def of(cls, expression: str = "", *params: Any) -> 'Conditional': """ Creates a condition directly from an expression and parameters. Args: expression: Condition expression. params: Parameters used in the condition. Returns: Condition object. """ return Conditional(expression, list(params))
def startswith(**kwargs: str) ‑> Conditional
-
Works like
eq
, but appliesLIKE
. Given parameters will be escaped and appended with wildcards (%) to execute prefix match.Args
_alias_
- Table alias.
_and_
- Specifies concatenating logical operator is
AND
orOR
. kwargs
- Column names and parameters.
Returns
Condition object.
Expand source code
@classmethod def startswith(cls, _alias_: Optional[str] = None, _and_: bool = True, **kwargs: str) -> 'Conditional': """ Works like `eq`, but applies `LIKE`. Given parameters will be escaped and appended with wildcards (%) to execute prefix match. Args: _alias_: Table alias. _and_: Specifies concatenating logical operator is `AND` or `OR`. kwargs: Column names and parameters. Returns: Condition object. """ return _conditional("LIKE", _and_, {k: f"{escape_like(v)}%" for k, v in kwargs.items()}, None, _alias_)
class S
-
An utility class to build
NodeSerializer
.This class provides factory class methods to create
NodeSerializer
each of which works in the same way as the method of the same name declared onNodeSerializer
.Use them to supply
NodeSerializer
s to functions to serialize a graph or to create a graph schema such asgraph_dict()
orgraph_schema()
.graph_dict( graph, a = S.of(), b = S.head(), )
Expand source code
class S(metaclass=SerializerMeta): """ An utility class to build `NodeSerializer` . This class provides factory class methods to create `NodeSerializer` each of which works in the same way as the method of the same name declared on `NodeSerializer` . Use them to supply `NodeSerializer`s to functions to serialize a graph or to create a graph schema such as `graph_dict` or `graph_schema` . ```python graph_dict( graph, a = S.of(), b = S.head(), ) ``` """ @classmethod def of( cls, namer: Optional[Union[str, Callable[[str], str]]] = None, aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None, *serializers: Serializer, ) -> 'NodeSerializer': """ Create an instance of `NodeSerializer`. Args: namer: A string or naming function. aggregator: An aggregation function or an index of node to select in node container. serializer: A list of *serializer* s. Returns: Created `NodeSerializer` . """ return NodeSerializer(namer, aggregator, *serializers)
Static methods
def of(namer: Union[str, Callable[[str], str], ForwardRef(None)] = None, aggregator: Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]], ForwardRef(None)] = None, *serializers: Callable[[ForwardRef('NodeContext')], Any]) ‑> NodeSerializer
-
Create an instance of
NodeSerializer
.Args
namer
- A string or naming function.
aggregator
- An aggregation function or an index of node to select in node container.
serializer
- A list of serializer s.
Returns
Created
NodeSerializer
.Expand source code
@classmethod def of( cls, namer: Optional[Union[str, Callable[[str], str]]] = None, aggregator: Optional[Union[Callable[[list[Node]], Node], Callable[[list[Node]], list[Node]]]] = None, *serializers: Serializer, ) -> 'NodeSerializer': """ Create an instance of `NodeSerializer`. Args: namer: A string or naming function. aggregator: An aggregation function or an index of node to select in node container. serializer: A list of *serializer* s. Returns: Created `NodeSerializer` . """ return NodeSerializer(namer, aggregator, *serializers)
Methods
def alter(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def at(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def doc(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def each(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def fold(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def head(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def last(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def merge(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def name(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def select(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
def sub(*args, **kwargs)
-
Expand source code
def g(*args, **kwargs): ns = NodeSerializer() nf = getattr(ns, n) return nf(*args, **kwargs)
class Table (name: str, columns: list[Column], comment: str = '')
-
This class represents a schema of a table.
Expand source code
class Table: """ This class represents a schema of a table. """ def __init__(self, name: str, columns: list[Column], comment: str = ""): #: Table name. self.name = name #: Columns in the table. self.columns = columns #: Comment of the table. self.comment = comment def find(self, name: str) -> Optional[Column]: """ Find a column by name. Args: name: Column name. Returns: The column if exists, otherwise `None`. """ return next(filter(lambda c: c.name == name, self.columns), None)
Instance variables
var columns
-
Columns in the table.
var comment
-
Comment of the table.
var name
-
Table name.
Methods
def find(self, name: str) ‑> Optional[Column]
-
Find a column by name.
Args
name
- Column name.
Returns
The column if exists, otherwise
None
.Expand source code
def find(self, name: str) -> Optional[Column]: """ Find a column by name. Args: name: Column name. Returns: The column if exists, otherwise `None`. """ return next(filter(lambda c: c.name == name, self.columns), None)
class Typeable
-
An interface for generic type which is resolved into a concrete type by a type parameter.
Inherit this class and declare static method whose signature is
resolve(me, bound, arg, spec) -> type
.>>> class A(Typeable[T]): >>> @staticmethod >>> def resolve(me, bound, arg, spec): >>> ... >>> return some_type >>> >>> Typeable.resolve(A[T], int, spec)
Type resolution starts from
Typeable.resolve()
which invokes the static method with following arguments.- Type to resolve itself, in this case,
A[T]
. - A resolved type which replace
T
.arg
is the first candidate.- When
arg
is alsoTypeable
, this resolution flow is applied to it recursively until concrete type if determined.
arg
is passed through as it is.spec
is passed through as it is.
Expand source code
class Typeable(Generic[T]): """ An interface for generic type which is resolved into a concrete type by a type parameter. Inherit this class and declare static method whose signature is `resolve(me, bound, arg, spec) -> type`. ```python >>> class A(Typeable[T]): >>> @staticmethod >>> def resolve(me, bound, arg, spec): >>> ... >>> return some_type >>> >>> Typeable.resolve(A[T], int, spec) ``` Type resolution starts from `Typeable.resolve` which invokes the static method with following arguments. - Type to resolve itself, in this case, `A[T]`. - A resolved type which replace `T`. - `arg` is the first candidate. - When `arg` is also `Typeable` , this resolution flow is applied to it recursively until concrete type if determined. - `arg` is passed through as it is. - `spec` is passed through as it is. """ @staticmethod def resolve(typeable, arg: type, spec: Any) -> type: """ Resolve a `Typeable` type into a concrete type by a type for its type parameter. Args: typeable: `Typeable` type having a generic type parameter. arg: Type to replace a type parameter. spec: An object containing information for schema generation. Returns: Resolved type. """ if get_origin(typeable) is Typeable: raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.") bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return Typeable.resolve(typeable[arg], arg, spec) elif issubgeneric(bound, Typeable): bound = Typeable.resolve(bound, arg, spec) return typeable.resolve(typeable, bound, arg, spec) else: return typeable.resolve(typeable, bound, arg, spec) @staticmethod def is_resolved(typeable: type['Typeable']) -> bool: """ Checks a type parameter of given `Typeable` is alredy resolved. Args: typeable: `Typeable` type having a generic type parameter. Returns: Whether the type parameter is already resolved or not. """ bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return False elif issubgeneric(bound, Typeable): return Typeable.is_resolved(bound) else: return True
Ancestors
- typing.Generic
Subclasses
Static methods
def is_resolved(typeable: type['Typeable']) ‑> bool
-
Checks a type parameter of given
Typeable
is alredy resolved.Args
typeable
Typeable
type having a generic type parameter.
Returns
Whether the type parameter is already resolved or not.
Expand source code
@staticmethod def is_resolved(typeable: type['Typeable']) -> bool: """ Checks a type parameter of given `Typeable` is alredy resolved. Args: typeable: `Typeable` type having a generic type parameter. Returns: Whether the type parameter is already resolved or not. """ bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return False elif issubgeneric(bound, Typeable): return Typeable.is_resolved(bound) else: return True
def resolve(typeable, arg: type, spec: Any) ‑> type
-
Resolve a
Typeable
type into a concrete type by a type for its type parameter.Args
typeable
Typeable
type having a generic type parameter.arg
- Type to replace a type parameter.
spec
- An object containing information for schema generation.
Returns
Resolved type.
Expand source code
@staticmethod def resolve(typeable, arg: type, spec: Any) -> type: """ Resolve a `Typeable` type into a concrete type by a type for its type parameter. Args: typeable: `Typeable` type having a generic type parameter. arg: Type to replace a type parameter. spec: An object containing information for schema generation. Returns: Resolved type. """ if get_origin(typeable) is Typeable: raise ValueError(f"Typeable should not be used directly. Use inheriting class instead.") bound = get_args(typeable)[0] if isinstance(bound, TypeVar): return Typeable.resolve(typeable[arg], arg, spec) elif issubgeneric(bound, Typeable): bound = Typeable.resolve(bound, arg, spec) return typeable.resolve(typeable, bound, arg, spec) else: return typeable.resolve(typeable, bound, arg, spec)
- Type to resolve itself, in this case,