Source code for dataclass_wizard.v1.dumpers

# TODO cleanup imports
from __future__ import annotations

import collections.abc as abc
from base64 import b64encode
from collections import defaultdict, deque
from dataclasses import is_dataclass, MISSING, Field
from datetime import datetime, time, date, timedelta
from decimal import Decimal
from enum import Enum
from pathlib import Path
# noinspection PyUnresolvedReferences,PyProtectedMember
from typing import (
    cast, Any, Type, Dict, List, Tuple, Iterable, Sequence, Union,
    NamedTupleMeta, SupportsFloat, AnyStr, Text, Callable, Optional,
    Literal, Annotated, NamedTuple,
)
from uuid import UUID

from .decorators import (setup_recursive_safe_function,
                         setup_recursive_safe_function_for_generic)
from .enums import KeyCase, DateTimeTo
from .models import (Extras, TypeInfo, PatternBase,
                     LEAF_TYPES, LEAF_TYPES_NO_BYTES, UTC, ZERO)
from .type_conv import datetime_to_timestamp
from ..abstractions import AbstractDumperGenerator
from ..bases import AbstractMeta, BaseDumpHook, META
from ..class_helper import (
    CLASS_TO_DUMP_FUNC,
    DATACLASS_FIELD_TO_ALIAS_PATH_FOR_DUMP,
    create_meta,
    get_meta,
    is_subclass_safe,
    v1_dataclass_field_to_alias_for_dump,
    dataclass_fields,
    dataclass_field_to_default,
    dataclass_field_names,
    dataclass_field_to_skip_if,
)
from ..constants import CATCH_ALL, TAG, PACKAGE_NAME
from ..errors import (ParseError, MissingFields, MissingData, JSONWizardError)
from ..loader_selection import get_dumper, asdict
from ..log import LOG
from ..models import get_skip_if_condition, finalize_skip_if
from ..type_def import (
    NoneType, JSONObject,
    PyLiteralString,
    T, ExplicitNull
)
# noinspection PyProtectedMember
from ..utils.dataclass_compat import _set_new_attribute
from ..utils.dict_helper import NestedDict
from ..utils.function_builder import FunctionBuilder
from ..utils.typing_compat import (
    is_typed_dict, get_args, is_annotated,
    eval_forward_ref_if_needed, get_origin_v2, is_union,
    get_keys_for_typed_dict, is_typed_dict_type_qualifier,
)


def _type_returns_value_unchanged(arg, leaf_handling_as_subclass, origin=None):
    # scalar type:
    # (str, int, float, bool, complex, type, Literal, Any)
    if origin is None:
        origin = get_origin_v2(arg)
    return (origin is Any
            or origin is Literal
            or origin in LEAF_TYPES_NO_BYTES
            or (leaf_handling_as_subclass
                and is_subclass_safe(origin, LEAF_TYPES_NO_BYTES)))


def _all_return_value_unchanged(args, leaf_handling_as_subclass):
    for arg in args:
        if not _type_returns_value_unchanged(arg, leaf_handling_as_subclass):
            return False
    return True


