from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline, FeatureUnion from sklearn.preprocessing import OrdinalEncoder, StandardScaler from sklearn.feature_extraction.text import CountVectorizer from sklearn.impute import SimpleImputer, MissingIndicator from sklearn.model_selection import cross_val_score from sklearn.exceptions import ConvergenceWarning from .preprocessing import OutlierRemover, TextPreprocessing from warnings import simplefilter import itertools def generate_model (classifier, categorical_variables, continuous_variables, text_variable=None, missing_indicator=True, OrdinalEncoder_kwargs={}, StandardScaler_kwargs={}, CountVectorizer_kwargs={}, SimpleImputer_kwargs={}, MissingIndicator_kwargs={}, remove_outliers=False, outliers_variables_ranges=None): """ Generate a model Pipeline containing features pre-processing Parameters ---------- classifier: sklearn classifier object with a fit and transform method categorical_variables: [str], list of categorical variables continuous_variables: [str], list of continuous variables text_variable: [str], text variables, None if missing missing_indicator: boolean, if True a missing indicator is added to the Pipeline OrdinalEncoder_kwargs: dict, argument passed to the ordinal encoder StandardScaler_kwargs: dict, argument passed to the standard scaler CountVectorizer_kwargs: dict, argument passed to the count vectorizer SimpleImputer_kwargs: dict, argument passed to the simple imputer MissingIndicator_kwargs: dict, argument passed to the missing indicator remove_outliers: boolean, if true the outliers are set to nan outliers_variables_ranges: Dict(variable:[range_inf, range_sup], ...), dictionnary containing for each variable the inferior and superior range """ variables = categorical_variables+continuous_variables if text_variable is not None: variables += [text_variable] # Features pre-processing features_preprocessing_list = [] ## Outliers removal : if remove_outliers==True and outliers_variables_ranges is not None: # Creating the range list outliers_variables_range_clean = dict([(x, y) for x,y in outliers_variables_ranges.items() if x in variables]) features_preprocessing_list.append(("outliers", OutlierRemover(variables_ranges=outliers_variables_range_clean), list(outliers_variables_range_clean.keys()))) if len(categorical_variables) > 0: features_preprocessing_list.append(("binary_encoder", OrdinalEncoder(**OrdinalEncoder_kwargs), categorical_variables)) if len(continuous_variables) > 0: features_preprocessing_list.append(("continuous_scaling", StandardScaler(**StandardScaler_kwargs), continuous_variables)) if text_variable is not None: # Text pre-processing then count vectorizer text_preprocessing_pipeline = Pipeline([ ("text_preprocessing", TextPreprocessing()), ("text_countvectorizer", CountVectorizer(**CountVectorizer_kwargs)) ]) features_preprocessing_list.append(("text_encoding", text_preprocessing_pipeline, text_variable)) # Imputation methods imputation_list = [] imputation_list.append(("missing_imputer", SimpleImputer(**SimpleImputer_kwargs))) if missing_indicator: imputation_list.append( ("missing_indicator", MissingIndicator(**MissingIndicator_kwargs)) ) # Creating the pipeline features_preprocessing = ColumnTransformer(features_preprocessing_list) full_preprocessing = Pipeline([ ("features", features_preprocessing), ("impute_and_store_missing", FeatureUnion(imputation_list)), ]) pipeline = Pipeline([ ("preprocessing", full_preprocessing), ("lr", classifier) ]) return pipeline def get_features_selection (X, y, classifier, categorical_variables, continuous_variables, text_variable=None, min_features=1, cv=3, metric_score="auc_score"): """ This function return the metrics score according to each variables combination Parameters ---------- X: Pandas Dataframe of features y: Pandas Dataframe of labels classifier: sklearn classifier object with a fit and transform method categorical_variables: [str], list of categorical variables continuous_variables: [str], list of continuous variables text_variable: [str], text variables, None if missing min_features: int, minimum number of features to include in the model cv: int, cross-validation splitting strategy according to the cross_val_score documentation metric_score: str, metric score to evaluate the model, according to sklearn.metrics.SCORERS.keys() Output ------ Dictionnary containing a list of combination and associated score for each label """ # Getting labels list labels = y.columns.tolist() # Getting the combinations variables = categorical_variables + continuous_variables if text_variable is not None: variables += [text_variable] variables_combinations = [] for i in range(min_features, len(variables)+1): variables_combinations += itertools.combinations(variables, i) # Getting global model global_pipeline = generate_model( classifier, categorical_variables, continuous_variables, text_variable, CountVectorizer_kwargs={"ngram_range":(1,1), "max_features":200} ) # Preprocessing the data : we accept here to mix train/eval in feature scaling to reduce execution time X_transformed = global_pipeline.steps[0][1].steps[0][1].fit_transform(X) # Storing scores dictionnary scores = dict(zip( labels, [[] for x in labels] )) # Getting the scores for variable_combination in variables_combinations: combination_categorical_variables, combination_continuous_variables = [x for x in categorical_variables if x in variable_combination], [x for x in continuous_variables if x in variable_combination] combination_text_variable = text_variable if (text_variable is not None and text_variable in variable_combination) else None pipeline = generate_model( classifier, combination_categorical_variables, combination_continuous_variables, combination_text_variable, CountVectorizer_kwargs={"ngram_range":(1,1), "max_features":200} ) # Get X index if text_variable is not None: variables_index = [variables.index(x) for x in variable_combination if x != text_variable] if text_variable in variable_combination: variables_index += list(range(X_transformed.shape[1]-200, X_transformed.shape[1])) else: variables_index = [variables.index(x) for x in variable_combination] pipeline.steps[0][1].steps.pop(0) # Removing preprocessing step for label in labels: score = cross_val_score(pipeline, X_transformed[:,variables_index], y[label], cv=cv, scoring="roc_auc").mean().mean() scores[label].append([variable_combination, score]) return scores def fit_all_classifiers(classifier, X_train, y_train, hide_warnings=True): """ This function fill all the models for each label. Parameters: ---------- model: Classifier with a fit method X: Pandas Dataframe of features y: Pandas Dataframe of labels hide_warnings: boolean, if true the warnings will be hidden Output: ------- Dictionnary containing a classifier per label """ if hide_warnings == True: simplefilter("ignore", category=ConvergenceWarning) labels = y_train.columns.tolist() classifiers = {} for label in labels: classifiers[label] = classifier.fit(X_train, y_train[label]) return classifiers