Skip to content
Snippets Groups Projects

aggregate_logger: fix bug with timestamp at beginning of the new year

Merged Alexander Hinz requested to merge (removed):master into master
All threads resolved!
2 files
+ 10
13
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -14,6 +14,7 @@ from lib.logger import initLogger, exceptionLogged
@@ -14,6 +14,7 @@ from lib.logger import initLogger, exceptionLogged
from lib.daccess import ConfigFields as F
from lib.daccess import ConfigFields as F
 
pd.options.mode.chained_assignment = None
AGGREGATIONS = {
AGGREGATIONS = {
"SUM": np.nansum,
"SUM": np.nansum,
@@ -73,8 +74,8 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
@@ -73,8 +74,8 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
data = np.full(df_data.shape[1], NODATA, dtype="float64")
data = np.full(df_data.shape[1], NODATA, dtype="float64")
with warnings.catch_warnings():
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
warnings.simplefilter("ignore", category=RuntimeWarning)
for agg in np.unique(aggregations): #.drop_duplicates():
for agg in np.unique(aggregations):
idx = (aggregations == agg) & np.isfinite(df_data).all(axis=0)
idx = (aggregations == agg)
if agg == "CIRCULAR_MAX_DIST":
if agg == "CIRCULAR_MAX_DIST":
for i, col in zip(np.where(idx)[0], columns[idx]):
for i, col in zip(np.where(idx)[0], columns[idx]):
wsi = columns.get_loc("WS" + col[2:])
wsi = columns.get_loc("WS" + col[2:])
@@ -96,8 +97,8 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
@@ -96,8 +97,8 @@ def aggregatePeriod(df, pflags1, pflags2, aggregations):
def aggregateData(df, freq, config):
def aggregateData(df, freq, config):
groups = (reindexTable(df)
groups = (reindexTable(df)
.astype(float)
.astype(float)
.groupby(pd.Grouper(freq=freq, closed="right", label="right")))
.groupby(pd.Grouper(freq=freq, closed="right", label="right")))
cols = df.columns.get_level_values("varnames").drop_duplicates()
cols = df.columns.get_level_values("varnames").drop_duplicates()
# insert potentially missing column labels into config
# insert potentially missing column labels into config
@@ -109,23 +110,19 @@ def aggregateData(df, freq, config):
@@ -109,23 +110,19 @@ def aggregateData(df, freq, config):
# that's still a long runinng call
# that's still a long runinng call
out = groups.apply(aggregatePeriod, pflags1.to_numpy(), pflags2.to_numpy(), aggregations.to_numpy())
out = groups.apply(aggregatePeriod, pflags1.to_numpy(), pflags2.to_numpy(), aggregations.to_numpy())
out.columns = df.columns
out.columns = df.columns
if freq == '1D':
out.index = out.index - pd.Timedelta(days=1)
return out
return out
def writeData(logger, data, device, freq):
def writeData(logger, data, device, freq):
if freq == "1D":
for date, out in iterDataFreq(data, "yearly"):
change_freq = "Y"
else:
change_freq = "yearly"
for date, out in iterDataFreq(data, change_freq):
fname = Path(
fname = Path(
device.derivedpath,
device.derivedpath,
freq,
freq,
f"{device.station_key}_{device.logger_key}_{freq}_{date.year}.level2.csv"
f"{device.station_key}_{device.logger_key}_{freq}_{date.year}.level2.csv"
)
)
logger.debug(fname)
logger.debug(fname)
 
if freq == '1D':
 
out.index = out.index - pd.Timedelta(days=1)
writeTable(fname, out, make_path=True)
writeTable(fname, out, make_path=True)
@@ -178,7 +175,6 @@ if __name__ == "__main__":
@@ -178,7 +175,6 @@ if __name__ == "__main__":
parser = ChainParser("Aggregate logger data to a frequency of 30 minutes", {"ndays": 1, "freq": "30min"})
parser = ChainParser("Aggregate logger data to a frequency of 30 minutes", {"ndays": 1, "freq": "30min"})
parser._add_argument("--freq", type=str, help="target time frequency as a pandas offset string")
parser._add_argument("--freq", type=str, help="target time frequency as a pandas offset string")
pd.options.mode.chained_assignment = None
args = parser.parse_args()
args = parser.parse_args()
main(
main(
Loading