Commit 7159f38b authored by Jean-Marie Lepioufle's avatar Jean-Marie Lepioufle
Browse files

clean code

parent 88daba84
# mtsaq
A package for testing AQ forecasting with different DL models
A package for testing DNN models on AQ.
Package under dev: changes might occur at anytime.
# install
## install
```bash
cd /tmp
git clone https://git.nilu.no/aqdl/mtsaq_pkg.git
cd mtsaq_pkg
pip3 install -e .
```
Package 'mtsaq' has been tested on python 3.8 and 3.9
## Usage
```python
py -3.9 mtsaq/k_fold_train.py -w "/path/to/workdir" -c "config_train.json" -d "/path/to/data.csv"
py -3.9 mtsaq/k_fold_eval.py -w "/path/to/workdir" -c "config_eval.json" -s "key_of_a_session" -d "/path/to/data.csv"
py -3.9 mtsaq/prediction.py -w "/path/to/workdir" -c "config_pred.json" -s "key_of_a_session" -m "core_name_model" -d "path/to/last_data.csv"
```
## Examples
Have a look at our [case studies](https://git.nilu.no/aqdl/case_studies) repository.
from typing import Dict
from datetime import datetime
from mtsaq.utils.utils import get_random_alphanumeric_string
from mtsaq.dictionnary.dictionnary_model import model_dict
import torch
import os
import json
class class_model():
def __init__(self, params: dict):
self.name = params["name"]
self.params = params['param']
def __init__(self):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Torch is using " + str(self.device))
self.filename_core = get_random_alphanumeric_string(20)
def build(self):
def build(self,params:dict):
self.dump = params
self.name = params["name"]
self.params = params['param']
if self.name in model_dict:
self.model = model_dict[self.name](**self.params)
self.model.to(self.device)
else:
raise Exception("Model " + self.name +" not found.")
#def load_model(self, path: str):
def load(self, path: str, name: str):
if not os.path.exists(path):
print("'path' does not exist")
model_path = os.path.join(path, name + "_model.pth")
if not os.path.exists(model_path):
print("'model_path' does not exist")
params_path = os.path.join(path, name + ".json")
if not os.path.exists(params_path):
print("'param_path' does not exist")
f = open(params_path)
self.dump=json.load(f)
self.name = self.dump["name"]
self.params = self.dump['param']
if self.name in model_dict:
self.model = model_dict[self.name](**self.params)
self.model.to(self.device)
else:
raise Exception("Model " + self.name +" not found.")
self.model.load_state_dict(torch.load(model_path))
self.model.eval()
def save(self, path: str) -> None:
def save(self, path: str, name:str) -> None:
if not os.path.exists(path):
os.mkdir(path)
model_path = os.path.join(path, self.filename_core + "_model.pth")
params_path = os.path.join(path, self.filename_core + ".json")
model_path = os.path.join(path, name + "_model.pth")
params_path = os.path.join(path, name + ".json")
torch.save(self.model.state_dict(), model_path)
with open(params_path, "w+") as p:
json.dump(self.params, p)
json.dump(self.dump, p)
def save_onnx(self, path: str, name:str, class_ts_) -> None:
if not os.path.exists(path):
os.mkdir(path)
model_path = os.path.join(path, name + "_model.onnx")
if class_ts_.nb_input() == 1:
torch.onnx.export(self.model, class_ts_[0][0], model_path)
elif class_ts_.nb_input() > 1:
torch.onnx.export(self.model, tuple(class_ts_[0][0]), model_path)
#import numpy as np
import pandas as pd
import numpy as np
import torch
from mtsaq.utils.utils import encode_cos, encode_sin
from mtsaq.dictionnary.dictionnary_scaling import scaling_dict
class class_ts():
def __init__(self, path: str, date: list, id_start: int, id_stop: int, target: list, gap_length: int, horizon_length: int, features: list, features_cyclic: list, history_length: int ):
self.gap_length = gap_length
self.horizon_length = horizon_length
self.history_length = history_length
self.SCALED = False
tmp = pd.read_csv(path).sort_values(by=date)
if features_cyclic is not None:
for i in features_cyclic:
tmp[i] = [float(ele) for ele in tmp[i]]
tmp[i+'_sin'] = encode_sin(tmp[i], max(tmp[i]))
tmp[i+'_cos'] = encode_cos(tmp[i], max(tmp[i]))
del tmp[i]
self.features = features + [s + '_sin' for s in features_cyclic] + [s + '_cos' for s in features_cyclic]
else:
self.features = features
self.target = target
self.df_features = tmp.loc[id_start: id_stop, self.features]
self.df_target = tmp.loc[id_start:id_stop, self.target]
del tmp
#################################
# !! self.scaling_dict[scaling] is defined as a global variable;scaling features is replaced by scaling targ
# Todo: writting properly the scaling functions
tmp = self.df_features.mean()
self.mean_features = pd.DataFrame(tmp.values).transpose()
self.mean_features.columns = tmp.index
tmp = self.df_features.std()
self.std_features = pd.DataFrame(tmp.values).transpose()
self.std_features.columns = tmp.index
tmp = self.df_target.mean()
self.mean_target = pd.DataFrame(tmp.values).transpose()
self.mean_target.columns = tmp.index
tmp = self.df_target.std()
self.std_target = pd.DataFrame(tmp.values).transpose()
self.std_target.columns = tmp.index
for i in self.std_features:
if self.std_features[i].values == 0:
self.std_features[i] = 1
for i in self.std_target:
if self.std_target[i].values == 0:
self.std_target[i] = 1
###################################
# https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset
def __getitem__(self, id):
#features
tmp = self.df_features.iloc[id: self.history_length + id]
features_data = torch.from_numpy(tmp.to_numpy()).float()
del tmp
#target
tmp = self.df_target.iloc[(self.gap_length + self.history_length + id): (self.gap_length + self.horizon_length + self.history_length + id)]
target_data = torch.from_numpy(tmp.to_numpy()).float()
return features_data, target_data
# https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset
def __len__(self) -> int:
return (len(self.df_features.index) - self.history_length - self.gap_length - self.horizon_length - 1)
def scale(self, scaling = None):
if scaling is not None:
if self.SCALED == False:
# !! self.scaling_dict[scaling] is defined as a global variable;consequence: scaling features is replaced by scaling targ
# Todo: writting properly the scaling functions
#self.scalefeat = self.scaling_dict[scaling]
#self.scalefeat.fit(self.df_features)
#tmp_df = self.scalefeat.transform(self.df_features)
#self.df_features = pd.DataFrame(tmp_df, index=self.df_features.index, columns=self.df_features.columns)
#del tmp_df
#self.scaletarg = self.scaling_dict[scaling]
#self.scaletarg.fit(self.df_target)
#tmp_df = self.scaletarg.transform(self.df_target)
#self.df_target = pd.DataFrame(tmp_df, index=self.df_target.index, columns=self.df_target.columns)
#del tmp_df
# so far only standardscaler
for i in self.df_features:
self.df_features[i] = self.df_features[i].apply(lambda x: (x - self.mean_features[i])/ self.std_features[i])
for i in self.df_target:
self.df_target[i] = self.df_target[i].apply(lambda x: (x - self.mean_target[i])/ self.std_target[i])
self.SCALED = True
else:
print('df already scaled')
else:
print("scaling parameters required")
def unscale(self, newdata=None, datatype = None):
if newdata is not None:
if isinstance(newdata, torch.Tensor):
tmp = newdata.numpy()
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
tmp = np.asarray(newdata)
elif isinstance(newdata, np.ndarray):
tmp = newdata
else:
print('instance of newdata not known')
if datatype == 'target' :
# ! same remark as earlier
#tmp = self.scaletarg.inverse_transform(newdata_np)
# so far only standardscaler
for i in range(len(self.target)):
tmp[:,:,i] = tmp[:,:,i]*self.std_target.iloc[i].values + self.mean_target.iloc[i].values
#tmp[i] = tmp[i].apply(lambda x: (x *self.std_target[i] + self.mean_target[i]))
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
elif datatype == 'features':
# ! same remark as earlier
#tmp = self.scalefeat.inverse_transform(newdata_np)
for i in range(len(self.features)):
tmp[:,:,i] = tmp[:,:,i]*self.std_features.iloc[i].values + self.mean_features.iloc[i].values
#tmp[i] = tmp[i].apply(lambda x: (x *self.std_features[i] + self.mean_features[i]))
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
else:
print('datatype either target or features')
return res
elif self.SCALED == True:
# ! same remark as earlier
#tmp = self.scalefeat.inverse_transform(np.asarray(self.df_features))
#self.df_features = pd.DataFrame(tmp, index=self.df_features.index, columns=self.df_features.columns)
#del tmp
#tmp = self.scaletarg.inverse_transform(np.asarray(self.df_target))
#self.df_target = pd.DataFrame(tmp, index=self.df_target.index, columns=self.df_target.columns)
#self.SCALED = False
#del tmp
for i in self.df_features:
self.df_features[i] = self.df_features[i].apply(lambda x: (x *self.std_features[i] + self.mean_features[i]))
for i in self.df_target:
self.df_target[i] = self.df_target[i].apply(lambda x: (x *self.std_target[i] + self.mean_target[i]))
else:
print('df already unscaled')
import pandas as pd
import numpy as np
import torch
import os
from typing import Dict
from skimage import io, img_as_float
from mtsaq.utils.utils import encode_cos, encode_sin
from mtsaq.utils.scale import param_scale
from mtsaq.dictionnary.dictionnary_scaling import scaling_dict
class class_ts_img():
def __init__(self, params: dict,subset_indice=None ):
path= params['path']
self.date = params['date']
features=params['features']['features']
features_cyclic=params['features']['features_cyclic']
images=params['features']['images']
imgdir=params['features']['imgdir']
self.history_length=params['features']['history_length']
self.imgdir = imgdir
self.images = images
self.target = params['target']['target']
if self.target is not None:
self.gap_length=params['target']['gap_length']
self.horizon_length=params['target']['horizon_length']
self.SCALED = False
self.scaler=scaling_dict[params['scaling']]
tmp = pd.read_csv(path)#.sort_values(by=date)
tmp = pd.read_csv(path)#.sort_values(by=date)
## subset indice
if subset_indice is not None:
self.indice = subset_indice
else:
self.indice = range(0,len(tmp))
## date
self.df_date = tmp.loc[self.indice, self.date]
## features
if features_cyclic is not None:
for i in features_cyclic:
tmp[i] = [float(ele) for ele in tmp[i]]
tmp[i+'_sin'] = encode_sin(tmp[i], max(tmp[i]))
tmp[i+'_cos'] = encode_cos(tmp[i], max(tmp[i]))
del tmp[i]
self.features = features + [s + '_sin' for s in features_cyclic] + [s + '_cos' for s in features_cyclic]
else:
self.features = features
if images is not None:
self.df_images = tmp.loc[self.indice, self.images]
self.df_features = tmp.loc[self.indice, self.features]
self.scale_param_features = param_scale(self.df_features,params['scaling'])
## target
if self.target is not None:
self.df_target = tmp.loc[self.indice, self.target]
self.scale_param_target = param_scale(self.df_target,params['scaling'])
else:
self.df_target = pd.DataFrame()
self.scale_param_target = []
del tmp
def get_scale_param_target(self):
if self.target is not None:
return self.scale_param_features, self.scale_param_target
else:
return self.scale_param_features
def __getitem__(self, id):
#features
if (self.history_length>0):
indice_i = range(id,(self.history_length + id))
else:
indice_i = id
tmp = self.df_features.iloc[indice_i]
features_data = torch.from_numpy(tmp.to_numpy()).float()
#images
tmp = self.df_images.iloc[indice_i]
np_images = []
for index, row in tmp.iterrows():
for k in self.images:
filename = row[k]
# TODO: like https://pytorch.org/vision/0.9/_modules/torchvision/datasets/cifar.html
np_images.append(img_as_float(io.imread(os.path.join(self.imgdir,filename))))
image_data = torch.from_numpy(np.array(np_images))
image_data = image_data.permute((0, 3, 1, 2)).float() # channels, row, columns
#target
if self.target is not None:
if (self.history_length>0 or self.gap_length>0):
indice_target_i = range((self.gap_length + id), (self.gap_length + self.horizon_length + id))
else:
indice_target_i = id
tmp = self.df_target.iloc[indice_target_i]
target_data = torch.from_numpy(tmp.to_numpy()).float()
return features_data, image_data, target_data
else:
return features_data, image_data
def __len__(self) -> int:
return len(self.df_features.index)
def nb_input(self) -> int:
return(2)
def scale(self):
if self.SCALED == False:
self.df_features=self.scaler(self.df_features, self.scale_param_features, True)
if self.target is not None:
self.df_target=self.scaler(self.df_target, self.scale_param_target, True)
self.SCALED = True
else:
print('df already scaled')
def unscale(self, newdata=None, datatype = None):
if newdata is not None:
if isinstance(newdata, torch.Tensor):
tmp = newdata.numpy()
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
tmp = np.asarray(newdata)
elif isinstance(newdata, np.ndarray):
tmp = newdata
else:
print('instance of newdata not known')
if datatype == 'target' :
res = self.scaler(tmp, self.scale_param_target, False)
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
elif datatype == 'features':
res = self.scaler(tmp, self.scale_param_features, False)
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
else:
print('datatype either target or features')
return res
elif self.SCALED == True:
self.df_features = self.scaler(self.df_features, self.scale_param_features, False)
if self.target is not None:
self.df_target = self.scaler(self.df_target, self.scale_param_target, False)
self.SCALED = False
else:
print('df already unscaled')
import pandas as pd
import numpy as np
import torch
import os
from typing import Dict
from skimage import io, img_as_float
from mtsaq.utils.utils import encode_cos, encode_sin
from mtsaq.utils.scale import param_scale
from mtsaq.dictionnary.dictionnary_scaling import scaling_dict
class class_ts_none():
def __init__(self, params: dict,subset_indice=None ):
path= params['path']
self.date = params['date']
features=params['features']['features']
features_cyclic=params['features']['features_cyclic']
self.history_length=params['features']['history_length']
self.target = params['target']['target']
if self.target is not None:
self.gap_length=params['target']['gap_length']
self.horizon_length=params['target']['horizon_length']
self.SCALED = False
self.scaler=scaling_dict[params['scaling']]
tmp = pd.read_csv(path)#.sort_values(by=date)
## subset indice
if subset_indice is not None:
self.indice = subset_indice
else:
self.indice = range(0,len(tmp))
## date
self.df_date = tmp.loc[self.indice, self.date]
## features
if features_cyclic is not None:
for i in features_cyclic:
tmp[i] = [float(ele) for ele in tmp[i]]
tmp[i+'_sin'] = encode_sin(tmp[i], max(tmp[i]))
tmp[i+'_cos'] = encode_cos(tmp[i], max(tmp[i]))
del tmp[i]
self.features = features + [s + '_sin' for s in features_cyclic] + [s + '_cos' for s in features_cyclic]
else:
self.features = features
self.df_features = tmp.loc[self.indice, self.features]
self.scale_param_features = param_scale(self.df_features,params['scaling'])
## target
if self.target is not None:
self.df_target = tmp.loc[self.indice, self.target]
self.scale_param_target = param_scale(self.df_target,params['scaling'])
else:
self.df_target = pd.DataFrame()
self.scale_param_target = []
del tmp
def get_scale_param_target(self):
if self.target is not None:
return self.scale_param_features, self.scale_param_target
else:
return self.scale_param_features
def __getitem__(self, id):
#features
if (self.history_length>0):
indice_i = range(id,(self.history_length + id))
else:
indice_i = id
tmp = self.df_features.iloc[indice_i]
features_data = torch.from_numpy(tmp.to_numpy()).float()
#target
if self.target is not None:
if (self.history_length>0 or self.gap_length>0):
indice_target_i = range((self.gap_length + id), (self.gap_length + self.horizon_length + id))
else:
indice_target_i = id
tmp = self.df_target.iloc[indice_target_i]
target_data = torch.from_numpy(tmp.to_numpy()).float()
return features_data, target_data
else:
return features_data
def __len__(self) -> int:
return len(self.df_features.index)
def nb_input(self) -> int:
return(1)
def scale(self):
if self.SCALED == False:
self.df_features=self.scaler(self.df_features, self.scale_param_features, True)
if self.target is not None:
self.df_target=self.scaler(self.df_target, self.scale_param_target, True)
self.SCALED = True
else:
print('df already scaled')
def unscale(self, newdata=None, datatype = None):
if newdata is not None:
if isinstance(newdata, torch.Tensor):
tmp = newdata.numpy()
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
tmp = np.asarray(newdata)
elif isinstance(newdata, np.ndarray):
tmp = newdata
else:
print('instance of newdata not known')
if datatype == 'target' :
res = self.scaler(tmp, self.scale_param_target, False)
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
elif datatype == 'features':
res = self.scaler(tmp, self.scale_param_features, False)
if isinstance(newdata, torch.Tensor):
res = torch.from_numpy(tmp)
elif isinstance(newdata, pd.Series) or isinstance(newdata, pd.DataFrame):
res = pd.DataFrame(tmp, index=newdata.index, columns=newdata.columns)
elif isinstance(newdata, np.ndarray):
res = tmp
else:
print('instance of tmp not known')
else:
print('datatype either target or features')
return res
elif self.SCALED == True:
self.df_features = self.scaler(self.df_features, self.scale_param_features, False)
if self.target is not None:
self.df_target = self.scaler(self.df_target, self.scale_param_target, False)
self.SCALED = False
else:
print('df already unscaled')
from typing import Dict
from mtsaq.dictionnary.dictionnary_ts import class_ts_dict
def class_ts_x(params: dict,subset_indice=None):
class_x = class_ts_dict[params['class']]
res = class_x(params,subset_indice)
return(res)
import math
import torch
# https://discuss.pytorch.org/t/rmse-loss-function/16540/3
class RMSELoss(torch.nn.Module):
def __init__(self):
super().__init__()
self.mse = torch.nn.MSELoss()
def forward(self, target: torch.Tensor, output: torch.Tensor):
return torch.sqrt(self.mse(target, output))
class MAPELoss(torch.nn.Module):
def __init__(self):
super().__init__()
def forward(self, target: torch.Tensor, output: torch.Tensor):
return torch.mean(torch.abs((target - output) / target))
# Source: https://arxiv.org/abs/1907.00235
# Compute the negative log likelihood of Gaussian Distribution
class GaussianLoss(torch.nn.Module):
def __init__(self, mu, sigma):
super(GaussianLoss, self).__init__()