Source code for dataclass_wizard.v1.loaders

from __future__ import annotations

import collections.abc as abc
import dataclasses

from base64 import b64decode
from collections import defaultdict, deque
from dataclasses import is_dataclass, Field, MISSING
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Literal, NamedTuple, cast
from uuid import UUID

from .decorators import (process_patterned_date_time,
                         setup_recursive_safe_function,
                         setup_recursive_safe_function_for_generic)
from .enums import KeyAction, KeyCase
from .models import Extras, PatternBase, TypeInfo, LEAF_TYPES, UTC
from .type_conv import (
    as_datetime_v1, as_date_v1, as_int_v1,
    as_time_v1, as_timedelta, TRUTHY_VALUES,
)
from ..abstractions import AbstractLoaderGenerator
from ..bases import AbstractMeta, BaseLoadHook, META
from ..class_helper import (create_meta,
                            dataclass_fields,
                            dataclass_field_to_default,
                            dataclass_init_fields,
                            dataclass_init_field_names,
                            get_meta,
                            is_subclass_safe,
                            v1_dataclass_field_to_alias_for_load,
                            CLASS_TO_LOAD_FUNC,
                            DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD,
                            dataclass_kw_only_init_field_names)
from ..constants import CATCH_ALL, TAG, PY311_OR_ABOVE, PACKAGE_NAME
from ..errors import (JSONWizardError,
                      MissingData,
                      MissingFields,
                      ParseError,
                      UnknownKeysError)
from ..loader_selection import fromdict, get_loader
from ..log import LOG
from ..type_def import DefFactory, JSONObject, NoneType, PyLiteralString, T
# noinspection PyProtectedMember
from ..utils.dataclass_compat import _set_new_attribute
from ..utils.function_builder import FunctionBuilder
from ..utils.object_path import v1_safe_get
from ..utils.string_conv import possible_json_keys
from ..utils.typing_compat import (eval_forward_ref_if_needed,
                                   get_args,
                                   get_keys_for_typed_dict,
                                   get_origin_v2,
                                   is_annotated,
                                   is_typed_dict,
                                   is_typed_dict_type_qualifier,
                                   is_union)


