Создание конвейера машинного обучения с помощью Kubeflow

Kubernetes
Создание конвейера машинного обучения с помощью Kubeflow

существуетпредыдущая статьяИтак, я познакомил вас с Kubeflow, платформой машинного обучения, созданной для команд, которым необходимо создавать конвейеры машинного обучения.

В этой статье мы увидим, как подробно изучить существующее машинное обучение и превратить его в конвейер машинного обучения Kubeflow, который затем можно будет развернуть в Kubernetes. Выполняя это упражнение, подумайте, как бы вы преобразовали свои существующие проекты машинного обучения в Kubeflow.

Я буду использовать Fashion MNIST в качестве примера, так как сложность модели не является основной целью, которую мы должны решить в этом упражнении. Для этого простого примера я разделил пайплайн на 3 этапа:

  • Git клонирует кодовую базу

  • Загрузка и повторная обработка обучающих и тестовых данных

  • оценка обучения

Конечно, вы можете разделить конвейер по своему усмотрению, в зависимости от вашего варианта использования, и вы можете расширить конвейер по своему желанию.

получить код

Вы можете получить код из Github:

% git clone https://github.com/benjamintanweihao/kubeflow-mnist.git

Ниже приведен полный список, который мы использовали для создания нашего конвейера. На самом деле ваш код, скорее всего, будет охватывать несколько библиотек и файлов. В нашем случае мы разбили код на два скрипта,preprocessing.pyиtrain.py.

from tensorflow import keras
import argparse
import os
import pickle


def preprocess(data_dir: str):
    fashion_mnist = keras.datasets.fashion_mnist
    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

    train_images = train_images / 255.0
    test_images = test_images / 255.0

    os.makedirs(data_dir, exist_ok=True)

    with open(os.path.join(data_dir, 'train_images.pickle'), 'wb') as f:
  pickle.dump(train_images, f)

    with open(os.path.join(data_dir, 'train_labels.pickle'), 'wb') as f:
  pickle.dump(train_labels, f)

    with open(os.path.join(data_dir, 'test_images.pickle'), 'wb') as f:
        pickle.dump(test_images, f)

    with open(os.path.join(data_dir, 'test_labels.pickle'), 'wb') as f:
        pickle.dump(test_labels, f)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubeflow MNIST training script')
    parser.add_argument('--data_dir', help='path to images and labels.')
    args = parser.parse_args()

    preprocess(data_dir=args.data_dir)

Скрипт обработки принимает единственный параметрdata_dir. Он загружает и предварительно обрабатывает данные, аpickledверсия сохраняется вdata_dirсередина. В производственном коде это может быть каталог хранения для TFRecords.

train.py

import calendar
import os
import time

import tensorflow as tf
import pickle
import argparse

from tensorflow import keras
from constants import PROJECT_ROOT


def train(data_dir: str):
    # Training
    model = keras.Sequential([
          keras.layers.Flatten(input_shape=(28, 28)),
          keras.layers.Dense(128, activation='relu'),
          keras.layers.Dense(10)])

    model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    with open(os.path.join(data_dir, 'train_images.pickle'), 'rb') as f:
        train_images = pickle.load(f)

    with open(os.path.join(data_dir, 'train_labels.pickle'), 'rb') as f:
        train_labels = pickle.load(f)

    model.fit(train_images, train_labels, epochs=10)

    with open(os.path.join(data_dir, 'test_images.pickle'), 'rb') as f:
        test_images = pickle.load(f)

    with open(os.path.join(data_dir, 'test_labels.pickle'), 'rb') as f:
        test_labels = pickle.load(f)

    # Evaluation
    test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)

    print(f'Test Loss: {test_loss}')
    print(f'Test Acc: {test_acc}')

    # Save model
    ts = calendar.timegm(time.gmtime())
    model_path = os.path.join(PROJECT_ROOT, f'mnist-{ts}.h5')
    tf.saved_model.save(model, model_path)

    with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
        f.write(model_path)
        print(f'Model written to: {model_path}')


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Kubeflow FMNIST training script')
    parser.add_argument('--data_dir', help='path to images and labels.')
    args = parser.parse_args()

    train(data_dir=args.data_dir)

существуетtrain.py, модель будет построена и использованаdata_dirУкажите расположение обучающих и тестовых данных. После обучения модели и начала ее оценки запишите модель в путь с отметкой времени. Обратите внимание, что путь также записываетсяoutput.txt. Об этом будет сказано позже.

Разработайте пайплайн Kubeflow

Чтобы начать создание конвейера Kubeflow, нам нужно вытащить некоторые зависимости. я приготовил одинenvironment.yml, который включаетkfp 0.5.0,tensorflowи другие необходимые зависимости.

Вам необходимо установить Conda, затем выполните следующие действия:

% conda env create -f environment.yml
% source activate kubeflow-mnist
% python preprocessing.py --data_dir=/path/to/data
% python train.py --data_dir=/path/to/data

Теперь давайте рассмотрим несколько шагов в нашем конвейере:

  • Git клонирует кодовую базу

  • Загрузка и предварительная обработка обучающих и тестовых данных

  • Тренируйтесь и оценивайте

Прежде чем мы начнем писать код, нам нужно понять конвейер Kubeflow на макроуровне.

Конвейер состоит из связанных компонентов. Вывод одного компонента становится вводом другого компонента, каждый из которых фактически выполняется в контейнере (в данном случае Docker). Что произойдет, так это то, что мы выполним образ Docker, который мы укажем позже, который содержитpreprocessing.pyиtrain.pyвсе необходимое. Конечно, эти две фазы будут иметь свои составляющие.

Нам также нужно дополнительное зеркало, чтобыgit cloneпроект. Нам нужно запечь проект в образ Docker, но в реальном проекте это может привести к увеличению размера образа Docker.

Говоря об образах Docker, мы должны сначала создать один.

Step0: Создайте образ Docker

Если вы просто хотите протестировать, то этот шаг не нужен, потому что я подготовил образ на Docker Hub. ЭтоDockerfileПолная картина:

FROM tensorflow/tensorflow:1.14.0-gpu-py3
LABEL MAINTAINER "Benjamin Tan <benjamintanweihao@gmail.com>"
SHELL ["/bin/bash", "-c"]

# Set the locale
RUN echo 'Acquire {http::Pipeline-Depth "0";};' >> /etc/apt/apt.conf
RUN DEBIAN_FRONTEND="noninteractive"
RUN apt-get update  && apt-get -y install --no-install-recommends locales && locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8

