Skip to content
Snippets Groups Projects

Transfer ICOS data to level 1

Merged Martin Quanz requested to merge (removed):ICOS into master
1 unresolved thread
2 files
+ 2
1
Compare changes
  • Side-by-side
  • Inline
Files
2
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import pandas as pd
import re
from pathlib import Path
from datetime import date, datetime
from lib.faccess import getDevices
from lib.logger import initLogger, exceptionLogged
from transfer_level0_level1.do_transfer_level0_level1_soilnet import dropDuplicates, Flds, FNAME
from lib.tools import createPath
from lib.daccess import writeTable, readCaldataICOS
from config.data import NODATA
from lib.argparser import parseArguments
def flattenData(df):
out = (df
.set_index([df.index, Flds.BOX])
.stack()
.rename_axis([Flds.DATE, Flds.BOX, Flds.HEADERS])
.rename(Flds.VALUE)
.reset_index(level=[1, 2])
.sort_index())
out[Flds.BOX] = "Box" + out[Flds.BOX]
out[Flds.HEADERS] = out[Flds.BOX] + "_" + out[Flds.HEADERS]
return out
def writeFiles(df, device, logger):
def colLabels(collabels):
datalabels = sorted([l for l in collabels])
flaglabels = [re.sub(r"\s+\[.*?\]", r"_f", l) for l in datalabels]
labels = [l for p in zip(datalabels, flaglabels) for l in p]
return labels
dfp = pd.pivot_table(df, columns=Flds.HEADERS, values=Flds.VALUE,
dropna=False, index=df.index)
dfp.drop([col for col in dfp.columns if str(NODATA) in col],
axis=1, inplace=True)
data = (dfp
.round(4)
.reindex(columns=colLabels(dfp.columns)))
for day, out in data.groupby(by=pd.Grouper(freq="D")):
if out.empty:
continue
fname = Path(device.l1path, str(day.year),
FNAME.format(
station=device.station_key, box=device.logger_key,
date=datetime.strftime(day, "%Y%m%d")))
createPath(fname)
logger.debug("writing: %s", str(fname))
writeTable(fname, out)
def procDevice(device, raw, logger):
def unitstodict(fname):
units = pd.read_csv(fname, usecols=["Headerout", "Units"])
units["Units"] = " [" + units["Units"].fillna("") + "]"
units.set_index("Headerout", inplace=True)
return units["Units"].to_dict()
cal = readCaldataICOS(device.calpath)
units = unitstodict(cal["fname"][0])
for fname in cal["fname"]:
to_update = unitstodict(fname)
units.update(to_update)
# to keep the memory footprint within reasonable bounds
grouper = pd.Grouper(freq="1W", closed="right")
for _, chunk in raw.groupby(grouper):
if chunk.empty:
continue
data = flattenData(chunk)
ddata = data[data[Flds.BOX] == device.logger_key].copy()
if ddata.empty:
continue
unit = [units[x] if x in units else NODATA for x in ddata["headers"]]
ddata["headers"] = ddata["headers"] + unit
writeFiles(ddata, device, logger)
def transferFilesSoilnetICOS(station=None, logger=None,
start_date=None, end_date=None,
debug=False):
with initLogger(__file__, debug) as log:
devices = getDevices(station=station, logger=logger,
tag="soilnet_icos", start_date=start_date or date(2012, 1, 1),
end_date=end_date or datetime.today())
cache = {}
for device in devices:
log.info("transferring: %s", device)
# soilnet devices share the same rawdata,
# do not read it multiple times
if device.l0path not in cache:
log.debug("reading data: %s", device)
raw = device.getL0Data()
cache = {device.l0path: raw}
msg = "soilnet transfer level0 to level1 failed for: {:}".format(
device)
with exceptionLogged(log, msg, fail=debug):
procDevice(device, cache[device.l0path], log)
if __name__ == "__main__":
args = parseArguments("Transfer soilnet ICOS level0 data to level1", {"ndays": 1})
transferFilesSoilnetICOS(
station=args.station, logger=args.logger,
start_date=args.start_date, end_date=args.end_date,
debug=args.debug)
Loading