Skip to content
Snippets Groups Projects
Commit 26c7fa2f authored by Lennart Schmidt's avatar Lennart Schmidt
Browse files

merge MLinterface

parent 2a8d3b8d
No related branches found
No related tags found
No related merge requests found
*.feather filter=lfs diff=lfs merge=lfs -text
*.pkl filter=lfs diff=lfs merge=lfs -text
......@@ -2,3 +2,4 @@
*.pyc
.idea/
testspace/
.DS_Store
\ No newline at end of file
**/__pycache__
*.pyc
.idea/
testspace/
atomicwrites==1.3.0
attrs==19.1.0
joblib==0.14.0
llvmlite==0.30.0
more-itertools==7.0.0
numba==0.46.0
......@@ -7,9 +8,11 @@ numpy==1.16.2
pandas==0.24.2
pluggy==0.9.0
py==1.8.0
pyarrow==0.15.1
pytest==4.4.0
python-dateutil==2.8.0
python-intervals==1.10.0
pytz==2018.9
scikit-learn==0.21.2
scipy==1.3.0
six==1.12.0
File added
source diff could not be displayed: it is stored in LFS. Options to address this: view the blob.
import pandas as pd
import numpy as np
import random #for random sampling of training/test
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import recall_score, precision_score, classification_report
import joblib # for saving of model objects
###--------------------
### EXAMPLE PARAMETRIZATION:
###--------------------
# pd.options.mode.chained_assignment = None # default='warn'
# data = pd.read_feather("data/sm/02_data.feather")
# data = data.reset_index()#data.index has to be reset as I use row nos only for indexing
#
# ### Reflagging
# index_manual = data.Flag == "Manual"
# data["FlagMan"] = index_manual.astype("int")# True/False as 0 or 1
# index_auto = data.Flag.str.contains("Auto")
# data["flag_bin"] = index_auto.astype("int")# True/False as 0 or 1
#
# field = "Target"
# references = ["Var1","Var2"]
# window_values = 20
# window_flags = 20
# modelname="testmodel"
# #groupvar = 0.2
# path = "saqc/ressources/machine_learning/models/"
# sensor_field="SensorID"
# group_field = "GroupVar"
def trainML(data, field, references, sensor_field:str, group_field:str, window_values:int = 20, window_flags:int = 20, path:str, modelname:str, testratio:float, **kwargs):
"""This Function trains machine-learning models to reproduce manual flags that were
set for a specific variable. Inputs to the model are the timeseries of the
respective target variable at multiple sensors, the automatic flags that were assigned by SaQC as well as multiple reference series.
Internally, context information for each point is gathered in form of moving windows to improve the flagging algorithm. By default, for both the target timeseries and the automatic flags, the
information of the previous and preceeding timestep are gathered. Next, according to user inputs of window_flags and window_values, the number of flags
and the mean gradient of the specified windows is calculated, both for t+windowsize and t-windowsize. The moving window calculations are executed for each sensor, seperately,
and multiple models are trained, one for each level a grouping variable that can be defined by the user. The model objects that can be used for future flagging are stored
along with log-files that store the models`accuracy on training and test.
:param data: The pandas dataframe holding the data of the target variable at multiple sensors in long format, i.e. concatenated row-wise.
Along with this, there should be columns with the respective series of reference variables and a column of quality flags. The latter
should contain both automatic and manual flags.
:param field: Fieldname of the field in data that is to be flagged
:param references: A list of strings, denoting the fieldnames of the data series that should be used as reference variables
:parameters sensor_field: A string denoting the fieldname of unique sensor-IDs
:parameter group_field: A string denoting the fieldname of the grouping variable. For each level of this variable, a seperate model will be trained.
:param window_values: An integer, denoting the window size that is used to derive the gradients of both the field- and reference-series inside the moving window
:param window_flags: An integer, denoting the window size that is used to count the surrounding automatic flags that have been set before
:param path: A string denoting the path to the folder where the model objects along with log-files should be saved to
:param modelname: A string denoting the name of the model. The name is used for naming of the model objects as well as log-files. Naming will
be: 'modelname'_'value of group_field'.pkl
:param testratio A float denoting the ratio of the test- vs. training-set to be drawn from the data, e.g. 0.3
"""
randomseed = 36
### Prepare data, i.e. compute moving windows
print("Computing time-lags")
# save original row index for merging into original dataframe, as NAs will be introduced
data = data.rename(columns={"index":"RowIndex"})
# define Test/Training
data = data.assign(TeTr = "Tr")
# create empty df for training data
traindata = pd.DataFrame()
# calculate windows
for sensor_id in data[sensor_field].unique():
print(sensor_id)
sensordf = data.loc[data[sensor_field]==sensor_id]
index_test = sensordf.RowIndex.sample(n=int(testratio*len(sensordf)), random_state=randomseed)#draw random sample
sensordf.TeTr[index_test] = "Te"#assign test samples
sensordf["flag_bin_t_1"] = sensordf["flag_bin"]-sensordf["flag_bin"].shift(1)# Flag at t-1
sensordf["flag_bin_t1"] = sensordf["flag_bin"]-sensordf["flag_bin"].shift(-1)# Flag at t+1
sensordf["flag_bin_t_"+str(window_flags)] = sensordf["flag_bin"].rolling(window_flags+1,center=False).sum()# n Flags in interval t to t-window_flags
sensordf["flag_bin_t"+str(window_flags)] = sensordf["flag_bin"].iloc[::-1].rolling(window_flags+1,center=False).sum()[::-1]# n Flags in interval t to t+window_flags
# forward-orientation not possible, so right-orientation on reversed data an reverse result
# Add context information for field+references
for i in [field]+references:
sensordf = pd.concat([sensordf,refCalc(reference=sensordf[i],window_values=window_values)],axis=1)
# write back into new dataframe
traindata = traindata.append(sensordf)
# remove rows that contain NAs (new ones occured during predictor calculation)
traindata = traindata.dropna(axis=0,how="any")
################
### FIT Model
################
n_cores = os.getenv('NSLOTS', 1)
print("MODEL TRAINING ON "+str(n_cores)+" CORES")
# make column in "traindata" to store predictions
traindata = traindata.assign(PredMan=0)
outinfo_df = []
resultfile = open(os.path.join(path,modelname+"_resultfile.txt"),"w")
starttime = time.time()
# For each category of groupvar, fit a separate model
for groupvar in traindata[group_field].unique():
resultfile.write("GROUPVAR: " + str(groupvar)+"\n")
print("GROUPVAR: " + str(groupvar))
print("TRAINING MODEL...")
# drop unneeded columns
groupdata = traindata[traindata[group_field]==groupvar].drop(columns=["Time","RowIndex","Flag","flag_bin","PredMan",group_field, sensor_field])
forest = RandomForestClassifier(n_estimators=500, random_state=randomseed,oob_score=True,n_jobs=-1)
X_tr = groupdata.drop(columns=["TeTr","FlagMan"])[groupdata.TeTr=="Tr"]
Y_tr = groupdata.FlagMan[groupdata.TeTr=="Tr"]
forest.fit(y=Y_tr, X=X_tr)
# save model object
joblib.dump(forest, os.path.join(path,modelname+"_"+str(groupvar)+".pkl"))
# retrieve training predictions
print("PREDICTING...")
preds_tr = forest.oob_decision_function_[:,1]>forest.oob_decision_function_[:,0]#training, derive from OOB class votes
preds_tr = preds_tr.astype("int")
# get test predictions
X_te = groupdata.drop(columns=["TeTr","FlagMan"])[groupdata.TeTr=="Te"]
Y_te = groupdata.FlagMan[groupdata.TeTr=="Te"]
preds_te = forest.predict(X_te)#test
# Collect info on model run (n datapoints, share of flags, Test/Training accuracy...)
outinfo = [groupvar,groupdata.shape[0],len(preds_tr),len(preds_te),sum(groupdata.FlagMan[groupdata.TeTr=="Tr"])/len(preds_tr)*100,sum(groupdata.FlagMan[groupdata.TeTr=="Te"])/len(preds_te)*100,\
recall_score(Y_tr,preds_tr),recall_score(Y_te,preds_te),\
precision_score(Y_tr,preds_tr),precision_score(Y_te,preds_te)]
resultfile.write("TRAINING RECALL:"+"\n")
resultfile.write(str(recall_score(groupdata.FlagMan[groupdata.TeTr=="Tr"],preds_tr))+"\n")# Training error (Out-of-Bag)
resultfile.write("TEST RECALL:"+"\n")
resultfile.write(str(recall_score(groupdata.FlagMan[groupdata.TeTr=="Te"],preds_te))+"\n"+"\n")# Test error
outinfo_df.append(outinfo)
# save back to dataframe
traindata.PredMan[(traindata.TeTr=="Tr") & (traindata[group_field]==groupvar)] = preds_tr
traindata.PredMan[(traindata.TeTr=="Te") & (traindata[group_field]==groupvar)] = preds_te
endtime = time.time()
print("TIME ELAPSED: "+str(timedelta(seconds=endtime-starttime))+" min")
outinfo_df = pd.DataFrame.from_records(outinfo_df,columns=[group_field,"n","n_Tr", "n_Te", "Percent_Flags_Tr","Percent_Flags_Te", "Recall_Tr", "Recall_Te", "Precision_Tr","Precision_Te"])
outinfo_df = outinfo_df.assign(Modelname=modelname)
resultfile.write(str(outinfo_df))
outinfo_df.to_csv(os.path.join(path,modelname+"_outinfo.csv"),index=False)
resultfile.close()
# write results back into original "data" dataframe
data = data.assign(PredMan=np.nan)
data.PredMan[traindata.RowIndex] = traindata.PredMan# based on RowIndex as NAs were created in traindata
data.to_feather("data/sm/03_data_preds")
trainML(data,field, references, sensor_field,group_field, window_values, window_flags, path, modelname,0.3)
......@@ -9,3 +9,4 @@ from .constants_detection import *
from .soil_moisture_tests import *
from .spike_detection import *
from .harm_functions import *
from .machine_learning import *
import pandas as pd
import numpy as np
from saqc.funcs.register import register
# NEW
from sklearn.ensemble import RandomForestClassifier
import joblib
@register("machinelearning")
def flagML(data, flags, field, flagger, references, window_values:int, window_flags:int, path:str,**kwargs):
"""This Function uses pre-trained machine-learning model objects for flagging of a specific variable. The model is supposed to be trained using the script provided in "ressources/machine_learning/train_machine_learning.py".
For flagging, Inputs to the model are the timeseries of the respective target at one specific sensors, the automatic flags that were assigned by SaQC as well as multiple reference series.
Internally, context information for each point is gathered in form of moving windows to improve the flagging algorithm according to user input during model training.
For the model to work, the parameters 'references', 'window_values' and 'window_flags' have to be set to the same values as during training.
:param data: The pandas dataframe holding the data-to-be flagged, as well as the reference series. Data must be indexed by a datetime index.
:param flags: A dataframe holding the flags
:param field: Fieldname of the field in data that is to be flagged.
:param flagger: A flagger - object.
:param references: A strong or list of strings, denoting the fieldnames of the data series that should be used as reference variables
:param window_values: An integer, denoting the window size that is used to derive the gradients of both the field- and reference-series inside the moving window
:param window_flags: An integer, denoting the window size that is used to count the surrounding automatic flags that have been set before
:param path: A string giving the path to the respective model object, i.e. its name and the respective value of the grouping variable. e.g. "models/model_0.2.pkl"
"""
# TO DO:
# - make flagger iterate over groupvar if multiple sensors are passed into saqc (Does that happen?)
# Function for moving window calculations
def refCalc (reference,window_values):
# Helper function for calculation of moving window values
outdata = pd.DataFrame()
name = reference.name
# derive gradients from reference series
outdata[name+"_Dt_1"] = reference-reference.shift(1)# gradient t vs. t-1
outdata[name+"_Dt1"] = reference-reference.shift(-1)# gradient t vs. t+1
# moving mean of gradients var1 and var2 before/after
outdata[name+"_Dt_"+str(window_values)] = outdata[name+"_Dt_1"].rolling(window_values,center=False).mean()# mean gradient t to t-window
outdata[name+"_Dt"+str(window_values)] = outdata[name+"_Dt_1"].iloc[::-1].rolling(window_values,center=False).mean()[::-1]# mean gradient t to t+window
return(outdata)
# Create custom df for easier processing
df = data.loc[:,[field]+references]
# Create binary column of BAD-Flags
df["flag_bin"] = flagger.isFlagged(flags, field, flag=flagger.BAD, comparator='==').astype("int")#get "BAD"-flags and turn into binary
# Add context information of flags
df["flag_bin_t_1"] = df["flag_bin"]-df["flag_bin"].shift(1)# Flag at t-1
df["flag_bin_t1"] = df["flag_bin"]-df["flag_bin"].shift(-1)# Flag at t+1
df["flag_bin_t_"+str(window_flags)] = df["flag_bin"].rolling(window_flags+1,center=False).sum()# n Flags in interval t to t-window_flags
df["flag_bin_t"+str(window_flags)] = df["flag_bin"].iloc[::-1].rolling(window_flags+1,center=False).sum()[::-1]# n Flags in interval t to t+window_flags
# forward-orientation not possible, so right-orientation on reversed data an reverse result
# Add context information for field+references
for i in [field]+references:
df = pd.concat([df,refCalc(reference=df[i],window_values=window_values)],axis=1)
# remove rows that contain NAs (new ones occured during predictor calculation)
df = df.dropna(axis=0,how="any")
# drop column of automatic flags at time t
df = df.drop(columns="flag_bin")
# Load model and predict on df:
model = joblib.load(path)
preds = model.predict(df)
print(str(preds.sum())+" Values flagged")
#Get indices of flagged values
flag_indices = df[preds.astype('bool')].index
# set Flags
flags = flagger.setFlags(flags, field, loc = flag_indices, **kwargs)
return data, flags
import pytest
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from saqc.funcs.machine_learning import flagML
from saqc.flagger.categoricalflagger import CategoricalBaseFlagger
from saqc.flagger.dmpflagger import DmpFlagger
from saqc.flagger.simpleflagger import SimpleFlagger
from saqc.flagger.continuousflagger import ContinuousBaseFlagger
TESTFLAGGERS = [
CategoricalBaseFlagger(['NIL', 'GOOD', 'BAD']),
DmpFlagger(),
SimpleFlagger(),
ContinuousBaseFlagger()]
@pytest.mark.parametrize('flagger', TESTFLAGGERS)
def test_flagML(flagger):
### CREATE MWE DATA
data = pd.read_feather("ressources/machine_learning/data/soil_moisture_mwe.feather")
data = data.set_index(pd.DatetimeIndex(data.Time))
flags_raw = data[["SM1_Flag","SM2_Flag","SM3_Flag"]]
flags_raw.columns = ["SM1","SM2","SM3"]
# masks for flag preparation
mask_bad = flags_raw.isin(['Auto:BattV','Auto:Range','Auto:Spike'])
mask_unflagged = flags_raw.isin(['Manual'])
mask_good = flags_raw.isin(['OK'])
field = "SM2"
# prepare flagsframe
flags = flagger.initFlags(data)
flags = flagger.setFlags(flags,field,loc=mask_bad[field])
flags = flagger.setFlags(flags,field,loc=mask_unflagged[field],flag=flagger.UNFLAGGED)
flags = flagger.setFlags(flags,field,loc=mask_good[field],flag=flagger.GOOD)
references = ["Temp2","BattV"]
window_values = 20
window_flags = 20
groupvar = 0.2
modelname="testmodel"
path = "ressources/machine_learning/models/"+modelname+"_"+str(groupvar)+".pkl"
outdat, outflags = flagML(data,flags,field, flagger, references, window_values, window_flags, path)
# compare
#assert resulting no of bad flags
badflags = flagger.isFlagged(outflags,field)
assert(badflags.sum()==10447)#assert
# Have the right values been flagged?
checkdates = pd.DatetimeIndex(['2014-08-05 23:03:59', '2014-08-06 01:35:44',
'2014-08-06 01:50:54', '2014-08-06 02:06:05','2014-08-06 02:21:15', '2014-08-06 04:22:38',
'2014-08-06 04:37:49', '2014-08-06 04:52:59'])
assert(badflags[checkdates].all())
# IN CASE OF FUTURE changes
# Get indices of values that were flagged
#wasmanual = flags_raw[field]=="Manual"
#a = badflags & wasmanual
#data.loc[a,:].index
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment