Data Scientists often build Machine learning pipelines which involves preprocessing (imputing null values, feature transformation, creating new features), modeling, hyper parameter tuning. There are many transformations that need to be done before modeling in a particular order. Scikit learn provides us with the Pipeline class to perform those transformations in one go.
Pipeline serves multiple purposes here (from documentation):
- Convenience and encapsulation: You only have to call fit and predict once on your data to fit a whole sequence of estimators.
- Joint parameter selection: You can grid search over parameters of all estimators in the pipeline at once (hyper-parameter tuning/optimization).
- Safety: Pipelines help avoid leaking statistics from your test data into the trained model in cross-validation, by ensuring that the same samples are used to train the transformers and predictors.
In this article, I will show you
- How to build a complete pipeline using scikit-learn’s Pipeline module
- Create custom transformers
- Hyper-parameter tuning
For the entire analysis, I am using the Titanic dataset. I chose this dataset because most of them are familiar with this dataset. Let’s start the analysis by loading all the required libraries:
import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.impute import SimpleImputer from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, RobustScaler from sklearn.base import BaseEstimator, TransformerMixin from sklearn.metrics import f1_score, accuracy_score from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier import xgboost from xgboost import XGBClassifier from sklearn.model_selection import GridSearchCV
Load titanic dataset and split the dataset into train and test sets:
# Load data def load_data(PATH): data = pd.read_csv(PATH) return data titanic_data = load_data('https://raw.githubusercontent.com/mattdelhey/kaggle-titanic/master/Data/train.csv') titanic_data.shape # (891, 11) titanic_data.columns # ['survived', 'pclass', 'name', 'sex', 'age', 'sibsp', 'parch', 'ticket', 'fare', 'cabin', 'embarked'] null_value_count = titanic_data.isnull().sum() features_with_null_values = null_value_count[null_value_count != 0].index features_with_null_values # ['age', 'cabin', 'embarked']
# Split data 80:20 train_data, test_data = train_test_split(titanic_data, test_size=0.2, random_state=42) X_train = train_data.drop(columns=["survived"]) y_train = train_data["survived"] X_test = test_data.drop(columns=["survived"]) y_test = test_data["survived"]
There are 11 features (10 + target) in total. We can remove high cardinal features (['name', 'ticket', 'cabin']) for this analysis from the data. We remain with 7 features. Out of 7, there are 5 numerical features ('pclass', 'age', 'sibsp', 'parch', 'fare') and 2 categorical features ('sex', 'embarked'). The preprocessing for numerical and categorical features is different. So, let’s build pipelines for numerical and categorical features separately.
Preprocessing Numerical features:
- Impute null values with median
- Create new features. We can create 'family_count' by adding 'sibsp' (No. of siblings/spouses) and 'parch' (No. of parents/children)
- Feature Scaling
Scikit learn provides a lot of transformers by default. For custom processing purposes, we can create our own Custom Transformers for eg. creating new features. As scikit-learn relies on duck typing (you check only for the presence of a given method or attribute), it’s very easy to create custom transformers just by implementing fit, transform and fit_transformer methods in a class.
For the fit() function, you can just return self. The main processing code will go into the transform() function. The fit_transform() is automatically available for us if we add TransformerMixin as a base class. We can also add BaseEstimator as a base class which automatically provides two functions: get_params() and set_params().
class CreateNewFeatures(BaseEstimator, TransformerMixin): def __init__(self, indices): self.indices = indices def fit(self, X, y=None): return self def transform(self, X): """ Create a new feature 'family_count' by adding 'sibsp' and 'parch' """ sibsp = self.indices[0] parch = self.indices[1] family_count = X[:, sibsp] + X[:, parch] X = np.c_[X, family_count] return X
Let’s build a pipeline for processing numerical features. It’s very easy.
family_count_indices = [2, 3] numerical_pipeline = Pipeline([ ('numerical_imputer', SimpleImputer(strategy='median')), ('create_new_features', CreateNewFeatures(family_count_indices)), ('feature_scaling', StandardScaler()) ])
Preprocessing Categorical features:
- Impute null values with mode
- One hot encoding
categorical_pipeline = Pipeline([ ('categorical_imputer', SimpleImputer(strategy='most_frequent')), ('categorical_encoder', OneHotEncoder()) ])
Column Transformer:
We have applied transformations separately for numerical and categorical features. Scikit learn provides ColumnTransformer through which we can apply different transformations on different columns at the same time. Let’s see how we can do this.
drop_columns = ["name", "ticket", "cabin"] numerical_columns = X_train.drop(columns=drop_columns).select_dtypes(exclude = "object").columns categorical_columns = X_train.drop(columns=drop_columns).select_dtypes(include = "object").columns # Column Transformer column_pipeline = ColumnTransformer([ ("numerical_pipeline", numerical_pipeline, numerical_columns), ("categorical_pipeline", categorical_pipeline, categorical_columns) ])
Each tuple takes 3 inputs: (name, pipeline, columns on which the transformation to be applied). Using ColumnTransformer, we can apply numerical and categorical transformations parallelly on the same dataset. Now, let’s create the full pipeline by dropping unnecessary features before transforming the features.
class DropFeatures(BaseEstimator, TransformerMixin): """ Drop features """ def __init__(self, drop_columns): self.drop_columns = drop_columns def fit(self, X, y=None): return self def transform(self, X): """ Drop features """ X = X.drop(columns=self.drop_columns, axis=1) return X drop_columns = ["name", "ticket", "cabin"] # Full pipeline full_pipeline = Pipeline([ ('drop_features', DropFeatures(drop_columns)), ('column_transformer', column_pipeline) ])
Hurray! We created the pipeline for processing the input features.
NOTE: While building the pipeline, we have to make sure that all the estimators except the last one must be transformers. The output of the estimator is passed to the next estimator in the pipeline.
Now, I will show you the different ways of using the pipelines:
- Use pipeline for preprocessing features only
- Include modeling in the pipeline
- Hyper-parameter tuning
1. Use pipeline for preprocessing features only
We use the pipeline to pre-process the features and then do modeling on top of the processed dataset.
# Transform input data X_train_processed = full_pipeline.fit_transform(X_train) # Train data using XGBoost model = xgboost.XGBClassifier(max_depth=4) model.fit(X_train_processed, y_train)
We use the same pipeline to transform the test data and predict using the trained model.
# Transform test data using full_pipeline X_test_processed = full_pipeline.transform(X_test) # Predict on the processed data y_pred = model.predict(X_test_processed) # Evaluate on test data accuracy_score(y_test, y_pred), f1_score(y_test, y_pred) # (0.815, 0.762)
2. Include modeling in the pipeline
In this case, we include modeling (for eg.: DecisionTreeClassifier()) in the pipeline by adding it to full_pipeline.
# Add DecisionTreeClassifier to the end of processing pipeline pipeline_modeling = Pipeline([ ('preprocessing', full_pipeline), ('model', DecisionTreeClassifier()) ])
Using the pipeline object, we can directly fit and predict on the data.
# Fit data pipeline_modeling.fit(X_train, y_train) # Predict on new data y_pred = pipeline_modeling.predict(X_test) # Score on new data. Returns the accuracy score pipeline_modeling.score(X_test, y_test) # 0.782
3. Hyper-parameter tuning
The most interesting part of the article.
Before diving deep into the hyper-parameter tuning, let’s understand the power of using pipelines. First advantage is we can tune any parameter of any method that is in the pipeline. Second advantage is that we can tune different methods too. For example, in the categorical pipeline, we are using OneHotEncoder(). But, there are different methods like OrdinalEncoder(). We can tune the method along with the parameters.
Let’s define the parameter grid. We can also pass a list of parameter dictionaries to optimize. In the parameter grid, to define the parameters we want to tune, we have to use the transformer names that were used while creating the pipeline. To backtrack to the parameter, we should use double underscore (__). The below code gives you much clarity on how to backtrack and define the parameters.
parameter_grid = [ { "preprocessing__column_transformer__numerical_pipeline__numerical_imputer__strategy": ['median', 'mean'], "preprocessing__column_transformer__numerical_pipeline__feature_scaling": [StandardScaler(), RobustScaler()], "model": [DecisionTreeClassifier()], "model__criterion": ["gini", "entropy"], "model__max_depth": [10, 20] }, { "preprocessing__column_transformer__categorical_pipeline__categorical_encoder": [OneHotEncoder(), OrdinalEncoder()], "model": [RandomForestClassifier()], "model__max_depth": [10, 15, 25], "model__n_estimators": [100, 200], "model__bootstrap": [True, False] }, { "model": [XGBClassifier()], "model__n_estimators": [10, 50, 100], "model__learning_rate": [0.01, 0.1, 1], "model__max_depth": [3, 6, 9], "model__min_child_weight": [1, 3] } ]
Pass the parameter_grid to GridSearchCV to initialize and call fit() function to find the optimal parameters.
# Initialize grid search grid_search = GridSearchCV(pipeline_modeling, parameter_grid, cv=5, verbose=0)
# Fit data grid_search.fit(X_train, y_train) # Get best estimator grid_search.best_estimator_ # Score on new data grid_search.score(X_test, y_test) # 0.821
There is a slight improvement after the grid_search. In this way, we can easily try different transformations and select the best pipeline.
TIP: We can build pipelines with different transformations and save them for future purposes. We can add new transformations and functions as we go. This will be very helpful in times of competitions and personal use as well.
Here is the full code for your reference:
# Import libraries import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.impute import SimpleImputer from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, RobustScaler from sklearn.base import BaseEstimator, TransformerMixin from sklearn.metrics import f1_score, accuracy_score from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier import xgboost from xgboost import XGBClassifier from sklearn.model_selection import GridSearchCV # Load data def load_data(PATH): data = pd.read_csv(PATH) return data titanic_data = load_data('https://raw.githubusercontent.com/mattdelhey/kaggle-titanic/master/Data/train.csv') # Split data 80:20 train_data, test_data = train_test_split(titanic_data, test_size=0.2, random_state=42) X_train = train_data.drop(columns=["survived"]) y_train = train_data["survived"] X_test = test_data.drop(columns=["survived"]) y_test = test_data["survived"] # Create new features class CreateNewFeatures(BaseEstimator, TransformerMixin): def __init__(self, indices): self.indices = indices def fit(self, X, y=None): return self def transform(self, X): """ Create a new feature 'family_count' by adding 'sibsp' and 'parch' """ sibsp = self.indices[0] parch = self.indices[1] family_count = X[:, sibsp] + X[:, parch] X = np.c_[X, family_count] return X # Pipeline for processing numercial features family_count_indices = [2, 3] numerical_pipeline = Pipeline([ ('numerical_imputer', SimpleImputer(strategy='median')), ('create_new_features', CreateNewFeatures(family_count_indices)), ('feature_scaling', StandardScaler()) ]) # Pipeline for processing categorical features categorical_pipeline = Pipeline([ ('categorical_imputer', SimpleImputer(strategy='most_frequent')), ('categorical_encoder', OneHotEncoder()) ]) drop_columns = ["name", "ticket", "cabin"] numerical_columns = X_train.drop(columns=drop_columns).select_dtypes(exclude = "object").columns categorical_columns = X_train.drop(columns=drop_columns).select_dtypes(include = "object").columns # Column Transformer column_pipeline = ColumnTransformer([ ("numerical_pipeline", numerical_pipeline, numerical_columns), ("categorical_pipeline", categorical_pipeline, categorical_columns) ]) class DropFeatures(BaseEstimator, TransformerMixin): """ Drop features """ def __init__(self, drop_columns): self.drop_columns = drop_columns def fit(self, X, y=None): return self def transform(self, X): """ Drop features """ X = X.drop(columns=self.drop_columns, axis=1) return X drop_columns = ["name", "ticket", "cabin"] # Full pipeline full_pipeline = Pipeline([ ('drop_features', DropFeatures(drop_columns)), ('column_transformer', column_pipeline) ]) # 1. Use pipeline for preprocessing features only # Transform the input data X_train_processed = full_pipeline.fit_transform(X_train) # Train data using XGBoost model = xgboost.XGBClassifier(max_depth=4) model.fit(X_train_processed, y_train) # Transform the data using full_pipeline X_test_processed = full_pipeline.transform(X_test) # Predict on the processed data y_pred = model.predict(X_test_processed) # Evaluate on test data accuracy_score(y_test, y_pred), f1_score(y_test, y_pred) #(0.815, 0.762) # 2. Include modeling in the pipeline pipeline_modeling = Pipeline([ ('preprocessing', full_pipeline), ('model', DecisionTreeClassifier()) ]) # Fit data pipeline_modeling.fit(X_train, y_train) # Predict on new data y_pred = pipeline_modeling.predict(X_test) # Score on the new data. Returns the accuracy score pipeline_modeling.score(X_test, y_test) # 0.782 # 3. Hyper-parameter tuning # Parameter grid parameter_grid = [ { "preprocessing__column_transformer__numerical_pipeline__numerical_imputer__strategy": ['median', 'mean'], "preprocessing__column_transformer__numerical_pipeline__feature_scaling": [StandardScaler(), RobustScaler()], "model": [DecisionTreeClassifier()], "model__criterion": ["gini", "entropy"], "model__max_depth": [10, 20] }, { "preprocessing__column_transformer__categorical_pipeline__categorical_encoder": [OneHotEncoder(), OrdinalEncoder()], "model": [RandomForestClassifier()], "model__max_depth": [10, 15, 25], "model__n_estimators": [100, 200], "model__bootstrap": [True, False] }, { "model": [XGBClassifier()], "model__n_estimators": [10, 50, 100], "model__learning_rate": [0.01, 0.1, 1], "model__max_depth": [3, 6, 9], "model__min_child_weight": [1, 3] } ] # Initialize grid search grid_search = GridSearchCV(pipeline_modeling, parameter_grid, cv=5, verbose=0) # Fit data grid_search.fit(X_train, y_train) # Get best estimator grid_search.best_estimator_ # Score on new data grid_search.score(X_test, y_test) # 0.821
That’s it for this post. Thanks for reading till the end. I hope this will help you to write better codes, achieve good ranks in competitions.
Thank you so much for reading my blog and supporting me. Stay tuned for my next article. If you want to receive email updates, don’t forget to subscribe to my blog.
Follow me here:
GitHub: https://github.com/Abhishekmamidi123
LinkedIn: https://www.linkedin.com/in/abhishekmamidi
Kaggle: https://www.kaggle.com/abhishekmamidi
If you are looking for any specific blog, please do comment in the comment section below.
GitHub: https://github.com/Abhishekmamidi123
LinkedIn: https://www.linkedin.com/in/abhishekmamidi
Kaggle: https://www.kaggle.com/abhishekmamidi
If you are looking for any specific blog, please do comment in the comment section below.
Very useful for learners.
ReplyDeleteVery useful for learners.
ReplyDeleteGbu neat clean readable ml code
ReplyDeleteIdentifying trends, enabling self-service analytics, utilizing powerful visualizations and offering real-time online data analysis are becoming the standard in business operations, strategic development and, ultimately, indispensable tools in increasing profit. Click here
ReplyDelete