[docs] class DumpMixin(AbstractDumperGenerator, BaseDumpHook): """ This Mixin class derives its name from the eponymous `json.dumps` function. Essentially it contains helper methods to convert a `dataclass` to JSON strings (or a Python dictionary object). Refer to the :class:`AbstractDumper` class for documentation on any of the implemented methods. """ __slots__ = () def __init_subclass__(cls, **kwargs): super().__init_subclass__() setup_default_dumper(cls) transform_dataclass_field = None
[docs] @staticmethod def dump_fallback(tp: TypeInfo, _extras: Extras): # identity: o return tp.v()
dump_from_str = dump_fallback dump_from_int = dump_fallback dump_from_float = dump_fallback dump_from_bool = dump_fallback dump_from_literal = dump_fallback
[docs] @staticmethod def dump_from_bytes(tp: TypeInfo, extras: Extras): tp.ensure_in_locals(extras, b64encode) return f"b64encode({tp.v()}).decode('ascii')"
[docs] @classmethod def dump_from_bytearray(cls, tp: TypeInfo, extras: Extras): tp.ensure_in_locals(extras, b64encode) return f"b64encode(bytes({tp.v()})).decode('ascii')"
[docs] @staticmethod def dump_from_none(tp: TypeInfo, extras: Extras): return 'None'
[docs] @staticmethod def dump_from_enum(tp: TypeInfo, extras: Extras): # alias: o.value return f'{tp.v()}.value'
[docs] @staticmethod def dump_from_uuid(tp: TypeInfo, extras: Extras): # alias: o.hex return f'{tp.v()}.hex'
[docs] @classmethod def dump_from_iterable(cls, tp: TypeInfo, extras: Extras): v, v_next, i_next = tp.v_and_next() gorg = tp.origin if v_next[0] == 'k': # raise same error as `json` (not serializable) raise TypeError('keys must be str, int, float, bool or None, ' f'not {gorg.__qualname__}') from None # noinspection PyBroadException try: elem_type = tp.args[0] except: elem_type = Any string = cls.dump_dispatcher_for_annotation( tp.replace(origin=elem_type, i=i_next, index=None, val_name=None), extras) if string == v_next: return f'{v}.copy()' if issubclass(gorg, list) else f'list({v})' return f'[{string} for {v_next} in {v}]'
[docs] @classmethod def dump_from_tuple(cls, tp: TypeInfo, extras: Extras): args = tp.args # Determine the code string for the annotation # Check if the `Tuple` appears in the variadic form # i.e. Tuple[str, ...] if args: is_variadic = args[-1] is ... else: # Annotated without args, as simply `tuple` return f'list({tp.v()})' if is_variadic: # Logic that handles the variadic form of :class:`Tuple`'s, # i.e. ``Tuple[str, ...]`` # # Per `PEP 484`_, only **one** required type is allowed before the # ``Ellipsis``. That is, ``Tuple[int, ...]`` is valid whereas # ``Tuple[int, str, ...]`` would be invalid. `See here`_ for more info. # # .. _PEP 484: https://www.python.org/dev/peps/pep-0484/ # .. _See here: https://github.com/python/typing/issues/180 v, v_next, i_next = tp.v_and_next() # Given `Tuple[T, ...]`, we only need the generated string for `T` string = cls.dump_dispatcher_for_annotation( tp.replace(origin=args[0], i=i_next, index=None, val_name=None), extras) if string == v_next: return f'list({v})' result = f'[{string} for {v_next} in {v}]' else: string = ', '.join([ str(cls.dump_dispatcher_for_annotation( tp.replace(origin=arg, index=k), extras)) for k, arg in enumerate(args)]) result = f'[{string}]' return result
[docs] @classmethod def dump_from_named_tuple(cls, tp: TypeInfo, extras: Extras): nt_tp = cast(NamedTuple, tp.origin) fields = nt_tp._fields # field names in order ann = nt_tp.__annotations__ field_to_value = { name: str( cls.dump_dispatcher_for_annotation( tp.replace(origin=ann.get(name, Any), index=i), extras, ) ) for i, name in enumerate(fields) } if extras['config'].v1_namedtuple_as_dict: params = [f'{field!r}: {value}' for field, value in field_to_value.items()] return f'{{{", ".join(params)}}}' params = ', '.join(field_to_value.values()) return f'[{params}]'
[docs] @classmethod def dump_from_named_tuple_untyped(cls, tp: TypeInfo, extras: Extras): as_dict = extras['config'].v1_namedtuple_as_dict return f'{tp.v()}._asdict()' if as_dict else f'list({tp.v()})'
@classmethod def _build_dict_comp(cls, tp, v, i_next, k_next, v_next, kt, vt, extras): tp_k_next = tp.replace(origin=kt, i=i_next, prefix='k', index=None, val_name=None) string_k = cls.dump_dispatcher_for_annotation(tp_k_next, extras) tp_v_next = tp.replace(origin=vt, i=i_next, prefix='v', index=None, val_name=None) string_v = cls.dump_dispatcher_for_annotation(tp_v_next, extras) if k_next == string_k and v_next == string_v: # Easy path; shallow copy return f'{v}.copy()' return f'{{{string_k}: {string_v} for {k_next}, {v_next} in {v}.items()}}'
[docs] @classmethod def dump_from_dict(cls, tp: TypeInfo, extras: Extras): try: kt, vt = tp.args except ValueError: # Annotated without two arguments, # e.g. like `dict[str]` or `dict` return f'{tp.v()}.copy()' v, k_next, v_next, i_next = tp.v_and_next_k_v() return cls._build_dict_comp( tp, v, i_next, k_next, v_next, kt, vt, extras)
dump_from_defaultdict = dump_from_dict
[docs] @classmethod def dump_from_typed_dict(cls, tp: TypeInfo, extras: Extras): req_keys, opt_keys = get_keys_for_typed_dict(tp.origin) if opt_keys: return cls._dump_from_typed_dict_fn(tp, extras) ann = tp.origin.__annotations__ dict_body = ', '.join( f"""{name!r}: { cls.dump_dispatcher_for_annotation( tp.replace(origin=ann.get(name, Any), index=repr(name)), extras, ) }""" for name in req_keys ) return f'{{{dict_body}}}'
@classmethod @setup_recursive_safe_function(prefix='dump') def _dump_from_typed_dict_fn(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] req_keys, opt_keys = get_keys_for_typed_dict(tp.origin) td_annotations = tp.origin.__annotations__ v = tp.v_for_def() result_list = [] # Set required keys for the `TypedDict` for k in req_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.dump_dispatcher_for_annotation( tp.replace(origin=field_tp, index=field_name, val_name=None), extras) result_list.append(f'{field_name}: {string}') fn_gen.add_lines('result = {', *(f' {r},' for r in result_list), '}') # Set optional keys for the `TypedDict` (if they exist) next_i = tp.i + 1 new_tp = tp.replace(i=next_i, index=None, val_name=None) v_next = new_tp.v() for k in opt_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.dump_dispatcher_for_annotation( new_tp.replace(origin=field_tp), extras) with fn_gen.if_(f'({v_next} := {v}.get({field_name}, MISSING)) is not MISSING'): fn_gen.add_line(f'result[{field_name}] = {string}') fn_gen.add_line('return result')
[docs] @classmethod @setup_recursive_safe_function_for_generic(prefix='dump', per_class_cache=True) def dump_from_union(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] config = extras['config'] actual_cls = extras['cls'] _locals = extras['locals'] args = tp.args field_i = tp.field_i i = tp.i v = tp.v_for_def() tag_key = config.tag_key or TAG auto_assign_tags = config.auto_assign_tags leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass' in_optional = NoneType in args _locals['fields'] = args _locals['tag_key'] = tag_key leaf_types = [] try_parse_lines = [] dataclass_and_line = [] has_dataclass = False for possible_tp in args: possible_tp = eval_forward_ref_if_needed(possible_tp, actual_cls) tp_new = TypeInfo(possible_tp, field_i=field_i, i=i) tp_new.in_optional = in_optional if _type_returns_value_unchanged( possible_tp, leaf_handling_as_subclass): leaf_types.append(possible_tp) # if num_leaf_types_no_bytes > 0: # fn_gen.add_line(f'return {v}') elif is_dataclass(possible_tp): # we see a dataclass in `Union` declaration has_dataclass = True string = cls.dump_dispatcher_for_annotation(tp_new, extras) meta = get_meta(possible_tp) cls_name = possible_tp.__name__ tag = meta.tag assign_tags_to_cls = auto_assign_tags or meta.auto_assign_tags if assign_tags_to_cls and not tag: tag = cls_name # We don't want to mutate the base Meta class here if meta is AbstractMeta: create_meta(possible_tp, cls_name, tag=tag) else: meta.tag = cls_name if tag: dataclass_and_line.append( (possible_tp, cls_name, tag, f'result = {string}; result[tag_key] = {tag!r}; return result')) else: dataclass_and_line.append( (possible_tp, cls_name, tag, f'return {string}')) else: try_parse_lines.append( cls.dump_dispatcher_for_annotation(tp_new, extras)) fn_gen.add_line(f't = {v}.__class__') if leaf_types: # a good heuristic: use tuples for smaller unions, else frozenset container = tuple if len(leaf_types) <= 6 else frozenset _locals['leaf_types'] = container(leaf_types) leaf_type_names = ', '.join(getattr(t, '__name__', None) or str(t) for t in leaf_types) with fn_gen.if_('t in leaf_types', comment=f'{{{leaf_type_names}}}'): fn_gen.add_line(f'return {v}') if has_dataclass: for field_i, (dataclass, name, tag, line) in enumerate(dataclass_and_line, start=1): cls_name = TypeInfo(dataclass).type_name(extras) with fn_gen.if_(f't is {cls_name}', comment=f'{tag!r}' if tag else ''): fn_gen.add_line(line) for string in try_parse_lines: with fn_gen.try_(): fn_gen.add_line(f'return {string}') with fn_gen.except_(Exception): fn_gen.add_line('pass') # Invalid type for Union fn_gen.add_line("raise ParseError(" "TypeError('Object was not in any of Union types')," f"{v},fields,'dump'," "tag_key=tag_key" ")")
[docs] @staticmethod def dump_from_decimal(tp: TypeInfo, extras: Extras): return f'str({tp.v()})'
[docs] @staticmethod def dump_from_path(tp: TypeInfo, extras: Extras): return f'str({tp.v()})'
[docs] @classmethod def dump_from_date(cls, tp: TypeInfo, extras: Extras): o = tp.v() if extras['config'].v1_dump_date_time_as is DateTimeTo.TIMESTAMP: tp.ensure_in_locals(extras, datetime, UTC=UTC) return f'int(datetime((v0 := {o}).year, v0.month, v0.day, tzinfo=UTC).timestamp())' return f'{o}.isoformat()'
[docs] @classmethod def dump_from_datetime(cls, tp: TypeInfo, extras: Extras): o = tp.v() config = extras['config'] if config.v1_dump_date_time_as is DateTimeTo.TIMESTAMP: naive_tz = config.v1_assume_naive_datetime_tz if naive_tz is None: def raise_naive(): raise ValueError('Naive datetime has no timezone; ' 'set v1_assume_naive_datetime_tz to ' 'define how it should be interpreted.') tp.ensure_in_locals(extras, raise_naive, ZERO=ZERO) return f'raise_naive() if (v0 := {o}).tzinfo is None else int(v0.timestamp()) if v0.utcoffset() == ZERO else int(v0.astimezone(UTC).timestamp())' else: tp.ensure_in_locals(extras, datetime_to_timestamp, assume_naive_tz=naive_tz) return f'datetime_to_timestamp({o}, assume_naive_tz)' # Safe: source is datetime.isoformat(); '+00:00' only appears for UTC return f"{o}.isoformat().replace('+00:00', 'Z', 1)"
[docs] @staticmethod def dump_from_time(tp: TypeInfo, _extras: Extras): o = tp.v() # Safe: source is time.isoformat(); '+00:00' only appears for UTC return f"{o}.isoformat().replace('+00:00', 'Z', 1)"
[docs] @staticmethod def dump_from_timedelta(tp: TypeInfo, extras: Extras): return f'str({tp.v()})'
[docs] @staticmethod @setup_recursive_safe_function( prefix='dump', fn_name=f'__{PACKAGE_NAME}_to_dict_{{cls_name}}__') def dump_from_dataclass(tp: TypeInfo, extras: Extras): dump_func_for_dataclass(tp.origin, extras)
[docs] @classmethod def dump_dispatcher_for_annotation(cls, tp, extras): hooks = cls.__DUMP_HOOKS__ config = extras['config'] type_hooks = config.v1_type_to_dump_hook leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass' # type_ann = tp.origin type_ann = eval_forward_ref_if_needed(tp.origin, extras['cls']) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) args = None if is_annotated(type_ann): # Given `Annotated[T, ...]`, we only need `T` type_ann, *field_extras = get_args(type_ann) type_ann = eval_forward_ref_if_needed(type_ann, extras['cls']) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # Check for Custom Patterns for date / time / datetime for extra in field_extras: if isinstance(extra, PatternBase): extras['pattern'] = extra elif is_typed_dict_type_qualifier(origin): # Given `Required[T]` or `NotRequired[T]`, we only need `T` type_ann = get_args(type_ann)[0] origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # TypeAliasType: Type aliases are created through # the `type` statement if (value := getattr(origin, '__value__', None)) is not None: type_ann = value origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # `LiteralString` enforces stricter rules at # type-checking but behaves like `str` at runtime. # TODO maybe add `load_to_literal_string` if origin is PyLiteralString: dump_hook = cls.dump_from_str origin = str name = 'str' # -> Atomic, immutable types which don't require # any iterative / recursive handling. elif origin in LEAF_TYPES or ( leaf_handling_as_subclass and is_subclass_safe(origin, LEAF_TYPES)): dump_hook = hooks.get(origin) elif (type_hooks is not None and (hook_info := type_hooks.get(origin)) is not None): mode, dump_hook = hook_info if mode == 'runtime': fn_name, = tp.ensure_in_locals(extras, dump_hook) return f'{fn_name}({tp.v()})' try: args = get_args(type_ann) except ValueError: args = Any, elif (dump_hook := hooks.get(origin)) is not None: try: args = get_args(type_ann) except ValueError: args = Any, # -> Union[x] elif is_union(origin): args = get_args(type_ann) # all args in `Union[...]` are simple types if _all_return_value_unchanged(args, leaf_handling_as_subclass): return tp.v() dump_hook = cls.dump_from_union # Special case for Optional[x], which is actually Union[x, None] if len(args) == 2 and NoneType in args: origin = args[0] if tp.val_name: val_name = 'v0' o = f'({val_name} := {tp.v()})' else: val_name = None o = tp.v() new_tp = tp.replace(origin=origin, args=None, name=None, val_name=val_name) new_tp.in_optional = True string = cls.dump_dispatcher_for_annotation(new_tp, extras) return f'None if {o} is None else {string}' # -> Literal[X, Y, ...] elif origin is Literal: dump_hook = cls.dump_from_literal args = get_args(type_ann) # https://stackoverflow.com/questions/76520264/dataclasswizard-after-upgrading-to-python3-11-is-not-working-as-expected elif origin is Any: dump_hook = cls.dump_fallback elif is_subclass_safe(origin, tuple) and hasattr(origin, '_fields'): if getattr(origin, '__annotations__', None): # Annotated as a `typing.NamedTuple` subtype dump_hook = cls.dump_from_named_tuple else: # Annotated as a `collections.namedtuple` subtype dump_hook = cls.dump_from_named_tuple_untyped elif is_typed_dict(origin): dump_hook = cls.dump_from_typed_dict elif is_dataclass(origin): # return a dynamically generated `asdict` # for the `cls` (base_type) dump_hook = cls.dump_from_dataclass elif is_subclass_safe(origin, Enum): dump_hook = cls.dump_from_enum elif origin in (abc.Sequence, abc.MutableSequence, abc.Collection): if origin is abc.Sequence: dump_hook = cls.dump_from_tuple # desired (non-generic) origin type name = 'tuple' origin = tuple # Re-map type arguments to variadic tuple format, # e.g. `Sequence[int]` -> `tuple[int, ...]` try: args = (get_args(type_ann)[0], ...) except (IndexError, ValueError): args = Any, else: dump_hook = cls.dump_from_iterable # desired (non-generic) origin type name = 'list' origin = list # Get type arguments, e.g. `Sequence[int]` -> `int` try: args = get_args(type_ann) except ValueError: args = Any, elif isinstance(origin, PatternBase): __base__ = origin.base if issubclass(__base__, datetime): dump_hook = cls.dump_from_datetime origin = datetime elif issubclass(__base__, date): dump_hook = cls.dump_from_date origin = date elif issubclass(__base__, time): dump_hook = cls.dump_from_time origin = time else: # TODO everything should use `get_origin_v2` try: args = get_args(type_ann) except ValueError: args = Any, if dump_hook is None: # TODO END for t in hooks: if (not leaf_handling_as_subclass) and (t in LEAF_TYPES): continue if issubclass(origin, t): dump_hook = hooks[t] break tp.origin = origin tp.args = args tp.name = name if dump_hook is not None: result = dump_hook(tp, extras) return result # No matching hook is found for the type. # TODO do we want to add a `Meta` field to not raise # an error but perform a default action? err = TypeError('Provided type is not currently supported.') pe = ParseError( err, origin, type_ann, 'dump', resolution=f'Register a dump hook for {ParseError.name(origin)} ' f'(v1: `register_type` / `Meta.v1_type_to_dump_hook`).', unsupported_type=origin ) raise pe from None
def setup_default_dumper(cls=DumpMixin): """ Setup the default type hooks to use when converting a `dataclass` instance to a `str` (json) or a Python `dict` object. Note: `cls` must be :class:`DumpMixIn` or a sub-class of it. """ # TODO maybe `dict.update` might be better? # Technically a complex type, however check this # first, since `StrEnum` and `IntEnum` are subclasses # of `str` and `int` cls.register_dump_hook(Enum, cls.dump_from_enum) # Simple types cls.register_dump_hook(str, cls.dump_from_str) cls.register_dump_hook(float, cls.dump_from_float) cls.register_dump_hook(bool, cls.dump_from_bool) cls.register_dump_hook(int, cls.dump_from_int) cls.register_dump_hook(bytes, cls.dump_from_bytes) cls.register_dump_hook(bytearray, cls.dump_from_bytearray) cls.register_dump_hook(NoneType, cls.dump_from_none) # Complex types cls.register_dump_hook(UUID, cls.dump_from_uuid) cls.register_dump_hook(set, cls.dump_from_iterable) cls.register_dump_hook(frozenset, cls.dump_from_iterable) cls.register_dump_hook(deque, cls.dump_from_iterable) cls.register_dump_hook(list, cls.dump_from_iterable) cls.register_dump_hook(tuple, cls.dump_from_tuple) # `typing` Generics # cls.register_dump_hook(Literal, cls.dump_from_literal) # noinspection PyTypeChecker cls.register_dump_hook(defaultdict, cls.dump_from_defaultdict) cls.register_dump_hook(dict, cls.dump_from_dict) cls.register_dump_hook(Decimal, cls.dump_from_decimal) cls.register_dump_hook(Path, cls.dump_from_path) # Dates and times cls.register_dump_hook(datetime, cls.dump_from_datetime) cls.register_dump_hook(time, cls.dump_from_time) cls.register_dump_hook(date, cls.dump_from_date) cls.register_dump_hook(timedelta, cls.dump_from_timedelta) def check_and_raise_missing_fields( _locals, o, cls, fields: tuple[Field, ...]): missing_fields = [f.name for f in fields if f.init and f'__{f.name}' not in _locals and (f.default is MISSING and f.default_factory is MISSING)] missing_keys = [v1_dataclass_field_to_alias_for_dump(cls)[field] for field in missing_fields] raise MissingFields( None, o, cls, fields, None, missing_fields, missing_keys ) from None def dump_func_for_dataclass( cls: type, extras: Extras | None = None, dumper_cls=DumpMixin, base_meta_cls: type = AbstractMeta, ) -> Union[Callable[[T], JSONObject], str]: # TODO dynamically generate for multiple nested classes at once # Tuple describing the fields of this dataclass. cls_fields = dataclass_fields(cls) cls_fields_list = list(cls_fields) cls_field_names = dataclass_field_names(cls) # Get the dumper for the class, or create a new one as needed. cls_dumper = get_dumper(cls, base_cls=dumper_cls, v1=True) cls_name = cls.__name__ fn_name = f'__{PACKAGE_NAME}_to_dict_{cls_name}__' # Get the meta config for the class, or the default config otherwise. meta = get_meta(cls, base_meta_cls) if extras is None: # we are being run for the main dataclass is_main_class = True # If the `recursive` flag is enabled and a Meta config is provided, # apply the Meta recursively to any nested classes. # # Else, just use the base `AbstractMeta`. config: META = meta if meta.recursive else base_meta_cls # Initialize the FuncBuilder fn_gen = FunctionBuilder() new_locals = { 'cls': cls, 'fields': cls_fields, } # noinspection PyTypeChecker extras: Extras = { 'config': config, 'cls': cls, 'cls_name': cls_name, 'locals': new_locals, 'recursion_guard': {cls: fn_name}, 'fn_gen': fn_gen, } _globals = { 'MISSING': MISSING, 'ParseError': ParseError, 'raise_missing_fields': check_and_raise_missing_fields, 're_raise': re_raise, } # we are being run for a nested dataclass else: is_main_class = False # config for nested dataclasses config = extras['config'] # Initialize the FuncBuilder fn_gen = extras['fn_gen'] if config is not base_meta_cls: # we want to apply the meta config from the main dataclass # recursively. meta = meta | config meta.bind_to(cls, is_default=False) new_locals = extras['locals'] new_locals['fields'] = cls_fields # TODO need a way to auto-magically do this extras['cls'] = cls extras['cls_name'] = cls_name key_case: KeyCase | None = cls_dumper.transform_dataclass_field # TODO decide if different logic is needed for `AUTO` case if key_case is KeyCase.AUTO: key_case = None # A cached mapping of each dataclass field to the resolved key name in a # JSON or dictionary object; useful so we don't need to do a case # transformation (via regex) each time. field_to_alias = v1_dataclass_field_to_alias_for_dump(cls) check_aliases = True if field_to_alias else False field_to_path = DATACLASS_FIELD_TO_ALIAS_PATH_FOR_DUMP[cls] has_paths = True if field_to_path else False # A cached mapping of dataclass field name to its default value, either # via a `default` or `default_factory` argument. field_to_default = dataclass_field_to_default(cls) has_defaults = True if field_to_default else False # A cached mapping of dataclass field name to its SkipIf condition. field_to_skip_if = dataclass_field_to_skip_if(cls) skip_if_condition = get_skip_if_condition( meta.skip_if, new_locals, '_skip_value') skip_defaults_if_condition = get_skip_if_condition( meta.skip_defaults_if, new_locals, '_skip_defaults_value') skip_defaults = True if meta.skip_defaults else False skip_if = True if field_to_skip_if or skip_if_condition else False catch_all_name: 'str | None' = field_to_alias.pop(CATCH_ALL, None) has_catch_all = catch_all_name is not None if has_catch_all: catch_all_name_stripped = catch_all_name.rstrip('?') catch_all_idx = cls_field_names.index(catch_all_name_stripped) # remove catch all field from list, so we don't iterate over it # noinspection PyTypeChecker del cls_fields_list[catch_all_idx] else: catch_all_name_stripped = None cls_name = cls.__name__ with fn_gen.function( fn_name, [ 'o', 'dict_factory=dict', "exclude:'list[str]|None'=None", f'skip_defaults:bool={skip_defaults}', ], MISSING, new_locals): if (_pre_to_dict := getattr(cls, '_pre_to_dict', None)) is not None: new_locals['__pre_to_dict__'] = _pre_to_dict fn_gen.add_line('o = __pre_to_dict__(o)') # Need to create a separate dictionary to copy over the constructor # args, as we don't want to mutate the original dictionary object. if has_defaults: fn_gen.add_line('add_defaults = not skip_defaults') if has_paths: new_locals['NestedDict'] = NestedDict fn_gen.add_line('paths = NestedDict()') required_field_assigns = [] default_assigns = [] path_assigns = [] name_to_skip_condition = {} if cls_fields_list: with fn_gen.try_(): for i, f in enumerate(cls_fields_list): name = f.name default = field_to_default.get(name, ExplicitNull) has_default = default is not ExplicitNull has_skip_if = False # TODO: This is if we want to check if field is in `exclude` # (not huge performance gain) # skip_field = f'_skip_{i}' default_value = f'_default_{i}' # Check for Field Aliases + Paths # NOTE: `key` is used later, so we need to capture it. if check_aliases and (key := field_to_alias.get(name)) is not None: # special case: skip serialization for field, e.g. `Alias(..., skip=True)` if key is ExplicitNull: continue elif key_case is None: key = name else: key = key_case(name) # If field has an explicit `SkipIf` condition if skip_if: has_skip_if = True if (_skip_condition := field_to_skip_if.get(name)) is not None: _skip_if = get_skip_if_condition( _skip_condition, new_locals, condition_i=i) _final_skip_if = finalize_skip_if(_skip_condition, '{}', _skip_if) name_to_skip_condition[name] = f'not ({_final_skip_if})' # If Meta `skip_if` has a value elif skip_if_condition: _final_skip_if = finalize_skip_if( meta.skip_if, '{}', skip_if_condition) name_to_skip_condition[name] = f'not ({_final_skip_if})' # # Else, proceed as normal # else: # field_assignments.append(f"if not {skip_field}:") # A dataclass field which specifies a "JSON Path". if has_paths and ( path := field_to_path.get(name) ) is not None: # AliasPath(...) lvalue = f"paths{''.join(f'[{p!r}]' for p in path)}" if has_default: new_locals[default_value] = default string = generate_field_code(cls_dumper, extras, f, i) default_assigns.append((name, key, default_value, lvalue, string)) else: var_name = 'v1' if has_skip_if else f'o.{name}' string = generate_field_code(cls_dumper, extras, f, i, var_name) path_assigns.append((name, f"{lvalue} = {string}")) continue if has_default: new_locals[default_value] = default string = generate_field_code(cls_dumper, extras, f, i) lvalue = f'result[{key!r}]' default_assigns.append((name, key, default_value, lvalue, string)) else: # TODO confirm this is ok # vars_for_fields.append(f'{name}={var}') if has_skip_if: string = generate_field_code(cls_dumper, extras, f, i, 'v1') lvalue = f'result[{key!r}]' default_assigns.append((name, ExplicitNull, ExplicitNull, lvalue, string)) else: string = generate_field_code(cls_dumper, extras, f, i, f'o.{name}') required_field_assigns.append((name, key, string)) # Add assignments for `AliasPath(...)` for (name, line) in path_assigns: if (condition := name_to_skip_condition.get(name)) is not None: fn_gen.add_line(f'v1 = o.{name}') with fn_gen.if_(condition.format('v1')): fn_gen.add_line(line) else: fn_gen.add_line(line) # Add required dataclass field assignments fn_gen.add_line('result = {') for (_, key, string) in required_field_assigns: fn_gen.add_line(f' {key!r}: {string},') fn_gen.add_line('}') # Add default (optional) dataclass field assignments for (name, key, default_name, lvalue, rvalue) in default_assigns: var_name = 'v1' if rvalue == var_name: # and default_name is not ExplicitNull: var_name = rvalue = f'o.{name}' else: fn_gen.add_line(f'{var_name} = o.{name}') line = f'{lvalue} = {rvalue}' def_condition = f'add_defaults or {var_name} != {default_name}' if skip_defaults_if_condition: _final_skip_if = finalize_skip_if( meta.skip_defaults_if, var_name, skip_defaults_if_condition) # TODO missing skip individual condition!! with fn_gen.if_( f'(add_defaults or {var_name} != {default_name}) ' f'and not ({_final_skip_if})'): fn_gen.add_line(line) elif (condition := name_to_skip_condition.get(name)) is not None: condition = condition.format(var_name) if default_name is ExplicitNull: # Required field with skip condition with fn_gen.if_(condition): fn_gen.add_line(line) else: with fn_gen.if_( f'(add_defaults or {var_name} != {default_name}) ' f'and {condition}'): fn_gen.add_line(line) else: with fn_gen.if_(def_condition): fn_gen.add_line(line) # create a broad `except Exception` block, as we will be # re-raising all exception(s) as a custom `ParseError`. with fn_gen.except_(Exception, 'e', ParseError): fn_gen.add_line("re_raise(e, cls, o, fields, '<UNK>', locals().get('v1'))") else: fn_gen.add_line('result = {}') if has_catch_all: # noinspection PyUnresolvedReferences,PyProtectedMember from dataclasses import _asdict_inner as __dataclasses_asdict_inner__ if (default := field_to_default.get(catch_all_name_stripped, ExplicitNull)) is not ExplicitNull: default_value = f'_default_{len(cls_fields_list)}' new_locals[default_value] = default condition = f"(v1 := o.{catch_all_name_stripped}) != {default_value}" else: condition = f'v1 := o.{catch_all_name_stripped}' with fn_gen.if_(condition): with fn_gen.for_(f"k, v in v1.items()"): fn_gen.globals['__asdict_inner__'] = __dataclasses_asdict_inner__ fn_gen.add_line('result[k] = __asdict_inner__(v,dict_factory)') with fn_gen.if_('exclude'): with fn_gen.for_('k in exclude'): fn_gen.add_line('result.pop(k, None)') if has_paths: fn_gen.add_line('result.update(paths)') # Now pass the arguments to the dict_factory method, and return # the new dict_factory instance. fn_gen.add_line(f'return result if dict_factory is dict else dict_factory(result)') # Save the dump function for the main dataclass, so we don't need to run # this logic each time. if is_main_class: # noinspection PyUnboundLocalVariable functions = fn_gen.create_functions(_globals) cls_todict = functions[fn_name] # Check if the class has a `to_dict`, and it's # a class method bound to `todict`. if getattr(cls, 'to_dict', None) is asdict: LOG.debug("setattr(%s, 'to_dict', %s)", cls_name, fn_name) # Marker reserved for future detection/debugging of specialized dumpers. # setattr(cls_todict, _SPECIALIZED_TO_DICT, True) # safe to specialize only when user didn't define it on cls _set_new_attribute(cls, 'to_dict', cls_todict, force=True) _set_new_attribute( cls, f'__{PACKAGE_NAME}_to_dict__', cls_todict) LOG.debug( "setattr(%s, '__%s_to_dict__', %s)", cls_name, PACKAGE_NAME, fn_name) # TODO in `v1`, we will use class attribute (set above) instead. CLASS_TO_DUMP_FUNC[cls] = cls_todict return cls_todict def generate_field_code(cls_dumper: DumpMixin, extras: Extras, field: Field, field_i: int, var_name=None) -> 'str | TypeInfo': cls = extras['cls'] field_type = field.type = eval_forward_ref_if_needed(field.type, cls) try: return cls_dumper.dump_dispatcher_for_annotation( TypeInfo(field_type, field_i=field_i, val_name=var_name), extras ) except ParseError as pe: pe.class_name = cls pe.field_name = field.name raise pe from None def re_raise(e, cls, o, fields, field, value): # If the object `o` is None, then raise an error with # the relevant info included. if o is None: raise MissingData(cls) from None add_fields = True if type(e) is not ParseError: if isinstance(e, JSONWizardError): add_fields = False else: tp = getattr(next((f for f in fields if f.name == field), None), 'type', Any) e = ParseError(e, value, tp, 'dump') # If field name is missing or not known, make a "best effort" # to resolve it. if field == '<UNK>' and cls and fields: if len((names := [f.name for f in fields if getattr(o, f.name, MISSING) == e.obj])) == 1: field = e.field_name = names[0] # We run into a parsing error while dumping the field value; # Add additional info on the Exception object before re-raising it. # # First confirm these values are not already set by an # inner dataclass. If so, it likely makes it easier to # debug the cause. Note that this should already be # handled by the `setter` methods. if add_fields: e.class_name, e.fields, e.field_name, e.json_object = cls, fields, field, repr(o) else: e.class_name, e.field_name, e.json_object = cls, field, repr(o) raise e from None