Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • berntm/saqc
  • rdm-software/saqc
  • schueler/saqc
3 results
Show changes
Showing
with 0 additions and 1223 deletions
.. SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
..
.. SPDX-License-Identifier: GPL-3.0-or-later
.. dios documentation master file, created by
sphinx-quickstart on Sun Apr 19 02:36:37 2020.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Dios Docs
=========
.. currentmodule:: dios
The whole package :mod:`dios` is mainly a container for
the class :class:`dios.DictOfSeries`. See
.. toctree::
dios.DictOfSeries <_api/dios.DictOfSeries>
.. toctree::
:hidden:
Repository <https://git.ufz.de/rdm/dios>
example DictOfSeries <_api/dios.example_DictOfSeries>
Most magic happen in getting and setting elements.
To select any combination from columns and rows,
read the documentation about indexing:
.. toctree::
doc_indexing
.. toctree::
doc_cookbook
For the idea behind the Itype concept and its usage read:
.. toctree::
doc_itype
For implemented methods and module functions,
respectively the full module api, see:
.. toctree::
:maxdepth: 2
dios_api
or browse the Index..
.. toctree::
genindex
REM SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
REM
REM SPDX-License-Identifier: GPL-3.0-or-later
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
alabaster==0.7.12
Babel==2.8.0
certifi==2020.6.20
chardet==3.0.4
commonmark==0.9.1
docutils==0.16
idna==2.10
imagesize==1.2.0
importlib-metadata==1.7.0
Jinja2==2.11.2
Markdown==3.2.2
MarkupSafe==1.1.1
numpy==1.19.1
packaging==20.4
pandas==1.1.1
Pygments==2.6.1
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.1
recommonmark==0.6.0
requests==2.24.0
six==1.15.0
snowballstemmer==2.0.0
Sphinx==3.2.1
sphinx-automodapi==0.12
sphinx-markdown-tables==0.0.15
sphinxcontrib-applehelp==1.0.2
sphinxcontrib-devhelp==1.0.2
sphinxcontrib-fulltoc==1.2.0
sphinxcontrib-htmlhelp==1.0.3
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.3
sphinxcontrib-serializinghtml==1.1.4
urllib3==1.25.10
zipp==3.1.0
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
from .generate_testsets import *
from .performance import find_index_range, gen_random_timestamps
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import datetime as dt
import os
import pickle
import time
import numpy as np
import pandas as pd
from ..dios import DictOfSeries
var_prefix = "var"
def _gen_testset(rowsz, colsz, freq="1min", disalign=True, randstart=True):
df = pd.DataFrame()
dos = DictOfSeries()
start = dt.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
times = pd.date_range(periods=rowsz, start=start, freq=freq)
frequ = freq.strip("0123456789")
freqv = int(freq[: -len(frequ)])
for i in range(colsz):
if randstart:
# generate random startpoint for each series
r = str(np.random.randint(int(rowsz * 0.05), int(rowsz * 0.6) + 2)) + frequ
st = start + pd.Timedelta(r)
times = pd.date_range(periods=rowsz, start=st, freq=freq)
if disalign:
if disalign == "random":
r = np.random.randint(1, i + 2)
else:
# total disalign
r = i
times += pd.Timedelta(f"{r}ns")
d = np.random.randint(1, 9, rowsz)
v = f"var{i}"
tmp = pd.DataFrame(index=times, data=d, columns=[v])
df = pd.merge(df, tmp, left_index=True, right_index=True, how="outer")
dos[v] = tmp.squeeze().copy()
return df, dos
def get_random_df_and_dios(rowsz, colsz, freq="1min", disalign=True, randstart=True):
df, _, _, dios, *_ = get_testset(
rowsz, colsz, freq=freq, disalign=disalign, randstart=randstart
)
return df, dios
def get_testset(
rows,
cols,
freq="1s",
disalign=True,
randstart=True,
storagedir=None,
noresult=False,
):
if storagedir is None:
storagedir = os.path.dirname(__file__)
storagedir = os.path.join(storagedir, "testsets")
fname = f"set_f{freq}_d{disalign}_r{randstart}_dim{rows}x{cols}.pkl"
fpath = os.path.join(storagedir, fname)
# try to get pickled data
try:
with open(fpath, "rb") as fh:
if noresult:
return
tup = pickle.load(fh)
# file/data was present
return tup
except (pickle.UnpicklingError, FileNotFoundError):
pass
# generate testset(s)
df, dios = _gen_testset(
rowsz=rows, colsz=cols, freq=freq, disalign=disalign, randstart=randstart
)
df = df.sort_index(axis=0, level=0)
df_type_a = df.copy().stack(dropna=False).sort_index(axis=0, level=0).copy()
df_type_b = df.copy().unstack().sort_index(axis=0, level=0).copy()
tup = df, df_type_a, df_type_b, dios
# store testsets
with open(fpath, "wb") as fh:
pickle.dump(tup, fh)
if noresult:
return
return tup
def gen_all(rrange, crange):
for r in rrange:
for c in crange:
print(r, " x ", c)
t0 = time.time()
get_testset(r, c, noresult=True)
t1 = time.time()
print(t1 - t0)
if __name__ == "__main__":
# import time
#
# t0 = time.time()
# for i in range(7):
# get_testset(10**i, 10)
# t1 = time.time()
# print(t1-t0)
rr = [10**r for r in range(1, 6)]
c = range(10, 60, 10)
gen_all(rr, c)
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import gc
from .generate_testsets import get_random_df_and_dios
def calc_mem(rows, cols, shifted=False, dtypesz=(64 / 8)):
if shifted:
idxsz = 8 * rows * cols
# additional nans are inserted exactly as many as variables
rowsz = rows * cols * dtypesz
else:
idxsz = 8 * rows
rowsz = rows * dtypesz
return idxsz + rowsz * cols
def bytes2hread(bytes):
i = 0
units = ["B", "kB", "MB", "GB", "TB"]
while bytes > 1000:
bytes /= 1024
i += 1
if i == 4:
break
return bytes, units[i]
def rows_by_time(nsec, mdays):
"""calc the number of values for one value every n seconds in m days
:param nsec: n seconds a value occur
:param mdays: this many days of data
:return: rows thats needed
"""
return int((60 / nsec) * 60 * 24 * mdays)
if __name__ == "__main__":
# dios - linear in rows and colums, same size for r=10,c=100 or r=100,c=10
do_real_check = True
cols = 10
rows = 100000
# rows = rows_by_time(nsec=600, mdays=365*2)
mem = calc_mem(rows, cols, shifted=False)
memsh = calc_mem(rows, cols, shifted=True)
df, dios = get_random_df_and_dios(rows, cols, disalign=False, randstart=True)
dios_mem = dios.memory_usage()
print(f"dios:\n-----------")
print("mem: ", *bytes2hread(dios_mem))
print("entries:", sum([len(dios[e]) for e in dios]))
print()
ratio = (1 / (memsh - mem)) * dios_mem
mem = bytes2hread(mem)
memsh = bytes2hread(memsh)
print("df - best case\n---------")
print("mem: ", *mem)
print("entries:", rows)
print()
print("df - worst case\n---------")
print("mem :", *memsh)
print("entries:", rows * cols)
print()
print(f"dfbest, dios, dfworst: 0%, {round(ratio, 4)*100}%, 100% ")
if not do_real_check:
exit(0)
proveMeRight = False
if proveMeRight:
# best case
print()
print("best case proove")
dfb, _ = get_random_df_and_dios(rows, cols, disalign=False, randstart=False)
dfb.info(memory_usage="deep", verbose=False)
print()
print("rand start, same freq")
df.info(memory_usage="deep", verbose=False)
print("entries:", sum([len(df[e]) for e in df]))
print()
print("rand start, rand freq")
df, _ = get_random_df_and_dios(rows, cols, disalign="random", randstart=True)
df.info(memory_usage="deep", verbose=False)
print("entries:", sum([len(df[e]) for e in df]))
if proveMeRight:
# worst case
print()
print("worst case proove")
df, _ = get_random_df_and_dios(rows, cols, disalign=True, randstart=False)
df.info(memory_usage="deep", verbose=False)
gc.collect()
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import time
import numpy as np
import pandas as pd
from .generate_testsets import get_testset, var_prefix
profile_assignment = False
idx = pd.IndexSlice
rows = 0
fir = ["var", "ts", "ass"]
sec = ["df", "a", "b", "dios"]
timingsdf = pd.DataFrame(columns=pd.MultiIndex.from_product([fir, sec]))
def df_timmings(df, t0, t1, v1, v2):
_t0 = time.time()
a = df.loc[t0:t1, :]
_t1 = time.time()
b = df.loc[:, v1]
_t2 = time.time()
if profile_assignment:
df.loc[t0:t1, v1] = df.loc[t0:t1, v1] * 1111
_t3 = time.time()
timingsdf.at[rows, ("ts", "df")] += _t1 - _t0
timingsdf.at[rows, ("var", "df")] += _t2 - _t1
timingsdf.at[rows, ("ass", "df")] += _t3 - _t2
return a, b, df
def a_timings(df, t0, t1, v1, v2):
_t0 = time.time()
a = df.loc[t0:t1, :]
_t1 = time.time()
b = df.loc[:, v1]
_t2 = time.time()
if profile_assignment:
df.loc[t0:t1, v1] = df.loc[t0:t1, v1] * 1111
_t3 = time.time()
timingsdf.at[rows, ("ts", "a")] += _t1 - _t0
timingsdf.at[rows, ("var", "a")] += _t2 - _t1
timingsdf.at[rows, ("ass", "a")] += _t3 - _t2
return a, b, df
def b_timings(df, t0, t1, v1, v2):
_t0 = time.time()
a = df.loc[:, t0:t1]
_t1 = time.time()
b = df.loc[v1, :]
_t2 = time.time()
if profile_assignment:
df.loc[v1, t0:t1] = df.loc[v1, t0:t1] * 1111
_t3 = time.time()
timingsdf.at[rows, ("ts", "b")] += _t1 - _t0
timingsdf.at[rows, ("var", "b")] += _t2 - _t1
timingsdf.at[rows, ("ass", "b")] += _t3 - _t2
return a, b, df
def dios_timings(dios, t0, t1, v1, v2):
_t0 = time.time()
a = dios.loc[t0:t1, :]
_t1 = time.time()
b = dios.loc[:, v1]
_t2 = time.time()
if profile_assignment:
dios.loc[t0:t1, v1] = dios.loc[t0:t1, v1] * 1111
_t3 = time.time()
timingsdf.at[rows, ("ts", "dios")] += _t1 - _t0
timingsdf.at[rows, ("var", "dios")] += _t2 - _t1
timingsdf.at[rows, ("ass", "dios")] += _t3 - _t2
return a, b, dios
def gen_random_timestamps(m, M):
r = (M - m) * (np.random.randint(10, 90) + np.random.random()) * 0.01
a, b = m + r, M - r
return min(a, b), max(a, b)
def find_index_range(obj):
min_ = None
max_ = None
for r in obj:
m = obj[r].index.min()
M = obj[r].index.max()
try:
min_ = min(min_, m)
max_ = max(max_, M)
except TypeError:
min_ = m
max_ = M
return min_, max_
if __name__ == "__main__":
import matplotlib.pyplot as plt
# do not touch
rows = 1
# max increase of of rows
# 1 = 10 # 2 = 100 # .... # 5 = 100'000
iterations = 5
runs = 1
cols = 10
profile_assignment = True
# which to calc and plot
use_df = False
use_a = True
use_b = True
use_dios = True
# plot options
normalize_to_df = True
plot_xlog = True
plot_ylog = True
# ########################
v1 = "var1"
v2 = "var2"
for i in range(iterations):
rows *= 10
timingsdf.loc[rows] = (0,) * len(timingsdf.columns)
df, a, b, dios = get_testset(rows, cols)
t0, t4 = find_index_range(df)
if use_df or normalize_to_df:
for r in range(runs):
t1, t2 = gen_random_timestamps(t0, t4)
vr1 = var_prefix + str(np.random.randint(0, cols))
df_timmings(df, t1, t2, vr1, None)
if use_a:
for r in range(runs):
t1, t2 = gen_random_timestamps(t0, t4)
vr1 = var_prefix + str(np.random.randint(0, cols))
a_timings(a, t1, t2, vr1, None)
if use_b:
for r in range(runs):
t1, t2 = gen_random_timestamps(t0, t4)
vr1 = var_prefix + str(np.random.randint(0, cols))
b_timings(b, t1, t2, vr1, None)
if use_dios:
for r in range(runs):
t1, t2 = gen_random_timestamps(t0, t4)
vr1 = var_prefix + str(np.random.randint(0, cols))
dios_timings(dios, t1, t2, vr1, None)
# calc the average
timingsdf /= runs
pd.set_option("display.max_columns", 100)
df = timingsdf
if not profile_assignment:
df.drop(labels="ass", axis=1, level=0, inplace=True)
print("timings:")
print(df)
df = df.swaplevel(axis=1)
if normalize_to_df:
a = df.loc[:, "a"] / df.loc[:, "df"]
b = df.loc[:, "b"] / df.loc[:, "df"]
c = df.loc[:, "df"] / df.loc[:, "df"]
d = df.loc[:, "dios"] / df.loc[:, "df"]
df.loc[:, "a"] = a.values
df.loc[:, "b"] = b.values
df.loc[:, "df"] = c.values
df.loc[:, "dios"] = d.values
all = df.copy()
all.swaplevel(axis=1)
print("\n\ndiff:")
print(all)
a = df.loc[:, ("a", slice(None))]
b = df.loc[:, ("b", slice(None))]
dios = df.loc[:, ("dios", slice(None))]
df = df.loc[:, ("df", slice(None))]
ax = plt.gca()
ax.set_title(f"avg of: {runs} runs, columns: {cols}")
if use_df:
df.plot(logy=plot_ylog, logx=plot_xlog, linestyle="-", ax=ax)
if use_a:
a.plot(logy=plot_ylog, logx=plot_xlog, linestyle="--", ax=ax)
if use_b:
b.plot(logy=plot_ylog, logx=plot_xlog, linestyle=":", ax=ax)
if use_dios:
dios.plot(logy=plot_ylog, logx=plot_xlog, linestyle="-.", ax=ax)
plt.show()
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
# ignore all
*
# except ourself, to ensure the `testsets`-dir isn't ignored
!.gitignore
\ No newline at end of file
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
numpy==1.21.2
pandas==1.3.5
python-dateutil==2.8.2
pytz==2022.2.1
six==1.16.0
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import subprocess
from setuptools import find_packages, setup
with open("Readme.md", "r") as fh:
long_description = fh.read()
cmd = "git describe --tags --always --dirty"
version = (
subprocess.run(cmd, shell=True, check=False, stdout=subprocess.PIPE)
.stdout.decode()
.strip()
)
print(f"git version: {version}")
# if '-dirty' in version:
# print("Do not make a version from a dirty repro. Exiting now")
# exit(1)
txt = "enter version\n>"
version = input(txt)
setup(
name="dios",
version=version,
author="Bert Palm",
author_email="bert.palm@ufz.de",
description="Dictionary of Series - a kind of pandas extension",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://git.ufz.de/rdm/dios",
packages=["dios"],
install_requires=[
"pandas",
],
license="GPLv3",
)
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
from .test_setup import *
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
from builtins import range
import numpy as np
from dios import *
if __name__ == "__main__":
dios_options[OptsFields.mixed_itype_warn_policy] = Opts.itype_warn
print(dios_options)
df = pd.DataFrame(columns=range(1000))
pd.Series()
# print(df)
# exit(99)
# dios_options[OptsFields.disp_max_cols] = 5
# dios_options[OptsFields.disp_max_rows] = 100
dios_options[OptsFields.disp_min_rows] = 50
# dios_options[OptsFields.dios_repr] = Opts.repr_aligned
n = 10
d = DictOfSeries(
dict(
l=pd.Series(0, index=range(0, 30)),
# i123=pd.Series(dtype='O'),
a=pd.Series(1, index=range(0, n)),
nan=pd.Series(np.nan, index=range(3, n + 3)),
b=pd.Series(2, index=range(0, n * 2, 2)),
c=pd.Series(3, index=range(n, n * 2)),
d=pd.Series(4, index=range(-n // 2, n // 2)),
# z=pd.Series([1, 2, 3], index=list("abc"))
)
)
def f(s):
sec = 10**9
s.index = pd.to_datetime(s.index * sec)
return s
dd = d.apply(f)
print(d)
# print(d.to_df())
# print(pd.options.display.max_rows)
# print(d.to_str(col_delim=' | ', col_space=20, header_delim='0123456789'))
# print(d.to_str(col_delim=' | ', col_space=20, max_cols=4 ))
di = DictOfSeries(columns=[])
print(di)
# print(DictOfSeries(data=1, columns=['a']))
#!/usr/bin/env python
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import pytest
from .test_setup import *
__author__ = "Bert Palm"
__email__ = "bert.palm@ufz.de"
__copyright__ = "Copyright 2018, Helmholtz-Centrum für Umweltforschung GmbH - UFC"
@pytest.mark.parametrize("left", diosFromMatr(DATA_ALIGNED))
@pytest.mark.parametrize("right", diosFromMatr(DATA_ALIGNED))
def test__eq__(left, right):
a, b = left, right
_test = a == b
for c in _test:
for i in _test[c].index:
res = (_test[c])[i]
e1 = a[c][i]
e2 = b[c][i]
exp = e1 == e2
assert res == exp
@pytest.mark.filterwarnings(
"ignore: invalid value encountered in .*_scalars", category=RuntimeWarning
)
@pytest.mark.filterwarnings(
"ignore: divide by zero encountered in .*_scalars", category=RuntimeWarning
)
@pytest.mark.parametrize("left", diosFromMatr(DATA_ALIGNED))
@pytest.mark.parametrize("right", diosFromMatr(DATA_ALIGNED))
@pytest.mark.parametrize("op", OP2)
def test__op2__aligningops(left, right, op):
a, b = left, right
test = op(a, b)
for c in test:
for j in test[c].index:
exp = op(a[c][j], b[c][j])
res = test[c][j]
if not np.isfinite(res):
print(f"\n\n{a[c][j]} {OP_MAP[op]} {b[c][j]}")
print(f"\nres: {res}, exp:{exp}, op: {OP_MAP[op]}")
pytest.skip("test not support non-finite values")
return
assert res == exp
@pytest.mark.filterwarnings(
"ignore: invalid value encountered in .*_scalars", category=RuntimeWarning
)
@pytest.mark.filterwarnings(
"ignore: divide by zero encountered in .*_scalars", category=RuntimeWarning
)
@pytest.mark.parametrize("left", diosFromMatr(DATA_UNALIGNED))
@pytest.mark.parametrize("right", diosFromMatr(DATA_UNALIGNED))
@pytest.mark.parametrize("op", OPNOCOMP)
def test__op2__UNaligningops(left, right, op):
try:
a, b = left, right
test = op(a, b)
for c in test:
for j in test[c].index:
exp = op(a[c][j], b[c][j])
res = test[c][j]
if not np.isfinite(res):
print(f"\n\n{a[c][j]} {OP_MAP[op]} {b[c][j]}")
print(f"\nres: {res}, exp:{exp}, op: {OP_MAP[op]}")
pytest.skip("test not support non-finite values")
return
assert res == exp
except ZeroDivisionError:
pytest.skip("ZeroDivisionError")
@pytest.mark.parametrize("data", diosFromMatr(ALL))
@pytest.mark.parametrize("op", OP1)
def test__op1__(data, op):
test = op(data)
res = [entry for col in test for entry in test[col]]
e = [entry for col in data for entry in data[col]]
for i in range(len(res)):
exp = op(e[i])
assert res[i] == exp
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pandas.core.dtypes.common import is_scalar
from .test_setup import *
@pytest.mark.parametrize(("idxer", "exp"), [("a", s1), ("c", s3)])
def test__getitem_single(dios_aligned, idxer, exp):
di = dios_aligned[idxer]
assert isinstance(di, pd.Series)
assert (di == exp).all()
@pytest.mark.parametrize(
"idxer",
[
"x",
"2",
1000,
None,
],
)
def test__getitem_single_fail(dios_aligned, idxer):
with pytest.raises((KeyError, ValueError)):
di = dios_aligned[idxer]
@pytest.mark.parametrize("idxer", BASIC_INDEXER)
def test__getitem_(dios_aligned, idxer):
di = dios_aligned[idxer]
assert isinstance(di, DictOfSeries)
@pytest.mark.parametrize("idxer", BASIC_INDEXER_FAIL)
def test__getitem_fail(dios_aligned, idxer):
with pytest.raises((ValueError, KeyError)):
dios_aligned[idxer]
@pytest.mark.parametrize(
("idxer", "exp"),
[
(slice(None), [s1 == s1, s2 == s2, s3 == s3, s4 == s4]),
(dios_aligned__() > 5, [s1 > 5, s2 > 5, s3 > 5, s4 > 5]),
],
)
def test__setitem_single(dios_aligned, idxer, exp):
di = dios_aligned
di[idxer] = 99
for i, c in enumerate(di):
assert ((di[c] == 99) == exp[i]).all()
@pytest.mark.parametrize("idxer", BASIC_INDEXER_FAIL)
def test__setitem__fail(dios_aligned, idxer):
with pytest.raises((ValueError, KeyError, IndexError)):
dios_aligned[idxer] = 99
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# SPDX-FileCopyrightText: 2021 Helmholtz-Zentrum für Umweltforschung GmbH - UFZ
#
# SPDX-License-Identifier: GPL-3.0-or-later
import pytest
from .test_setup import *
#
# s1 = pd.Series(range(10), index=range(10))
# s2 = pd.Series(range(5, 10), index=range(5, 10))
# s3 = pd.Series(range(1, 30, 2), index=range(1, 30, 2))
# s4 = pd.Series(np.linspace(7, 13, 9), index=range(3, 12))
# s1.name, s2.name, s3.name, s4.name = 'a', 'b', 'c', 'd'
# d1 = DictOfSeries(data=dict(a=s1.copy(), b=s2.copy(), c=s3.copy(), d=s4.copy()))
#
# blist = [True, False, False, True]
# b = pd.Series([True, False] * 5, index=[1, 2, 3, 4, 5] + [6, 8, 10, 12, 14])
# B = d1 > 5
#
#
#
#
# BLIST = [True, False, False, True]
#
# LISTIDXER = [['a'], ['a', 'c'], pd.Series(['a', 'c'])]
# BOOLIDXER = [pd.Series(BLIST), d1.copy() > 10]
# SLICEIDXER = [slice(None), slice(-3, -1), slice(-1, 3), slice(None, None, 3)]
# MULTIIDXER = [] # [d1 > 9, d1 != d1, d1 == d1]
# EMPTYIDEXER = [[], pd.Series(), slice(3, 3), slice(3, -1), DictOfSeries()]
#
# INDEXERS = LISTIDXER + BOOLIDXER + SLICEIDXER + MULTIIDXER + EMPTYIDEXER
#
#
This diff is collapsed.
This diff is collapsed.