Skip to content
Snippets Groups Projects
Commit 5f9f19aa authored by Bert Palm's avatar Bert Palm 🎇
Browse files

introduced base-class, cleanup imports

parent d77f0c9b
No related branches found
Tags 0.5
No related merge requests found
#!/usr/bin/env python
from . import operators as ops
from . import lib
from .lib import (
_CAST_POLICIES,
_throw_MixedItype_err_or_warn,
_find_least_common_itype,
)
from abc import abstractmethod
import pandas as pd
import operator as op
import functools as ftools
from pandas.core.common import is_bool_indexer as _is_bool_indexer
import pandas.core.dtypes.common as pdcom
"""
Unlike the example says, return lists False, not True
>>is_iterator([1, 2, 3])
>>False
"""
from pandas.core.dtypes.common import is_iterator as _is_iterator
__author__ = "Bert Palm"
__email__ = "bert.palm@ufz.de"
__copyright__ = "Copyright 2018, Helmholtz-Zentrum für Umweltforschung GmbH - UFZ"
class _DiosBase:
@property
@abstractmethod
def _constructor(self):
pass
def __init__(self, data=None, columns=None, index=None, itype=None, cast_policy='save', fastpath=False):
self.cast_policy = cast_policy
# we are called internally
if fastpath:
self._itype = itype or lib.ObjItype
if data is not None:
self._data = data
else:
# it is significantly faster, to provide an index and fill it,
# than to successively build the index by adding data
self._data = pd.Series(dtype='O', index=columns)
else:
if index is not None and not isinstance(index, pd.Index):
index = pd.Index(index)
# itype=None means infer the itype by the data, so we first set to the highest
# possible itype, then insert data, then infer the best-fitting itype.
if itype is None and index is None:
self._itype = lib.ObjItype
else:
if index is not None:
self._itype = lib.get_itype(index)
if itype is not None:
self._itype = lib.get_itype(itype)
cols = pd.Index([] if columns is None else columns)
if not cols.is_unique:
raise ValueError("columns must be unique")
self._data = pd.Series(dtype='O', index=cols)
if data is not None:
self._init_insert_data(data, columns, index)
# self._data still contain nans at all positions, where
# no data was present, but a column-name was given
if self._data.hasnans:
e = pd.Series(dtype='O', index=index)
for c in self.columns[self._data.isna()]:
self._insert(c, e.copy())
self._data.index.name = 'columns'
# we try to infer the itype, but if we still have
# no data, we will set the itype lazy, i.e. with
# the first non-empty _insert()
if itype is None:
if self.empty:
self._itype = 'INFER'
else:
self._itype = _find_least_common_itype(self._data)
if not self._itype.unique:
_throw_MixedItype_err_or_warn(self.itype)
def _init_insert_data(self, data, columns, index):
""" Insert items of a iterable in self"""
if _is_iterator(data):
data = list(data)
if _is_dios_like(data) or isinstance(data, dict):
if columns is None:
pass # data is dict-like
else:
data = {k: data[k] for k in data if k in columns}
elif isinstance(data, pd.Series):
name = data.name or 0
if columns is not None and len(columns) > 0:
name = self.columns[0]
data = {name: data}
elif pdcom.is_nested_list_like(data):
if columns is None:
data = {i: d for i, d in enumerate(data)}
elif len(data) == len(columns):
data = dict(zip(self.columns, data))
else:
raise ValueError(f"{len(columns)} columns passed, data implies {len(data)} columns")
elif pdcom.is_list_like(data):
name = 0 if columns is None or len(columns) < 1 else self.columns[0]
data = {name: data}
else:
raise TypeError("data type not understood")
for k in data:
self._insert(k, pd.Series(data[k], index=index))
# ----------------------------------------------------------------------
# Indexing Methods
def _insert(self, col, val):
"""Insert a fresh new value as pd.Series into self"""
val = list(val) if _is_iterator(val) else val
if _is_dios_like(val):
val = val.squeeze()
if not isinstance(val, pd.Series):
raise ValueError(f"Cannot insert frame-like with more than one column")
elif val is None:
val = pd.Series()
elif not isinstance(val, pd.Series):
raise TypeError(f"Only data of type pandas.Series can be inserted, passed was {type(val)}")
# set the itype lazy, i.e. when first non-empty
# column is inserted
if self._itype == 'INFER':
if not val.empty:
self._itype = lib.get_itype(val.index)
# cast all pre-inserted empty series
self._cast_all(self._itype, self._policy)
if not self._itype.unique:
_throw_MixedItype_err_or_warn(self._itype)
else:
val = lib.cast_to_itype(val, self.itype, policy=self._policy)
val.name = col
self._data.at[col] = val.copy(deep=True)
def __getitem__(self, key):
""" dios[key] -> dios/series """
key = list(key) if _is_iterator(key) else key
if isinstance(key, tuple):
raise KeyError("tuples are not allowed")
if pdcom.is_hashable(key):
# NOTE: we use copy here to prevent index
# changes, that could result in an invalid
# itype. A shallow copy is not sufficient.
# work on columns, return series
return self._data.at[key].copy()
if _is_dios_like(key):
# work on rows and columns
new = self._getitem_bool_dios(key)
elif isinstance(key, slice):
# work on rows
new = self._slice(key)
elif _is_bool_indexer(key):
# work on rows
new = self._getitem_bool_listlike(key)
else:
# work on columns
data = self._data.loc[key]
new = self._constructor(data=data, itype=self.itype, cast_policy=self._policy, fastpath=True)
return new
def _slice(self, key):
"""slices self, return copy"""
if key == slice(None):
return self.copy()
new = self.copy_empty(columns=True)
for k in self.columns:
new._data.at[k] = self._data.at[k][key]
return new
def _getitem_bool_dios(self, key):
""" Select items by a boolean dios-like drop un-selected indices. """
if not _is_bool_dios_like(key):
raise ValueError("Must pass DictOfSeries with boolean values only")
new = self.copy_empty(columns=True)
for k in self.columns.intersection(key.columns):
dat = self._data.at[k]
val = key[k]
# align rows
idx = val[val].index.intersection(dat.index)
new._data.at[k] = dat[idx]
return new
def _getitem_bool_listlike(self, key):
new = self.copy_empty(columns=True)
for k in self.columns:
new._data.at[k] = self._data.at[k].loc[key]
return new
def __setitem__(self, key, value):
""" dios[key] = value """
key = list(key) if _is_iterator(key) else key
if isinstance(key, tuple):
raise KeyError(f"{key}. tuples are not allowed")
elif pdcom.is_hashable(key):
if isinstance(value, pd.Series) or key not in self.columns:
self._insert(key, value)
elif _is_dios_like(value) or pdcom.is_nested_list_like(value):
raise ValueError("Incompatible indexer with multi-dimensional value")
else:
self._data.at[key][:] = value
else:
data = self.__getitem__(key)
assert isinstance(data, self.__class__), f"getitem returned data of type {type(data)}"
# special cases
if _is_dios_like(value):
self._setitem_dios(data, value)
# NOTE: pd.Series also considered list-like
elif pdcom.is_list_like(value):
self._setitem_listlike(data, value)
# default case
else:
for k in data.columns:
s = data._data.at[k]
s[:] = value
self._data.at[k][s.index] = s
def _setitem_listlike(self, data, value):
value = value.values if isinstance(value, pd.Series) else value
if len(value) != len(data.columns):
raise ValueError(f"array-like value of length {len(value)} could "
f"not be broadcast to indexing result of shape "
f"(.., {len(data.columns)})")
for i, k in enumerate(data.columns):
s = data._data.at[k]
s[:] = value[i]
self._data.at[k][s.index] = s
def _setitem_dios(self, data, value):
""" Write values from a dios-like to self.
No justification or alignment of columns, but of indices.
If value has missing indices, nan's are inserted at that
locations, just like `series.loc[:]=val` or `df[:]=val` do.
Eg.
di[::2] = di[::3] -> di[::2]
x | x | x |
===== | ==== | ====== |
0 x | 0 z | 0 z |
2 x | = 3 z | -> 2 NaN |
4 x | 6 z | 4 NaN |
6 x | 6 z |
Parameter
----------
data : dios
A maybe trimmed version of self
value : dios, pd.Dataframe
The value to set with the same column dimension like data
"""
if len(data) != len(value.columns):
raise ValueError(f"shape mismatch: values array of shape "
f"(.., {len(value.columns)}) could not "
f"be broadcast to indexing result of "
f"shape (.., {len(data.columns)})")
for i, k in enumerate(data):
dat = data._data.at[k]
# .loc cannot handle empty series,
# like `emptySeries.loc[:] = [1,2]`
if dat.empty:
continue
val = value[value.columns[i]]
dat.loc[:] = val
self._data.at[k].loc[dat.index] = dat
def __delitem__(self, key):
del self._data[key]
# ------------------------------------------------------------------------------
# Base properties and basic dunder magic
@property
def columns(self):
return self._data.index
@columns.setter
def columns(self, cols):
index = pd.Index(cols)
if not index.is_unique:
raise ValueError("columns index must have unique values")
self._data.index = index
@property
def itype(self):
if self._itype == 'INFER':
return None
return self._itype
@itype.setter
def itype(self, itype):
itype = lib.get_itype(itype)
self._cast_all(itype, policy=self._policy)
self._itype = itype
@property
def cast_policy(self):
return self._policy
@cast_policy.setter
def cast_policy(self, policy):
if policy not in _CAST_POLICIES:
raise ValueError(f"policy must be one of {_CAST_POLICIES}")
self._policy = policy
def _cast_all(self, itype, policy):
c = '?'
data = self.copy_empty()
try:
for c in self.columns:
data._data.at[c] = lib.cast_to_itype(self._data.at[c], itype, policy=policy)
except Exception as e:
raise type(e)(f"Column {c}: " + str(e)) from e
def __len__(self):
return len(self.columns)
@property
def empty(self):
return len(self) == 0 or all(s.empty for s in self._data)
def __iter__(self):
yield from self.columns
def __reversed__(self):
yield from reversed(self.columns)
def __contains__(self, item):
return item in self.columns
# ----------------------------------------------------------------------
# if copy.copy() is copy.copy(): return copy.copy().copy()
def __copy__(self):
return self.copy(deep=True)
def __deepcopy__(self, memo=None):
return self.copy(deep=True)
def copy(self, deep=True):
if deep:
data = pd.Series(dtype='O', index=self.columns)
for c in self.columns:
data.at[c] = self._data.at[c].copy(deep=True)
else:
data = self._data
kws = dict(itype=self._itype, cast_policy=self._policy)
return self._constructor(data=data, fastpath=True, **kws)
def copy_empty(self, columns=True):
data = None
if columns is True: # is correct
data = pd.Series(dtype='O', index=self.columns)
for c in self.columns:
data.at[c] = pd.Series(dtype=self._data.at[c].dtype)
kws = dict(itype=self._itype, cast_policy=self._policy)
return self._constructor(data=data, fastpath=True, **kws)
# ------------------------------------------------------------------------------
# Operators
def _op1(self, op):
new = self.copy_empty(columns=True)
try:
for k in self.columns:
new[k] = op(self[k])
except Exception as e:
raise type(e)(f"'{ops.OP_MAP[op]} dios' failed: " + str(e)) from e
return new
def _op2(self, op, other, align=True, inplace=False):
def raiseif(kself, kother, s):
if kself != kother:
raise ValueError(f"{s} does not match, {s} left: {kself}, {s} right: {kother}")
def doalign(left, right):
return left.align(right, join='inner') if align else (left, right)
def gen():
if _is_dios_like(other):
raiseif(list(self), list(other), 'keys')
for k in self.columns:
left, right = self[k], other[k]
yield k, op(*doalign(left, right))
elif isinstance(other, pd.Series):
for k in self.columns:
left, right = self[k], other
yield k, op(*doalign(left, right))
elif pdcom.is_dict_like(other):
raiseif(sorted(self), sorted(other), 'keys')
for k in self.columns:
yield k, op(self[k], other[k])
elif pdcom.is_nested_list_like(other):
raiseif(len(self), len(other), 'length')
for i, k in enumerate(self.columns):
yield k, op(self[k], other[i])
elif pdcom.is_scalar(other) or pdcom.is_list_like(other):
for k in self.columns:
yield k, op(self[k], other)
else:
raise NotImplementedError
new = self if inplace else self.copy_empty(columns=True)
try:
for k, val in gen():
new[k] = val
except Exception as e:
raise type(e)(f"'dios {ops.OP_MAP[op]} other' failed: " + str(e)) from e
return new
__neg__ = ftools.partialmethod(_op1, op.neg)
__abs__ = ftools.partialmethod(_op1, op.abs)
__invert__ = ftools.partialmethod(_op1, op.inv)
__eq__ = ftools.partialmethod(_op2, op.eq, align=False)
__ne__ = ftools.partialmethod(_op2, op.ne, align=False)
__le__ = ftools.partialmethod(_op2, op.le, align=False)
__ge__ = ftools.partialmethod(_op2, op.ge, align=False)
__lt__ = ftools.partialmethod(_op2, op.lt, align=False)
__gt__ = ftools.partialmethod(_op2, op.gt, align=False)
__add__ = ftools.partialmethod(_op2, op.add)
__sub__ = ftools.partialmethod(_op2, op.sub)
__mul__ = ftools.partialmethod(_op2, op.mul)
__mod__ = ftools.partialmethod(_op2, op.mod)
__truediv__ = ftools.partialmethod(_op2, op.truediv)
__floordiv__ = ftools.partialmethod(_op2, op.floordiv)
__pow__ = ftools.partialmethod(_op2, op.pow)
__and__ = ftools.partialmethod(_op2, op.and_)
__or__ = ftools.partialmethod(_op2, op.or_)
__xor__ = ftools.partialmethod(_op2, op.xor)
# ------------------------------------------------------------------------------
# Indexer
@property
def loc(self):
return _LocIndexer(self)
@property
def iloc(self):
return _iLocIndexer(self)
@property
def aloc(self):
return _aLocIndexer(self)
@property
def at(self):
return _AtIndexer(self)
@property
def iat(self):
return _iAtIndexer(self)
def _is_dios_like(obj) -> bool:
# must have columns
# columns is some kind of pd.Index
# iter will iter through columns
# a `in` obj check if obj is in columns
# obj[key] will give a pd.Series
# obj.squeeze() give pd.Series if len(obj) == 1
return isinstance(obj, _DiosBase) or isinstance(obj, pd.DataFrame)
def _is_bool_series(obj) -> bool:
return isinstance(obj, pd.Series) and obj.dtype == bool
def _is_bool_dios_like(obj) -> bool:
if not _is_dios_like(obj):
return False
dtypes = obj.dtypes
if (dtypes == bool).all():
return True
if (dtypes == 'O').any():
return obj.apply(_is_bool_indexer).all()
return False
# keep this here to prevent cyclic import
from .indexer import _aLocIndexer, _iLocIndexer, _LocIndexer, _iAtIndexer, _AtIndexer
This diff is collapsed.
from .dios import ( from .base import (
DictOfSeries, _DiosBase,
_is_dios_like, _is_dios_like,
_is_bool_series,
_is_list_like_not_nested,
_is_bool_dios_like, _is_bool_dios_like,
_is_iterator) _is_iterator)
...@@ -23,7 +21,7 @@ _is_null_slice = ccom.is_null_slice ...@@ -23,7 +21,7 @@ _is_null_slice = ccom.is_null_slice
class _Indexer: class _Indexer:
def __init__(self, obj: DictOfSeries): def __init__(self, obj: _DiosBase):
self.obj = obj self.obj = obj
self._data = obj._data self._data = obj._data
...@@ -271,7 +269,7 @@ class _aLocIndexer(_Indexer): ...@@ -271,7 +269,7 @@ class _aLocIndexer(_Indexer):
if lowdim: if lowdim:
return data.squeeze() return data.squeeze()
else: else:
return DictOfSeries(data=data, fastpath=True, **kws) return self.obj._constructor(data=data, fastpath=True, **kws)
def __setitem__(self, key, value): def __setitem__(self, key, value):
rowkeys, colkeys, _ = self._unpack_key_aloc(key) rowkeys, colkeys, _ = self._unpack_key_aloc(key)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment