Машинное обучение: создание конвейера (1)

машинное обучение
Машинное обучение: создание конвейера (1)

0/Предисловие

一个完整的机器学习过程大致包括以下4个环节:
   ETL
   数据预处理
   特征工程
   模型训练和评估

如果我们能把这4部分做成自动化的标准工作流程,这就是一个pipeline。

pipeline可以理解为'一条龙服务',或者‘综合解决方法’。

机器学习项目中有可以自动化的标准工作流程。
在 Python scikit-learn 扩展包中,管道有助于明确定义和自动化这些工作流程。
在这系列文章中,您将了解 scikit-learn 扩展包中的流水线以及如何自动化常见的机器学习工作流程以及构建自定义的流水线处理。


在讲管道pipeline之前我们先了解一下scikit-learn中的两个概念Transformer和Estimator.
<1>Transformer:
      转换器
      是一个对象,是一个指具有 fit() 和 transform() 方法的对象,
      用于清理、减少、扩展或生成特征。
      简而言之,转换器可帮助您将数据转换为机器学习模型所需的数据格式。 
      LabelEncoder,OneHotEncoder 和 MinMaxScaler 是转换器的示例。
      Transformer主要用于在特征工程上。如特征编码,归一化,标准化等等。

<2>Estimator:
     估计器是指机器学习模型。它是一个带有 fit() 和 predict() 方法的对象。
     fit()就是训练模型
     predict()就是预测
     
     我们将在本文中交替使用估计器和模型。如RandomFroest(),SVM()。

1/Pipeline

我们首先从一个数据集合开始,逐步的讲解如何使用scikit-learn扩展包构建一个简单的pipeline。
# 导入数据集合
# 导入相应的包和模块
import pandas as pd
from sklearn.datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LinearRegression  # 线性模型

# =加载数据集,我们无放回随机选取了5组数据进行测试
# 设置随机种子
seed = 123
df = load_dataset('tips')
df = df.drop( columns=['tip', 'sex'] )
df = df.sample( n=5, random_state=seed )

# 人为增加一些缺失值用于测试
# 1,2,4行的2,4列赋值为np.nan
df.iloc[[1, 2, 4], [2, 4]] = np.nan
print(df)

image.png

上面代码我们随机选取五行数据进行测试,可以方便的看到在pipeline中到底发生了怎样的操作。
在创建pipeline之前我们首先要做的一步就是划分训练集和测试集,
此处我们使用'total_bill'列作为需要预测的y,及标签列,
使用'smoker','day',time'作为特征。可以看到此时这三个特征全部类别特征。
代码如下
x = df.drop( columns=['total_bill', 'size'] ) # 特征
y = df['total_bill'] # 标签
X_train, X_test, y_train, y_test = train_test_split(x,y,test_size=.2,random_state=seed)

对于特征我们首先要做的就是将其转换成机器学习模型所能接受的数据形式,因此我们首先需要做两部分工作:
1.对特征进行缺失值填充,关于缺失值填充其实是有很多技巧的。
   好的缺失值处理对模型有很大的提升。
   本文会用简单的方式填充。
2.对类别型特征进行编码

我们先不用pipeline方式,
逐步的对训练数据和测试数据进行上面两步操作,缺失值填充和特征编码
# 使用scikit-learn中自带的模块处理缺失值,strategy='constant'表示使用常量填充缺失值
# 对训练数据进行缺失值补充
# 先调用SimpleImputer()方法初始化一个对象imputer
imputer = SimpleImputer(strategy='constant',fill_value='missing') # 如果是空,则赋值为‘missing’
X_train_imputed = imputer.fit_transform(X_train) 

# 对训练数据进行特征编码
encoder = OneHotEncoder(handle_unknown='ignore', sparse=False)
X_train_encoded = encoder.fit_transform(X_train_imputed)

# 查看训练数据处理前后结果
print("******************** Training data ********************")
display(X_train)
display(pd.DataFrame(X_train_imputed, columns=X_train.columns))
display(pd.DataFrame(X_train_encoded, columns=encoder.get_feature_names(X_train.columns)))
#由于scikit-learn返回的都是numpy,因此我们通过encoder.get_feature_names可以得到编码之后的列名
#由其自动生成的

# 转换测试数据
# 对测试数据进行缺失值的补充
# 对测试数据进行特征编码
X_test_imputed = imputer.transform(X_test)
X_test_encoded = encoder.transform(X_test_imputed)

# 查看测试数据前后的结果
print("******************** Test data ********************")
display(X_test)
display(pd.DataFrame(X_test_imputed, columns=X_train.columns))
display(pd.DataFrame(X_test_encoded, columns=encoder.get_feature_names(X_train.columns)))
非pipeline方式处理结果如下:

image.png image.png

从上面的图我们可以看到,我们不用pipeline方式,也是可以把训练数据和测试数据经过缺失值填充以及编码之后已经转换成了机器模型接受的形式的。
但是纵观上述代码,我们需要手动一步步的对数据进行缺失值填充和编码处理,并且需要对测试集进行同样的重复的操作,这样还是挺麻烦的。
所以我们是否可以统一化的方式处理呢。
答案是肯定的,使用pipeline:
# 构建pipeline对象
pipe = Pipeline(
    step = [ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
             ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
           ]
    )
pipe.fit(X_train) # 这一条代码就可以对训练数据进行缺失值处理和特征编码

# 处理和处理之前训练数据对比
print("******************** Training data ********************")
display(X_train)
display(pd.DataFrame(pipe.transform(X_train), columns=pipe['encoder'].get_feature_names(X_train.columns)))

# 处理和处理之前的测试集对比
print("******************** Test data ********************")
display(X_test)
display(pd.DataFrame(pipe.transform(X_test), columns=pipe['encoder'].get_feature_names(X_train.columns)))
上述代码中,pipeline的构建接受一个列表参数,
列表是由元组构成的。
每个元组都包含两个元素,分别为转换器或者估计器的名字(也就是变量名字),和转换器或者估计器本身,
注意:估计器只能放在pipeline的最后一个。名字用于使用pipeline进行字典访问。
通过pipeline方法,进行缺失值处理和编码的结果如下:

image.png

上面可以看到使用pipeline后,我们每一步的输出都会自动的作为下一个的输入。
此时我们可以在pipeline中加入模型,从而进行从特征到模型的完整的过程。
# 构建pipeline
pipe = Pipeline(
    [ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
      ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False)), 
      ('model', LinearRegression())
    ]
    )
pipe.fit(X_train, y_train)

# 预测训练数据
y_train_pred = pipe.predict(X_train)
print(f"Predictions on training data: {y_train_pred}")

# 预测测试数据
y_test_pred = pipe.predict(X_test)
print(f"Predictions on test data: {y_test_pred}")
使用 Pipeline 不仅可以组织和简化您的代码,而且还有许多其他好处,以下是其中一些:

1.微调管道的能力:
  在构建模型时,您是否曾经不得不回过头来尝试不同的方法来预处理数据并再次运行模型,以查看预处理步骤中的调整是否提高了模型的适应度?
  在优化模型时,参数不仅存在于模型超参数中,还存在于预处理步骤的实现中。考虑到这一点,当我们有一个统一转换器和估计器的单个管道对象时,我们能够微调整个管道的超参数,包括转换器和具有 GridSearchCV 或 RandomizedSearchCV 的估计器
2. 更容易部署:在训练模型时用于准备数据的所有转换步骤也应在进行预测时应用于生产环境中的数据。
   当我们训练流水线时,我们训练包含数据转换器和模型的单个对象。
   经过训练后,此 Pipeline 对象可用于更顺畅的部署。
   
   

3/ColumnTransformer

在上述的例子中,我们注意到只涉及到类别列。
因此我们将所有的列都进行了encode,然而我们有时候需要针对不同类型的列进行不同的处理。
举个例子:一般我们会有数值列和类别列,对于类别列我们需要onehot编码,对于数值列我们需要进行归一化等。
下面我们将特征数据中既包含数值列又包含类别列。
# 划分训练集和测试集
x = df.drop(columns=['total_bill'])
y = df['total_bill']
X_train,X_test,y_train,y_test = train_test_split(x,y, test_size=.2,random_state=seed)

# 筛选出类别列
# X_train是一个dataframe对象,select_dtypes()函数可以边指定类型的列筛选出来。
categorical = list( X_train.select_dtypes('category').columns )
print(f"Categorical columns are: {categorical}")

# 筛选出数值列
# X_train是一个dataframe对象,select_dtypes()函数可以边指定类型的列筛选出来。
numerical = list( X_train.select_dtypes('number').columns )
print(f"Numerical columns are: {numerical}")
我们根据数据类型将特征分为两组,categorical和numerical
可以根据适合数据的内容进行列分组。
例如,如果不同的预处理管道更适合分类列,则可以进一步将其拆分为多个组。

由于数据中有多种数据类型,上一节中的代码现在将不再适用。
让我们看一个例子,我们使用 ColumnTransformer 和 Pipeline 在存在多种数据类型的情况下执行与以前相同的转换:
# 定义类别列处理的管道
cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
])

# 拟合训练数据,remainder='passthrough'参数的意思是对其余列保持不变
preprocessor = ColumnTransformer( 
             [('cat', cat_pipe, categorical)], 
             remainder='passthrough' )

preprocessor.fit(X_train)

# 准备列名,注意:都是通过名称以字典形式访问
cat_columns = preprocessor.named_transformers_['cat']['encoder'].get_feature_names(categorical)
columns = np.append(cat_columns, numerical)

# 转换前后对比
print("******************** Training data ********************")
display(X_train)
display(pd.DataFrame(preprocessor.transform(X_train), columns=columns))

# 测试集转换前后对比
print("******************** Test data ********************")
display(X_test)
display(pd.DataFrame(preprocessor.transform(X_test), columns=columns))

image.png

