-
David Schäfer authored
for bigger datasets.
00a10088
generate_testsets.py 3.13 KiB
import time
import pandas as pd
import numpy as np
import datetime as dt
from ..dios import DictOfSeries
import pickle
import os
var_prefix = "var"
def _gen_testset(rowsz, colsz, freq="1min", disalign=True, randstart=True):
df = pd.DataFrame()
dos = DictOfSeries()
start = dt.datetime.strptime("2000-01-01 00:00:00", "%Y-%m-%d %H:%M:%S")
times = pd.date_range(periods=rowsz, start=start, freq=freq)
frequ = freq.strip("0123456789")
freqv = int(freq[: -len(frequ)])
for i in range(colsz):
if randstart:
# generate random startpoint for each series
r = str(np.random.randint(int(rowsz * 0.05), int(rowsz * 0.6) + 2)) + frequ
st = start + pd.Timedelta(r)
times = pd.date_range(periods=rowsz, start=st, freq=freq)
if disalign:
if disalign == "random":
r = np.random.randint(1, i + 2)
else:
# total disalign
r = i
times += pd.Timedelta(f"{r}ns")
d = np.random.randint(1, 9, rowsz)
v = f"var{i}"
tmp = pd.DataFrame(index=times, data=d, columns=[v])
df = pd.merge(df, tmp, left_index=True, right_index=True, how="outer")
dos[v] = tmp.squeeze().copy()
return df, dos
def get_random_df_and_dios(rowsz, colsz, freq="1min", disalign=True, randstart=True):
df, _, _, dios, *_ = get_testset(
rowsz, colsz, freq=freq, disalign=disalign, randstart=randstart
)
return df, dios
def get_testset(
rows,
cols,
freq="1s",
disalign=True,
randstart=True,
storagedir=None,
noresult=False,
):
if storagedir is None:
storagedir = os.path.dirname(__file__)
storagedir = os.path.join(storagedir, "testsets")
fname = f"set_f{freq}_d{disalign}_r{randstart}_dim{rows}x{cols}.pkl"
fpath = os.path.join(storagedir, fname)
# try to get pickled data
try:
with open(fpath, "rb") as fh:
if noresult:
return
tup = pickle.load(fh)
# file/data was present
return tup
except (pickle.UnpicklingError, FileNotFoundError):
pass
# generate testset(s)
df, dios = _gen_testset(
rowsz=rows, colsz=cols, freq=freq, disalign=disalign, randstart=randstart
)
df = df.sort_index(axis=0, level=0)
df_type_a = df.copy().stack(dropna=False).sort_index(axis=0, level=0).copy()
df_type_b = df.copy().unstack().sort_index(axis=0, level=0).copy()
tup = df, df_type_a, df_type_b, dios
# store testsets
with open(fpath, "wb") as fh:
pickle.dump(tup, fh)
if noresult:
return
return tup
def gen_all(rrange, crange):
for r in rrange:
for c in crange:
print(r, " x ", c)
t0 = time.time()
get_testset(r, c, noresult=True)
t1 = time.time()
print(t1 - t0)
if __name__ == "__main__":
# import time
#
# t0 = time.time()
# for i in range(7):
# get_testset(10**i, 10)
# t1 = time.time()
# print(t1-t0)
rr = [10 ** r for r in range(1, 6)]
c = range(10, 60, 10)
gen_all(rr, c)