from __future__ import annotations
import json
import logging
import os
from collections import ChainMap
from dataclasses import Field, MISSING
# noinspection PyUnresolvedReferences,PyProtectedMember
from dataclasses import _FIELD_INITVAR, _POST_INIT_NAME
from typing import (Any, Callable, Mapping, TYPE_CHECKING)
from ._path_util import get_secrets_map, get_dotenv_map
from .enums import EnvKeyStrategy, EnvPrecedence
from .loaders import LoadMixin as V1LoadMixin
from .models import Extras, TypeInfo, SEQUENCE_ORIGINS, MAPPING_ORIGINS
from .type_conv import as_list_v1, as_dict_v1
from ..bases import META, AbstractEnvMeta, ENV_META
from ..bases_meta import BaseEnvWizardMeta, EnvMeta, register_type
from ..class_helper import (dataclass_fields,
dataclass_field_to_default,
dataclass_init_fields,
dataclass_init_field_names,
get_meta,
v1_dataclass_field_to_env_for_load,
CLASS_TO_LOAD_FUNC,
DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD,
call_meta_initializer_if_needed,
dataclass_field_names)
from ..constants import CATCH_ALL, PACKAGE_NAME
from ..decorators import cached_class_property
from ..errors import (JSONWizardError,
MissingData,
ParseError,
type_name, MissingVars)
from ..loader_selection import get_loader, asdict
from ..log import LOG, enable_library_debug_logging
from ..type_def import T, JSONObject, dataclass_transform
# noinspection PyProtectedMember
from ..utils.dataclass_compat import (_apply_env_wizard_dataclass,
_dataclass_needs_refresh,
_set_new_attribute)
from ..utils.function_builder import FunctionBuilder
from ..utils.object_path import v1_env_safe_get
from ..utils.string_conv import possible_env_vars
from ..utils.typing_compat import (eval_forward_ref_if_needed)
if TYPE_CHECKING:
from ._env import EnvInit, E_
[docs]
def env_config(**kw):
return kw
_PRECEDENCE_ORDER: dict[EnvPrecedence, tuple[str, ...]] = {
EnvPrecedence.SECRETS_ENV_DOTENV: ('secrets', 'env', 'dotenv'),
EnvPrecedence.SECRETS_DOTENV_ENV: ('secrets', 'dotenv', 'env'),
EnvPrecedence.ENV_ONLY: ('env', ),
}
def _pre_decoder(_cls: V1LoadMixin, container_tp: type, tp: TypeInfo, extras: Extras):
if tp.i == 1: # Outermost container (first seen in field annotation)
if container_tp in SEQUENCE_ORIGINS:
tp.ensure_in_locals(extras, as_list=as_list_v1)
return tp.replace(val_name=f'as_list({tp.v()})')
elif container_tp in MAPPING_ORIGINS:
tp.ensure_in_locals(extras, as_dict=as_dict_v1)
return tp.replace(val_name=f'as_dict({tp.v()})')
return tp
[docs]
@dataclass_transform(kw_only_default=True)
class EnvWizard:
__slots__ = ()
def __init__(self, **kwargs):
__init_fn__ = load_func_for_dataclass(
self.__class__,
loader_cls=LoadMixin,
base_meta_cls=AbstractEnvMeta,
)
__init_fn__(self, **kwargs)
def __init_subclass__(cls,
debug: bool = False,
_apply_dataclass=True,
**dc_kwargs):
super().__init_subclass__()
# skip classes provided by this library.
if cls.__module__.startswith(f'{PACKAGE_NAME}.'): # pragma: no cover
return
# Apply the @dataclass decorator.
if _apply_dataclass and _dataclass_needs_refresh(cls):
# noinspection PyArgumentList
_apply_env_wizard_dataclass(cls, dc_kwargs)
load_meta_kwargs = {'v1': True, 'v1_pre_decoder': _pre_decoder}
if debug:
lvl = logging.DEBUG if isinstance(debug, bool) else debug
enable_library_debug_logging(lvl)
# set `v1_debug` flag for the class's Meta
load_meta_kwargs['v1_debug'] = lvl
EnvMeta(**load_meta_kwargs).bind_to(cls)
# Calls the Meta initializer when inner :class:`Meta` is sub-classed.
call_meta_initializer_if_needed(cls)
_dcw_env_cache_secrets = classmethod(get_secrets_map)
_dcw_env_cache_dotenv = classmethod(get_dotenv_map)
__field_names__ = cached_class_property(dataclass_field_names)
register_type = classmethod(register_type)
[docs]
def raw_dict(self: E_) -> JSONObject:
"""
Same as ``__dict__``, but only returns values for fields defined
on the `EnvWizard` instance. See :attr:`__field_names__` for more info.
.. NOTE::
The values in the returned dictionary object are not needed to be
JSON serializable. Use :meth:`to_dict` if this is required.
"""
to_dict = asdict
[docs]
def to_json(self, *,
encoder=json.dumps,
**encoder_kwargs):
return encoder(asdict(self), **encoder_kwargs)
def load_func_for_dataclass(
cls,
extras: Extras | None = None,
loader_cls=None,
base_meta_cls: ENV_META = AbstractEnvMeta,
) -> Callable[[T, dict[str, Any]], None] | None:
# Tuple describing the fields of this dataclass.
fields = dataclass_fields(cls)
cls_init_fields = dataclass_init_fields(cls, True)
cls_init_field_names = dataclass_init_field_names(cls)
field_to_default = dataclass_field_to_default(cls)
has_defaults = True if field_to_default else False
# Does this class have a post-init function?
has_post_init = hasattr(cls, _POST_INIT_NAME)
# Get the loader for the class, or create a new one as needed.
cls_loader = get_loader(cls, base_cls=loader_cls or LoadMixin, v1=True)
cls_name = cls.__name__
fn_name = f'__{PACKAGE_NAME}_init_{cls_name}__'
raw_dict_name = f'__{PACKAGE_NAME}_raw_dict_{cls_name}__'
# Get the meta config for the class, or the default config otherwise.
meta = get_meta(cls, base_meta_cls)
# if extras is None: # we are being run for the main dataclass
# If the `recursive` flag is enabled and a Meta config is provided,
# apply the Meta recursively to any nested classes.
#
# Else, just use the base `AbstractMeta`.
config: META = meta if meta.recursive else base_meta_cls
# Initialize the FuncBuilder
fn_gen = FunctionBuilder()
new_locals = {
'cls': cls,
'fields': fields,
}
# noinspection PyTypeChecker
extras: Extras = {
'config': config,
'cls': cls,
'cls_name': cls_name,
'locals': new_locals,
'recursion_guard': {cls: fn_name},
'fn_gen': fn_gen,
}
_globals = {
'os': os,
'ChainMap': ChainMap,
'MISSING': MISSING,
'ParseError': ParseError,
'MissingVars': MissingVars,
'add': _add_missing_var,
're_raise': re_raise,
}
# we are being run for a nested dataclass
# NOTE: I don't believe this path exists, since `v1.loaders.from_dict`
# is used for nested dataclasses.
#
# else:
# is_main_class = False
# default `v1_load_case` to `EnvKeyStrategy.ENV` if not set
env_key_strat: EnvKeyStrategy | None = meta.v1_load_case or EnvKeyStrategy.ENV
default_strat = env_key_strat is not EnvKeyStrategy.STRICT
# default `v1_env_precedence` to SECRETS_ENV_DOTENV if not set
env_precedence: EnvPrecedence = meta.v1_env_precedence or EnvPrecedence.SECRETS_ENV_DOTENV
field_to_env_vars = v1_dataclass_field_to_env_for_load(cls)
check_env_vars = True if field_to_env_vars else False
field_to_paths = DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD[cls]
has_alias_paths = True if field_to_paths else False
# Fix for using `auto_assign_tags` and `raise_on_unknown_json_key` together
# See https://github.com/rnag/dataclass-wizard/issues/137
has_tag_assigned = getattr(meta, 'tag', None) is not None
if (has_tag_assigned and
# Ensure `tag_key` isn't a dataclass field,
# to avoid issues with our logic.
# See https://github.com/rnag/dataclass-wizard/issues/148
meta.tag_key not in cls_init_field_names):
expect_tag_as_unknown_key = True
else:
expect_tag_as_unknown_key = False
# on_unknown_key = meta.v1_on_unknown_key
catch_all_field: str | None = field_to_env_vars.pop(CATCH_ALL, None)
has_catch_all = catch_all_field is not None
if has_catch_all:
pre_assign = 'i+=1; '
catch_all_field_stripped = catch_all_field.rstrip('?')
catch_all_idx = cls_init_field_names.index(catch_all_field_stripped)
# remove catch all field from list, so we don't iterate over it
del cls_init_fields[catch_all_idx]
else:
pre_assign = ''
catch_all_field_stripped = catch_all_idx = None
# if on_unknown_key is not None:
# should_raise = on_unknown_key is KeyAction.RAISE
# should_warn = on_unknown_key is KeyAction.WARN
# if should_warn or should_raise:
# pre_assign = 'i+=1; '
# set_aliases = True
# else:
# set_aliases = has_catch_all
# else:
# should_raise = should_warn = None
set_aliases = has_catch_all
if set_aliases:
if expect_tag_as_unknown_key:
# add an alias for the tag key, so we don't
# capture or raise an error when we see it
aliases = {meta.tag_key}
else:
aliases = set()
new_locals['aliases'] = aliases
else:
aliases = None
if has_alias_paths:
new_locals['safe_get'] = v1_env_safe_get
add_body_lines = cls_init_fields or has_catch_all
_env_defaults: EnvInit = {}
if _env_file := meta.env_file:
_env_defaults['file'] = _env_file
if (_env_prefix := meta.env_prefix) is not None:
_env_defaults['prefix'] = _env_prefix
LOG.debug('__env__ defaults = %r', _env_defaults)
if (_secrets_dir := meta.secrets_dir) is not None:
_env_defaults['secrets_dir'] = _secrets_dir
LOG.debug('secrets_dir = <configured>')
new_locals['_env_defaults'] = _env_defaults
init_params = ['self',
"__env__:'EnvInit'=None",
'*']
with fn_gen.function(fn_name, init_params, MISSING, new_locals):
if add_body_lines:
fn_gen.add_line('cfg = _env_defaults if __env__ is None else _env_defaults | __env__')
fn_gen.add_line("reload = cfg.get('reload', False)")
fn_gen.add_line("env_file = cfg.get('file')")
fn_gen.add_line("secrets_dir = cfg.get('secrets_dir')")
fn_gen.add_line("pfx = cfg.get('prefix', '')")
# Need to create a separate dictionary to copy over the constructor
# args, as we don't want to mutate the original dictionary object.
if pre_assign:
fn_gen.add_line('i = 0')
env_map_assign = "cfg.get('mapping') or os.environ"
if env_precedence is EnvPrecedence.ENV_ONLY:
fn_gen.add_line(f'env = {env_map_assign}')
else:
fn_gen.add_line(f'env_map = {env_map_assign}')
order = _PRECEDENCE_ORDER[env_precedence]
fn_gen.add_line('maps = []')
fn_gen.add_line(f'# precedence: {env_precedence.value}')
for src in order:
if src == 'secrets':
with fn_gen.if_('secrets_dir is not None'):
fn_gen.add_line('maps.append(cls._dcw_env_cache_secrets(secrets_dir, reload=reload))')
elif src == 'dotenv':
with fn_gen.if_('env_file'):
fn_gen.add_line('maps.append(cls._dcw_env_cache_dotenv(env_file, reload=reload))')
elif src == 'env':
fn_gen.add_line('maps.append(env_map)')
fn_gen.add_line('env = env_map if len(maps) == 1 else ChainMap(*maps)')
if (_pre_from_dict := getattr(cls, '_pre_from_dict', None)) is not None:
new_locals['__pre_from_dict__'] = _pre_from_dict
fn_gen.add_line('env = __pre_from_dict__(env)')
fn_gen.add_line('_vars = None')
with fn_gen.try_():
if expect_tag_as_unknown_key and pre_assign:
with fn_gen.if_(f'{meta.tag_key!r} in env'):
fn_gen.add_line('i+=1')
val = 'v1'
_val_is_found = f'{val} is not MISSING'
for i, f in enumerate(cls_init_fields):
name = f.name
preferred_env_var = f"f'{{pfx}}{name}'"
has_default = has_defaults and name in field_to_default
val_is_found = _val_is_found
tp_var = f'tp_{i}'
new_locals[tp_var] = f.type
init_params.append(f'{name}:{tp_var}=MISSING')
f_assign = f'field={name!r}; {val}={name}'
condition = [val_is_found]
if (has_alias_paths
and (paths := field_to_paths.get(name)) is not None):
if len(paths) == 1:
first_key, *path = paths[0]
# add the first part (top-level key) of the path
if set_aliases:
aliases.add(first_key)
condition.append(
f'({val} := safe_get(env, {first_key!r}, {path!r}, {not has_default})) is not MISSING'
)
else:
last_idx = len(paths) - 1
for k, path in enumerate(paths):
first_key, *path = path
# add the first part (top-level key) of each path
if set_aliases:
aliases.add(first_key)
if k == last_idx:
condition.append(
f'({val} := safe_get(env, {first_key!r}, {path!r}, {not has_default})) is not MISSING')
else:
condition.append(
f'({val} := safe_get(env, {first_key!r}, {path!r}, False)) is not MISSING')
# TODO raise some useful message like (ex. on IndexError):
# Field "my_str" of type tuple[float, str] in A2 has invalid value ['123']
else:
if (check_env_vars
and (_initial_env_vars := field_to_env_vars.get(name)) is not None):
if len(_initial_env_vars) == 1:
_aliases = [_initial_env_vars[0]]
else:
_aliases = list(_initial_env_vars)
_has_alias = True
# No prefix for explicit aliases!
condition.extend([
f'({val} := env.get({alias!r}, MISSING)) is not MISSING'
for alias in _initial_env_vars
])
preferred_env_var = repr(_initial_env_vars[0])
else:
_aliases = []
_has_alias = False
if default_strat:
_env_vars = possible_env_vars(name, env_key_strat)
condition.extend([
f"({val} := env.get(f'{{pfx}}{alias}', MISSING)) is not MISSING"
for alias in _env_vars
])
_aliases.extend(_env_vars)
if not _has_alias:
preferred_env_var = f"f'{{pfx}}{_env_vars[0]}'"
else: # EnvKeyStrategy.STRICT
pass
if set_aliases:
# add field name itself
aliases.add(name)
# add possible JSON keys
aliases.update(_aliases)
if len(condition) > 1:
val_is_found = '(' + '\n or '.join(condition) + ')'
else:
val_is_found = condition[0]
string = generate_field_code(cls_loader, extras, f, i)
if f_assign is not None:
fn_gen.add_line(f_assign)
if has_default:
with fn_gen.if_(val_is_found):
fn_gen.add_line(f'{pre_assign}self.{name} = {string}')
if (default_factory := f.default_factory) is not MISSING:
with fn_gen.else_():
default_factory_name = f'_dflt{i}'
new_locals[default_factory_name] = default_factory
fn_gen.add_line(f'self.{name} = {default_factory_name}()')
else:
with fn_gen.if_(val_is_found):
fn_gen.add_line(f'{pre_assign}self.{name} = {string}')
with fn_gen.else_():
fn_gen.add_line(f'_vars = add(_vars, field, {preferred_env_var}, {tp_var})')
# check for any required fields with missing values
with fn_gen.if_('_vars is not None'):
fn_gen.add_line('raise MissingVars(cls, _vars) from None')
# create a broad `except Exception` block, as we will be
# re-raising all exception(s) as a custom `ParseError`.
with fn_gen.except_(Exception, 'e', ParseError):
fn_gen.add_line("re_raise(e, cls, env, fields, field, locals().get('v1'))")
elif not has_post_init:
fn_gen.add_line('pass')
if not cls_init_fields:
init_params.pop() # remove trailing `*` in function params
if has_catch_all:
catch_all_def = f'{{k: env[k] for k in env if k not in aliases}}'
if catch_all_field.endswith('?'): # Default value
with fn_gen.if_('len(env) != i'):
fn_gen.add_line(f'self.{catch_all_field_stripped} = {catch_all_def}')
else:
fn_gen.add_line(f'self.{catch_all_field_stripped} = {{}} if len(env) == i else {catch_all_def}')
# elif set_aliases: # warn / raise on unknown key
# line = 'extra_keys = set(env) - aliases'
#
# with fn_gen.if_('len(env) != i'):
# fn_gen.add_line(line)
# if should_raise:
# # Raise an error here (if needed)
# new_locals['UnknownKeysError'] = UnknownKeysError
# fn_gen.add_line('raise UnknownKeysError(extra_keys, env, cls, fields) from None')
# elif should_warn:
# # Show a warning here
# new_locals['LOG'] = LOG
# fn_gen.add_line(r"LOG.warning('Found %d unknown keys %r not mapped to the dataclass schema.\n"
# r" Class: %r\n Dataclass fields: %r', "
# "len(extra_keys), extra_keys, "
# "cls.__qualname__, [f.name for f in fields])")
# Does this class have a post-init function?
if has_post_init:
# noinspection PyUnresolvedReferences,PyProtectedMember
params_str = ','.join(f.name for f in fields
if f._field_type is _FIELD_INITVAR)
fn_gen.add_line(f'self.{_POST_INIT_NAME}({params_str})')
# Save the load function for the main dataclass, so we don't need to run
# this logic each time.
_locals = {}
with fn_gen.function(raw_dict_name, ['self'], JSONObject, _locals):
parts = ','.join([f'{name!r}:self.{name}' for name in cls.__field_names__])
fn_gen.add_line(f'return {{{parts}}}')
# noinspection PyUnboundLocalVariable
functions = fn_gen.create_functions(_globals)
cls_init = functions[fn_name]
cls_raw_dict = functions[raw_dict_name]
_set_new_attribute(
cls, '__init__', cls_init)
LOG.debug("setattr(%s, '__init__', %s)",
cls_name, fn_name)
_set_new_attribute(
cls, 'raw_dict', cls_raw_dict)
LOG.debug("setattr(%s, 'raw_dict', %s)",
cls_name, raw_dict_name)
# TODO in `v1`, we will use class attribute (set above) instead.
CLASS_TO_LOAD_FUNC[cls] = cls_init
return cls_init
def _add_missing_var(missing_vars: dict | None, name, var_name, tp):
tn = type_name(tp)
# noinspection PyBroadException
try:
suggested = tp()
except Exception:
suggested = None
if missing_vars is None:
missing_vars = []
missing_vars.append((name, var_name, tn, suggested))
return missing_vars
# def _handle_parse_error(e, cls, name, env_prefix, var_name):
#
# # We run into a parsing error while loading the field
# # value; Add additional info on the Exception object
# # before re-raising it.
# e.class_name = cls
# e.field_name = name
# e.kwargs['env_variable'] = _get_var_name(name, env_prefix, var_name)
#
# raise
def generate_field_code(cls_loader: LoadMixin,
extras: Extras,
field: Field,
field_i: int) -> 'str | TypeInfo':
cls = extras['cls']
field_type = field.type = eval_forward_ref_if_needed(field.type, cls)
try:
return cls_loader.load_dispatcher_for_annotation(
TypeInfo(field_type, field_i=field_i), extras
)
# except Exception as e:
# re_raise(e, cls, None, dataclass_init_fields(cls), field, None)
except ParseError as pe:
pe.class_name = cls
# noinspection PyPropertyAccess
pe.field_name = field.name
raise pe from None
def re_raise(e, cls, o, fields, field, value):
# If the object `o` is None, then raise an error with
# the relevant info included.
if o is None:
raise MissingData(cls) from None
# Check if the object `o` is some other type than what we expect -
# for example, we could be passed in a `list` type instead.
if not isinstance(o, Mapping):
base_err = TypeError('Incorrect type for `from_dict()`')
e = ParseError(base_err, o, dict, cls, desired_type=dict)
add_fields = True
if type(e) is not ParseError:
if isinstance(e, JSONWizardError):
add_fields = False
else:
tp = getattr(next((f for f in fields if f.name == field), None), 'type', Any)
e = ParseError(e, value, tp, 'load')
# We run into a parsing error while loading the field value;
# Add additional info on the Exception object before re-raising it.
#
# First confirm these values are not already set by an
# inner dataclass. If so, it likely makes it easier to
# debug the cause. Note that this should already be
# handled by the `setter` methods.
if add_fields:
e.class_name, e.fields, e.field_name, e.json_object = cls, fields, field, o
else:
e.class_name, e.field_name, e.json_object = cls, field, o
raise e from None
class LoadMixin(V1LoadMixin):
"""
This Mixin class derives its name from the eponymous `json.loads`
function. Essentially it contains helper methods to convert JSON strings
(or a Python dictionary object) to a `dataclass` which can often contain
complex types such as lists, dicts, or even other dataclasses nested
within it.
Refer to the :class:`AbstractLoader` class for documentation on any of the
implemented methods.
"""
__slots__ = ()
def __init_subclass__(cls, **kwargs):
super().__init_subclass__()
@staticmethod
def is_none(tp: TypeInfo, extras: Extras) -> str:
o = tp.v()
return f"{o} is None or {o} == 'null'"
@staticmethod
def load_to_bytes(tp: TypeInfo, extras: Extras):
# could add support for b64-encoded strings later:
# bytes(__b64decode(o)) if (o.__class__ is str and __env_b64)
o = tp.v()
return (f"{o} if (t := {o}.__class__) is bytes "
f"else {o}.encode('utf-8') if t is str "
f"else bytes({o})")
@classmethod
def load_to_bytearray(cls, tp: TypeInfo, extras: Extras):
o = tp.v()
as_bytes = cls.load_to_bytes(tp, extras)
return (f'{o} if {o}.__class__ is bytearray '
f'else {tp.wrap_builtin(bytearray, as_bytes, extras)}')
@classmethod
def load_to_dataclass(cls, tp: TypeInfo, extras: Extras):
# pre-decoder wraps `v()` in `asdict(...)`, so use the wrapped value
o = tp.v_for_def()
tn = tp.type_name(extras)
from_dict = super().load_to_dataclass(tp, extras)
return f'{o} if {o}.__class__ is {tn} else {from_dict}'