Commit 2848d66a authored by Martin Schrön's avatar Martin Schrön
Browse files

Improved neutron data functions for cleaning (wip)

parent 052f5fe5
......@@ -46,7 +46,8 @@ def safe_cast(var, to_type, default=None):
def col_is_unique(s):
"""
Check whether a pandas column has all identical values.
Use: if col_is_unique(data.lon): data = data.iloc[0]
Usage:
if col_is_unique(data.lon): data = data.iloc[0]
"""
import numpy
if isinstance(s, numpy.ndarray):
......@@ -60,7 +61,7 @@ def col_is_unique(s):
if isinstance(s, pandas.Series):
a = s.to_numpy()
else:
print('! col_is_unique: variable is neither of ndarray, pandas series, shapely point.')
print('Err','col_is_unique: variable is neither of ndarray, pandas series, shapely point.')
# Boolean array
b = a[0] == a
......@@ -70,9 +71,10 @@ def col_is_unique(s):
def str2dt(s, tz=None):
"""
Parses the date time string as a datetime object
Usage:
str2dt('2021-12-24', tz='UTC')
= datetime object
Parses the date time string as a datetime object
"""
dto = dateutil.parser.parse(s) # uses dateutil
if tz is None:
......@@ -120,11 +122,12 @@ def clean_interval_str(a):
def aggregate(data, aggstr, func='mean', set_std=[], adapt_err=[], min_count=None, verbose=False):
"""
Aggregation of the whole data set.
Adds stddev columns for specific columns.
Adapts error by 1/sqrt(a) for specific columns.
Usage:
aggregate(data, '1H')
= pandas
Aggregation of the whole data set.
Adds stddev columns for specific columns.
Adapts error by 1/sqrt(a) for specific columns.
"""
# Clean aggregation window string
......@@ -141,7 +144,7 @@ def aggregate(data, aggstr, func='mean', set_std=[], adapt_err=[], min_count=Non
elif func == 'median': data = aggobj.median()
elif func == 'sum': data = aggobj.sum(min_count=min_count)
else:
print('! Aggregation method unknown: ' + method)
print('Err','Aggregation method unknown: ' + method)
# record time resolution
tres_after = time_resolution(data)
......@@ -191,48 +194,82 @@ def moving_avg(D, column, window=1, err=True, std=True, center=True):
return(D)
def merge_datasets(data_1, data_2, name_1='', name_2=''):
"""
Merge columns of two DataFrames by index
"""
with Progress(' Merging dataset %6s into %6s ' % (name_2, name_1)) as Pr:
data = pandas.merge_asof(data_1, data_2, right_index=True, left_index=True)
Pr.update('(%5d lines x %2d columns)' % (len(data),len(data.columns)))
return(data)
"""
cut_period(df, '2019-03-14', '2020-03-14')
cut_period(df, datetime, datetime)
cut_period(df, shorter_df)
= df
Cut period to that of another data frame.
"""
def cut_period_to(D, a, b=None):
if a is None or a.strip() == '':
a = D.index.min()
if b is None or b.strip() == '':
b = D.index.max()
# Cut to other DataFrame
if isinstance(a, pandas.DataFrame):
return(D.loc[a.index.min():a.index.max()])
# Strings to parse
elif isinstance(a, str):
if a.strip() == 'today':
a = datetime.now().strftime('%Y-%m-%d')
b = D.index.max().strftime('%Y-%m-%d %H:%M')
a = str2dt(a, 'UTC')
if isinstance(b, str):
b = str2dt(b, 'UTC')
print(a,b)
"""
Cut period to that of another data frame.
Usage:
cut_period(df, '2019-03-14', '2020-03-14')
cut_period(df, datetime, datetime)
cut_period(df, shorter_df)
= df
"""
with Progress('Cutting period') as Pr:
if a is None or a.strip() == '':
a = D.index.min()
if b is None or b.strip() == '':
b = D.index.max()
# Cut to other DataFrame
if isinstance(a, pandas.DataFrame):
a = a.index.min()
b = a.index.max()
# Strings to parse
elif isinstance(a, str):
if a.strip() == 'today':
a = datetime.now().strftime('%Y-%m-%d')
b = D.index.max().strftime('%Y-%m-%d %H:%M')
a = str2dt(a, 'UTC')
if isinstance(b, str):
b = str2dt(b, 'UTC')
# Datetimes
Pr.update('(%s to %s)' % (a,b))
return(D.loc[a:b])
# Datetimes
else:
return(D.loc[a:b])
def new_column(data, copy_from=None, fill=np.nan):
"""
Create new column from a copy of another column.
If no other or no valid column is given, fill with fill.
If no fill is given, do nothing.
Example #1:
data['lat'] = new_column(data, copy_from='Latitude')
# Does nothing if Latitude is not in data
Example #2:
data['pressure'] = new_column(data, copy_from='p4', fill=np.nan)
# Creates a new column filled with np.nan even if no p4 exists
"""
if copy_from:
if copy_from in data.columns:
return(data[copy_from].copy())
else:
print('Warn','Cannot find column "%s" in the dataset.' % copy_from)
"""
Performance(data[[a,b]]
= [kge, nse, rmse, correl, var, bias]
Evaluation of two columns of a df
"""
def Performance(data, title='Test', writeout=True):
return(fill)
def Performance(data, title='Test', writeout=True):
"""
Evaluation of two columns of a df
Usage:
Performance(data[[a,b]]
= [kge, nse, rmse, correl, var, bias]
"""
from hydroeval import evaluator, kge, rmse, nse
columns = data.columns
......@@ -256,7 +293,7 @@ def optimize(df, opt, cmp, span=(700,1100), func='kge'):
elif func=='rmse':
fi = 2
best = 100000
else: print('! Unknown function. Choose '+'kge, NSE, RMSE')
else: print('Err','Unknown function. Choose '+'kge, NSE, RMSE')
best_i = 0
for i in np.arange(span[0],span[1]):
......@@ -287,8 +324,21 @@ def xsplit(s, range=0, type=str):
Split a string by , and ignore spaces,
convert to certain data type,
optionally generate an array of length `range`.
E.g.: coords = xplit('51.5, 12.1, 120', range=3, type=float)
Example:
coords = xplit('51.5, 12.1, 120', range=3, type=float)
"""
if range is None or range == []:
range = 0
if isinstance(type, str):
typedict = {'str':str,'float':float,'int':int,'bool':bool}
type = typedict[type]
if type=='str': type = str
elif type=='int': type = int
elif type=='float': type = float
elif type=='bool': type = bool
if isinstance(s, str):
s = s.replace(',',' ')
a = np.array(s.split(), dtype=type)
......
......@@ -51,32 +51,126 @@ class CRNS():
@P.setter
def P(self, v): self.data[self.alias['P']] = v
def name_err(x):
if isinstance(x, str):
return(x+'_err')
elif isinstance(x, pandas.Series):
return(x.name+'_err')
def neutron_uncertainty(data, resolution=None, units='interval'):
"""
Return neutron error for raw data.
Special case if data is already averaged in units of cph.
"""
#if special_units == 'cps':
if units == 'cps':
# print('Err',' Special cps units for neutrons requires resolution data.')
# return(np.nan)
#else:
# Assume data is already averaged
r = np.sqrt(data)/np.sqrt(resolution)
else:
r = np.sqrt(data)
return(r)
def clean_neutrons_list(data, tube_list, basename='N', uncertainty=None, resolution=None, units_from=None, units_to=None, range=None, n_sigma=None):
N = []
N_err = []
N_raw = []
for tube in tube_list:
if tube in data.columns:
#N_tube, N_tube_err
data[tube+'_c'], data[tube+'_c_err'], data[tube+'_cph'] = clean_neutrons(data[tube],
resolution=resolution, units_from=units_from, units_to=units_to,
range=range, n_sigma=n_sigma)
print(data[[tube+'_c',tube+'_c_err',tube+'_cph']])
N.append(data[tube+'_c'])
N_err.append(data[tube+'_c_err'])
N_raw.append(data[tube+'_cph'])
data[basename+'_c'] = np.sum(N)
data[basename+'_c_err'] = np.sqrt(np.sum(np.square(N_err)))
data[basename+'_raw'] = np.sum(N_raw)
return(data)
def sum_neutrons(data):
return( data.sum(axis=1) )
def sum_neutrons_err(data):
return( np.sqrt(data.pow(2).sum(axis=1)) )
def clean_neutrons(N, uncertainty=None, resolution=None, units_from=None, units_to=None, range=None, n_sigma=None):
name = N.name
# Make sure units are in counts per measurement interval
N = convert_units(N, units_from=units_from, units_to='interval', resolution=resolution)
# Get the stochastic uncertainty of N
N_err = neutron_uncertainty(N) #, data['resolution'], interval=C.get('clean','neutrons_unit'))
# Convert to cph
N = convert_units(N, units_from='interval', units_to=units_to, resolution=resolution)
N_err = convert_units(N_err, units_from='interval', units_to=units_to, resolution=resolution)
# Find outlier
N.name = name
N_o = find_outliers(N, lower=range[0], upper=range[1], n_sigma=n_sigma, sigma=N_err)
return(N_o, N_err, N)
#####################
# Cleaning #
#####################
"""
def infer_tres(data, default=None, in_sec=False):
"""
Infer time resolution
"""
def infer_tres(data, default=None):
if default:
return(int(default))
else:
"""
with Progress('Infer time resolution') as Pr:
r = data.index.to_series().diff()
r_avg = r.median().total_seconds()
Pr.update('median: %.0f sec' % r_avg)
if default:
r = default
Pr.update('using default: %.0f sec' % r)
elif in_sec:
r = r_avg
return(r)
# infer from timestamp difference
try:
return(data.index.to_series().diff()) #.median().seconds
except:
# set 60 sec as the fallback time resolution
return(60)
#try:
#except:
# # set 60 sec as the fallback time resolution
# return(60)
"""
def Unit_convert(counts, units_from=None, units_to=None, resolution=None):
"""
Unit_convert(data[N], data[tres], 'cph')
"""
def Unit_convert(counts, unit1=None, unit2=None):
if unit2 == 'cph':
factor = 3600
if isinstance(unit1, pandas.Series):
return( counts / unit1 * factor )
"""
# to cps
if units_from == 'cps':
if units_to == 'interval': factor = resolution
elif units_to == 'cps': factor = 1
elif units_to == 'cpm': factor = 60
elif units_to == 'cph': factor = 3600
elif units_to == 'cpd': factor = 86400
else: factor = np.nan
return( counts * factor )
elif units_to == 'interval':
return(counts)
else:
return(convert_units(counts/resolution, units_from='cps', units_to=units_to, resolution=resolution))
convert_units = Unit_convert
def get_nans(data):
return(data.isnull().sum().sum())
......@@ -111,13 +205,50 @@ def find_outlier(data_orig, column, target=None, lower=None, upper=None, n_sigma
if n_outliers > 0:
if action == 'drop':
data = data.dropna(subset=[target])
print('i Dropped %d invalid values in "%s"' % (n_outliers, column))
print('i','Dropped %4d invalid values in "%s"' % (n_outliers, column))
else:
print('i Found %d invalid values in "%s"' % (n_outliers, column))
print('i','Found %4d invalid values in "%s"' % (n_outliers, column))
return(data[target])
def find_outliers(data, lower=None, upper=None, n_sigma=None, sigma=None, fill=None, action=None):
"""
Takes only series
"""
with Progress(' Outliers in %s (%.0f<x<%.0f, dx<%.1f sigma)' % (data.name, lower, upper, n_sigma)) as Pr:
if fill is None:
fill = np.nan
#data = data_orig.copy()
data = data.astype(float)
before = get_nans(data)
if not lower is None:
data[data < lower] = fill
if not upper is None:
data[data > upper] = fill
if n_sigma:
#temp = '__temp'
#diff = '__diff'
_max_dev = n_sigma * sigma
_abs_dev = np.abs(data.diff()) /2
data[_abs_dev > _max_dev] = fill
n_outliers = get_nans(data) - before
if n_outliers > 0:
if action == 'drop':
data = data.dropna()
Pr.update('%4s dropped' % n_outliers)
#print('i','Dropped %4d invalid values in "%s"' % (n_outliers, column))
else:
Pr.update('%4s replaced by %s' % (n_outliers, fill))
#print('i','Found %4d invalid values in "%s"' % (n_outliers, column))
return(data)
def fill_gaps(data, column, var='pressure', fill=None, fill_type=float,
n_stations=4, lon=None, lat=None, alt=None, T=None,
local_storage_name='', verbose=True):
......
......@@ -3,8 +3,8 @@
title = CRNS Styx Test Campaign
subtitle = operated by Helmholtz Centre for Environmental Research -- UFZ, Leipzig
contact = Martin Schrön
cover =
coords =
cover =
coords =
[update]
# Source where to check for new data
......@@ -16,9 +16,9 @@ FTP_user =
FTP_pswd =
FTP_prefix =
FTP_suffix =
SD_path =
SD_prefix =
SD_suffix =
SD_path =
SD_prefix =
SD_suffix =
[input]
# Folder containing the input files
......@@ -26,7 +26,7 @@ path = input/rover-styx.zip
# File pattern:
# Select all input files that start with prefix and end with suffix
prefix = ENV
suffix =
suffix =
# Identify column names
## Define columns comma separated, leave blank for automatic header detection
......@@ -39,7 +39,7 @@ skip_lines = 0
# Separate GPS file?
separate_gps = yes
gps_prefix = GPS
gps_suffix =
gps_suffix =
gps_input_columns = SCS,Time,NSats, lat, lon, alt, Date, Time1, Datetime
gps_index_column_number = 6, 7
gps_timestamp = no
......@@ -49,7 +49,7 @@ gps_skip_lines = 0
neutron_columns = N0, N1, N3, N4, N5, N6, N7
thermal_neutron_columns =
## Temporal resolution (one column)
resolution_column =
resolution_column =
resolution_default = 20
## Data column for air pressure
pressure_column = p
......@@ -103,19 +103,20 @@ invalid_data = replace by NaN
# - constant number
# - DWD (for nearest DWD station)
# Cite as: DWD Climate Data Center (CDC): Aktuelle 10-minütige Stationsmessungen des Luftdrucks, der Lufttemperatur (in 5cm und 2m Höhe), der Luftfeuchte und des Taupunkts in Deutschland, Version recent, 2019.
missing_pressure =
missing_humidity =
missing_temperature =
missing_pressure =
missing_humidity =
missing_temperature =
# Range of corrected neutron counts in cph
neutron_range = 100, 25000
neutron_range_per_tube = 0.05, 1.1
neutron_range_per_tube = 180, 32000
#0.05, 1.1
# Treat every change that is bigger than n_sigma * sqrt(N) as outlier
neutron_n_sigma = 3
# Range for other variables
pressure_range = 1, 1100
humidity_range = 0, 100
temperature_range = -60, 60
timeres_range =
timeres_range =
# soil moisture range, applied after conversion
sm_range = 0, 0.8
# interpolate of NaN data
......@@ -215,7 +216,7 @@ rainfall = 0
# Road effect
# Schroen et al. (2018)
# leave empty for no correction
road_method =
road_method =
# Lookup table of road types and its attributes
road_type_table = input/road_type_correction.txt
# default values when no road data is available
......
......@@ -21,16 +21,18 @@ def main(configfile=None):
# Update data (FTP, SD, ...)
if 'update' in config:
print('Update')
for source in csplit(config['update']['data_source']):
print('| Updating data from ' + source + ' into ' + config['input']['path'])
download(
archive_file = config['input']['path'], source = source,
server = config['update']['FTP_server'], user = config['update']['FTP_user'],
pswd = config['update']['FTP_pswd'], ftp_prefix = config['update']['FTP_prefix'],
ftp_suffix = config['update']['FTP_suffix'], sd_path = config['update']['SD_path'],
sd_prefix = config['update']['SD_prefix'], sd_suffix = config['update']['SD_suffix'],
update_all = yesno2bool(gc(config,'update','update_all','no')))
data_source = csplit(config['update']['data_source'])
if 'SD' in data_source or 'FTP' in data_source:
print('Update')
for source in data_source:
print('Proc','Updating data from ' + source + ' into ' + config['input']['path'])
download(
archive_file = config['input']['path'], source = source,
server = config['update']['FTP_server'], user = config['update']['FTP_user'],
pswd = config['update']['FTP_pswd'], ftp_prefix = config['update']['FTP_prefix'],
ftp_suffix = config['update']['FTP_suffix'], sd_path = config['update']['SD_path'],
sd_prefix = config['update']['SD_prefix'], sd_suffix = config['update']['SD_suffix'],
update_all = yesno2bool(gc(config,'update','update_all','no')))
coordinates = xsplit(gc(config,'info','coords'), range=3, type=float)
......@@ -71,16 +73,14 @@ def main(configfile=None):
if not check_pandas(gps_data): exit()
with Progress(' Merge data and gps'):
data = pandas.merge_asof(data, gps_data, right_index=True, left_index=True)
with Progress(' Cut period...'):
data = cut_period_to(data, gc(config,'clean','start'), gc(config,'clean','end'))
data = merge_datasets(data, gps_data, 'input', 'GPS')
if not check_pandas(data): exit()
print('i',' Data period from', data.index.min().strftime('%Y-%m-%d %H:%M:%S'),
'to', data.index.max().strftime('%Y-%m-%d %H:%M:%S'))
# Cut period
data = cut_period_to(data, C.get('clean','start'), C.get('clean','end'))
if not check_pandas(data): exit()
print('i','Data period from', data.index.min().strftime('%Y-%m-%d %H:%M:%S'),
'to', data.index.max().strftime('%Y-%m-%d %H:%M:%S'))
####################################################
###### Basic cleaning ############################
......@@ -90,14 +90,15 @@ def main(configfile=None):
bbox = C.list('clean','bbox', types='float')
lat = gc(config, 'input', 'latitude_column')
lon = gc(config, 'input', 'longitude_column')
alt = gc(config, 'input', 'altitude_column')
p = gc(config, 'input', 'pressure_column')
rh = gc(config, 'input', 'humidity_column')
T = gc(config, 'input', 'temperature_column')
ah = gc(config, 'correction', 'new_humidity_column')
inc = gc(config, 'correction', 'new_incoming_column')
# Set new data columns as copy of old ones
#data['Lat'] = new_column(data, copy_from=C.get('input','latitude_column'), dispense=True)
#data['Lon'] = new_column(data, copy_from=C.get('input','longitude_column'), dispense=True)
#data['Alt'] = new_column(data, copy_from=C.get('input','altitude_column'), dispense=True)
data['Pres'] = new_column(data, copy_from=C.get('input','pressure_column'))
data['rHum'] = new_column(data, copy_from=C.get('input','humidity_column'))
data['Temp'] = new_column(data, copy_from=C.get('input','temperature_column'))
#ah = gc(config, 'correction', 'new_humidity_column')
#inc = gc(config, 'correction', 'new_incoming_column')
# Thermal neutrons
th = '_thermal'
......@@ -118,54 +119,36 @@ def main(configfile=None):
# Road
road = '_road'
# check wrong column names
for name in [p, rh, T]:
if not name in data.columns:
print('! Warning: %s is not a column name.' % name)
data[name] = np.nan
# Estimate time resolution if does not exist
tres = gc(config, 'input', 'resolution_column', alt='N1ET_sec')
if not tres in data.columns:
data[tres] = infer_tres(data, default=gc(config, 'input', 'resolution_default'))
print('i Time resolution approx. %.0f sec.' % data[tres].median())
data['resolution'] = new_column(data, copy_from=C.get('input', 'resolution_column')) #, alt='N1ET_sec'))
if data['resolution'].isnull().all():
data['resolution'] = infer_tres(data, default=C.int('input', 'resolution_default'), in_sec=True)
# TODO: make diagnistic plot for resolutio versus resolution column
## Sum up neutron columns
N = gc(config, 'correction', 'new_neutron_column')
data[N] = 0
### Clean neutrons
for c in xsplit(config['input']['neutron_columns']):
# If neutron range per tube is given, first filter outliers from each tube, then sum up
if cc(config,'clean','neutron_range_per_tube'):
if cc(config,'clean','neutrons_unit','cps'):
data[c+err] = N_err(data[c]*data[tres])/data[tres]
else:
data[c+err] = N_err(data[c])
(Napt,Nbpt) = xsplit(gc(config,'clean','neutron_range_per_tube'), range=2, type=float)
data[c+'_clean'] = find_outlier(data, c, lower=Napt, upper=Nbpt,
n_sigma = gc(config,'clean','neutron_n_sigma', dtype=float),
sigma = c+err)
data[N] += data[c+'_clean']
else:
data[N] += data[c]
neutron_columns = C.list('input','neutron_columns')
for tube in neutron_columns:
# Stochastic Uncertainty
data[tube+'_err'] = neutron_uncertainty(data[tube], data['resolution'], units=C.get('clean','neutrons_unit'))
# Harmonize units to "sum per time interval" (e.g., styx logs mean counts per second)
if cc(config,'clean','neutrons_unit','cps'):
data[N] *= data[tres]
# Convert to cph
data[tube+'_cph'] = convert_units(data[tube], units_from=C.get('clean','neutrons_unit'), units_to='cph', resolution=data['resolution'])
data[tube+'_cph_err'] = convert_units(data[tube+'_err'], units_from=C.get('clean','neutrons_unit'), units_to='cph', resolution=data['resolution'])
# Calculate error
data[N+err] = N_err(data[N])
# Filter for outlier
neutron_range_per_tube = C.list('clean','neutron_range_per_tube', range=2, types=float)
data[tube+'_c'] = find_outliers(data[tube+'_cph'], lower=neutron_range_per_tube[0], upper=neutron_range_per_tube[1],