# TODO cleanup imports
from __future__ import annotations
import collections.abc as abc
from base64 import b64encode
from collections import defaultdict, deque
from dataclasses import is_dataclass, MISSING, Field
from datetime import datetime, time, date, timedelta
from decimal import Decimal
from enum import Enum
from pathlib import Path
# noinspection PyUnresolvedReferences,PyProtectedMember
from typing import (
cast, Any, Type, Dict, List, Tuple, Iterable, Sequence, Union,
NamedTupleMeta, SupportsFloat, AnyStr, Text, Callable, Optional,
Literal, Annotated, NamedTuple,
)
from uuid import UUID
from .decorators import (setup_recursive_safe_function,
setup_recursive_safe_function_for_generic)
from .enums import KeyCase, DateTimeTo
from .models import (Extras, TypeInfo, PatternBase,
LEAF_TYPES, LEAF_TYPES_NO_BYTES, UTC, ZERO)
from .type_conv import datetime_to_timestamp
from ..abstractions import AbstractDumperGenerator
from ..bases import AbstractMeta, BaseDumpHook, META
from ..class_helper import (
CLASS_TO_DUMP_FUNC,
DATACLASS_FIELD_TO_ALIAS_PATH_FOR_DUMP,
create_meta,
get_meta,
is_subclass_safe,
v1_dataclass_field_to_alias_for_dump,
dataclass_fields,
dataclass_field_to_default,
dataclass_field_names,
dataclass_field_to_skip_if,
)
from ..constants import CATCH_ALL, TAG, PACKAGE_NAME
from ..errors import (ParseError, MissingFields, MissingData, JSONWizardError)
from ..loader_selection import get_dumper, asdict
from ..log import LOG
from ..models import get_skip_if_condition, finalize_skip_if
from ..type_def import (
NoneType, JSONObject,
PyLiteralString,
T, ExplicitNull
)
# noinspection PyProtectedMember
from ..utils.dataclass_compat import _set_new_attribute
from ..utils.dict_helper import NestedDict
from ..utils.function_builder import FunctionBuilder
from ..utils.typing_compat import (
is_typed_dict, get_args, is_annotated,
eval_forward_ref_if_needed, get_origin_v2, is_union,
get_keys_for_typed_dict, is_typed_dict_type_qualifier,
)
def _type_returns_value_unchanged(arg, leaf_handling_as_subclass, origin=None):
# scalar type:
# (str, int, float, bool, complex, type, Literal, Any)
if origin is None:
origin = get_origin_v2(arg)
return (origin is Any
or origin is Literal
or origin in LEAF_TYPES_NO_BYTES
or (leaf_handling_as_subclass
and is_subclass_safe(origin, LEAF_TYPES_NO_BYTES)))
def _all_return_value_unchanged(args, leaf_handling_as_subclass):
for arg in args:
if not _type_returns_value_unchanged(arg, leaf_handling_as_subclass):
return False
return True
[docs]
class DumpMixin(AbstractDumperGenerator, BaseDumpHook):
"""
This Mixin class derives its name from the eponymous `json.dumps`
function. Essentially it contains helper methods to convert a `dataclass`
to JSON strings (or a Python dictionary object).
Refer to the :class:`AbstractDumper` class for documentation on any of the
implemented methods.
"""
__slots__ = ()
def __init_subclass__(cls, **kwargs):
super().__init_subclass__()
setup_default_dumper(cls)
transform_dataclass_field = None
[docs]
@staticmethod
def dump_fallback(tp: TypeInfo, _extras: Extras):
# identity: o
return tp.v()
dump_from_str = dump_fallback
dump_from_int = dump_fallback
dump_from_float = dump_fallback
dump_from_bool = dump_fallback
dump_from_literal = dump_fallback
[docs]
@staticmethod
def dump_from_bytes(tp: TypeInfo, extras: Extras):
tp.ensure_in_locals(extras, b64encode)
return f"b64encode({tp.v()}).decode('ascii')"
[docs]
@classmethod
def dump_from_bytearray(cls, tp: TypeInfo, extras: Extras):
tp.ensure_in_locals(extras, b64encode)
return f"b64encode(bytes({tp.v()})).decode('ascii')"
[docs]
@staticmethod
def dump_from_none(tp: TypeInfo, extras: Extras):
return 'None'
[docs]
@staticmethod
def dump_from_enum(tp: TypeInfo, extras: Extras):
# alias: o.value
return f'{tp.v()}.value'
[docs]
@staticmethod
def dump_from_uuid(tp: TypeInfo, extras: Extras):
# alias: o.hex
return f'{tp.v()}.hex'
[docs]
@classmethod
def dump_from_iterable(cls, tp: TypeInfo, extras: Extras):
v, v_next, i_next = tp.v_and_next()
gorg = tp.origin
if v_next[0] == 'k':
# raise same error as `json` (not serializable)
raise TypeError('keys must be str, int, float, bool or None, '
f'not {gorg.__qualname__}') from None
# noinspection PyBroadException
try:
elem_type = tp.args[0]
except:
elem_type = Any
string = cls.dump_dispatcher_for_annotation(
tp.replace(origin=elem_type, i=i_next, index=None, val_name=None), extras)
if string == v_next:
return f'{v}.copy()' if issubclass(gorg, list) else f'list({v})'
return f'[{string} for {v_next} in {v}]'
[docs]
@classmethod
def dump_from_tuple(cls, tp: TypeInfo, extras: Extras):
args = tp.args
# Determine the code string for the annotation
# Check if the `Tuple` appears in the variadic form
# i.e. Tuple[str, ...]
if args:
is_variadic = args[-1] is ...
else:
# Annotated without args, as simply `tuple`
return f'list({tp.v()})'
if is_variadic:
# Logic that handles the variadic form of :class:`Tuple`'s,
# i.e. ``Tuple[str, ...]``
#
# Per `PEP 484`_, only **one** required type is allowed before the
# ``Ellipsis``. That is, ``Tuple[int, ...]`` is valid whereas
# ``Tuple[int, str, ...]`` would be invalid. `See here`_ for more info.
#
# .. _PEP 484: https://www.python.org/dev/peps/pep-0484/
# .. _See here: https://github.com/python/typing/issues/180
v, v_next, i_next = tp.v_and_next()
# Given `Tuple[T, ...]`, we only need the generated string for `T`
string = cls.dump_dispatcher_for_annotation(
tp.replace(origin=args[0], i=i_next, index=None, val_name=None), extras)
if string == v_next:
return f'list({v})'
result = f'[{string} for {v_next} in {v}]'
else:
string = ', '.join([
str(cls.dump_dispatcher_for_annotation(
tp.replace(origin=arg, index=k),
extras))
for k, arg in enumerate(args)])
result = f'[{string}]'
return result
[docs]
@classmethod
def dump_from_named_tuple(cls, tp: TypeInfo, extras: Extras):
nt_tp = cast(NamedTuple, tp.origin)
fields = nt_tp._fields # field names in order
ann = nt_tp.__annotations__
field_to_value = {
name: str(
cls.dump_dispatcher_for_annotation(
tp.replace(origin=ann.get(name, Any), index=i),
extras,
)
)
for i, name in enumerate(fields)
}
if extras['config'].v1_namedtuple_as_dict:
params = [f'{field!r}: {value}' for field, value in field_to_value.items()]
return f'{{{", ".join(params)}}}'
params = ', '.join(field_to_value.values())
return f'[{params}]'
[docs]
@classmethod
def dump_from_named_tuple_untyped(cls, tp: TypeInfo, extras: Extras):
as_dict = extras['config'].v1_namedtuple_as_dict
return f'{tp.v()}._asdict()' if as_dict else f'list({tp.v()})'
@classmethod
def _build_dict_comp(cls, tp, v, i_next, k_next, v_next, kt, vt, extras):
tp_k_next = tp.replace(origin=kt, i=i_next, prefix='k', index=None, val_name=None)
string_k = cls.dump_dispatcher_for_annotation(tp_k_next, extras)
tp_v_next = tp.replace(origin=vt, i=i_next, prefix='v', index=None, val_name=None)
string_v = cls.dump_dispatcher_for_annotation(tp_v_next, extras)
if k_next == string_k and v_next == string_v:
# Easy path; shallow copy
return f'{v}.copy()'
return f'{{{string_k}: {string_v} for {k_next}, {v_next} in {v}.items()}}'
[docs]
@classmethod
def dump_from_dict(cls, tp: TypeInfo, extras: Extras):
try:
kt, vt = tp.args
except ValueError:
# Annotated without two arguments,
# e.g. like `dict[str]` or `dict`
return f'{tp.v()}.copy()'
v, k_next, v_next, i_next = tp.v_and_next_k_v()
return cls._build_dict_comp(
tp, v, i_next, k_next, v_next, kt, vt, extras)
dump_from_defaultdict = dump_from_dict
[docs]
@classmethod
def dump_from_typed_dict(cls, tp: TypeInfo, extras: Extras):
req_keys, opt_keys = get_keys_for_typed_dict(tp.origin)
if opt_keys:
return cls._dump_from_typed_dict_fn(tp, extras)
ann = tp.origin.__annotations__
dict_body = ', '.join(
f"""{name!r}: {
cls.dump_dispatcher_for_annotation(
tp.replace(origin=ann.get(name, Any), index=repr(name)),
extras,
)
}"""
for name in req_keys
)
return f'{{{dict_body}}}'
@classmethod
@setup_recursive_safe_function(prefix='dump')
def _dump_from_typed_dict_fn(cls, tp: TypeInfo, extras: Extras):
fn_gen = extras['fn_gen']
req_keys, opt_keys = get_keys_for_typed_dict(tp.origin)
td_annotations = tp.origin.__annotations__
v = tp.v_for_def()
result_list = []
# Set required keys for the `TypedDict`
for k in req_keys:
field_tp = td_annotations[k]
field_name = repr(k)
string = cls.dump_dispatcher_for_annotation(
tp.replace(origin=field_tp,
index=field_name,
val_name=None), extras)
result_list.append(f'{field_name}: {string}')
fn_gen.add_lines('result = {',
*(f' {r},' for r in result_list),
'}')
# Set optional keys for the `TypedDict` (if they exist)
next_i = tp.i + 1
new_tp = tp.replace(i=next_i, index=None, val_name=None)
v_next = new_tp.v()
for k in opt_keys:
field_tp = td_annotations[k]
field_name = repr(k)
string = cls.dump_dispatcher_for_annotation(
new_tp.replace(origin=field_tp), extras)
with fn_gen.if_(f'({v_next} := {v}.get({field_name}, MISSING)) is not MISSING'):
fn_gen.add_line(f'result[{field_name}] = {string}')
fn_gen.add_line('return result')
[docs]
@classmethod
@setup_recursive_safe_function_for_generic(prefix='dump', per_class_cache=True)
def dump_from_union(cls, tp: TypeInfo, extras: Extras):
fn_gen = extras['fn_gen']
config = extras['config']
actual_cls = extras['cls']
_locals = extras['locals']
args = tp.args
field_i = tp.field_i
i = tp.i
v = tp.v_for_def()
tag_key = config.tag_key or TAG
auto_assign_tags = config.auto_assign_tags
leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass'
in_optional = NoneType in args
_locals['fields'] = args
_locals['tag_key'] = tag_key
leaf_types = []
try_parse_lines = []
dataclass_and_line = []
has_dataclass = False
for possible_tp in args:
possible_tp = eval_forward_ref_if_needed(possible_tp, actual_cls)
tp_new = TypeInfo(possible_tp, field_i=field_i, i=i)
tp_new.in_optional = in_optional
if _type_returns_value_unchanged(
possible_tp, leaf_handling_as_subclass):
leaf_types.append(possible_tp)
# if num_leaf_types_no_bytes > 0:
# fn_gen.add_line(f'return {v}')
elif is_dataclass(possible_tp):
# we see a dataclass in `Union` declaration
has_dataclass = True
string = cls.dump_dispatcher_for_annotation(tp_new, extras)
meta = get_meta(possible_tp)
cls_name = possible_tp.__name__
tag = meta.tag
assign_tags_to_cls = auto_assign_tags or meta.auto_assign_tags
if assign_tags_to_cls and not tag:
tag = cls_name
# We don't want to mutate the base Meta class here
if meta is AbstractMeta:
create_meta(possible_tp, cls_name, tag=tag)
else:
meta.tag = cls_name
if tag:
dataclass_and_line.append(
(possible_tp, cls_name, tag,
f'result = {string}; result[tag_key] = {tag!r}; return result'))
else:
dataclass_and_line.append(
(possible_tp, cls_name, tag,
f'return {string}'))
else:
try_parse_lines.append(
cls.dump_dispatcher_for_annotation(tp_new, extras))
fn_gen.add_line(f't = {v}.__class__')
if leaf_types:
# a good heuristic: use tuples for smaller unions, else frozenset
container = tuple if len(leaf_types) <= 6 else frozenset
_locals['leaf_types'] = container(leaf_types)
leaf_type_names = ', '.join(getattr(t, '__name__', None) or str(t)
for t in leaf_types)
with fn_gen.if_('t in leaf_types', comment=f'{{{leaf_type_names}}}'):
fn_gen.add_line(f'return {v}')
if has_dataclass:
for field_i, (dataclass, name, tag, line) in enumerate(dataclass_and_line, start=1):
cls_name = TypeInfo(dataclass).type_name(extras)
with fn_gen.if_(f't is {cls_name}', comment=f'{tag!r}' if tag else ''):
fn_gen.add_line(line)
for string in try_parse_lines:
with fn_gen.try_():
fn_gen.add_line(f'return {string}')
with fn_gen.except_(Exception):
fn_gen.add_line('pass')
# Invalid type for Union
fn_gen.add_line("raise ParseError("
"TypeError('Object was not in any of Union types'),"
f"{v},fields,'dump',"
"tag_key=tag_key"
")")
[docs]
@staticmethod
def dump_from_decimal(tp: TypeInfo, extras: Extras):
return f'str({tp.v()})'
[docs]
@staticmethod
def dump_from_path(tp: TypeInfo, extras: Extras):
return f'str({tp.v()})'
[docs]
@classmethod
def dump_from_date(cls, tp: TypeInfo, extras: Extras):
o = tp.v()
if extras['config'].v1_dump_date_time_as is DateTimeTo.TIMESTAMP:
tp.ensure_in_locals(extras, datetime, UTC=UTC)
return f'int(datetime((v0 := {o}).year, v0.month, v0.day, tzinfo=UTC).timestamp())'
return f'{o}.isoformat()'
[docs]
@classmethod
def dump_from_datetime(cls, tp: TypeInfo, extras: Extras):
o = tp.v()
config = extras['config']
if config.v1_dump_date_time_as is DateTimeTo.TIMESTAMP:
naive_tz = config.v1_assume_naive_datetime_tz
if naive_tz is None:
def raise_naive():
raise ValueError('Naive datetime has no timezone; '
'set v1_assume_naive_datetime_tz to '
'define how it should be interpreted.')
tp.ensure_in_locals(extras, raise_naive, ZERO=ZERO)
return f'raise_naive() if (v0 := {o}).tzinfo is None else int(v0.timestamp()) if v0.utcoffset() == ZERO else int(v0.astimezone(UTC).timestamp())'
else:
tp.ensure_in_locals(extras, datetime_to_timestamp, assume_naive_tz=naive_tz)
return f'datetime_to_timestamp({o}, assume_naive_tz)'
# Safe: source is datetime.isoformat(); '+00:00' only appears for UTC
return f"{o}.isoformat().replace('+00:00', 'Z', 1)"
[docs]
@staticmethod
def dump_from_time(tp: TypeInfo, _extras: Extras):
o = tp.v()
# Safe: source is time.isoformat(); '+00:00' only appears for UTC
return f"{o}.isoformat().replace('+00:00', 'Z', 1)"
[docs]
@staticmethod
def dump_from_timedelta(tp: TypeInfo, extras: Extras):
return f'str({tp.v()})'
[docs]
@staticmethod
@setup_recursive_safe_function(
prefix='dump',
fn_name=f'__{PACKAGE_NAME}_to_dict_{{cls_name}}__')
def dump_from_dataclass(tp: TypeInfo, extras: Extras):
dump_func_for_dataclass(tp.origin, extras)
[docs]
@classmethod
def dump_dispatcher_for_annotation(cls,
tp,
extras):
hooks = cls.__DUMP_HOOKS__
config = extras['config']
type_hooks = config.v1_type_to_dump_hook
leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass'
# type_ann = tp.origin
type_ann = eval_forward_ref_if_needed(tp.origin, extras['cls'])
origin = get_origin_v2(type_ann)
name = getattr(origin, '__name__', origin)
args = None
if is_annotated(type_ann):
# Given `Annotated[T, ...]`, we only need `T`
type_ann, *field_extras = get_args(type_ann)
type_ann = eval_forward_ref_if_needed(type_ann, extras['cls'])
origin = get_origin_v2(type_ann)
name = getattr(origin, '__name__', origin)
# Check for Custom Patterns for date / time / datetime
for extra in field_extras:
if isinstance(extra, PatternBase):
extras['pattern'] = extra
elif is_typed_dict_type_qualifier(origin):
# Given `Required[T]` or `NotRequired[T]`, we only need `T`
type_ann = get_args(type_ann)[0]
origin = get_origin_v2(type_ann)
name = getattr(origin, '__name__', origin)
# TypeAliasType: Type aliases are created through
# the `type` statement
if (value := getattr(origin, '__value__', None)) is not None:
type_ann = value
origin = get_origin_v2(type_ann)
name = getattr(origin, '__name__', origin)
# `LiteralString` enforces stricter rules at
# type-checking but behaves like `str` at runtime.
# TODO maybe add `load_to_literal_string`
if origin is PyLiteralString:
dump_hook = cls.dump_from_str
origin = str
name = 'str'
# -> Atomic, immutable types which don't require
# any iterative / recursive handling.
elif origin in LEAF_TYPES or (
leaf_handling_as_subclass
and is_subclass_safe(origin, LEAF_TYPES)):
dump_hook = hooks.get(origin)
elif (type_hooks is not None
and (hook_info := type_hooks.get(origin)) is not None):
mode, dump_hook = hook_info
if mode == 'runtime':
fn_name, = tp.ensure_in_locals(extras, dump_hook)
return f'{fn_name}({tp.v()})'
try:
args = get_args(type_ann)
except ValueError:
args = Any,
elif (dump_hook := hooks.get(origin)) is not None:
try:
args = get_args(type_ann)
except ValueError:
args = Any,
# -> Union[x]
elif is_union(origin):
args = get_args(type_ann)
# all args in `Union[...]` are simple types
if _all_return_value_unchanged(args, leaf_handling_as_subclass):
return tp.v()
dump_hook = cls.dump_from_union
# Special case for Optional[x], which is actually Union[x, None]
if len(args) == 2 and NoneType in args:
origin = args[0]
if tp.val_name:
val_name = 'v0'
o = f'({val_name} := {tp.v()})'
else:
val_name = None
o = tp.v()
new_tp = tp.replace(origin=origin, args=None, name=None, val_name=val_name)
new_tp.in_optional = True
string = cls.dump_dispatcher_for_annotation(new_tp, extras)
return f'None if {o} is None else {string}'
# -> Literal[X, Y, ...]
elif origin is Literal:
dump_hook = cls.dump_from_literal
args = get_args(type_ann)
# https://stackoverflow.com/questions/76520264/dataclasswizard-after-upgrading-to-python3-11-is-not-working-as-expected
elif origin is Any:
dump_hook = cls.dump_fallback
elif is_subclass_safe(origin, tuple) and hasattr(origin, '_fields'):
if getattr(origin, '__annotations__', None):
# Annotated as a `typing.NamedTuple` subtype
dump_hook = cls.dump_from_named_tuple
else:
# Annotated as a `collections.namedtuple` subtype
dump_hook = cls.dump_from_named_tuple_untyped
elif is_typed_dict(origin):
dump_hook = cls.dump_from_typed_dict
elif is_dataclass(origin):
# return a dynamically generated `asdict`
# for the `cls` (base_type)
dump_hook = cls.dump_from_dataclass
elif is_subclass_safe(origin, Enum):
dump_hook = cls.dump_from_enum
elif origin in (abc.Sequence, abc.MutableSequence, abc.Collection):
if origin is abc.Sequence:
dump_hook = cls.dump_from_tuple
# desired (non-generic) origin type
name = 'tuple'
origin = tuple
# Re-map type arguments to variadic tuple format,
# e.g. `Sequence[int]` -> `tuple[int, ...]`
try:
args = (get_args(type_ann)[0], ...)
except (IndexError, ValueError):
args = Any,
else:
dump_hook = cls.dump_from_iterable
# desired (non-generic) origin type
name = 'list'
origin = list
# Get type arguments, e.g. `Sequence[int]` -> `int`
try:
args = get_args(type_ann)
except ValueError:
args = Any,
elif isinstance(origin, PatternBase):
__base__ = origin.base
if issubclass(__base__, datetime):
dump_hook = cls.dump_from_datetime
origin = datetime
elif issubclass(__base__, date):
dump_hook = cls.dump_from_date
origin = date
elif issubclass(__base__, time):
dump_hook = cls.dump_from_time
origin = time
else:
# TODO everything should use `get_origin_v2`
try:
args = get_args(type_ann)
except ValueError:
args = Any,
if dump_hook is None:
# TODO END
for t in hooks:
if (not leaf_handling_as_subclass) and (t in LEAF_TYPES):
continue
if issubclass(origin, t):
dump_hook = hooks[t]
break
tp.origin = origin
tp.args = args
tp.name = name
if dump_hook is not None:
result = dump_hook(tp, extras)
return result
# No matching hook is found for the type.
# TODO do we want to add a `Meta` field to not raise
# an error but perform a default action?
err = TypeError('Provided type is not currently supported.')
pe = ParseError(
err, origin, type_ann, 'dump',
resolution=f'Register a dump hook for {ParseError.name(origin)} '
f'(v1: `register_type` / `Meta.v1_type_to_dump_hook`).',
unsupported_type=origin
)
raise pe from None
def setup_default_dumper(cls=DumpMixin):
"""
Setup the default type hooks to use when converting
a `dataclass` instance to a `str` (json) or a
Python `dict` object.
Note: `cls` must be :class:`DumpMixIn` or a sub-class of it.
"""
# TODO maybe `dict.update` might be better?
# Technically a complex type, however check this
# first, since `StrEnum` and `IntEnum` are subclasses
# of `str` and `int`
cls.register_dump_hook(Enum, cls.dump_from_enum)
# Simple types
cls.register_dump_hook(str, cls.dump_from_str)
cls.register_dump_hook(float, cls.dump_from_float)
cls.register_dump_hook(bool, cls.dump_from_bool)
cls.register_dump_hook(int, cls.dump_from_int)
cls.register_dump_hook(bytes, cls.dump_from_bytes)
cls.register_dump_hook(bytearray, cls.dump_from_bytearray)
cls.register_dump_hook(NoneType, cls.dump_from_none)
# Complex types
cls.register_dump_hook(UUID, cls.dump_from_uuid)
cls.register_dump_hook(set, cls.dump_from_iterable)
cls.register_dump_hook(frozenset, cls.dump_from_iterable)
cls.register_dump_hook(deque, cls.dump_from_iterable)
cls.register_dump_hook(list, cls.dump_from_iterable)
cls.register_dump_hook(tuple, cls.dump_from_tuple)
# `typing` Generics
# cls.register_dump_hook(Literal, cls.dump_from_literal)
# noinspection PyTypeChecker
cls.register_dump_hook(defaultdict, cls.dump_from_defaultdict)
cls.register_dump_hook(dict, cls.dump_from_dict)
cls.register_dump_hook(Decimal, cls.dump_from_decimal)
cls.register_dump_hook(Path, cls.dump_from_path)
# Dates and times
cls.register_dump_hook(datetime, cls.dump_from_datetime)
cls.register_dump_hook(time, cls.dump_from_time)
cls.register_dump_hook(date, cls.dump_from_date)
cls.register_dump_hook(timedelta, cls.dump_from_timedelta)
def check_and_raise_missing_fields(
_locals, o, cls, fields: tuple[Field, ...]):
missing_fields = [f.name for f in fields
if f.init
and f'__{f.name}' not in _locals
and (f.default is MISSING
and f.default_factory is MISSING)]
missing_keys = [v1_dataclass_field_to_alias_for_dump(cls)[field]
for field in missing_fields]
raise MissingFields(
None, o, cls, fields, None, missing_fields,
missing_keys
) from None
def dump_func_for_dataclass(
cls: type,
extras: Extras | None = None,
dumper_cls=DumpMixin,
base_meta_cls: type = AbstractMeta,
) -> Union[Callable[[T], JSONObject], str]:
# TODO dynamically generate for multiple nested classes at once
# Tuple describing the fields of this dataclass.
cls_fields = dataclass_fields(cls)
cls_fields_list = list(cls_fields)
cls_field_names = dataclass_field_names(cls)
# Get the dumper for the class, or create a new one as needed.
cls_dumper = get_dumper(cls, base_cls=dumper_cls, v1=True)
cls_name = cls.__name__
fn_name = f'__{PACKAGE_NAME}_to_dict_{cls_name}__'
# Get the meta config for the class, or the default config otherwise.
meta = get_meta(cls, base_meta_cls)
if extras is None: # we are being run for the main dataclass
is_main_class = True
# If the `recursive` flag is enabled and a Meta config is provided,
# apply the Meta recursively to any nested classes.
#
# Else, just use the base `AbstractMeta`.
config: META = meta if meta.recursive else base_meta_cls
# Initialize the FuncBuilder
fn_gen = FunctionBuilder()
new_locals = {
'cls': cls,
'fields': cls_fields,
}
# noinspection PyTypeChecker
extras: Extras = {
'config': config,
'cls': cls,
'cls_name': cls_name,
'locals': new_locals,
'recursion_guard': {cls: fn_name},
'fn_gen': fn_gen,
}
_globals = {
'MISSING': MISSING,
'ParseError': ParseError,
'raise_missing_fields': check_and_raise_missing_fields,
're_raise': re_raise,
}
# we are being run for a nested dataclass
else:
is_main_class = False
# config for nested dataclasses
config = extras['config']
# Initialize the FuncBuilder
fn_gen = extras['fn_gen']
if config is not base_meta_cls:
# we want to apply the meta config from the main dataclass
# recursively.
meta = meta | config
meta.bind_to(cls, is_default=False)
new_locals = extras['locals']
new_locals['fields'] = cls_fields
# TODO need a way to auto-magically do this
extras['cls'] = cls
extras['cls_name'] = cls_name
key_case: KeyCase | None = cls_dumper.transform_dataclass_field
# TODO decide if different logic is needed for `AUTO` case
if key_case is KeyCase.AUTO:
key_case = None
# A cached mapping of each dataclass field to the resolved key name in a
# JSON or dictionary object; useful so we don't need to do a case
# transformation (via regex) each time.
field_to_alias = v1_dataclass_field_to_alias_for_dump(cls)
check_aliases = True if field_to_alias else False
field_to_path = DATACLASS_FIELD_TO_ALIAS_PATH_FOR_DUMP[cls]
has_paths = True if field_to_path else False
# A cached mapping of dataclass field name to its default value, either
# via a `default` or `default_factory` argument.
field_to_default = dataclass_field_to_default(cls)
has_defaults = True if field_to_default else False
# A cached mapping of dataclass field name to its SkipIf condition.
field_to_skip_if = dataclass_field_to_skip_if(cls)
skip_if_condition = get_skip_if_condition(
meta.skip_if, new_locals, '_skip_value')
skip_defaults_if_condition = get_skip_if_condition(
meta.skip_defaults_if, new_locals, '_skip_defaults_value')
skip_defaults = True if meta.skip_defaults else False
skip_if = True if field_to_skip_if or skip_if_condition else False
catch_all_name: 'str | None' = field_to_alias.pop(CATCH_ALL, None)
has_catch_all = catch_all_name is not None
if has_catch_all:
catch_all_name_stripped = catch_all_name.rstrip('?')
catch_all_idx = cls_field_names.index(catch_all_name_stripped)
# remove catch all field from list, so we don't iterate over it
# noinspection PyTypeChecker
del cls_fields_list[catch_all_idx]
else:
catch_all_name_stripped = None
cls_name = cls.__name__
with fn_gen.function(
fn_name, [
'o',
'dict_factory=dict',
"exclude:'list[str]|None'=None",
f'skip_defaults:bool={skip_defaults}',
], MISSING, new_locals):
if (_pre_to_dict := getattr(cls, '_pre_to_dict', None)) is not None:
new_locals['__pre_to_dict__'] = _pre_to_dict
fn_gen.add_line('o = __pre_to_dict__(o)')
# Need to create a separate dictionary to copy over the constructor
# args, as we don't want to mutate the original dictionary object.
if has_defaults:
fn_gen.add_line('add_defaults = not skip_defaults')
if has_paths:
new_locals['NestedDict'] = NestedDict
fn_gen.add_line('paths = NestedDict()')
required_field_assigns = []
default_assigns = []
path_assigns = []
name_to_skip_condition = {}
if cls_fields_list:
with fn_gen.try_():
for i, f in enumerate(cls_fields_list):
name = f.name
default = field_to_default.get(name, ExplicitNull)
has_default = default is not ExplicitNull
has_skip_if = False
# TODO: This is if we want to check if field is in `exclude`
# (not huge performance gain)
# skip_field = f'_skip_{i}'
default_value = f'_default_{i}'
# Check for Field Aliases + Paths
# NOTE: `key` is used later, so we need to capture it.
if check_aliases and (key := field_to_alias.get(name)) is not None:
# special case: skip serialization for field, e.g. `Alias(..., skip=True)`
if key is ExplicitNull:
continue
elif key_case is None:
key = name
else:
key = key_case(name)
# If field has an explicit `SkipIf` condition
if skip_if:
has_skip_if = True
if (_skip_condition := field_to_skip_if.get(name)) is not None:
_skip_if = get_skip_if_condition(
_skip_condition, new_locals, condition_i=i)
_final_skip_if = finalize_skip_if(_skip_condition, '{}', _skip_if)
name_to_skip_condition[name] = f'not ({_final_skip_if})'
# If Meta `skip_if` has a value
elif skip_if_condition:
_final_skip_if = finalize_skip_if(
meta.skip_if, '{}', skip_if_condition)
name_to_skip_condition[name] = f'not ({_final_skip_if})'
# # Else, proceed as normal
# else:
# field_assignments.append(f"if not {skip_field}:")
# A dataclass field which specifies a "JSON Path".
if has_paths and (
path := field_to_path.get(name)
) is not None: # AliasPath(...)
lvalue = f"paths{''.join(f'[{p!r}]' for p in path)}"
if has_default:
new_locals[default_value] = default
string = generate_field_code(cls_dumper, extras, f, i)
default_assigns.append((name, key, default_value, lvalue, string))
else:
var_name = 'v1' if has_skip_if else f'o.{name}'
string = generate_field_code(cls_dumper, extras, f, i, var_name)
path_assigns.append((name, f"{lvalue} = {string}"))
continue
if has_default:
new_locals[default_value] = default
string = generate_field_code(cls_dumper, extras, f, i)
lvalue = f'result[{key!r}]'
default_assigns.append((name, key, default_value, lvalue, string))
else:
# TODO confirm this is ok
# vars_for_fields.append(f'{name}={var}')
if has_skip_if:
string = generate_field_code(cls_dumper, extras, f, i, 'v1')
lvalue = f'result[{key!r}]'
default_assigns.append((name, ExplicitNull, ExplicitNull, lvalue, string))
else:
string = generate_field_code(cls_dumper, extras, f, i, f'o.{name}')
required_field_assigns.append((name, key, string))
# Add assignments for `AliasPath(...)`
for (name, line) in path_assigns:
if (condition := name_to_skip_condition.get(name)) is not None:
fn_gen.add_line(f'v1 = o.{name}')
with fn_gen.if_(condition.format('v1')):
fn_gen.add_line(line)
else:
fn_gen.add_line(line)
# Add required dataclass field assignments
fn_gen.add_line('result = {')
for (_, key, string) in required_field_assigns:
fn_gen.add_line(f' {key!r}: {string},')
fn_gen.add_line('}')
# Add default (optional) dataclass field assignments
for (name, key, default_name, lvalue, rvalue) in default_assigns:
var_name = 'v1'
if rvalue == var_name: # and default_name is not ExplicitNull:
var_name = rvalue = f'o.{name}'
else:
fn_gen.add_line(f'{var_name} = o.{name}')
line = f'{lvalue} = {rvalue}'
def_condition = f'add_defaults or {var_name} != {default_name}'
if skip_defaults_if_condition:
_final_skip_if = finalize_skip_if(
meta.skip_defaults_if, var_name, skip_defaults_if_condition)
# TODO missing skip individual condition!!
with fn_gen.if_(
f'(add_defaults or {var_name} != {default_name}) '
f'and not ({_final_skip_if})'):
fn_gen.add_line(line)
elif (condition := name_to_skip_condition.get(name)) is not None:
condition = condition.format(var_name)
if default_name is ExplicitNull: # Required field with skip condition
with fn_gen.if_(condition):
fn_gen.add_line(line)
else:
with fn_gen.if_(
f'(add_defaults or {var_name} != {default_name}) '
f'and {condition}'):
fn_gen.add_line(line)
else:
with fn_gen.if_(def_condition):
fn_gen.add_line(line)
# create a broad `except Exception` block, as we will be
# re-raising all exception(s) as a custom `ParseError`.
with fn_gen.except_(Exception, 'e', ParseError):
fn_gen.add_line("re_raise(e, cls, o, fields, '<UNK>', locals().get('v1'))")
else:
fn_gen.add_line('result = {}')
if has_catch_all:
# noinspection PyUnresolvedReferences,PyProtectedMember
from dataclasses import _asdict_inner as __dataclasses_asdict_inner__
if (default := field_to_default.get(catch_all_name_stripped, ExplicitNull)) is not ExplicitNull:
default_value = f'_default_{len(cls_fields_list)}'
new_locals[default_value] = default
condition = f"(v1 := o.{catch_all_name_stripped}) != {default_value}"
else:
condition = f'v1 := o.{catch_all_name_stripped}'
with fn_gen.if_(condition):
with fn_gen.for_(f"k, v in v1.items()"):
fn_gen.globals['__asdict_inner__'] = __dataclasses_asdict_inner__
fn_gen.add_line('result[k] = __asdict_inner__(v,dict_factory)')
with fn_gen.if_('exclude'):
with fn_gen.for_('k in exclude'):
fn_gen.add_line('result.pop(k, None)')
if has_paths:
fn_gen.add_line('result.update(paths)')
# Now pass the arguments to the dict_factory method, and return
# the new dict_factory instance.
fn_gen.add_line(f'return result if dict_factory is dict else dict_factory(result)')
# Save the dump function for the main dataclass, so we don't need to run
# this logic each time.
if is_main_class:
# noinspection PyUnboundLocalVariable
functions = fn_gen.create_functions(_globals)
cls_todict = functions[fn_name]
# Check if the class has a `to_dict`, and it's
# a class method bound to `todict`.
if getattr(cls, 'to_dict', None) is asdict:
LOG.debug("setattr(%s, 'to_dict', %s)", cls_name, fn_name)
# Marker reserved for future detection/debugging of specialized dumpers.
# setattr(cls_todict, _SPECIALIZED_TO_DICT, True)
# safe to specialize only when user didn't define it on cls
_set_new_attribute(cls, 'to_dict', cls_todict, force=True)
_set_new_attribute(
cls, f'__{PACKAGE_NAME}_to_dict__', cls_todict)
LOG.debug(
"setattr(%s, '__%s_to_dict__', %s)",
cls_name, PACKAGE_NAME, fn_name)
# TODO in `v1`, we will use class attribute (set above) instead.
CLASS_TO_DUMP_FUNC[cls] = cls_todict
return cls_todict
def generate_field_code(cls_dumper: DumpMixin,
extras: Extras,
field: Field,
field_i: int,
var_name=None) -> 'str | TypeInfo':
cls = extras['cls']
field_type = field.type = eval_forward_ref_if_needed(field.type, cls)
try:
return cls_dumper.dump_dispatcher_for_annotation(
TypeInfo(field_type, field_i=field_i, val_name=var_name), extras
)
except ParseError as pe:
pe.class_name = cls
pe.field_name = field.name
raise pe from None
def re_raise(e, cls, o, fields, field, value):
# If the object `o` is None, then raise an error with
# the relevant info included.
if o is None:
raise MissingData(cls) from None
add_fields = True
if type(e) is not ParseError:
if isinstance(e, JSONWizardError):
add_fields = False
else:
tp = getattr(next((f for f in fields if f.name == field), None), 'type', Any)
e = ParseError(e, value, tp, 'dump')
# If field name is missing or not known, make a "best effort"
# to resolve it.
if field == '<UNK>' and cls and fields:
if len((names := [f.name for f in fields
if getattr(o, f.name, MISSING) == e.obj])) == 1:
field = e.field_name = names[0]
# We run into a parsing error while dumping the field value;
# Add additional info on the Exception object before re-raising it.
#
# First confirm these values are not already set by an
# inner dataclass. If so, it likely makes it easier to
# debug the cause. Note that this should already be
# handled by the `setter` methods.
if add_fields:
e.class_name, e.fields, e.field_name, e.json_object = cls, fields, field, repr(o)
else:
e.class_name, e.field_name, e.json_object = cls, field, repr(o)
raise e from None