Skip to content
Snippets Groups Projects

Dendro

Merged Alexander Hinz requested to merge hinza/data_progs:dendro into meteo
All threads resolved!
1 file
+ 35
51
Compare changes
  • Side-by-side
  • Inline
@@ -11,22 +11,35 @@ from config.data import NODATA
from lib.faccess import getDevices
from lib.daccess import splitTable
from lib.logger import initLogger, exceptionLogged
from lib.flagging import getMaxflags
def filter(data, flags):
mask = flags == 9
# the filter function mask the data, leaving only useable data (not flagged)
# and choose only the dendro columns
mask = getMaxflags(flags) == 0
mask = pd.DataFrame(mask)
mask.columns = data.columns
mask.index = data.index
data = data[mask]
return data[data.columns[data.columns.str.match(".*dendro(?!.*r)")]]
def grouper(n, df, fillvalue=None):
def grouper(df, n=3, fillvalue=None):
"""
This grouper function splits a pd.DataFrame into pd.DataFrames with `n` columns.
Background: Every dendrometer (i.e one tree) is represented by three columns:
1. calculated value
2. manually read value
3. quality flag
"""
args = [iter(df)] * n
return zip_longest(fillvalue=fillvalue, *args)
def fillNa(df):
for bhd_value, bhd_manu, flag in grouper(3, df):
for bhd_value, bhd_manu, flag in grouper(df):
# setting 91 flag on all values, where calculated value is nan
df.loc[df[bhd_value].isna(), flag] = 91
df[bhd_value] = df[bhd_value].fillna(df[bhd_manu])
@@ -52,14 +65,11 @@ def dendroCalculation(df, dendro, first_value):
c0 = math.pi * 2 * r0
v0 = df[dendro][0]
cwire = c0 + 2 * ((dini + r0 - v0) ** 2 - r0 ** 2) ** 0.5 - 2 * r0 * math.acos(r0 / (dini + r0 - v0))
for num, val in enumerate(df['rad'][1:], 1):
df['rad'][num] = (cwire
- 2 * ((dini + df['rad'].iloc[num - 1] - df[dendro].iloc[num]) ** 2
- df['rad'].iloc[num - 1] ** 2) ** 0.5
+ 2 * df['rad'].iloc[num - 1]
* math.acos(df['rad'].iloc[num - 1] / (dini + df['rad'].iloc[num - 1]
- df[dendro].iloc[num]))) / math.pi / 2
return df['rad'] * 2
df['rad_2'] = df['rad'].shift(1)
df['rad'] = (cwire
- 2 * ((dini + df['rad_2'] - df[dendro]) ** 2 - df['rad_2'] ** 2) ** .5
+ 2 * df['rad_2'] * np.arccos(df['rad_2'] / (dini + df['rad_2'] - df[dendro]))) / math.pi
return df['rad']
def dendroInterpolation(df, dendro):
@@ -71,52 +81,48 @@ def dendroInterpolation(df, dendro):
def dendroProcess(logger, start_date, end_date, data, manflags):
final_df = pd.DataFrame()
final = []
for dendro in data:
if data[dendro].isnull().all():
continue # table is empty
try:
dropped_data = data[dendro].dropna()
dropped_manflags = manflags.loc[dendro].dropna(how='all')
dropped_manflags.index = dropped_manflags.index.round('10min')
concat_df = pd.concat([dropped_data, dropped_manflags], axis=1)
except (pd.errors.InvalidIndexError, KeyError) as e:
continue # concat Error
dropped_data = data[dendro].dropna()
dropped_manflags = manflags.loc[dendro].dropna(how='all')
dropped_manflags.index = dropped_manflags.index.ceil('10min')
concat_df = pd.concat([dropped_data, dropped_manflags], axis=1)
concat_df = concat_df.loc[start_date:end_date]
first_value = data.loc[data[dendro].first_valid_index(), dendro]
d_ini_index_list = concat_df['d_ini'].dropna().index
temporary_df = pd.DataFrame()
for num, val in enumerate(d_ini_index_list):
try:
part_concat_df = concat_df.loc[val:d_ini_index_list[num + 1]][:-1]
except IndexError:
part_concat_df = concat_df.loc[val:]
if not part_concat_df.empty:
value_df = dendroCalculation(part_concat_df, dendro, first_value)
temporary_df = pd.concat([temporary_df, value_df], axis=0)
try:
temporary_df_regex = r'[0-9]+'
temporary_df.columns = [f'bhd_auto_{re.findall(temporary_df_regex, dendro)[0]} [mm]']
except ValueError:
continue # temporary_df is empty, because no d_ini value in this period of time
d_ini_index = concat_df['d_ini'].dropna().index
temporary = []
stop_list = d_ini_index[1:].tolist() + [pd.Timestamp.today()]
for start, stop in zip(d_ini_index, stop_list):
part_concat_df = concat_df.loc[start:stop][:-1]
if part_concat_df.empty:
continue
value_df = dendroCalculation(part_concat_df, dendro, first_value)
temporary.append(value_df)
temporary_df = pd.concat(temporary, axis=0)
temporary_df_regex = r'[0-9]+'
temporary_df.name = f'bhd_auto_{re.findall(temporary_df_regex, dendro)[0]} [mm]'
final.append(temporary_df)
# interpolation Column
df = manflags['dbh'].loc[dendro]
df_reindexed = dendroInterpolation(df, dendro)
final.append(df_reindexed * 10)
# Flags Column
flag = temporary_df.copy()
name_regex = r'[0-9]+'
name = f'bhd_{re.findall(name_regex, dendro)[0]}_f'
flag.columns = [name]
flag[name] = 9
flag_name = f'bhd_{re.findall(name_regex, dendro)[0]}_f'
flag.name = flag_name
flag[:] = 9
final.append(flag)
final_df = pd.concat([final_df, temporary_df, df_reindexed * 10, flag], axis=1)
final_df = pd.concat(final, axis=1)
final_df.index.name = 'Date Time'
final_df = final_df.loc[start_date:end_date]
return final_df
@@ -124,28 +130,17 @@ def writeData(data, device):
fname = Path(
f"{ROOT}/HohesHolz/derived/dendrometer/",
f"{device.station_key}_bhd_trees_10min.level2.csv")
try:
old = pd.read_csv(fname, index_col=0, parse_dates=True)
resulting_list = list(old.columns)
resulting_list.extend(x for x in data.columns if x not in resulting_list)
new = data.combine_first(old)[resulting_list]
# corrections in not set flags
for a, b, c in grouper(3, new):
new[a] = new[a].replace(np.nan, -9999)
new[b] = new[b].replace(np.nan, -9999)
new[c] = new[c].replace(np.nan, 92)
new.to_csv(fname)
except FileNotFoundError:
data.to_csv(fname)
data.to_csv(fname)
for time, values in data.groupby(by=pd.Grouper(freq='AS', closed="right")):
values.to_csv(f"{ROOT}/{device.station_path}/derived/dendrometer/{device.station_key}_bhd_trees_10min.level2_{time.year}.csv")
def procDevice(logger, device, **kwargs):
logger.info(f"processing: {device}")
logger.debug("reading data")
df = device.getL1Data(
df = device.getL2Data(
start_date=device.start_date,
end_date=device.end_date,
reindex=True, fill=True)
@@ -156,6 +151,9 @@ def procDevice(logger, device, **kwargs):
logger.debug("processing")
data, flags = splitTable(df, index_level="varnames")
data = filter(data, flags)
# in case, an error in the pipeline has produced a level2 dataframe only with -9999, the device will be skipped here
if data.isna().values.all():
return
manflags.index = manflags.index.str.split(' ').str[0]
manflags = manflags.set_index('end', append=True)
manflags = manflags.drop(['start', 'flag', 'comment'], axis=1)
@@ -178,18 +176,16 @@ def main(station, device, start_date, end_date, debug):
with exceptionLogged(logger, f"{device}: failed", fail=debug):
final_out_list.append(procDevice(logger, device))
logger.debug("writing")
final = pd.concat(final_out_list, axis=1)
final.index.name = 'Date Time'
logger.debug("writing")
final = pd.concat(final_out_list, axis=1)
final.index.name = 'Date Time'
# corrections in index Order and not setted flags
final = final.sort_index()
for a, b, c in grouper(3, final):
final[a] = final[a].replace(np.nan, -9999)
final[b] = final[b].replace(np.nan, -9999)
final[c] = final[c].replace(np.nan, 92)
# corrections in index Order and not setted flags
final = final.sort_index()
for a, b, c in grouper(final):
final = final.fillna({a: NODATA, b: NODATA, c: 92})
writeData(final, device)
writeData(final, device)
if __name__ == "__main__":
@@ -197,4 +193,4 @@ if __name__ == "__main__":
pd.options.mode.chained_assignment = None
args = parseArguments("Calculate dendro data", {"ndays": 1})
main(args.station, args.logger, args.start_date, args.end_date, args.debug)
main(args.station, args.device, args.start_date, args.end_date, args.debug)
Loading