Commit b878699a authored by David Schäfer's avatar David Schäfer
Browse files

bump pipetools to stacked DataFrame version

parent 67ca743f
......@@ -10,9 +10,10 @@ import requests
import numpy as np
import pandas as pd
from saqc import fromConfig, flagging, register, Flags, UNFLAGGED
from saqc.core import flagging, register, Flags, fromConfig
from saqc.constants import UNFLAGGED
from pipetools.lib import splitData, mergeData, writeParquet
from pipetools.lib import splitData, mergeData, writeParquet, diosUnstack
NM_URL = "http://www.nmdb.eu/nest/draw_graph.php"
......@@ -66,14 +67,14 @@ def nmDownload(station: str, resolution: int, start_date: datetime, end_date: da
return out
def extractNM(data):
def extractNM(index):
resolution = 60
nm = getNMData(
station="JUNG", resolution=resolution,
start_date=data.index.min(), end_date=data.index.max()
start_date=index.min(), end_date=index.max()
)
nm = nm.reindex(
data.index, method="nearest", limit=1,
index, method="nearest", limit=1,
tolerance=pd.Timedelta(seconds=resolution*60/2-1)
).interpolate()
return nm
......@@ -109,8 +110,7 @@ def getManFlags(fname, field):
@flagging()
def flagManual(data, field, flags, fname, **kwargs):
def flagFromFile(data, field, flags, fname, **kwargs):
mflags = getManFlags(fname, field)
for _, (start, end, flag) in mflags.iterrows():
mask = pd.Series(data=0, index=data[field].index, dtype=bool)
......@@ -119,6 +119,16 @@ def flagManual(data, field, flags, fname, **kwargs):
return data, flags
def resultStack(result):
t0 = datetime.now()
variables = []
for col in result.columns:
var = result[col].round(8)
var.index = pd.MultiIndex.from_product([[col], var.index])
variables.append(var)
return pd.concat(variables)
@click.command()
@click.option("-i", "--infile", type=click.Path(exists=True), required=True)
@click.option("-o", "--outfile", type=click.Path(), required=True)
......@@ -126,11 +136,14 @@ def flagManual(data, field, flags, fname, **kwargs):
def main(infile, outfile, configfile):
data = pd.read_parquet(infile)
data.loc[:, "NM"] = extractNM(data)
dios = diosUnstack(data)
index = data.index.get_level_values("timestamp").drop_duplicates().sort_values()
dios["NM"] = extractNM(index)
saqc = fromConfig(fname=configfile, data=data, scheme="dmp")
saqc = fromConfig(fname=configfile, data=dios, scheme="dmp")
df_out = mergeData(saqc.data.round(8), saqc.flags)
df_out = resultStack(saqc.result)
writeParquet(df_out, outfile)
......
Subproject commit 1c4c9f4de23bd4d9791a32e9b8281a2cdbbd8e48
Subproject commit 5798e9cd713c90620de00546d0ea0bdb1b04fa7d
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment