123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from random import sample
- from sklearn.base import BaseEstimator
- from sklearn.metrics import SCORERS
- from sklearn.model_selection import train_test_split
- from sklearn.utils import check_X_y, check_array
- import torch
- from torch import nn, optim
- from torch.utils.data import DataLoader
- from scipy.sparse import issparse
- import numpy as np
- class torchMLP (nn.Module):
- """
- Neural network model for
- """
- def __init__(self, n_features, n_labels):
- super().__init__()
- self.network = nn.Sequential(*[
- nn.Linear(n_features, 200),
- nn.ReLU(),
- nn.Linear(200, 50),
- nn.ReLU(),
- nn.Linear(50, n_labels),
- nn.Sigmoid()
- ])
- def forward(self, x):
-
- y_hat = self.network(x)
- return y_hat
- class torchMLPClassifier_sklearn (BaseEstimator):
- """
- Pytorch neural network with a sklearn-like API
- """
- def __init__ (self, model, n_epochs=50, early_stop=True, early_stop_metric="accuracy", early_stop_validations_size=0.1, batch_size=1024, learning_rate=1e-3, class_weight=None, device_train="cpu", device_predict="cpu"):
- """
- Parameters:
- -----------
- model: non instanciated pytorch neural network model with a n_features and n_labels parameter
- n_epochs: int, number of epochs
- early_stop: boolean, if true an evaluation dataset is created and used to stop the training
- early_stop_metric: str, metric score to evaluate the model, according to sklearn.metrics.SCORERS.keys()
- early_stop_validations_size: int or float, if float percentage of the train dataset used for validation, otherwise number of sample to use
- batch_size: int, size of the training batch
- learning_rate: float, Adam optimizer learning rate
- class_weight: dict or str, same as the sklearn API
- device_train: str, device on which to train
- device_predict: str, device on which to predict
- """
- self.model = model
- self.n_epochs = n_epochs
- if early_stop and (early_stop_metric is not None) and (early_stop_metric in SCORERS.keys()) and (isinstance(early_stop_validations_size, int) or isinstance(early_stop_validations_size, float)):
- self.early_stop = early_stop
- self.early_stop_metric = SCORERS[early_stop_metric]
- self.early_stop_validations_size = early_stop_validations_size
- else:
- self.early_stop = False
- self.early_stop_metric = None
- self.early_stop_validations_size = None
- self.class_weight = class_weight
- self.learning_rate = learning_rate
- self.device_train = device_train
- self.device_predict = device_predict
- self.batch_size = batch_size
- def fit(self, X, y):
- """
- Training the model
- Parameters:
- -----------
- X_test: pandas dataframe of the features
- y_test: pandas dataframe of the labels
- """
- X, y = check_X_y(X, y, accept_sparse=True, multi_output=True)
- if y.ndim == 1:
- y = np.expand_dims(y, 1)
- # Validation split if early stopping
- if self.early_stop:
- X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=self.early_stop_validations_size)
- if issparse(X_val): # To deal with the sparse matrix situations
- X_val = X_val.toarray()
- else:
- X_train, y_train = X, y
- n_samples = y_train.shape[0]
- n_labels_values = len(np.unique(y_train))
- n_labels = y_train.shape[1]
- n_features = X.shape[1]
- # Raising the model
- self.network = self.model(n_features=n_features, n_labels=n_labels)
- self.optimizer = optim.Adam(self.network.parameters(), lr=self.learning_rate)
- # Creating dataloader for X_train, y_train
- data_loader = DataLoader(range(X_train.shape[0]), shuffle=True, batch_size=self.batch_size)
- # Initializing loss function
- ## Getting weights
- if self.class_weight is not None:
- if self.class_weight == "balanced":
- weights = n_samples/(n_labels_values*np.bincount(y_train[:,0]))
- weights_dict = dict(zip(range(len(weights)), weights))
- else:
- weights_dict = self.class_weight
- else:
- weights_dict = None
- criterion = nn.BCELoss()
- # Running train
- last_score = 0
- for i in range(self.n_epochs):
- # Starting an epoch
- for indices in data_loader:
- self.optimizer.zero_grad()
- X_train_sample, y_train_sample = X_train[indices, :], y_train[indices, :]
- if issparse(X_train_sample): # To deal with the sparse matrix situations
- X_train_sample = X_train_sample.toarray()
- X_train_sample_tensor, y_train_sample_tensor = [torch.tensor(x, dtype=torch.float32).to(self.device_train) for x in [X_train_sample, y_train_sample]]
- # Weighting the loss
- if self.class_weight is not None:
- sample_weights = y_train_sample.copy()
- for x, y in weights_dict.items():
- sample_weights[sample_weights == x] = y
- criterion.weigths = sample_weights
- # Get prediction
- y_train_sample_hat = self.network(X_train_sample_tensor)
- loss = criterion(y_train_sample_hat, y_train_sample_tensor)
- loss.backward()
- self.optimizer.step()
- # End of the Epoch : evaluating the score
- if self.early_stop:
- score = self.early_stop_metric(self, X_val, y_val)
- if score < last_score:
- return self
- else:
- last_score = score
- return self
- def predict(self, X):
- """
- Getting the prediction
- Parameters:
- -----------
- X_test: pandas dataframe of the features
-
- """
- y_hat_proba = self.predict_raw_proba(X)
- y_hat = ((y_hat_proba >= 0.5)*1).flatten()
- return y_hat
- def predict_raw_proba(self, X):
- """
- Getting the prediction score in tensor format
- Parameters:
- -----------
- X_test: pandas dataframe of the features
-
- """
- X = check_array(X, accept_sparse=True)
- if issparse(X): # To deal with the sparse matrix situations
- X = X.toarray()
- with torch.no_grad():
- model_predict = self.network.to(self.device_predict)
- model_predict.eval()
- # Create a tensor from X
- X_tensor = torch.tensor(X, dtype=torch.float32).to(self.device_predict)
-
- y_hat_proba_torch = model_predict(X_tensor)
- y_hat_proba_torch = y_hat_proba_torch.detach().cpu().numpy()
- return y_hat_proba_torch
- def predict_proba(self, X):
- """
- Getting the prediction score in sklearn format
- Parameters:
- -----------
- X_test: pandas dataframe of the features
-
- """
-
- y_hat_proba_torch = self.predict_raw_proba(X)
- y_hat_proba_torch = np.concatenate([
- 1-y_hat_proba_torch,
- y_hat_proba_torch
- ], axis=1)
- y_hat_proba = y_hat_proba_torch
- return y_hat_proba
|