from __future__ import annotations
from inspect import signature
import math
import pandas as pd
from typing import Callable, Iterable
from typing_extensions import Any
from covsirphy.util.error import NAFoundError, NotIncludedError, NotSubclassError, UnExpectedTypeError, EmptyError
from covsirphy.util.error import UnExpectedValueRangeError, UnExpectedValueError, UnExpectedLengthError, UnExpectedNoneError
[docs]
class Validator(object):
"""Validate objects and arguments.
Args:
target (object): target object to validate
name (str): name of the target shown in error code
accept_none (str): whether accept None as the target value or not
Raises:
NAFoundError: @accept_none is False, but @target is None
Note:
When @accept_none is True and @target is None, default values will be returned with instance methods.
"""
def __init__(self, target: Any, name: str = "target", accept_none: bool = True) -> None:
self._target = target
self._name = name
if target is None and not accept_none:
raise UnExpectedNoneError(self._name)
[docs]
def subclass(self, parent: Any) -> Any:
"""Ensure the target is a subclass of the parent class.
Args:
parent: parent class or sequence of parent classes
Raises:
NotSubclassError: the target is not the subclass
Returns:
the target itself
"""
if issubclass(self._target, parent):
return self._target
raise NotSubclassError(self._name, self._target, parent)
[docs]
def instance(self, expected: Any) -> Any:
"""Ensure that the target is an instance of a specified class.
Args:
expected: expected class or sequence of expected classes
Raises:
UnExpectedTypeError: the target is not an instance of the class
Returns:
the target itself
"""
if isinstance(self._target, expected):
return self._target
raise UnExpectedTypeError(self._name, self._target, expected)
[docs]
def dataframe(self, time_index: bool = False, columns: list[str] | None = None, empty_ok: bool = True) -> pd.DataFrame:
"""Ensure the target is a dataframe.
Args:
time_index (bool): if True, the dataframe must has DatetimeIndex
columns (list[str] or None): the columns the dataframe must have
empty_ok (bool): whether give permission to empty dataframe or not
Raises:
UnExpectedTypeError: the target is not a dataframe or that has un-expected-type index
EmptyError: empty when @empty_ok is False
NotIncludedError: expected columns were not included
Returns:
pandas.DataFrame: the target itself
"""
if not isinstance(self._target, pd.DataFrame):
raise UnExpectedTypeError(self._name, self._target, pd.DataFrame)
df = self._target.copy()
if not empty_ok and df.empty:
raise EmptyError(name=self._name)
if time_index and not isinstance(df.index, pd.DatetimeIndex):
raise UnExpectedTypeError(f"Index of {self._name}", df.index, pd.DatetimeIndex)
if columns is None:
return df
if not set(columns).issubset(df.columns):
expected_cols = sorted(set(columns) - set(df.columns), key=columns.index)
for col in expected_cols:
raise NotIncludedError(
col, f"column list of {self._name}",
details=f"The dataframe has {', '.join(df.columns.tolist())} as columns")
return df
[docs]
def float(self, value_range: tuple[int | None, int | None] = (0, None), default: float | None = None, digits: int | None = None) -> float:
"""Convert a value to a float value.
Args:
value_range: value range, None means un-specified
default: default value when the target is None
digits: effective digits or None (skip rounding)
Raises:
UnExpectedNoneError: the default value is None when the target is None
UnExpectedTypeError: the target cannot be converted to a float value
UnExpectedValueRangeError: the value is out of value range
Returns:
converted float value
"""
if self._target is None:
return Validator(default, "default", accept_none=False).float(value_range=value_range, digits=digits)
try:
value = float(self._target)
except ValueError:
raise UnExpectedTypeError(self._name, self._target, float) from None
if (value < (value_range[0] or value)) or (value > (value_range[1] or value)):
raise UnExpectedValueRangeError(self._name, value, value_range)
if digits is None or value == 0:
return value
return round(value, digits - 1 - math.floor(math.log10(abs(value))))
[docs]
def int(self, value_range: tuple[int | None, int | None] = (0, None), default: int | None = None, round_ok: bool = False) -> int:
"""Convert a value to an integer.
Args:
value_range: value range, None means un-specified
default: default value when the target is None
round_ok: whether ignore round-off error
Raises:
UnExpectedNoneError: the default value is None when the target is None
UnExpectedTypeError: the target cannot be converted to an integer or round-off error exists when @round_ok is False
UnExpectedValueRangeError: the value is out of value range
Returns:
converted float value
"""
if self._target is None:
return Validator(default, name="default", accept_none=False).int(value_range=value_range, round_ok=round_ok)
try:
value = int(self._target)
except (ValueError, TypeError):
raise UnExpectedTypeError(self._name, self._target, int) from None
if value != self._target and not round_ok:
raise UnExpectedTypeError(
self._name, self._target, int, details=f"This is because we cannot ignore round-off error, | {self._target} - {value} | > 0")
if (value < (value_range[0] or value)) or (value > (value_range[1] or value)):
raise UnExpectedValueRangeError(self._name, value, value_range)
return value
[docs]
def tau(self, default: int | None = None) -> int | None:
"""Validate the value can be used as tau value [min].
Args:
default: default value when the target is None
Raises:
UnExpectedTypeError: the target cannot be converted to an integer
UnExpectedValueRangeError: the value is out of value range
Returns:
converted float value or None (when both of the target and @default are None)
"""
if self._target is None:
return None if default is None else Validator(default, name="default").tau()
value = self.int(value_range=(0, 1440), round_ok=False)
if 1440 % value == 0:
return value
divisors = [str(i) for i in range(1, 1441) if 1440 % i == 0]
raise UnExpectedValueError(
self._name, value, divisors,
details="Tau value [min], a divisor of 1440 [min], is a parameter used to convert actual time to time steps (without units)")
[docs]
def date(self, value_range: tuple[pd.Timestamp | None, pd.Timestamp | None] = (None, None), default: pd.Timestamp | None = None) -> pd.Timestamp:
"""Convert a value to a date object.
Args:
value_range: value range, None means un-specified
default: default value when the target is None
Raises:
UnExpectedNoneError: the default value is None when the target is None
UnExpectedTypeError: the target cannot be converted to a date object
UnExpectedValueRangeError: the value is out of value range
Returns:
converted date
"""
if self._target is None:
return Validator(default, name="default", accept_none=False).date(value_range=value_range)
if isinstance(self._target, pd.Timestamp):
value = self._target.replace(hour=0, minute=0, second=0, microsecond=0)
else:
try:
value = pd.to_datetime(self._target).replace(hour=0, minute=0, second=0, microsecond=0)
except ValueError:
raise UnExpectedTypeError(self._name, self._target, pd.Timestamp) from None
if (value < (value_range[0] or value)) or (value > (value_range[1] or value)):
raise UnExpectedValueRangeError(
self._name, value.strftime("%Y-%m-%d"), [None if value is None else value.strftime("%Y-%m-%d") for value in value_range])
return value
[docs]
def sequence(self, default: Iterable[Any] | None = None, flatten: bool = False, unique: bool = False, candidates: Iterable[Any] | None = None, length: int | None = None) -> list[Any]:
"""Convert a sequence (list, tuple) to a list.
Args:
default: default value when the target is None
flatten: whether flatten the sequence or not
unique: whether remove duplicated values or not, the first value will remain
candidates: list of candidates or None (no limitations)
length: length of the sequence or None (no limitations)
Raises:
UnExpectedTypeError: the target cannot be converted to a list or failed in flattening
UnExpectedValueError: the target has a value which is not included in the candidates
UnExpectedLengthError: the number of elements is not the same as @length
Returns:
converted list or empty list (when both of the target and @default are None)
"""
if self._target is None:
return [] if default is None else Validator(default, name="default", accept_none=False).sequence(flatten=flatten, unique=unique, candidates=candidates)
if not isinstance(self._target, (list, tuple)):
raise UnExpectedTypeError(
self._name, self._target, list, details="A tuple can be used, but it will be converted to a list")
if flatten:
try:
targets = sum(self._target, [])
except TypeError:
for value in [value for value in self._target if not isinstance(value, list)]:
raise UnExpectedTypeError(
f"A value of {self._name}", value, list, details="This is required to flatten the sequence") from None
else:
targets = list(self._target)
if unique:
targets = sorted(set(targets), key=targets.index)
if length is not None and len(targets) != length:
raise UnExpectedLengthError(self._name, targets, length)
if candidates is None or set(targets).issubset(candidates):
return targets
for value in (set(targets) - set(candidates)):
raise UnExpectedValueError(self._name, value, [str(c) for c in candidates])
[docs]
def dict(self, default: dict[str, Any] | None = None, required_keys: list[Any] | None = None, errors: str = "coerce") -> dict[str, Any]:
"""Ensure the target is a dictionary.
Args:
default: default values, when the target is None or key is not included in the target
required_keys: keys which must be included
errors: "coerce" or "raise"
Raises:
UnExpectedTypeError: the target is not a dictionary
NAFoundError: values of the required keys are not specified when @errors="raise"
Returns:
the target is self with default values and required keys
Note:
All keys of @default will be included and the target will overwrite it.
Note:
If some keys of @required_keys are not included and @errors="coerce", None will be set as the values of the keys.
"""
if self._target is not None and not isinstance(self._target, dict):
raise UnExpectedTypeError(self._name, self._target, dict)
_dict = dict.fromkeys(required_keys or [])
_dict.update(default or {})
_dict.update(self._target or {})
if required_keys is not None and errors != "coerce" and None in [_dict[key] for key in required_keys]:
for key in [key for key in required_keys if _dict[key] is None]:
raise NAFoundError(f"The value of key {key} in dictionary {self._name}")
return _dict
[docs]
def kwargs(self, functions: list[Callable] | Callable, default: dict[str, Any] | None = None) -> dict[str, Any]:
"""Find keyword arguments of the functions.
Args:
functions: target functions
default: default values, when the target is None or key is not included in the target
Raises:
UnExpectedTypeError: the target is not a dictionary
Returns:
dict: keyword arguments of the functions
"""
_dict = self.dict(default=default, required_keys=None, errors="coerce")
keywords_nest = [
list(signature(func).parameters.keys()) for func in (functions if isinstance(functions, list) else [functions])]
keywords_set = set(sum(keywords_nest, [])) - {"self", "cls"}
return {k: v for k, v in _dict.items() if k in keywords_set}