RUN apt-get install -y --no-install-recommends \
    wget \
    git \
    python3-pip \
    openssh-client \
    python3-setuptools \
    google-perftools && \
    rm -rf /var/lib/apt/lists/*

# install conda
WORKDIR /tmp
RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.7.12-Linux-x86_64.sh -O ~/miniconda.sh && \
    /bin/bash ~/miniconda.sh -b -p /opt/conda && \
    rm ~/miniconda.sh && \
    ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \
    echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc

# build conda environments
COPY environment.yml /tmp/kubeflow-mnist/conda/
RUN /opt/conda/bin/conda update -n base -c defaults conda
RUN /opt/conda/bin/conda env create -f /tmp/kubeflow-mnist/conda/environment.yml
RUN /opt/conda/bin/conda clean -afy

# Cleanup
RUN rm -rf /workspace/{nvidia,docker}-examples && rm -rf /usr/local/nvidia-examples && \
    rm /tmp/kubeflow-mnist/conda/environment.yml

# switch to the conda environment
RUN echo "conda activate kubeflow-mnist" >> ~/.bashrc
ENV PATH /opt/conda/envs/kubeflow-mnist/bin:$PATH
RUN /opt/conda/bin/activate kubeflow-mnist

# make /bin/sh symlink to bash instead of dash:
RUN echo "dash dash/sh boolean false" | debconf-set-selections && \
    DEBIAN_FRONTEND=noninteractive dpkg-reconfigure dash

# Set the new Allocator
ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libtcmalloc.so.

В отношении Dockerfile важно отметить, настроена ли среда Conda и готова ли она к работе. Чтобы построить образ:

% docker build -t your-user-name/kubeflow-mnist . -f Dockerfile
% docker push your-user-name/kubeflow-mnist

Итак, теперь давайте создадим наш первый компонент!

существуетpipeline.pyСледующие фрагменты кода можно найти в файлах .

Шаг 1: Git-клон

На этом шаге мы выполнимgit clone. В частности, я хочу показать вам, как это сделать из частного репозитория.git clone, так как именно здесь расположено большинство проектов компаний. Конечно, это также отличная возможность продемонстрировать интересную функцию Rancher, которая просто добавляет ключи, такие как ключи SSH.

Добавить ключи с Rancher

Получите доступ к интерфейсу Rancher. В верхнем левом углу выберите «Локальный», затем «По умолчанию» во вторичном меню:

Затем выберите «Секреты» в разделе «Ресурсы».

Вы должны увидеть список ключей, которые используются только что выбранным кластером. Нажмите Добавить секрет:

Заполните страницу значениями, которые вы видите на изображении ниже. Если kubeflow не отображается в столбце пространства имен, вы можете выбратьAdd to a new namespaceи введитеkubeflowПросто создайте его.

Убедитесь, что Scope — это просто пространство имен. Установка Scope для всех пространств имен позволит любой рабочей нагрузке в проекте по умолчанию использовать ваш ключ ssh.

В секретных ценностях ключid_rsa, значениеid_rsaСодержание. Когда закончите, нажмите Сохранить.

Если что-то пойдет хорошо, вы увидите что-то вроде изображения ниже. Теперь вы успешно добавили свой SSH-ключ в пространство имен kubeflow, и вам не нужно использовать kubectl!

Теперь, когда мы добавили наш ключ SSH, пришло время вернуться к коду. Как мы можем использовать недавно добавленный ключ SSH для доступа к частному репозиторию git?

def git_clone_darkrai_op(repo_url: str):

    volume_op = dsl.VolumeOp(
        name="create pipeline volume",
        resource_name="pipeline-pvc",
        modes=["ReadWriteOnce"],
        size="3Gi"
    )

    image = 'alpine/git:latest'

    commands = [
        "mkdir ~/.ssh",
        "cp /etc/ssh-key/id_rsa ~/.ssh/id_rsa",
        "chmod 600 ~/.ssh/id_rsa",
        "ssh-keyscan bitbucket.org >> ~/.ssh/known_hosts",
        f"git clone {repo_url} {PROJECT_ROOT}",
        f"cd {PROJECT_ROOT}"]

    op = dsl.ContainerOp(
        name='git clone',
        image=image,
        command=['sh'],
        arguments=['-c', ' && '.join(commands)],
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": volume_op.volume}
    )

    # Mount Git Secrets
    op.add_volume(V1Volume(name='ssh-key-volume',
                           secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))
    op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))

    return op

Сначала создайте том Kubernetes с предопределенным размером 3Gi. Во-вторых,imageПеременная указана как та, которую мы будем использоватьalpine/gitДокер образ. После этого идет список команд для выполнения в контейнере Docker. Эти команды, по сути, настраивают ключи SSH, чтобы конвейеры могли получать доступ к частным репозиториям изgit cloneили используйтеgit://URLзаменитьhttps://.

Ядром функции является следующая строка, которая возвращаетdsl.ContainerOp.

commandиargumentsУказывает команду, которая должна быть выполнена после выполнения зеркального отображения.

Последняя переменная очень интересна иpvolumes, что является сокращением от Pipeline Volumes. Он создает том Kubernetes и позволяет компонентам конвейера совместно использовать одно хранилище. Том смонтирован на/workspaceначальство. Итак, что должен сделать этот компонент, это поместить репозиторийgit cloneприбыть/workspaceсередина.

Используйте секреты

Посмотрите еще раз на команду и куда копировать ключ SSH.

Где создается том конвейера? Когда мы объединим все компоненты в конвейер, мы увидим созданный объем. мы в/etc/ssh-key/Установить секреты на:

op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))

Помните, мы назвали секрет какssh-key-secret:

op.add_volume(V1Volume(name='ssh-key-volume',
                           secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))

используя то же имя томаssh-key-volume, мы можем связать все вместе.

Шаг 2: Предварительная обработка

def preprocess_op(image: str, pvolume: PipelineVolume, data_dir: str):
    return dsl.ContainerOp(
        name='preprocessing',
        image=image,
        command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/preprocessing.py"],
        arguments=["--data_dir", data_dir],
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": pvolume}
    )

Как видите, этапы предварительной обработки выглядят очень похоже.

imageУкажите на образ Docker, который мы создали на шаге 0.

здесьcommandпросто выполняется с использованием указанного conda pythonpreprocessing.pyсценарий. Переменнаяdata_dirиспользуется для выполненияpreprocessing.pyсценарий.

на этом этапеpvolumeбудет/workspaceВ нем есть репозитории, а значит все наши скрипты доступны на данном этапе. и на этом этапе предварительно обработанные данные будут сохранены в/workspaceвнизdata_dirсередина.

Шаг 3: Обучение и оценка

def train_and_eval_op(image: str, pvolume: PipelineVolume, data_dir: str, ):
    return dsl.ContainerOp(
        name='training and evaluation',
        image=image,
        command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/train.py"],
        arguments=["--data_dir", data_dir],
        file_outputs={'output': f'{PROJECT_ROOT}/output.txt'},
        container_kwargs={'image_pull_policy': 'IfNotPresent'},
        pvolumes={"/workspace": pvolume}
    )

Наконец, пришло время для этапа обучения и оценки. Единственная разница на этом шаге в том, чтоfile_outputsПеременная. Если мы снова посмотримtrain.py, следующий фрагмент кода:

    with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f:
        f.write(model_path)
        print(f'Model written to: {model_path}')

Мы записываем путь модели в файл с именемoutput.txtв текстовом файле. Как правило, его можно отправить следующему компоненту конвейера, и в этом случае параметр будет содержать путь к модели.

собрать все вместе

Чтобы указать конвейер, вам нужно использоватьdsl.pipelineЧтобы аннотировать функцию конвейера:

@dsl.pipeline(
    name='Fashion MNIST Training Pipeline',
    description='Fashion MNIST Training Pipeline to be executed on KubeFlow.'
)
def training_pipeline(image: str = 'benjamintanweihao/kubeflow-mnist',
                      repo_url: str = 'https://github.com/benjamintanweihao/kubeflow-mnist.git',
                      data_dir: str = '/workspace'):
    git_clone = git_clone_darkrai_op(repo_url=repo_url)

    preprocess_data = preprocess_op(image=image,
                                    pvolume=git_clone.pvolume,
                                    data_dir=data_dir)

    _training_and_eval = train_and_eval_op(image=image,
                                           pvolume=preprocess_data.pvolume,
                                           data_dir=data_dir)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(training_pipeline, __file__ + '.tar.gz')

Помните, когда выходные данные конвейерного компонента были входными данными другого компонента? это здесь,git clone,container_opизpvolumeбудет переданоpreprocess_cp.

Последняя часть будетpipeline.pyПреобразование в исполняемый скрипт. Последним шагом является компиляция пайплайна:

% dsl-compile --py pipeline.py --output pipeline.tar.gz

Загрузите и выполните конвейер

Теперь самое интересное! Первым шагом является загрузка пайплайна. нажмитеUpload a pipeline:

Далее заполнитеPipeline NameиPipeline Description, затем выберитеChoose fileи указать наpipeline.tar.gzдля загрузки конвейера.

На следующей странице будет показан полный конвейер. Мы рассматриваем конвейерный ориентированный ациклический граф, что в данном случае означает, что зависимости идут в одном направлении и он не содержит циклов. Нажмите синюю кнопкуCreate runчтобы начать обучение.

Большинство полей уже заполнено. Пожалуйста, обрати внимание,Run parametersи использовать@ dsl.pipelineАннотированныйtraining_pipelineПараметры, указанные в функции, те же:

Наконец, когда вы нажимаете синюю кнопку «Пуск», весь конвейер начинает работать! Вы можете щелкнуть по каждому компоненту и просмотреть журналы, чтобы увидеть, что произошло. Когда весь конвейер будет завершен, справа от всех компонентов появится зеленый знак подтверждения, как показано ниже:

в заключении

Если вы следили за последней статьей, у вас уже должен быть установлен Kubeflow, и вы должны понимать сложности управления проектами машинного обучения в масштабе.

В этом посте мы сначала рассмотрели процесс подготовки проекта машинного обучения для Kubeflow, затем построили конвейер Kubeflow и, наконец, загрузили и выполнили конвейер с использованием интерфейса Kubeflow. Самое замечательное в этом подходе то, что ваш проект машинного обучения может быть как простым, так и сложным, и вы можете использовать одни и те же методы сколь угодно долго.

Поскольку Kubeflow использует контейнеры Docker в качестве компонентов, вы можете добавить любой инструмент, который вам нравится. А поскольку Kubeflow работает на Kubernetes, вы можете позволить Kubernetes управлять планированием рабочих нагрузок машинного обучения.

Мы также узнали о функции Rancher, которая мне нравится, она очень удобна и позволяет легко добавлять секреты. В мгновение ока вы можете легко организовать секреты (например, ключи SSH) и выбрать, какое пространство имен их назначить, не беспокоясь о кодировке Base64. Подобно магазину приложений Rancher, эти удобства делают работу с Kubernetes более приятной и менее подверженной ошибкам.

Конечно, Rancher предлагает гораздо больше, и я призываю вас исследовать его самостоятельно. Я уверен, что вы наткнетесь на некоторые функции, которые удивят вас. Как платформа управления Kubernetes корпоративного уровня с открытым исходным кодом, Rancher, Run Kubernetes Everywhere всегда была нашим видением и целью. Функции с открытым исходным кодом и отсутствие привязки к поставщику позволяют пользователям легко развертывать и использовать Rancher в различных инфраструктурах. Кроме того, минималистичный опыт работы Rancher также позволяет пользователям использовать Rancher для повышения эффективности в различных сценариях, помогая разработчикам сосредоточиться на инновациях, не тратя энергию на утомительные мелочи.