[docs] class LoadMixin(AbstractLoaderGenerator, BaseLoadHook): """ This Mixin class derives its name from the eponymous `json.loads` function. Essentially it contains helper methods to convert JSON strings (or a Python dictionary object) to a `dataclass` which can often contain complex types such as lists, dicts, or even other dataclasses nested within it. Refer to the :class:`AbstractLoader` class for documentation on any of the implemented methods. """ __slots__ = () def __init_subclass__(cls, **kwargs): super().__init_subclass__() setup_default_loader(cls) transform_json_field = None
[docs] @staticmethod def load_fallback(tp: TypeInfo, extras: Extras): # identity: o return tp.v()
[docs] @staticmethod def is_none(tp: TypeInfo, extras: Extras) -> str: return f'{tp.v()} is None'
[docs] @classmethod def load_to_str(cls, tp: TypeInfo, extras: Extras): tn = tp.type_name(extras) o = tp.v() # str(v) if not extras['config'].v1_coerce_none_to_empty_str or tp.in_optional: return f'{tn}({o})' # '' if v is None else str(v) default = "''" if tp.origin is str else f'{tn}()' return f'{default} if {cls.is_none(tp, extras)} else {tn}({o})'
[docs] @staticmethod def load_to_int(tp: TypeInfo, extras: Extras): """ Generate code to load a value into an integer field. Current logic to parse (an annotated) ``int`` returns: - ``v`` --> ``v`` is an ``int`` or similarly annotated type. - ``int(v)`` --> ``v`` is a ``str`` value of either a decimal integer (e.g. ``'123'``) or a non-fractional float value (e.g. ``42.0``). - ``as_int(v)`` --> ``v`` is a non-fractional ``float``, or in case of "less common" types / scenarios. Note that empty strings and ``None`` (e.g. null values) are not supported. """ tn = tp.type_name(extras) o = tp.v() tp.ensure_in_locals(extras, as_int=as_int_v1) return ( f'{o} ' f'if (t := {o}.__class__) is {tn} ' f"else {tn}(f if '.' in {o} and (f := float({o})).is_integer() else {o}) " 'if t is str ' f'else as_int({o}, t, {tn})' )
# TODO when `in_union`, we already know `o.__class__` # is not `tn`, and we already have a variable `tp`.
[docs] @staticmethod def load_to_float(tp: TypeInfo, extras: Extras): # alias: float(o) return tp.wrap_builtin(float, tp.v(), extras)
[docs] @staticmethod def load_to_bool(tp: TypeInfo, extras: Extras): o = tp.v() tp.ensure_in_locals(extras, __TRUTHY=TRUTHY_VALUES) return (f'{o}.lower() in __TRUTHY ' f'if {o}.__class__ is str ' f'else {o} == 1')
[docs] @staticmethod def load_to_bytes(tp: TypeInfo, extras: Extras): tp.ensure_in_locals(extras, b64decode) o = tp.v() return (f'{o} if (t := {o}.__class__) is bytes ' f'else bytes({o}) if t is bytearray ' f'else b64decode({o})')
[docs] @classmethod def load_to_bytearray(cls, tp: TypeInfo, extras: Extras): # micro-optimization: avoid copying when already a bytearray # return f'{o} if (t := {o}.__class__) is bytearray else bytearray({o} if t is bytes else b64decode({o}))' as_bytes = cls.load_to_bytes(tp, extras) return tp.wrap_builtin(bytearray, as_bytes, extras)
[docs] @staticmethod def load_to_none(tp: TypeInfo, extras: Extras): return 'None'
[docs] @staticmethod def load_to_enum(tp: TypeInfo, extras: Extras): # alias: enum_cls(o) return tp.wrap(tp.v(), extras)
[docs] @staticmethod def load_to_uuid(tp: TypeInfo, extras: Extras): # alias: UUID(o) return tp.wrap_builtin(UUID, tp.v(), extras)
[docs] @classmethod def load_to_iterable(cls, tp: TypeInfo, extras: Extras): v, v_next, i_next = tp.v_and_next() gorg = tp.origin # noinspection PyBroadException try: elem_type = tp.args[0] except: elem_type = Any string = cls.load_dispatcher_for_annotation( tp.replace(origin=elem_type, i=i_next, index=None, val_name=None), extras) if issubclass(gorg, set): start_char = '{' end_char = '}' elif issubclass(gorg, frozenset): start_char = 'frozenset((' end_char = '))' else: start_char = '[' end_char = ']' result = f'{start_char}{string} for {v_next} in {v}{end_char}' return tp.wrap(result, extras)
[docs] @classmethod def load_to_tuple(cls, tp: TypeInfo, extras: Extras): args = tp.args # Determine the code string for the annotation # Check if the `Tuple` appears in the variadic form # i.e. Tuple[str, ...] if args: is_variadic = args[-1] is ... else: # Annotated without args, as simply `tuple` args = (Any, ...) is_variadic = True if is_variadic: # Logic that handles the variadic form of :class:`Tuple`'s, # i.e. ``Tuple[str, ...]`` # # Per `PEP 484`_, only **one** required type is allowed before the # ``Ellipsis``. That is, ``Tuple[int, ...]`` is valid whereas # ``Tuple[int, str, ...]`` would be invalid. `See here`_ for more info. # # .. _PEP 484: https://www.python.org/dev/peps/pep-0484/ # .. _See here: https://github.com/python/typing/issues/180 v, v_next, i_next = tp.v_and_next() # Given `Tuple[T, ...]`, we only need the generated string for `T` string = cls.load_dispatcher_for_annotation( tp.replace(origin=args[0], i=i_next, index=None, val_name=None), extras) result = f'[{string} for {v_next} in {v}]' # Wrap because we need to create a tuple from list comprehension force_wrap = True else: string = ', '.join([ str(cls.load_dispatcher_for_annotation( tp.replace(origin=arg, index=k, val_name=None), extras)) for k, arg in enumerate(args)]) result = f'({string}, )' force_wrap = False return tp.wrap(result, extras, force=force_wrap)
[docs] @classmethod def load_to_named_tuple(cls, tp: TypeInfo, extras: Extras): nt_tp = cast(NamedTuple, tp.origin) if nt_tp._field_defaults: # has optionals return cls._load_to_named_tuple_fn(tp, extras) fields_in_order = nt_tp._fields # field names in order ann = nt_tp.__annotations__ if extras['config'].v1_namedtuple_as_dict: values_in_order = tuple( str( cls.load_dispatcher_for_annotation( tp.replace(origin=ann.get(name, Any), index=repr(name)), extras, ) ) for name in fields_in_order ) else: values_in_order = tuple( str( cls.load_dispatcher_for_annotation( tp.replace(origin=ann.get(name, Any), index=i), extras, ) ) for i, name in enumerate(fields_in_order) ) params = ', '.join(values_in_order) return tp.wrap(params, extras)
@classmethod @setup_recursive_safe_function(per_class_cache=True) def _load_to_named_tuple_fn(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] _locals = extras['locals'] nt_tp = _locals['cls'] = cast(NamedTuple, tp.origin) fields_in_order = nt_tp._fields # field names in order field_to_default = nt_tp._field_defaults ann = nt_tp.__annotations__ req_field_to_value = {} opt_field_to_value = {} all_optionals = len(field_to_default) == len(fields_in_order) v = tp.v_for_def() if extras['config'].v1_namedtuple_as_dict: i_next = tp.i + 1 v_next = f'{tp.prefix}{i_next}' for name in fields_in_order: field_tp = ann.get(name, Any) if name in field_to_default: _locals[f'_dflt_{name}'] = field_to_default[name] new_tp = tp.replace(origin=field_tp, i=i_next, index=None, val_name=None) value = cls.load_dispatcher_for_annotation(new_tp, extras) opt_field_to_value[name] = value else: new_tp = tp.replace(origin=field_tp, index=repr(name)) value = cls.load_dispatcher_for_annotation(new_tp, extras) req_field_to_value[f'__{name}'] = value req_args = ', '.join(req_field_to_value) opt_args = ', '.join(f'__{f}' for f in opt_field_to_value) if all_optionals: # NamedTuple has no required fields ret_value_with_input = f'return cls({opt_args})' else: ret_value_with_input = f'return cls({req_args}, {opt_args})' for name, value in req_field_to_value.items(): fn_gen.add_line(f'{name} = {value}') # it's guaranteed the NamedTuple has at least one default field with fn_gen.if_(f'not {v}'): fn_gen.add_line('return cls()') for name, value in opt_field_to_value.items(): with fn_gen.if_(f'({v_next} := {v}.get({name!r}, MISSING)) is MISSING'): fn_gen.add_line(f'__{name} = _dflt_{name}') with fn_gen.else_(): fn_gen.add_line(f'__{name} = {value}') fn_gen.add_line(ret_value_with_input) else: # list mode for i, name in enumerate(fields_in_order): field_tp = ann.get(name, Any) value = cls.load_dispatcher_for_annotation( tp.replace(origin=field_tp, index=i), extras) if name in field_to_default: opt_field_to_value[name] = value else: req_field_to_value[f'__{name}'] = value req_args = ', '.join(req_field_to_value) opt_fields_start_i = len(req_field_to_value) if all_optionals: # NamedTuple has no required fields len_condition = 'n' ret_value_with_input = f'return cls(*args)' else: len_condition = f'n > {opt_fields_start_i}' ret_value_with_input = f'return cls({req_args}, *args)' for name, value in req_field_to_value.items(): fn_gen.add_line(f'{name} = {value}') # it's guaranteed the NamedTuple has at least one default field fn_gen.add_line(f'n = len({v})') with fn_gen.if_(len_condition): opt_values = list(opt_field_to_value.values()) fn_gen.add_line(f'args = [{opt_values.pop(0)}]') for i, value in enumerate(opt_values, start=opt_fields_start_i + 1): with fn_gen.if_(f'n > {i}'): fn_gen.add_line(f'args.append({value})') fn_gen.add_line(ret_value_with_input) fn_gen.add_line(f'return cls({req_args})')
[docs] @classmethod def load_to_named_tuple_untyped(cls, tp: TypeInfo, extras: Extras): # Check if input object is `dict` or `list`. # # Assuming `Point` is a `namedtuple`, this performs # the equivalent logic as: # Point(**x) if isinstance(x, dict) else Point(*x) # # star, dbl_star = tp.multi_wrap(extras, 'nt_', f'*{v}', f'**{v}') v = tp.v() if extras['config'].v1_namedtuple_as_dict: return tp.wrap(f'**{v}', extras, prefix='nt_') def raise_(): raise TypeError('Expected list/tuple for NamedTuple field') from None tp.ensure_in_locals(extras, raise_=raise_) star = tp.wrap(f'*{v}', extras, prefix='nt_') return f'{star} if (t := type({v})) is list or t is tuple else raise_()'
@classmethod def _build_dict_comp(cls, tp, v, i_next, k_next, v_next, kt, vt, extras): tp_k_next = tp.replace(origin=kt, i=i_next, prefix='k', index=None, val_name=None) string_k = cls.load_dispatcher_for_annotation(tp_k_next, extras) tp_v_next = tp.replace(origin=vt, i=i_next, prefix='v', index=None, val_name=None) string_v = cls.load_dispatcher_for_annotation(tp_v_next, extras) return f'{{{string_k}: {string_v} for {k_next}, {v_next} in {v}.items()}}'
[docs] @classmethod def load_to_dict(cls, tp: TypeInfo, extras: Extras): v, k_next, v_next, i_next = tp.v_and_next_k_v() try: kt, vt = tp.args except ValueError: # Annotated without two arguments, # e.g. like `dict[str]` or `dict` kt = vt = Any result = cls._build_dict_comp( tp, v, i_next, k_next, v_next, kt, vt, extras) return tp.wrap(result, extras)
[docs] @classmethod def load_to_defaultdict(cls, tp: TypeInfo, extras: Extras): v, k_next, v_next, i_next = tp.v_and_next_k_v() default_factory: DefFactory | None try: kt, vt = tp.args default_factory = getattr(vt, '__origin__', vt) except ValueError: # Annotated without two arguments, # e.g. like `defaultdict[str]` or `defaultdict` kt = vt = Any default_factory = NoneType result = cls._build_dict_comp( tp, v, i_next, k_next, v_next, kt, vt, extras) return tp.wrap_dd(default_factory, result, extras)
[docs] @classmethod def load_to_typed_dict(cls, tp: TypeInfo, extras: Extras): req_keys, opt_keys = get_keys_for_typed_dict(tp.origin) if opt_keys: # has optionals return cls._load_to_typed_dict_fn(tp, extras) ann = tp.origin.__annotations__ dict_body = ', '.join( f"""{name!r}: { cls.load_dispatcher_for_annotation( tp.replace(origin=ann.get(name, Any), index=repr(name)), extras, ) }""" for name in req_keys ) return f'{{{dict_body}}}'
@classmethod @setup_recursive_safe_function def _load_to_typed_dict_fn(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] req_keys, opt_keys = get_keys_for_typed_dict(tp.origin) result_list = [] v = tp.v_for_def() # TODO set __annotations__? td_annotations = tp.origin.__annotations__ # Set required keys for the `TypedDict` for k in req_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.load_dispatcher_for_annotation( tp.replace(origin=field_tp, index=field_name, val_name=None), extras) result_list.append(f'{field_name}: {string}') with fn_gen.try_(): fn_gen.add_lines('result = {', *(f' {r},' for r in result_list), '}') # Set optional keys for the `TypedDict` (if they exist) next_i = tp.i + 1 new_tp = tp.replace(i=next_i, index=None, val_name=None) v_next = new_tp.v() for k in opt_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.load_dispatcher_for_annotation( new_tp.replace(origin=field_tp), extras) with fn_gen.if_(f'({v_next} := {v}.get({field_name}, MISSING)) is not MISSING'): fn_gen.add_line(f'result[{field_name}] = {string}') fn_gen.add_line('return result') with fn_gen.except_(Exception, 'e'): with fn_gen.if_('type(e) is KeyError'): fn_gen.add_line('name = e.args[0]; e = KeyError(f"Missing required key: {name!r}")') with fn_gen.elif_(f'not isinstance({v}, dict)'): fn_gen.add_line('e = TypeError("Incorrect type for object")') fn_gen.add_line(f"raise ParseError(e, {v}, {{}}, 'load') from None")
[docs] @classmethod @setup_recursive_safe_function_for_generic(per_class_cache=True) def load_to_union(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] config = extras['config'] actual_cls = extras['cls'] tag_key = config.tag_key or TAG auto_assign_tags = config.auto_assign_tags leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass' args = tp.args in_optional = NoneType in args _locals = extras['locals'] _locals['fields'] = args _locals['tag_key'] = tag_key dataclass_tag_to_lines: dict[str, list] = {} has_dataclass = any(is_dataclass(a) for a in args) i = tp.i v = tp.v_for_def() # TODO: # Union handling here assumes `i == 1` (EnvWizard). If # reused for multiple Union fields, cache/function-name # collisions are possible. # noinspection PyUnboundLocalVariable if (has_dataclass and (pre_decoder := config.v1_pre_decoder) is not None and (new_v := pre_decoder(cls, dict, tp, extras).v()) != v): current_v = v tp = tp.replace(i=i+1) i = tp.i v = tp.v_for_def() with fn_gen.try_(): fn_gen.add_line(f'{v} = {new_v}') with fn_gen.except_(Exception): fn_gen.add_line(f'{v} = {current_v}') type_checks = [] try_parse_at_end = [] for possible_tp in args: possible_tp = eval_forward_ref_if_needed(possible_tp, actual_cls) tp_new = TypeInfo(possible_tp, i=i) tp_new.in_optional = in_optional if possible_tp is NoneType: with fn_gen.if_(cls.is_none(tp, extras)): fn_gen.add_line('return None') continue if has_dataclass and is_dataclass(possible_tp): # we see a dataclass in `Union` declaration meta = get_meta(possible_tp) tag = meta.tag assign_tags_to_cls = auto_assign_tags or meta.auto_assign_tags cls_name = possible_tp.__name__ if assign_tags_to_cls and not tag: tag = cls_name # We don't want to mutate the base Meta class here if meta is AbstractMeta: create_meta(possible_tp, cls_name, tag=tag) else: meta.tag = cls_name if tag: string = cls.load_dispatcher_for_annotation(tp_new, extras) dataclass_tag_to_lines[tag] = [ f'if tag == {tag!r}:', f' return {string}' ] continue elif not config.v1_unsafe_parse_dataclass_in_union: e = ValueError('Cannot parse dataclass types in a Union without ' 'one of the following `Meta` settings:\n\n' ' * `auto_assign_tags = True`\n' f' - Set on class `{extras["cls_name"]}`.\n\n' f' * `tag = "{cls_name}"`\n' f' - Set on class `{possible_tp.__qualname__}`.\n\n' ' * `v1_unsafe_parse_dataclass_in_union = True`\n' f' - Set on class `{extras["cls_name"]}`\n\n' 'For more information, refer to:\n' ' https://dcw.ritviknag.com/en/latest/common_use_cases/dataclasses_in_union_types.html') raise e from None string = cls.load_dispatcher_for_annotation(tp_new, extras) try_parse_lines = [ 'try:', f' return {string}', 'except Exception:', ' pass', ] if (possible_tp in LEAF_TYPES or ( leaf_handling_as_subclass and is_subclass_safe( get_origin_v2(possible_tp), LEAF_TYPES) )): # TODO disable for dataclasses tn = tp_new.type_name(extras) type_checks.extend([ f'if tp is {tn}:', f' return {v}' ]) list_to_add = try_parse_at_end else: list_to_add = type_checks list_to_add.extend(try_parse_lines) if dataclass_tag_to_lines: with fn_gen.try_(): fn_gen.add_line(f'tag = {v}[tag_key]') with fn_gen.except_(Exception): fn_gen.add_line('pass') with fn_gen.else_(): for lines in dataclass_tag_to_lines.values(): fn_gen.add_lines(*lines) fn_gen.add_line( "raise ParseError(" "TypeError('Object with tag was not in any of Union types')," f"{v},fields,'load'," "input_tag=tag," "tag_key=tag_key," f"valid_tags={list(dataclass_tag_to_lines)})" ) fn_gen.add_line(f'tp = type({v})') if type_checks: fn_gen.add_lines(*type_checks) if try_parse_at_end: fn_gen.add_lines(*try_parse_at_end) # Invalid type for Union fn_gen.add_line("raise ParseError(" "TypeError('Object was not in any of Union types')," f"{v},fields,'load'," "tag_key=tag_key" ")")
[docs] @staticmethod @setup_recursive_safe_function_for_generic def load_to_literal(tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] v = tp.v_for_def() _locals = extras['locals'] _locals['fields'] = frozenset(tp.args) with fn_gen.if_(f'{v} in fields', comment=repr(tp.args)): fn_gen.add_line(f'return {v}') # No such Literal with the value of `o` fn_gen.add_line("e = ValueError('Value not in expected Literal values')") fn_gen.add_line(f"raise ParseError(e, {v}, fields, 'load', " f'allowed_values=list(fields))')
# TODO Checks for Literal equivalence, as mentioned here: # https://www.python.org/dev/peps/pep-0586/#equivalence-of-two-literals # extras_cp['locals'][fields] = { # a: type(a) for a in tp.args # } # # with fn_gen.function(fn_name, ['v1'], None, _locals): # # with fn_gen.try_(): # with fn_gen.if_(f'type({tp.v()}) is {fields}[{tp.v()}]'): # fn_gen.add_line('return v1') # # # The value of `o` is in the ones defined for the Literal, but # # also confirm the type matches the one defined for the Literal. # fn_gen.add_line("e = TypeError('Value did not match expected type for the Literal')") # # fn_gen.add_line('raise ParseError(' # f'e, v1, {fields}, ' # 'have_type=type(v1), ' # f'desired_type={fields}[v1], ' # f'desired_value=next(v for v in {fields} if v == v1), ' # f'allowed_values=list({fields})' # ')') # with fn_gen.except_(KeyError): # # No such Literal with the value of `o` # fn_gen.add_line("e = ValueError('Value not in expected Literal values')") # fn_gen.add_line('raise ParseError(' # f'e, v1, {fields}, allowed_values=list({fields})' # f')')
[docs] @staticmethod def load_to_decimal(tp: TypeInfo, extras: Extras): o = tp.v() s = f'str({o}) if {o}.__class__ is float else {o}' return tp.wrap_builtin(Decimal, s, extras)
[docs] @staticmethod def load_to_path(tp: TypeInfo, extras: Extras): # alias: Path(o) return tp.wrap_builtin(Path, tp.v(), extras)
[docs] @classmethod @process_patterned_date_time def load_to_date(cls, tp: TypeInfo, extras: Extras): return cls._load_to_date(tp, extras, date)
[docs] @classmethod @process_patterned_date_time def load_to_datetime(cls, tp: TypeInfo, extras: Extras): return cls._load_to_date(tp, extras, datetime)
[docs] @staticmethod @process_patterned_date_time def load_to_time(tp: TypeInfo, extras: Extras): o = tp.v() tn = tp.type_name(extras, bound=time) tp_time = cast('type[time]', tp.origin) __fromisoformat = f'__{tn}_fromisoformat' tp.ensure_in_locals( extras, __as_time=as_time_v1, **{__fromisoformat: tp_time.fromisoformat} ) if PY311_OR_ABOVE: _parse_iso_string = f'{__fromisoformat}({o})' else: # pragma: no cover _parse_iso_string = f"{__fromisoformat}({o}.replace('Z', '+00:00', 1))" return (f'{_parse_iso_string} if {o}.__class__ is str ' f'else __as_time({o}, {tn})')
@staticmethod def _load_to_date(tp: TypeInfo, extras: Extras, cls: type[date] | type[datetime]): o = tp.v() tn = tp.type_name(extras, bound=cls) tp_date_or_datetime = cast('type[date]', tp.origin) _fromisoformat = f'__{tn}_fromisoformat' name_to_func = { _fromisoformat: tp_date_or_datetime.fromisoformat, } if cls is datetime: # datetime or a subclass _fromtimestamp = f'__{tn}_fromtimestamp' name_to_func[_fromtimestamp] = tp_date_or_datetime.fromtimestamp _as_func = '__as_datetime' name_to_func[_as_func] = as_datetime_v1 _date_part = _opt_cls = '' else: # date or a subclass _fromtimestamp = f'__datetime_fromtimestamp' name_to_func[_fromtimestamp] = datetime.fromtimestamp _as_func = '__as_date' name_to_func[_as_func] = as_date_v1 _date_part = '.date()' _opt_cls = f', {tn}' tp.ensure_in_locals(extras, UTC=UTC, **name_to_func) if PY311_OR_ABOVE: _parse_iso_string = f'{_fromisoformat}({o})' else: # pragma: no cover _parse_iso_string = f"{_fromisoformat}({o}.replace('Z', '+00:00', 1))" return (f'({_fromtimestamp}(int({o}), UTC){_date_part} if {o}.isdigit() ' f'else {_parse_iso_string}) if {o}.__class__ is str ' f'else {_as_func}({o}, {_fromtimestamp}, UTC{_opt_cls})')
[docs] @staticmethod def load_to_timedelta(tp: TypeInfo, extras: Extras): # alias: as_timedelta tn = tp.type_name(extras, bound=timedelta) tp.ensure_in_locals(extras, as_timedelta) return f'as_timedelta({tp.v()}, {tn})'
[docs] @staticmethod @setup_recursive_safe_function( fn_name=f'__{PACKAGE_NAME}_from_dict_{{cls_name}}__') def load_to_dataclass(tp: TypeInfo, extras: Extras): load_func_for_dataclass(tp.origin, extras)
[docs] @classmethod def load_dispatcher_for_annotation(cls, tp, extras): hooks = cls.__LOAD_HOOKS__ config = extras['config'] pre_decoder = config.v1_pre_decoder type_hooks = config.v1_type_to_load_hook leaf_handling_as_subclass = config.v1_leaf_handling == 'issubclass' # type_ann = tp.origin type_ann = eval_forward_ref_if_needed(tp.origin, extras['cls']) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) args = None container_tp = None if is_annotated(type_ann): # Given `Annotated[T, ...]`, we only need `T` type_ann, *field_extras = get_args(type_ann) type_ann = eval_forward_ref_if_needed(type_ann, extras['cls']) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # Check for Custom Patterns for date / time / datetime for extra in field_extras: if isinstance(extra, PatternBase): extras['pattern'] = extra elif is_typed_dict_type_qualifier(origin): # Given `Required[T]` or `NotRequired[T]`, we only need `T` type_ann = get_args(type_ann)[0] origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # TypeAliasType: Type aliases are created through # the `type` statement if (value := getattr(origin, '__value__', None)) is not None: type_ann = value origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # `LiteralString` enforces stricter rules at # type-checking but behaves like `str` at runtime. # TODO maybe add `load_to_literal_string` if origin is PyLiteralString: load_hook = cls.load_to_str origin = str name = 'str' # -> Atomic, immutable types which don't require # any iterative / recursive handling. elif origin in LEAF_TYPES or ( leaf_handling_as_subclass and is_subclass_safe(origin, LEAF_TYPES)): load_hook = hooks.get(origin) elif (type_hooks is not None and (hook_info := type_hooks.get(origin)) is not None): mode, load_hook = hook_info if mode == 'runtime': fn_name, = tp.ensure_in_locals(extras, load_hook) return f'{fn_name}({tp.v()})' try: args = get_args(type_ann) except ValueError: args = Any, elif (load_hook := hooks.get(origin)) is not None: try: args = get_args(type_ann) except ValueError: args = Any, # -> Union[x] elif is_union(origin): load_hook = cls.load_to_union args = get_args(type_ann) # Special case for Optional[x], which is actually Union[x, None] if len(args) == 2 and NoneType in args: new_tp = tp.replace(origin=args[0], args=None, name=None, val_name=None) new_tp.in_optional = True string = cls.load_dispatcher_for_annotation(new_tp, extras) return f'None if {cls.is_none(tp, extras)} else {string}' # -> Literal[X, Y, ...] elif origin is Literal: load_hook = cls.load_to_literal args = get_args(type_ann) # https://stackoverflow.com/questions/76520264/dataclasswizard-after-upgrading-to-python3-11-is-not-working-as-expected elif origin is Any: load_hook = cls.load_fallback elif is_subclass_safe(origin, tuple) and hasattr(origin, '_fields'): container_tp = dict if config.v1_namedtuple_as_dict else tuple if getattr(origin, '__annotations__', None): # Annotated as a `typing.NamedTuple` subtype load_hook = cls.load_to_named_tuple else: # Annotated as a `collections.namedtuple` subtype load_hook = cls.load_to_named_tuple_untyped elif is_typed_dict(origin): container_tp = dict load_hook = cls.load_to_typed_dict elif is_dataclass(origin): container_tp = dict # return a dynamically generated `fromdict` # for the `cls` (base_type) load_hook = cls.load_to_dataclass elif is_subclass_safe(origin, Enum): load_hook = cls.load_to_enum elif origin in (abc.Sequence, abc.MutableSequence, abc.Collection): if origin is abc.Sequence: load_hook = cls.load_to_tuple # desired (non-generic) origin type name = 'tuple' origin = tuple # Re-map type arguments to variadic tuple format, # e.g. `Sequence[int]` -> `tuple[int, ...]` try: args = (get_args(type_ann)[0], ...) except (IndexError, ValueError): args = Any, else: load_hook = cls.load_to_iterable # desired (non-generic) origin type name = 'list' origin = list # Get type arguments, e.g. `Sequence[int]` -> `int` try: args = get_args(type_ann) except ValueError: args = Any, elif isinstance(origin, PatternBase): load_hook = origin.load_to_pattern else: # TODO everything should use `get_origin_v2` try: args = get_args(type_ann) except ValueError: args = Any, if load_hook is None: # TODO END for t in hooks: if (not leaf_handling_as_subclass) and (t in LEAF_TYPES): continue if issubclass(origin, t): container_tp = t load_hook = hooks[t] break tp.origin = origin tp.args = args tp.name = name if container_tp is None: container_tp = origin if pre_decoder is not None: tp = pre_decoder(cls, container_tp, tp, extras) if load_hook is not None: result = load_hook(tp, extras) return result # No matching hook is found for the type. # TODO do we want to add a `Meta` field to not raise # an error but perform a default action? err = TypeError('Provided type is not currently supported.') pe = ParseError( err, origin, type_ann, 'load', resolution=f'Register a load hook for {ParseError.name(origin)} ' f'(v1: `register_type` / `Meta.v1_type_to_load_hook`).', unsupported_type=origin ) raise pe from None
[docs] def setup_default_loader(cls=LoadMixin): """ Set up the default type hooks to use when converting `str` (json) or a Python `dict` object to a `dataclass` instance. Note: `cls` must be :class:`LoadMixIn` or a subclass of it. """ # TODO maybe `dict.update` might be better? # Simple types cls.register_load_hook(str, cls.load_to_str) cls.register_load_hook(float, cls.load_to_float) cls.register_load_hook(bool, cls.load_to_bool) cls.register_load_hook(int, cls.load_to_int) cls.register_load_hook(bytes, cls.load_to_bytes) cls.register_load_hook(bytearray, cls.load_to_bytearray) cls.register_load_hook(NoneType, cls.load_to_none) # Complex types cls.register_load_hook(UUID, cls.load_to_uuid) cls.register_load_hook(set, cls.load_to_iterable) cls.register_load_hook(frozenset, cls.load_to_iterable) cls.register_load_hook(deque, cls.load_to_iterable) cls.register_load_hook(list, cls.load_to_iterable) cls.register_load_hook(tuple, cls.load_to_tuple) cls.register_load_hook(defaultdict, cls.load_to_defaultdict) cls.register_load_hook(dict, cls.load_to_dict) cls.register_load_hook(Decimal, cls.load_to_decimal) cls.register_load_hook(Path, cls.load_to_path) # Dates and times cls.register_load_hook(datetime, cls.load_to_datetime) cls.register_load_hook(time, cls.load_to_time) cls.register_load_hook(date, cls.load_to_date) cls.register_load_hook(timedelta, cls.load_to_timedelta)
[docs] def check_and_raise_missing_fields( _locals, o, cls, fields: tuple[Field, ...] | None, **kwargs, ): if fields is None: # `typing.NamedTuple` or `collections.namedtuple` nt_tp = cast(NamedTuple, cls) field_to_default = nt_tp._field_defaults field_names = nt_tp._fields fields = tuple([ dataclasses.field( default=field_to_default.get(field, MISSING), ) for field in field_names]) for field, name in zip(fields, field_names): field.name = name missing_fields = [f for f in field_names if f'__{f}' not in _locals and f not in field_to_default] missing_keys = None else: missing_fields = [f.name for f in fields if f.init and f'__{f.name}' not in _locals and (f.default is MISSING and f.default_factory is MISSING)] missing_keys = [v1_dataclass_field_to_alias_for_load(cls).get(field, [field])[0] for field in missing_fields] raise MissingFields( None, o, cls, fields, None, missing_fields, missing_keys, **kwargs, ) from None
[docs] def load_func_for_dataclass( cls: type, extras: Extras | None = None, loader_cls=LoadMixin, base_meta_cls: type = AbstractMeta, ) -> Callable[[JSONObject], T] | None: # Tuple describing the fields of this dataclass. fields = dataclass_fields(cls) cls_init_fields = dataclass_init_fields(cls, True) cls_init_field_names = dataclass_init_field_names(cls) cls_init_kw_only_field_names = dataclass_kw_only_init_field_names(cls) field_to_default = dataclass_field_to_default(cls) has_defaults = True if field_to_default else False # Get the loader for the class, or create a new one as needed. cls_loader = get_loader(cls, base_cls=loader_cls, v1=True) cls_name = cls.__name__ fn_name = f'__{PACKAGE_NAME}_from_dict_{cls_name}__' # Get the meta config for the class, or the default config otherwise. meta = get_meta(cls, base_meta_cls) if extras is None: # we are being run for the main dataclass is_main_class = True # If the `recursive` flag is enabled and a Meta config is provided, # apply the Meta recursively to any nested classes. # # Else, just use the base `AbstractMeta`. config: META = meta if meta.recursive else base_meta_cls # Initialize the FuncBuilder fn_gen = FunctionBuilder() new_locals = { 'cls': cls, 'fields': fields, } extras: Extras = { 'config': config, 'cls': cls, 'cls_name': cls_name, 'locals': new_locals, 'recursion_guard': {cls: fn_name}, 'fn_gen': fn_gen, } _globals = { 'MISSING': MISSING, 'ParseError': ParseError, 'raise_missing_fields': check_and_raise_missing_fields, 're_raise': re_raise, } # we are being run for a nested dataclass else: is_main_class = False # config for nested dataclasses config = extras['config'] # Initialize the FuncBuilder fn_gen = extras['fn_gen'] if config is not base_meta_cls: # we want to apply the meta config from the main dataclass # recursively. meta = meta | config meta.bind_to(cls, is_default=False) new_locals = extras['locals'] new_locals['fields'] = fields # TODO need a way to auto-magically do this extras['cls'] = cls extras['cls_name'] = cls_name # Added for a `v1.EnvWizard` main class, which doesn't set this in globals fn_gen.globals.setdefault('raise_missing_fields', check_and_raise_missing_fields) key_case: KeyCase | None = cls_loader.transform_json_field auto_key_case = key_case is KeyCase.AUTO field_to_aliases = v1_dataclass_field_to_alias_for_load(cls) check_aliases = True if field_to_aliases else False field_to_paths = DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD[cls] has_alias_paths = True if field_to_paths else False # Fix for using `auto_assign_tags` and `raise_on_unknown_json_key` together # See https://github.com/rnag/dataclass-wizard/issues/137 has_tag_assigned = meta.tag is not None if (has_tag_assigned and # Ensure `tag_key` isn't a dataclass field, # to avoid issues with our logic. # See https://github.com/rnag/dataclass-wizard/issues/148 meta.tag_key not in cls_init_field_names): expect_tag_as_unknown_key = True else: expect_tag_as_unknown_key = False on_unknown_key = meta.v1_on_unknown_key catch_all_field: str | None = field_to_aliases.pop(CATCH_ALL, None) has_catch_all = catch_all_field is not None if has_catch_all: pre_assign = 'i+=1; ' catch_all_field_stripped = catch_all_field.rstrip('?') catch_all_idx = cls_init_field_names.index(catch_all_field_stripped) # remove catch all field from list, so we don't iterate over it del cls_init_fields[catch_all_idx] else: pre_assign = '' catch_all_field_stripped = catch_all_idx = None if on_unknown_key is not None: should_raise = on_unknown_key is KeyAction.RAISE should_warn = on_unknown_key is KeyAction.WARN if should_warn or should_raise: pre_assign = 'i+=1; ' set_aliases = True else: set_aliases = has_catch_all else: should_raise = should_warn = None set_aliases = has_catch_all if set_aliases: if expect_tag_as_unknown_key: # add an alias for the tag key, so we don't # capture or raise an error when we see it aliases = {meta.tag_key} else: aliases = set() new_locals['aliases'] = aliases else: aliases = None if has_alias_paths: new_locals['safe_get'] = v1_safe_get with fn_gen.function(fn_name, ['o'], MISSING, new_locals): if (_pre_from_dict := getattr(cls, '_pre_from_dict', None)) is not None: new_locals['__pre_from_dict__'] = _pre_from_dict fn_gen.add_line('o = __pre_from_dict__(o)') # Need to create a separate dictionary to copy over the constructor # args, as we don't want to mutate the original dictionary object. if has_defaults: fn_gen.add_line('init_kwargs = {}') if pre_assign: fn_gen.add_line('i = 0') args = [] kwargs = [] if cls_init_fields: with fn_gen.try_(): if expect_tag_as_unknown_key and pre_assign: with fn_gen.if_(f'{meta.tag_key!r} in o'): fn_gen.add_line('i+=1') val = 'v1' _val_is_found = f'{val} is not MISSING' for i, f in enumerate(cls_init_fields): name = f.name var = f'__{name}' has_default = name in field_to_default val_is_found = _val_is_found if (check_aliases and (_aliases := field_to_aliases.get(name)) is not None): if len(_aliases) == 1: alias = _aliases[0] if set_aliases: aliases.add(alias) f_assign = f'field={name!r}; {val}=o.get({alias!r}, MISSING)' else: f_assign = None # add possible JSON keys if set_aliases: aliases.update(_aliases) fn_gen.add_line(f'field={name!r}') condition = [f'({val} := o.get({alias!r}, MISSING)) is not MISSING' for alias in _aliases] val_is_found = '(' + '\n or '.join(condition) + ')' elif (has_alias_paths and (paths := field_to_paths.get(name)) is not None): if len(paths) == 1: path = paths[0] # add the first part (top-level key) of the path if set_aliases: aliases.add(path[0]) f_assign = f'field={name!r}; {val}=safe_get(o, {path!r}, {not has_default})' else: f_assign = None fn_gen.add_line(f'field={name!r}') condition = [] last_idx = len(paths) - 1 for k, path in enumerate(paths): # add the first part (top-level key) of each path if set_aliases: aliases.add(path[0]) if k == last_idx: condition.append( f'({val} := safe_get(o, {path!r}, {not has_default})) is not MISSING') else: condition.append( f'({val} := safe_get(o, {path!r}, False)) is not MISSING') val_is_found = '(' + '\n or '.join(condition) + ')' # TODO raise some useful message like (ex. on IndexError): # Field "my_str" of type tuple[float, str] in A2 has invalid value ['123'] elif key_case is None: if set_aliases: aliases.add(name) f_assign = f'field={name!r}; {val}=o.get(field, MISSING)' elif auto_key_case: f_assign = None _aliases = possible_json_keys(name) if set_aliases: # add field name itself aliases.add(name) # add possible JSON keys aliases.update(_aliases) fn_gen.add_line(f'field={name!r}') condition = [f'({val} := o.get(field, MISSING)) is not MISSING'] for alias in _aliases: condition.append(f'({val} := o.get({alias!r}, MISSING)) is not MISSING') val_is_found = '(' + '\n or '.join(condition) + ')' else: alias = key_case(name) if set_aliases: aliases.add(alias) if alias != name: field_to_aliases[name] = (alias, ) f_assign = f'field={name!r}; {val}=o.get({alias!r}, MISSING)' string = generate_field_code(cls_loader, extras, f, i) if f_assign is not None: fn_gen.add_line(f_assign) if has_default: with fn_gen.if_(val_is_found): fn_gen.add_line(f'{pre_assign}init_kwargs[field] = {string}') else: if name in cls_init_kw_only_field_names: kwargs.append(f'{name}={var}') else: args.append(var) with fn_gen.if_(val_is_found): fn_gen.add_line(f'{pre_assign}{var} = {string}') # create a broad `except Exception` block, as we will be # re-raising all exception(s) as a custom `ParseError`. with fn_gen.except_(Exception, 'e', ParseError): fn_gen.add_line("re_raise(e, cls, o, fields, field, locals().get('v1'))") if has_catch_all: catch_all_def = f'{{k: o[k] for k in o if k not in aliases}}' if catch_all_field.endswith('?'): # Default value with fn_gen.if_('len(o) != i'): fn_gen.add_line(f'init_kwargs[{catch_all_field_stripped!r}] = {catch_all_def}') else: var = f'__{catch_all_field_stripped}' fn_gen.add_line(f'{var} = {{}} if len(o) == i else {catch_all_def}') if catch_all_field_stripped in cls_init_kw_only_field_names: kwargs.append(f'{catch_all_field_stripped}={var}') else: args.insert(catch_all_idx, var) elif set_aliases: # warn / raise on unknown key line = 'extra_keys = set(o) - aliases' with fn_gen.if_('len(o) != i'): fn_gen.add_line(line) if should_raise: # Raise an error here (if needed) new_locals['UnknownKeysError'] = UnknownKeysError fn_gen.add_line("raise UnknownKeysError(extra_keys, o, cls, fields) from None") elif should_warn: # Show a warning here new_locals['LOG'] = LOG fn_gen.add_line(r"LOG.warning('Found %d unknown keys %r not mapped to the dataclass schema.\n" r" Class: %r\n Dataclass fields: %r', " "len(extra_keys), extra_keys, " "cls.__qualname__, [f.name for f in fields])") # Now pass the arguments to the constructor method, and return # the new dataclass instance. If there are any missing fields, # we raise them here. if has_defaults: args.append('**init_kwargs') if kwargs: args.extend(kwargs) with fn_gen.try_(): fn_gen.add_line(f'return cls({", ".join(args)})') with fn_gen.except_(UnboundLocalError): # raise `MissingFields`, as required dataclass fields # are not present in the input object `o`. fn_gen.add_line("raise_missing_fields(locals(), o, cls, fields)") # Save the load function for the main dataclass, so we don't need to run # this logic each time. if is_main_class: # noinspection PyUnboundLocalVariable functions = fn_gen.create_functions(_globals) cls_fromdict = functions[fn_name] # Check if the class has a `from_dict`, and it's # a class method bound to `fromdict`. if ((from_dict := getattr(cls, 'from_dict', None)) is not None and getattr(from_dict, '__func__', None) is fromdict): LOG.debug("setattr(%s, 'from_dict', %s)", cls_name, fn_name) # Marker reserved for future detection/debugging of specialized loaders. # setattr(cls_fromdict, _SPECIALIZED_FROM_DICT, True) # safe to specialize only when user didn't define it on cls _set_new_attribute(cls, 'from_dict', cls_fromdict, force=True) _set_new_attribute( cls, f'__{PACKAGE_NAME}_from_dict__', cls_fromdict) LOG.debug( "setattr(%s, '__%s_from_dict__', %s)", cls_name, PACKAGE_NAME, fn_name) # TODO in `v1`, we will use class attribute (set above) instead. CLASS_TO_LOAD_FUNC[cls] = cls_fromdict return cls_fromdict
[docs] def generate_field_code(cls_loader: LoadMixin, extras: Extras, field: Field, field_i: int, var_name=None) -> 'str | TypeInfo': cls = extras['cls'] field_type = field.type = eval_forward_ref_if_needed(field.type, cls) try: return cls_loader.load_dispatcher_for_annotation( TypeInfo(field_type, field_i=field_i, val_name=var_name), extras ) # except Exception as e: # re_raise(e, cls, None, dataclass_init_fields(cls), field, None) except ParseError as pe: pe.class_name = cls # noinspection PyPropertyAccess pe.field_name = field.name raise pe from None
[docs] def re_raise(e, cls, o, fields, field, value): # If the object `o` is None, then raise an error with # the relevant info included. if o is None: raise MissingData(cls) from None # Check if the object `o` is some other type than what we expect - # for example, we could be passed in a `list` type instead. if not isinstance(o, dict): base_err = TypeError('Incorrect type for `from_dict()`') e = ParseError(base_err, o, dict, 'load', cls, desired_type=dict) add_fields = True if type(e) is not ParseError: if isinstance(e, JSONWizardError): add_fields = False else: tp = getattr(next((f for f in fields if f.name == field), None), 'type', Any) e = ParseError(e, value, tp, 'load') # We run into a parsing error while loading the field value; # Add additional info on the Exception object before re-raising it. # # First confirm these values are not already set by an # inner dataclass. If so, it likely makes it easier to # debug the cause. Note that this should already be # handled by the `setter` methods. if add_fields: e.class_name, e.fields, e.field_name, e.json_object = cls, fields, field, o else: e.class_name, e.field_name, e.json_object = cls, field, o # noinspection PyUnboundLocalVariable if (isinstance(e, ParseError) # `typing.NamedTuple` or `collections.namedtuple` and (origin := e.ann_type) is not None and is_subclass_safe(origin, tuple) and (_fields := getattr(origin, '_fields', None))): meta = get_meta(cls) nt_tp = cast(NamedTuple, origin) field_to_default = nt_tp._field_defaults num_req_fields = len(_fields) - len(field_to_default) e_cls = getattr(e.base_error, '__class__', None) if e_cls in (IndexError, KeyError, TypeError): # raise `MissingFields`, as required NamedTuple fields # are not present in the input object `o`. if isinstance(value, (list, tuple)): # noinspection PyUnboundLocalVariable _locals = {f'__{f}' for f in _fields[:len(value)]} num_req_fields_provided = min(len(value), num_req_fields) elif isinstance(value, dict): _locals = {f'__{f}' for f in _fields if f in value} num_req_fields_provided = len( [f for f in _fields if f in value and f not in field_to_default] ) else: _locals = _fields num_req_fields_provided = num_req_fields if num_req_fields_provided < num_req_fields: check_and_raise_missing_fields( _locals, value, origin, None, **( {'field': f'{ParseError.name(cls)}.{field}'} if cls and field else {} )) if meta.v1_namedtuple_as_dict: if e_cls is TypeError and type(value) is not dict: e.kwargs['resolution'] = ( 'List/tuple input is not supported for NamedTuple fields in dict mode. ' 'Pass a dict, or set Meta.v1_namedtuple_as_dict = False.' ) e.kwargs['unsupported_type'] = type(value) else: if e_cls is KeyError and type(value) is dict: e.kwargs['resolution'] = ( 'Dict input is not supported for NamedTuple fields in list mode. ' 'Pass a list/tuple, or set Meta.v1_namedtuple_as_dict = True.' ) e.kwargs['unsupported_type'] = dict raise e from None