Skip to content
Snippets Groups Projects

Aggregate

Merged Alexander Hinz requested to merge hinza/data_progs:aggregate into meteo
1 file
+ 13
19
Compare changes
  • Side-by-side
  • Inline
@@ -7,14 +7,18 @@ import pandas as pd
import numpy as np
from config.data import NODATA
from config.uri import CACHEPATH
from lib.faccess import getDevices
from lib.daccess import splitTable, reindexTable, writeTable, iterDataFreq
from lib.tools import firstOfYear, lastOfYear
from lib.logger import initLogger, exceptionLogged
from lib.flagging import getMaxflags
from lib.argparser import ChainParser
from lib.daccess import ConfigFields as F
pd.options.mode.chained_assignment = None
warnings.simplefilter(action='ignore', category=FutureWarning)
AGGREGATIONS = {
"SUM": np.nansum,
@@ -43,26 +47,41 @@ def wind(deg, speed):
def aggregatePeriod(df, pflags1, pflags2, aggregations):
# just in case there are some missing data, the Grouper produces an Error,
# this if clause add a NODATA line in output
if df.empty:
out = pd.Series(NODATA, index=range(df.shape[1]))
out[1::2] = 92
return out
# NOTE:
# Not the most expressive possible version, but this.
# is called a lot so performance matters .
# Maybe there is a way to numba boost this?
df_data, df_flag = splitTable(df, index_level="varnames")
data, flag = splitTable(df, index_level="varnames")
# let's work on numpy arrays.
columns = df_data.columns
df_flag = df_flag.to_numpy()
df_data = df_data.to_numpy()
# make all (not 9) flags uniformly with 91 or 92
columns = data.columns
df_flag = getMaxflags(flag)
df_flag_rule_1 = df_flag.copy()
df_flag_rule_2 = df_flag.copy()
df_input = data.to_numpy()
mask = df_flag == 0
# mask all flags != 9
mask = df_flag == 9
df_flag[~mask] = np.nan
df_flag = np.where(df_flag == 0, df_flag, np.nan)
# mask all flags != 91
df_flag_rule_1 = np.where(df_flag_rule_1 == 1, df_flag_rule_1, np.nan)
# mask all flags != 92
df_flag_rule_2 = np.where(df_flag_rule_2 == 2, df_flag_rule_2, np.nan)
# mask all NODATA values
mask |= df_data != NODATA
df_data[~mask] = np.nan
mask &= df_input != NODATA
df_input[~mask] = np.nan
# aggregate flags
ps = np.isfinite(df_flag).sum(axis=0) * 100 / df_flag.shape[0]
@@ -70,8 +89,16 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
flags[ps > (100 - pflags1)] = 9
flags[(ps <= (100 - pflags1)) & (ps > (100-pflags2))] = 91
# Rules to make a 91 flag in the aggregation
# Rule 1: if all flags are 91, then the aggregated flags should also be 91
flags[(df_flag_rule_1 == 1).all(axis=0)] = 91
# Rule 2: if the number of 92 flags are between at least one to 1/3 of all, then the aggregated flag should be 91
flags[((df_flag_rule_2 == 2).sum(axis=0) / df_flag_rule_2.shape[0] > 0) &
((df_flag_rule_2 == 2).sum(axis=0) / df_flag_rule_2.shape[0] < .3)] = 91
# aggregate data
data = np.full(df_data.shape[1], NODATA, dtype="float64")
agg_data = np.full(df_input.shape[1], NODATA, dtype="float64")
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
for agg in np.unique(aggregations):
@@ -79,20 +106,15 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
if agg == "CIRCULAR_MAX_DIST":
for i, col in zip(np.where(idx)[0], columns[idx]):
wsi = columns.get_loc("WS" + col[2:])
data[i] = wind(df_data[:, i], df_data[:, wsi])
agg_data[i] = wind(df_input[:, i], df_input[:, wsi])
else:
data[idx] = AGGREGATIONS[agg](df_data[:, idx], axis=0)
# replace everything else with NODATA, the inverse formulation
# of the condition ensures we hit np.nan
data[~(ps > (100 - pflags2))] = NODATA
agg_data[idx] = AGGREGATIONS[agg](df_input[:, idx], axis=0)
# merge data and flags back together - again, performance matters
out = np.zeros((len(flags) + len(data),))
out[::2] = data
out = np.zeros((len(flags) + len(agg_data),))
out[::2] = agg_data
out[1::2] = flags
return pd.Series(out)
return pd.Series(out).replace(np.nan, NODATA)
def aggregateData(df, freq, config):
@@ -114,6 +136,12 @@ def aggregateData(df, freq, config):
def writeData(logger, data, device, freq):
cache_name = Path(
CACHEPATH,
f"{device.station_key}-{device.logger_key}-aggregate-{freq}-level2.parquet"
)
writeTable(cache_name, data, make_path=True, format='parquet')
for date, out in iterDataFreq(data, "yearly"):
fname = Path(
device.derivedpath,
@@ -130,12 +158,13 @@ def procDevice(logger, device, freq, start_date, end_date):
logger.debug("reading data")
df = device.getL1Data(reindex=True,
df = device.getL2Data(reindex=True,
fill=True,
start_date=start_date,
end_date=end_date)
if df.empty:
logger.info(f"{device}: no data found for period - skipping")
return
config = device.readExcel()
config = config[[F.VARNAME, F.FLAG1, F.FLAG2, F.AGGREGATION_TYPE]]
@@ -149,7 +178,6 @@ def procDevice(logger, device, freq, start_date, end_date):
def main(station, device, start_date, end_date, freq, debug):
start_date = firstOfYear(start_date)
end_date = lastOfYear(end_date)
with initLogger(__file__, debug) as logger:
@@ -166,19 +194,18 @@ def main(station, device, start_date, end_date, freq, debug):
if __name__ == "__main__":
from lib.argparser import ChainParser
# TODO:
# - we could add a cli option to pass the intended frequency, if needed
# - this should also work to aggregate soilnet data
parser = ChainParser("Aggregate logger data to a frequency of 30 minutes", {"ndays": 1, "freq": "30min"})
parser._add_argument("--freq", type=str, help="target time frequency as a pandas offset string")
args = parser.parse_args()
parser = ChainParser("Aggregate logger data to a frequency of 30 minutes",
{"ndays": 1,
"freq": "30min"})
parser.add_argument("--freq", type=str, help="target time frequency as a pandas offset string")
args = parser.parseArgs()
main(
station=args.station, device=args.logger,
station=args.station, device=args.device,
start_date=args.start_date, end_date=args.end_date,
freq=args.freq, debug=args.debug
)
Loading