diff --git a/saqc/funcs/harm_functions.py b/saqc/funcs/harm_functions.py index 42ef0200c3210e4f7e34b83857876583a2f19e8a..b58070ec88d5a037654a3f198c2949e4593dc0b9 100644 --- a/saqc/funcs/harm_functions.py +++ b/saqc/funcs/harm_functions.py @@ -9,6 +9,7 @@ import dios from saqc.funcs.functions import flagMissing from saqc.funcs.register import register from saqc.lib.tools import toSequence, getFuncFromInput +from saqc.lib.ts_operators import interpolateNANs logger = logging.getLogger("SaQC") @@ -362,8 +363,9 @@ def _interpolateGrid( spec_case_mask = spec_case_mask.tshift(-1, freq) data = _insertGrid(data, freq) - data, chunk_bounds = _interpolate( + data, chunk_bounds = interpolateNANs( data, method, order=order, inter_limit=2, downcast_interpolation=downcast_interpolation, + return_chunk_bounds=True ) # exclude falsely interpolated values: @@ -382,78 +384,6 @@ def _interpolateGrid( return data, chunk_bounds -def _interpolate(data, method, order=2, inter_limit=2, downcast_interpolation=False): - """ - The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords - from the pd.Series.interpolate method and will than apply this very methods. Note, that the inter_limit keyword - really restricts the interpolation to chunks, not containing more than "inter_limit" nan entries - (thereby opposing the limit keyword of pd.Series.interpolate). - - :param data: pd.Series. The data series to be interpolated - :param method: String. Method keyword designating interpolation method to use. - :param order: Integer. If your desired interpolation method needs an order to be passed - - here you pass it. - :param inter_limit: Integer. Default = 2. Limit up to wich nan - gaps in the data get interpolated. - Its default value suits an interpolation that only will apply on an inserted - frequency grid. - :param downcast_interpolation: Boolean. Default False. If True: - If a data chunk not contains enough values for interpolation of the order "order", - the highest order possible will be selected for that chunks interpolation." - :return: - """ - - gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit - - if inter_limit == 2: - gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True) - else: - gap_mask = ( - gap_mask.replace(True, np.nan).fillna(method="bfill", limit=inter_limit).replace(np.nan, True).astype(bool) - ) - # start end ending points of interpolation chunks have to be memorized to block their flagging: - chunk_switches = gap_mask.astype(int).diff() - chunk_starts = chunk_switches[chunk_switches == -1].index - chunk_ends = chunk_switches[(chunk_switches.shift(-1) == 1)].index - chunk_bounds = chunk_starts.join(chunk_ends, how="outer", sort=True) - - data = data[gap_mask] - - if method in ["linear", "time"]: - - data.interpolate(method=method, inplace=True, limit=1, limit_area="inside") - - else: - dat_name = data.name - gap_mask = (~gap_mask).cumsum() - data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True) - - def _interpolWrapper(x, wrap_order=order, wrap_method=method): - if x.count() > wrap_order: - try: - return x.interpolate(method=wrap_method, order=int(wrap_order)) - except (NotImplementedError, ValueError): - logger.warning( - "Interpolation with method {} is not supported at order {}. " - "Interpolation will be performed with order {}".format( - method, str(wrap_order), str(wrap_order - 1) - ) - ) - return _interpolWrapper(x, int(wrap_order - 1), wrap_method) - elif x.size < 3: - return x - else: - if downcast_interpolation: - return _interpolWrapper(x, int(x.count() - 1), wrap_method) - else: - return x - - data = data.groupby(data.columns[0]).transform(_interpolWrapper) - # squeezing the 1-dimensional frame resulting from groupby for consistency reasons - data = data.squeeze(axis=1) - data.name = dat_name - return data, chunk_bounds - - def _reshapeFlags( flagger, field, diff --git a/saqc/lib/ts_operators.py b/saqc/lib/ts_operators.py index 2683f39b2d1d11c5a7798673460b40657e5e8862..3d86af54e3d1c03ed4244f5a3290892a16fe688c 100644 --- a/saqc/lib/ts_operators.py +++ b/saqc/lib/ts_operators.py @@ -7,23 +7,6 @@ from sklearn.neighbors import NearestNeighbors from scipy.stats import iqr -def _isValid(data, max_nan_total, max_nan_consec): - if (max_nan_total is np.inf) & (max_nan_consec is np.inf): - return True - - nan_mask = data.isna() - - if nan_mask.sum() <= max_nan_total: - if max_nan_consec is np.inf: - return True - elif nan_mask.rolling(window=max_nan_consec + 1).sum().max() <= max_nan_consec: - return True - else: - return False - else: - return False - - # ts_transformations def identity(ts): return ts @@ -85,12 +68,6 @@ def nBallClustering(in_arr, ball_radius=None): return exemplars, members, ex_indices -def kNN(in_arr, n_neighbors, algorithm="ball_tree"): - nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm).fit(in_arr) - return nbrs.kneighbors() - - - def standardizeByMean(ts): return (ts - ts.mean())/ts.std() @@ -99,8 +76,13 @@ def standardizeByMedian(ts): return (ts - ts.median())/iqr(ts, nan_policy='omit') +def _kNN(in_arr, n_neighbors, algorithm="ball_tree"): + nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm=algorithm).fit(in_arr) + return nbrs.kneighbors() + + def kNNMaxGap(in_arr, n_neighbors, algorithm='ball_tree'): - dist, *_ = kNN(in_arr, n_neighbors, algorithm=algorithm) + dist, *_ = _kNN(in_arr, n_neighbors, algorithm=algorithm) sample_size = dist.shape[0] to_gap = np.append(np.array([[0] * sample_size]).T, dist, axis=1) max_gap_ind = np.diff(to_gap, axis=1).argmax(axis=1) @@ -108,10 +90,27 @@ def kNNMaxGap(in_arr, n_neighbors, algorithm='ball_tree'): def kNNSum(in_arr, n_neighbors, algorithm="ball_tree"): - dist, *_ = kNN(in_arr, n_neighbors, algorithm=algorithm) + dist, *_ = _kNN(in_arr, n_neighbors, algorithm=algorithm) return dist.sum(axis=1) +def _isValid(data, max_nan_total, max_nan_consec): + if (max_nan_total is np.inf) & (max_nan_consec is np.inf): + return True + + nan_mask = data.isna() + + if nan_mask.sum() <= max_nan_total: + if max_nan_consec is np.inf: + return True + elif nan_mask.rolling(window=max_nan_consec + 1).sum().max() <= max_nan_consec: + return True + else: + return False + else: + return False + + def stdQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): """Pandas built in function for statistical moments have quite poor nan- control, so here comes a wrapper that will return the standart deviation for a given series input, if the total number of nans in the series does @@ -152,3 +151,87 @@ def meanQC(data, max_nan_total=np.inf, max_nan_consec=np.inf): if _isValid(data, max_nan_total, max_nan_consec): return data.mean() return np.nan + + +def interpolateNANs(data, method, order=2, inter_limit=2, downcast_interpolation=False, return_chunk_bounds=False): + """ + The function interpolates nan-values (and nan-grids) in timeseries data. It can be passed all the method keywords + from the pd.Series.interpolate method and will than apply this very methods. Note, that the inter_limit keyword + really restricts the interpolation to chunks, not containing more than "inter_limit" nan entries + (thereby opposing the limit keyword of pd.Series.interpolate). + + :param data: pd.Series. The data series to be interpolated + :param method: String. Method keyword designating interpolation method to use. + :param order: Integer. If your desired interpolation method needs an order to be passed - + here you pass it. + :param inter_limit: Integer. Default = 2. Limit up to whitch nan - gaps in the data get interpolated. + Its default value suits an interpolation that only will apply on an inserted + frequency grid. (regularization by interpolation) + :param downcast_interpolation: Boolean. Default False. If True: + If a data chunk not contains enough values for interpolation of the order "order", + the highest order possible will be selected for that chunks interpolation." + :param return_chunk_bounds: Boolean. Default False. If True: + Additionally to the interpolated data, the start and ending points of data chunks + not containing no series consisting of more then "inter_limit" nan values, + are calculated and returned. + (This option fits requirements of the "_interpolate" functions use in the context of + saqc harmonization mainly.) + + :return: + """ + + gap_mask = (data.rolling(inter_limit, min_periods=0).apply(lambda x: np.sum(np.isnan(x)), raw=True)) != inter_limit + + if inter_limit == 2: + gap_mask = gap_mask & gap_mask.shift(-1, fill_value=True) + else: + gap_mask = ( + gap_mask.replace(True, np.nan).fillna(method="bfill", limit=inter_limit).replace(np.nan, True).astype(bool) + ) + + if return_chunk_bounds: + # start end ending points of interpolation chunks have to be memorized to block their flagging: + chunk_switches = gap_mask.astype(int).diff() + chunk_starts = chunk_switches[chunk_switches == -1].index + chunk_ends = chunk_switches[(chunk_switches.shift(-1) == 1)].index + chunk_bounds = chunk_starts.join(chunk_ends, how="outer", sort=True) + + data = data[gap_mask] + + if method in ["linear", "time"]: + + data.interpolate(method=method, inplace=True, limit=1, limit_area="inside") + + else: + dat_name = data.name + gap_mask = (~gap_mask).cumsum() + data = pd.merge(gap_mask, data, how="inner", left_index=True, right_index=True) + + def _interpolWrapper(x, wrap_order=order, wrap_method=method): + if x.count() > wrap_order: + try: + return x.interpolate(method=wrap_method, order=int(wrap_order)) + except (NotImplementedError, ValueError): + logger.warning( + "Interpolation with method {} is not supported at order {}. " + "Interpolation will be performed with order {}".format( + method, str(wrap_order), str(wrap_order - 1) + ) + ) + return _interpolWrapper(x, int(wrap_order - 1), wrap_method) + elif x.size < 3: + return x + else: + if downcast_interpolation: + return _interpolWrapper(x, int(x.count() - 1), wrap_method) + else: + return x + + data = data.groupby(data.columns[0]).transform(_interpolWrapper) + # squeezing the 1-dimensional frame resulting from groupby for consistency reasons + data = data.squeeze(axis=1) + data.name = dat_name + if return_chunk_bounds: + return data, chunk_bounds + else: + return data