分类列的输出与上一节的输出相同。唯一的区别是这个版本有一个额外的列:size。
我们已将 cat_pipe(以前在第 1 节中称为管道)传递给 ColumnTransformer 以转换分类列并指定剩余 =“passthrough”以保持剩余列不变

也转换数值列不是很好吗?特别是,让我们用中值大小来估算缺失值并将其缩放到 0 和 1 之间.
# 定义分类列的处理管道
cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
])

# 定义数值列的处理管道
num_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', MinMaxScaler())
])

# 使用ColumnTransformer连结两个管道
preprocessor = ColumnTransformer([
    ('cat', cat_pipe, categorical),
    ('num', num_pipe, numerical)
])
preprocessor.fit(X_train)

# 准备列名
cat_columns = preprocessor.named_transformers_['cat']['encoder'].get_feature_names(categorical)
columns = np.append(cat_columns, numerical)

# 转换数据前后对比
print("******************** Training data ********************")
display(X_train)
display(pd.DataFrame(preprocessor.transform(X_train), columns=columns))

# 测试数据转换前后对比
print("******************** Test data ********************")
display(X_test)
display(pd.DataFrame(preprocessor.transform(X_test), columns=columns))

image.png

以上,现在所有的列都被处理了,范围在0和1之间。 
使用 ColumnTransformer 和 Pipeline,我们将数据分成两组,对每组应用不同的管道和不同的转换器集,然后将结果粘贴在一起。
尽管在我们的示例中,我们在数值和分类管道中具有相同数量的步骤,但您可以在管道中使用任意数量的步骤,因为它们彼此独立。
现在让我们在示例中添加一个模型:
#  定义分类列的处理管道
cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
])

# 定义数值列的处理管道
num_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', MinMaxScaler())
])

# 使用ColumnTransformer连结两个管道
preprocessor = ColumnTransformer([
    ('cat', cat_pipe, categorical),
    ('num', num_pipe, numerical)
])

# 再构建一个管道加上分类器,因为preprocessor是ColumnTransformer对象具有fit_transform方法因此是一个
#Transformer所以可以放在Pipeline中
pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('model', LinearRegression())
])
pipe.fit(X_train, y_train)

# 预测训练数据
y_train_pred = pipe.predict(X_train)
print(f"Predictions on training data: {y_train_pred}")

# 预测测试数据
y_test_pred = pipe.predict(X_test)
print(f"Predictions on test data: {y_test_pred}")
当我们需要对不同的列执行不同的操作时,ColumnTransformer 很好地补充了 Pipeline。
我们针对分类列和数字列的时候我们是自己手动筛选除了两种类型的列。
其实还可以使用scikit-learn的make_column_selector进行筛选。这样代码的量将更少
#直接使用make_column_selector类进行列筛选
from sklearn.compose import make_column_selector as selector

preprocessor = ColumnTransformer(transformers=[
    ('num', numeric_transformer, selector(dtype_exclude="category")),
    ('cat', categorical_transformer, selector(dtype_include="category"))
])
clf = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', LogisticRegression())])


clf.fit(X_train, y_train)
print("model score: %.3f" % clf.score(X_test, y_test))

3/FeatureUnion

FeatureUnion 是另一个有用的工具。它能够做 ColumnTransformer 刚刚做的事情,但方式更长,
刚才我们看到针对分类列和数值列使用了Transformer连结起了两个管道,但是有时候,我们想要根据我们自己选一些列做一些操作,另一些列做另一些操作。
然后将这些处理之后的列拼接起来。
这个时候Feature Union就很有用处了。
# 定义分类列数据处理管道
from sklearn.compose import make_column_selector as selector
cat_pipe = Pipeline([
    ('selector', selector(categorical)),
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False))
])

# 定义数值列处理管道
num_pipe = Pipeline([
    ('selector', selector(numerical)),
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', MinMaxScaler())
])

# 连结两个管道
preprocessor = FeatureUnion([
    ('cat', cat_pipe),
    ('num', num_pipe)
])

# 给管道加入模型
pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('model', LinearRegression())
])
pipe.fit(X_train, y_train)

# 预测训练数据
y_train_pred = pipe.predict(X_train)
print(f"Predictions on training data: {y_train_pred}")

# 预测测试数据
y_test_pred = pipe.predict(X_test)
print(f"Predictions on test data: {y_test_pred}")
如本例所示,使用 FeatureUnion 比使用 ColumnTransformer 更加冗长。
因此,在我看来,在类似的情况下最好使用 ColumnTransformer。
然而,FeatureUnion 绝对有它的位置。
如果您需要以不同的方式转换相同的输入数据并将它们用作特征,那么 FeatureUnion 就是其中之一。
使用Feature Union更加灵活,将在下一章构建自定义的Transformer的时候讲解。

4/Резюме

您可能已经注意到,Pipeline 是超级巨星。 
ColumnTransformer 和 FeatureUnion 是与 Pipeline 一起使用的附加工具。
当我们想要并行分治时,ColumnTransformer 更合适,而 FeatureUnion 允许我们在相同的输入数据上并行应用多个转换器。
我们将在下一节讲解如何自定义Transformer