Доклад Алинка (5): Итеративные вычисления и супершаг

машинное обучение

Раньше информация была неверная, и ее разделили на две статьи и выпустили, а теперь публикуют полностью.

0x00 сводка

Alink — это платформа алгоритмов машинного обучения нового поколения, разработанная Alibaba на основе вычислительного движка реального времени Flink, и первая в отрасли платформа машинного обучения, которая поддерживает как пакетные, так и потоковые алгоритмы. Итерационные алгоритмы используются во многих областях анализа данных, таких как машинное обучение или графические вычисления. Эта статья начнется с Superstep, чтобы увидеть, как Alink использует итеративный API Flink для реализации определенных алгоритмов.

Поскольку общедоступная информация Alink слишком мала, нижеследующее - все предположения, и обязательно будут упущения и ошибки. Я надеюсь, что все укажут, и я обновлю их в любое время.

0x01 Причина

Причина, по которой упоминается концепция супершага, заключается в том, что когда я писал код KMeans, я обнаружил несколько странных мест, например, в следующих трех шагах используется context.getStepNo(), и разные сервисы выполняются в соответствии с разными значения.

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        LOG.info("liuhao  KMeansPreallocateCentroid ");
        if (context.getStepNo() == 1) {
          /** 具体业务逻辑代码
           * Allocate memory for pre-round centers and current centers.
           */        
        }
    }
}  

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具体业务逻辑代码
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具体业务逻辑代码
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

Проверьте исходный код ComContext и обнаружите, что источник stepNo на самом делеruntimeContext.getSuperstepNumber().

public class ComContext {
   private final int taskId;
   private final int numTask;
   private final int stepNo; // 对,就是这里
   private final int sessionId;
	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber(); // 这里进行了变量初始化
	}  
	/**
	 * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
	 * @return iteration step number.
	 */
	public int getStepNo() {
		return stepNo; // 这里是使用
	}  
}

Увидев здесь некоторых братьев, может быть шокирован,Это не концепция модели BSP. Я просто хочу написать алгоритм KMeans, как я могу рассматривать модель BSP в дополнение к модели MPI. Давайте копать шаг за шагом, что именно сделал Alink.

0x02 Фоновая концепция

2.1 Четырехуровневая схема выполнения

Граф выполнения во Flink можно разделить на четыре слоя: StreamGraph -> JobGraph -> ExecutionGraph -> Граф физического выполнения.

  • StreamGraph: исходный график, сгенерированный кодом, написанным Stream API. Используется для представления топологии программы.
  • JobGraph: StreamGraph оптимизирован для создания JobGraph, который представляет собой структуру данных, отправляемую в JobManager. Основная оптимизация заключается в объединении нескольких подходящих узлов в один узел, что может уменьшить потребление сериализации/десериализации/передачи, необходимого для потока данных между узлами. JobGraph — единственная структура данных, распознаваемая механизмом потоков данных Flink для выражения заданий, и именно эта общая абстракция воплощает единство потоковой обработки и пакетной обработки во время выполнения.
  • ExecutionGraph: JobManager создает ExecutionGraph на основе JobGraph. ExecutionGraph — это распараллеленная версия JobGraph, которая является основной структурой данных уровня планирования.
  • Физический график выполнения: после того, как JobManager планирует задание в соответствии с ExecutionGraph, «граф», сформированный после развертывания задачи в каждом диспетчере задач, не является конкретной структурой данных.

2.2 Задача и подзадача

По какой-то причине использование этих двух концепций изначально сбивает с толку во Flink: концепция подзадач в диспетчере задач реализуется классом под названием Task. Объект Task, о котором говорится в диспетчере задач, на самом деле соответствует подзадаче в ExecutionGraph.

Поэтому эти два понятия нуждаются в уточнении.

  • Задача (task): Задача соответствует узлу JobGraph и является оператором. Задача — это набор нескольких подзадач с одной и той же функцией на этапе, аналогичный набору задач в Spark.
  • subTask (подзадача): subTask – это наименьшая исполнительная единица задачи во Flink. Это экземпляр класса Java. Этот класс Java имеет свойства и методы для выполнения определенной логики вычислений. В ExecutionGraph задача разбивается на несколько подзадач, которые выполняются параллельно. Каждая подзадача назначается Диспетчеру задач для выполнения в качестве выполнения.
  • Цепочки операторов: несколько операторов без перемешивания объединяются в подзадаче для формирования цепочек операторов, аналогично конвейеру в Spark. Количество подзадач оператора относится к степени параллелизма операторов. Различные операторы одной и той же программы также могут иметь разную степень параллелизма (поскольку степень параллелизма может быть изменена методом setParallelism()).

Программы во Flink по своей сути параллельны. Во время выполнения каждый оператор (преобразование) имеет одну или несколько подзадач оператора (подзадача оператора), причем подзадачи каждого оператора независимы друг от друга и выполняются в разных потоках, а также могут выполняться в разных потоках. контейнер.

Задача (подзадача) — это объект, который можно запускать.После получения TDD диспетчер задач будет использовать его для создания экземпляра объекта задачи и запускает поток для выполнения метода запуска задачи.

TaskDeploymentDescriptor(TDD): структура данных, которую диспетчер задач отправляет в TM в submitTask. Он содержит всю описательную информацию о Задаче. Например:

  • TaskInfo : содержит класс java, выполняемый Task, который является классом реализации AbstractInvokable и, конечно же, классом реализации оператора (например, DataSourceTask, DataSinkTask, BatchTask, StreamTask и т. д.).
  • Описание IG: обычно содержит один или два дескриптора InputGateDeploymentDescriptor (IGD).
  • Описание целевой RP: ParitionId, PartitionType, количество RS и т. д.

2.3 Основа разделения задач

Задачи будут перераспределены в следующих случаях

  • При изменении степени параллелизма
  • keyBy() /window()/apply() и т. д. Переназначение перебалансировки;
  • Вызовите метод startNewChain(), чтобы запустить новую цепочку операторов;
  • Вызвать метод diableChaining(), то есть указать текущей операции оператора не использовать операцию цепочки операторов.

Например, следующие операции

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)

Тогда поток преобразования StreamGraph:

 Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

Его задач четыре:

  • Source --> Filter --> Map
  • keyBy
  • timeWindow
  • Sink

Каждая задача будет разделена на несколько подзадач. Во время выполнения Задача будет распараллелена на несколько экземпляров подзадачи для выполнения, и одна подзадача соответствует одному потоку выполнения.

2.4 JobGraph

Сказав так много выше, я хочу поговорить о jobGraph и подзадаче, потому что в этой статье, когда мы анализируем исходный код и отлаживаем, мы в основном начинаем с jobGraph, чтобы посмотреть на подзадачу.

На основе StreamGraph JobGraph выполняет связанную операцию слияния на StreamNode.Например, для цепочки обработки данных, такой как источник -> flatMap -> уменьшить -> приемник, когда источник и flatMap соответствуют условиям ссылки, вы можете объединить два Операции каждого оператора выполняются параллельно в одном потоке, что может уменьшить передачу данных в сети.Поскольку данные, передаваемые между источником и flatMap, не нужно сериализовать и десериализовать, эффективность выполнения программы также улучшен.

По сравнению с StreamGraph и Batch Optimized Plan, JobGraph претерпел некоторые изменения и больше не является «статической» структурой данных, поскольку добавляет «динамическую» концепцию промежуточного набора данных.

Вершина задания (JobVertex), промежуточный набор данных (IntermediateDataSet) и граница задания (JobEdge) являются основными элементами, из которых состоит JobGraph. Эти три объекта зависят друг от друга:

  • JobVertex связан с несколькими JobEdge в качестве входных данных и несколькими IntermediateDataSet в качестве результирующих наборов, которые он производит; каждая JobVertex имеет такие свойства, как параллелизм и код выполнения.
  • IntermediateDataSet связан с JobVertex как производитель и несколько JobEdges как потребители;
  • JobEdge, связанный с IntermediateDataSet, может считаться источником, а JobVertex может считаться целевым потребителем;

Так как же JobGraph организует и хранит эти элементы? На самом деле JobGraph хранит только все JobVertex в виде карты, а ключ — это JobVertexID:

private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

Что касается других элементов, то их можно найти по связи через JobVertex. Следует отметить, что ребра обратной связи, используемые для итерации, в настоящее время не воплощены в JobGraph, а встроены в специальный JobVertex для установления связи между ними через канал обратной связи.

2.5 Модель BSP и Superstep

BSP-модель

Модель BSP представляет собой разновидность модели параллельных вычислений. Модель параллельных вычислений обычно относится к абстрагированию основных характеристик различных параллельных компьютеров (по крайней мере, определенного типа параллельных компьютеров) из разработки и анализа параллельных алгоритмов для формирования абстрактной модели вычислений.

Модель BSP представляет собой асинхронную модель MIMD-DM (DM: распределенная память, SM: общая память). Модель BSP поддерживает систему передачи сообщений, асинхронный параллелизм внутри блока и явную синхронизацию между блоками. Модель основана на мастере. согласование, все воркеры Синхронное (лок-шаговое) исполнение, данные считываются из входной очереди.

Вычислительная модель BSP — это не только архитектурная модель, но и метод проектирования параллельных программ. Принципом программирования BSP является объемная синхронизация, которая уникальна введением концепции супершага. Программа BSP имеет как горизонтальную, так и вертикальную структуру. По вертикали программа BSP состоит из серии последовательных супершагов.

Реализация модели BSP

Пример реализации модели BSP выглядит следующим образом:

  • Pregel: платформа крупномасштабных графовых вычислений Google, впервые предложившая применение модели BSP для графовых вычислений. Подробнее см. в Pregel — крупномасштабной системе обработки графов, но до сих пор она не была открыта.
  • Apache Giraph: Проект Incubator сообщества ASF, предоставленный Yahoo!, представляет собой реализацию BSP на Java, ориентированную на итерационные вычисления графа (такие как рейтинг страниц, кратчайшее соединение и т. д.), каждое задание представляет собой задание Hadoop без процесса редьюсера.
  • Apache Hama: это также инкубаторный проект сообщества ASF.В отличие от Giraph, это чистая Java-реализация модели BSP, и он не только используется для вычислений графа, но и предназначен для обеспечения общей прикладной среды для модели BSP.

Flink-Gelly

Flink-Gelly использует эффективные итерационные операторы Flink для поддержки итеративной графовой обработки массивных данных. В настоящее время Flink Gelly предоставляет реализации вычислительных моделей, таких как «Vertex-Centric», «Scatter-Gather» и «Gather-Sum-Apply».

Итеративная модель «Vertex-Centric», также известная как «Pregel», представляет собой метод вычисления графа с точки зрения Vertex. Среди них шаг синхронно-итерационного расчета называется «супершаг». В каждом «супершаге» каждая вершина выполняет определяемую пользователем функцию, и вершины обмениваются сообщениями.Когда вершина знает уникальный идентификатор любой другой вершины в графе, вершина может отправить ей сообщение.

Но на самом деле KMAIANS не является обработкой графа, а Alink не построена на мили. Может быть, только что заимствовал его концепцию. Так что нам все еще нужно выяснить.

0x03 Итеративный алгоритм Флинка (на основе супершагов)

Итерационные алгоритмы используются во многих областях анализа данных, таких как машинное обучение или графические вычисления. Чтобы извлечь полезную информацию из больших данных, в процессе обработки часто требуется итеративный расчет.

Так называемая итерационная операция заключается в задании начального значения, использовании данной формулы алгоритма для вычисления начального значения для получения промежуточного результата, а затем использовании промежуточного результата в качестве входного параметра для повторного расчета и получении результата расчета при определенных условиях. условия соблюдены.

Существует множество фреймворков для обработки больших данных, таких как spark и mr. На самом деле, эти итерационные вычисления очень трудно осуществить.

Flink напрямую поддерживает итерационные вычисления. Идея Flink для реализации итерации тоже очень проста, то есть реализовать пошаговую функцию, а затем встроить ее в итеративный оператор. Есть два итерационных оператора: Iterate и Delta Iterate. Оба оператора вызывают пошаговую функцию до тех пор, пока не будет получен сигнал завершения.

3.1 Bulk Iterate

Этот итерационный метод называется полной итерацией, Он будет вводить все данные, и после определенного количества итераций вы, наконец, получите желаемый результат.

Итеративный оператор включает в себя простую итеративную форму: на каждой итерации ступенчатая функция потребляет весь объем данных (вход за это время и результат предыдущей итерации), а затем вычисляет выходные данные следующей итерации (например, карта , уменьшать, объединять и т. д. )

Итерационный процесс в основном делится на следующие этапы:

  • Вход итерации: это начальное входное значение или результат предыдущего итеративного вычисления.
  • Пошаговая функция: пошаговая функция выполняется на каждой итерации. Он итеративно вычисляет DataSet, который состоит из ряда операторов, таких как map, flatMap, join и т. д., в зависимости от конкретной бизнес-логики.
  • Следующее частичное решение: результат расчета каждой итерации отправляется на расчет следующей итерации.
  • Результат итерации: результат последней выходной итерации, который выводится в приемник данных или отправляется на нижестоящую обработку.

Конечным условием его итерации является:

  • Достигнуто максимальное количество итераций
  • Пользовательская агрегатная функция конвергенции

При программировании вам нужно вызвать iterate(int), который возвращает IterativeDataSet.Конечно, мы можем выполнять над ним некоторые операции, например map. Единственным параметром функции Iterate является максимальное количество итераций.

Итерация представляет собой кольцо. Нам нужно выполнить операцию с обратной связью, затем нам нужно использовать операцию closeWith(Dataset) в это время, а параметром является набор данных, который необходимо зациклить и повторить. Вы также можете дополнительно указать критерий завершения, работать с closeWith(DataSet, DataSet), вы можете завершить итерацию, оценив, пуст ли второй набор данных. Если условия завершения не указаны, итерация завершится после максимального количества итераций.

3.2 Итеративный механизм

API DataSet представляет уникальный механизм синхронной итерации (на основе супершагов), который ограничен ограниченными потоками.

Мы называем выполнение каждой ступенчатой ​​функции итеративного оператора одной итерацией. В параллельной настройке несколько экземпляров ступенчатой ​​функции вычисляются параллельно в разных разделах состояния итерации. Во многих случаях одна оценка ступенчатой ​​функции для всех параллельных экземпляров формирует так называемый супершаг, который также является степенью детализации синхронизации. Следовательно, все параллельные задачи итерации должны завершить супершаг перед инициализацией следующего супершага. Критерии завершения также будут оцениваться как суперэтап.Синхронизироватьбарьер.

Ниже приведен исходный текст Apache

We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.

Ниже приведен исходный образ apache.

Supersteps

Обобщенно следующим образом:

每次迭代都是一个superstep
    每次迭代中有若干subtask在不同的partition上分别执行step
      	 每个step有一个HeadTask,若干IntermediateTask,一个TailTask
    每个superstep有一个SynchronizationSinkTask 同步,因为迭代的所有并行任务需要在下一个迭代前完成

Из этого мы можем узнать, что супершаг — это концепция Flink DataSet API, но отсюда вы можете увидеть тень модели BSP, например:

  • В традиционной модели BSP супершаг делится на три этапа: локальные вычисления, доставка сообщений и синхронный барьер.
  • Барьерная синхронизация также называется барьерной синхронизацией или барьерной синхронизацией. Каждая синхронизация также является завершением супершага и началом следующего супершага;
  • Супершаг Супершаг — это вычислительная итерация, и каждый шаг вперед от начала соответствует супершагу.
  • Когда программа должна закончиться, контролируется самой программой

0x04 Как Alink использует итерацию

В функции KMeansTrainBatchOp.iterateICQ создается IterativeComQueue, а в IterativeComQueue используется итерация на основе супершагов.

return new IterativeComQueue()
   .initWithPartitionedData(TRAIN_DATA, data)
   .initWithBroadcastData(INIT_CENTROID, initCentroid)
   .initWithBroadcastData(KMEANS_STATISTICS, statistics)
   .add(new KMeansPreallocateCentroid())
   .add(new KMeansAssignCluster(distance))
   .add(new AllReduce(CENTROID_ALL_REDUCE))
   .add(new KMeansUpdateCentroids(distance))
   .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 终止条件
   .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) 
   .setMaxIter(maxIter) // 迭代最大次数
   .exec();

И функция BaseComQueue.exec имеет:

public DataSet<Row> exec() {
   IterativeDataSet<byte[]> loop // Flink 迭代API
      = loopStartDataSet(executionEnvironment)
      .iterate(maxIter);
     // 后续操作能看出来,之前添加在queue上的比如KMeansPreallocateCentroid,都是在loop之上运行的。
  		if (null == compareCriterion) {
        loopEnd = loop.closeWith...
     	} else {     
        // compare Criterion.
        DataSet<Boolean> criterion = input ... compareCriterion
        loopEnd = loop.closeWith( ... criterion ... )
      }   
}

Присмотревшись к коду, мы видим, что:

superstepвключают:

.add(new KMeansPreallocateCentroid()) .add(new KMeansAssignCluster(distance)) .add(new AllReduce(CENTROID_ALL_REDUCE)) .add(new KMeansUpdateCentroids(distance))

Критерии прекращенияэто

RichMapPartitionFunction создается с использованием KMeansIterTermination в качестве критерия завершения. В конце вызовите KMeansOutputModel, чтобы завершить бизнес-операцию.

максимальный циклэто

.setMaxIter(maxIter)

Таким образом, мы можем сделать вывод, чтоИтеративный оператор Bulk Iterate на основе супершагов используется для реализации общего алгоритма KMeans, который представляет собой итерацию супершагов. Однако, если в содержимом супершага требуется связь или синхронизация ограждения, используется allReduce MPI.

0x05 Углубитесь в исходный код и среду выполнения Flink, чтобы проверить

Нам нужно углубиться во Flink, чтобы копать и проверять.Если вам интересно, вы можете обратиться к стеку вызовов ниже и добавить точки останова для исследования.

execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)

5.1 Отправить задание на Flink

Соединение между Alink и Flink устанавливается в вызове печати. Поскольку это локальная отладка, Flink запустит миникластер, а затем выполнит следующие операции.

  • Сначала сгенерируйте план выполнения Plan. План представляет пакетную программу в виде потока данных, но это только начальное представление пакетной программы, а затем план оптимизируется для создания более эффективного плана OptimizedPlan.
  • Затем план компилируется для создания JobGraph. Этот граф должен быть передан flink для создания графа задач.
  • Создайте серию конфигураций.
  • Передайте JobGraph и конфигурацию кластеру flink для запуска. Если он не работает локально, он также отправит файл jar на другие узлы по сети.
  • При работе в локальном режиме вы можете видеть процесс запуска, например показатели производительности при запуске, веб-модули, JobManager, ResourceManager, taskManager и т. д.

когда мы увиделиsubmitJobПозвонив, вы знаете, что код KMeans установил соединение с Flink.

@Internal
public class LocalExecutor implements PipelineExecutor {

   public static final String NAME = "local";

   @Override
   public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

      // we only support attached execution with the local executor.
      checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

      final JobGraph jobGraph = getJobGraph(pipeline, configuration);
      final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
      final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

      CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

      jobIdFuture
            .thenCompose(clusterClient::requestJobResult)
            .thenAccept((jobResult) -> clusterClient.shutDownCluster());

      return jobIdFuture.thenApply(jobID ->
            new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
   }

5.2 Создание графа заданий

Конкретный процесс создания jobGraph:

  • IterativeDataSet.closeWith создает BulkIterationResultSet.
  • ExecutionEnvironment.executeAsync будет вызываться в PrintBatchOp.sinkFrom
  • Вызовите createProgramPlan для создания плана
  • Обнаружение функции OperatorTranslation.translateif (dataSet instanceof BulkIterationResultSet), затем позвонитеtranslateBulkIteration(bulkIterationResultSet);
  • В это время создается план выполнения.
  • ExecutionEnvironment.executeAsync вызывает LocalExecutor.execute
  • Затем вызовите FlinkPipelineTranslationUtil.getJobGraph для создания jobGraph.
  • GraphCreatingVisitor.preVisit оценитif (c instanceof BulkIterationBase), чтобы сгенерировать BulkIterationNode
  • PlanTranslator.translateToJobGraph вызовет JobGraphGenerator.compileJobGraph и, наконец, вызовет createBulkIterationHead для создания итеративно обработанного заголовка.
  • Наконец, jobGraph передается в Cluster, а jobGraph преобразуется в ExceutionGraph и выполняется на JM и TM.

5.3 Задача, соответствующая итерации

В предыдущем коде функция getJobGraph заключается в создании графа заданий.

Затем JobManager создает ExecutionGraph на основе JobGraph. ExecutionGraph — это распараллеленная версия JobGraph, которая является основной структурой данных уровня планирования.

Наконец, после того как JobManager планирует задание в соответствии с ExecutionGraph, задача развертывается в каждом диспетчере задач.

Поэтому нам нужно увидеть, каким задачам соответствует API итерации в окончательной среде выполнения.

Для IterativeDataSet, то есть массовой итерации на основе супершагов, Flink генерирует следующие задачи.

  • IterationHeadTask
  • IterationIntermediateTask
  • IterationTailTask
  • IterationSynchronizationSinkTask

5.3.1 IterationHeadTask

Основная функция IterationHeadTask — координировать итерацию.

Он считывает начальный ввод и перебирает хвост, чтобы построить BlockingBackChannel. После успешной обработки ввода он отправляет событие EndOfSuperstep на собственный вывод. Он связывается с задачей синхронизации после каждого супершага и ждет, пока не получит событие AllWorkersDoneEvent для синхронизации. AllWorkersDoneEvent указывает, что все остальные головки завершили свою итерацию.

На следующей итерации выход хвоста на предыдущей итерации передается через бэкканал, формируя вход головы. Когда вводить следующую итерацию, это делает HeadTask. После завершения итерации головка отправит TerminationEvent всем связанным с ней задачам, сообщая им о завершении работы.

				barrier.waitForOtherWorkers();

				if (barrier.terminationSignaled()) {
					requestTermination();
					nextStepKickoff.signalTermination();
				} else {
					incrementIterationCounter();
					String[] globalAggregateNames = barrier.getAggregatorNames();
					Value[] globalAggregates = barrier.getAggregates();
					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
          // 在这里发起下一次Superstep。
					nextStepKickoff.triggerNextSuperstep();
				}
			}

IterationHeadTask создается в JobGraphGenerator.createBulkIterationHead. Пример этого следующий:

"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"

5.3.2 IterationIntermediateTask

IterationIntermediateTask — это задача в середине супершага, которая будет передавать EndOfSuperstepEvent и TerminationEvent всем связанным с ней задачам. Кроме того, IterationIntermediateTask может обновлять состояние итерации рабочего набора или набора решений.

Если состояние итерации обновлено, выходные данные этой задачи будут переданы обратно в IterationHeadTask, и в этом случае эта задача будет снова запланирована как головная.

Пример IterationIntermediateTask выглядит следующим образом:

 "MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   

5.3.3 IterationTailTask

IterationTailTask ​​— конец итерации. Если статус итерации обновлен, выходные данные этой задачи будут отправлены обратно в IterationHeadTask через BlockingBackChannel, а обратная связь с головкой итерации означает завершение итерационной полной логики, после чего замкнутый цикл итерации может быть закрыт. В этом случае задача будет перенесена на экземпляр, где находится голова.

Здесь следует отметить несколько ключевых моментов:

Как связаться с «Хед»

Flink имеет класс BlockingQueueBroker, который представляет собой блокирующий брокер очередей, роль которого заключается в управлении итеративным параллелизмом. Брокер является одноэлементным, и для головной задачи итерации и хвостовой задачи будет сгенерирован один и тот же идентификатор брокера, поэтому головная и хвостовая задачи будут взаимодействовать на основе одного и того же канала данных в одной и той же JVM. dataChannel создается заголовком итерации.

BlockingBackChannel создается в IterationHeadTask, которая представляет собой очередь блокировки с емкостью 1.

// 生成channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); 

// 然后block在这里,等待Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();

IterationTailTask ​​выглядит следующим образом:

// 在基类得到channel,因为是单例,所以会得到同一个
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());

// notify iteration head if responsible for workset update 在这里通知Head
worksetBackChannel.notifyOfEndOfSuperstep();

Оба они используют следующие методы для установления соединения: один и тот же BrokerKey будет использоваться в одной и той же подзадаче, так что начало и конец связаны.

public String brokerKey() {
    if (this.brokerKey == null) {
        int iterationId = this.config.getIterationId();
        this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    return this.brokerKey;
}
Как передать значение, возвращенное пользователем, в Head

Это делается через output.collect.

Во-первых, при инициализации Tail будет сгенерирован outputCollector, и этот outputCollector будет установлен как выходной outputCollector этой задачи. Это гарантирует, что вывод пользовательской функции будет направлен в outputCollector.

Вывод outputCollector — это вывод workspaceBackChannel, для которого здесь задан тот же экземпляр. Таким образом, пользовательский вывод выводится в backChannel.

	@Override
	protected void initialize() throws Exception {
		super.initialize();
    
		// set the last output collector of this task to reflect the iteration tail state update:
		// a) workset update,
		// b) solution set update, or
		// c) merged workset and solution set update

		Collector<OT> outputCollector = null;
		if (isWorksetUpdate) {
      // 生成一个outputCollector
			outputCollector = createWorksetUpdateOutputCollector();

			// we need the WorksetUpdateOutputCollector separately to count the collected elements
			if (isWorksetIteration) {
				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
			}
		}
    
    ......
    // 把outputCollector设置为本task的输出
		setLastOutputCollector(outputCollector);
	}

Результатом outputCollector является выходной буфер workspaceBackChannel, для которого здесь задан тот же экземпляр.

	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
		DataOutputView outputView = worksetBackChannel.getWriteEnd();
		TypeSerializer<OT> serializer = getOutputSerializer();
		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
	}

Время выполнения следующее:

	@Override
	public void run() throws Exception {

		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());

		while (this.running && !terminationRequested()) {

      // 用户在这里输出,最后会输出到output.collect,也就是worksetBackChannel的输出buffer。
			super.run();

      // 这时候以及输出到channel完毕,只是通知head进行读取。
			if (isWorksetUpdate) {
				// notify iteration head if responsible for workset update
				worksetBackChannel.notifyOfEndOfSuperstep();
			} else if (isSolutionSetUpdate) {
				// notify iteration head if responsible for solution set update
				solutionSetUpdateBarrier.notifySolutionSetUpdate();
			}

      ...
	}

Пример IterationTailTask ​​выглядит следующим образом:

"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"

5.3.4 IterationSynchronizationSinkTask

Функция IterationSynchronizationSinkTask заключается в синхронизации всех головок итераций, а IterationSynchronizationSinkTask реализована как выходная задача. Он используется только для координации и не обрабатывает никаких данных.

На каждом супершаге IterationSynchronizationSinkTask просто ждет, пока не получит WorkerDoneEvent от каждой головы. Это означает, что следующий супершаг может начаться.

Здесь следует обратить внимание на то, как SynchronizationSinkTask ожидает headTask для каждой степени параллелизма. Например, параллелизм Flink равен 5, так как же SynchronizationSinkTask ждет этих 5 headTasks.

В IterationSynchronizationSinkTask регистрируется SyncEventHandler для ожидания события WorkerDoneEvent головы.

this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);

В SyncEventHandler мы видим, что во время построения numberOfEventsUntilEndOfSuperstep устанавливается в степень параллелизма.Каждый раз, когда получен WorkerDoneEvent, workerDoneEventCounter увеличивается.Когда он равен numberOfEventsUntilEndOfSuperstep, то есть степени параллелизма, это означает, что в это супершаг, все головные задачи Все удалось.

    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        } else {
          // 每次递增
            ++this.workerDoneEventCounter;
            String[] aggNames = workerDoneEvent.getAggregatorNames();
            Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
            if (aggNames.length != aggregates.length) {
                throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
            } else {
                for(int i = 0; i < aggNames.length; ++i) {
                    Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
                    aggregator.aggregate(aggregates[i]);
                }

              // numberOfEventsUntilEndOfSuperstep就是并行度,等于并行度时候就说明所有head都成功了。
                if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
                    this.endOfSuperstep = true;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

Пример IterationSynchronizationSinkTask выглядит следующим образом:

"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"

5.4 superstep

Подводя итог, мы получаем супершаг следующим образом:

***** 文字描述如下 *****
  
每次迭代都是一个superstep
  每次迭代中有若干subtask在不同的partition上分别执行step
     每个step有一个HeadTask,若干IntermediateTask,一个TailTask
  每个superstep有一个SynchronizationSinkTask
  
***** 伪代码大致如下 *****
  
for maxIter :
  begin superstep
      for maxSubTask :
         begin step
           IterationHeadTask
           IterationIntermediateTask
           IterationIntermediateTask
           ...
           IterationIntermediateTask
           IterationIntermediateTask
           IterationTailTask
         end step
    IterationSynchronizationSinkTask
  end superstep

0x06 Просмотр надмножества в сочетании с кодом KMeans

6.1 Обзор алгоритма K-средних

Процесс алгоритма K-средних, чтобы попытаться не использовать математические символы, поэтому описание не очень строгое, что, вероятно, означает, «вещи сгруппированы по подобному, люди разделены на группы»:

  1. Сначала введите значение k, то есть мы хотим сгруппировать набор данных, чтобы получить k групп.
  2. Случайным образом выберите k точек данных из набора данных в качестве начального старшего брата (центроид, центроид)
  3. Для каждого младшего брата в наборе посчитайте расстояние с каждым старшим братом (значение расстояния будет обсуждаться позже) и решите, какой старший брат ближе всего к старшему брату.
  4. В это время подчиненные каждого старшего брата собирали голоса младших братьев, в это время проводился народный конгресс, и каждая группа избирала нового старшего брата (фактически новый центроид выбирался с помощью алгоритма).
  5. Если расстояние между новым старшим братом и старшим братом меньше определенного порога (указывающего на то, что положение пересчитанного центроида сильно не меняется, стремится к стабильности или сходится), можно считать, что наша кластеризация достигла желаемый результат, Алгоритм завершается.
  6. Если расстояние между новым старшим братом и старшим братом сильно меняется, необходимо повторить 3–5 шагов.

6.2 KMeansPreallocateCentroid

KMeansPreallocateCentroid также является членом superstep, но толькоcontext.getStepNo() == 1Когда будет введена фактическая бизнес-логика, Centroid будет предварительно выделен. Когда супершаг больше 1, эта задача будет выполнена, но не будет вводить определенный бизнес-код.

public class KMeansPreallocateCentroid extends ComputeFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);

    @Override
    public void calc(ComContext context) {
        // 每次superstep都会进到这里
        LOG.info("  KMeansPreallocateCentroid 我每次都会进的呀   ");
        if (context.getStepNo() == 1) {
          // 实际预分配业务只进入一次
        }
    }
}

6.3 KMeansAssignCluster и KMeansUpdateCentroids

Роль KMeansAssignCluster заключается в вычислении ближайшего центра кластера для каждой точки (точки), а также в подсчете и суммировании координат точек каждого центра кластера.

Роль KMeansUpdateCentroids заключается в вычислении новых центров кластеров на основе рассчитанного количества точек и координат.

Alink поддерживает специальный узел на протяжении всего процесса вычисления, чтобы запомнить текущий результат поиска центральной точки.

Вот почему необходимо различать нечетные и четные числа при повторении. Нечетное число означает старшего брата, четное число означает нового старшего брата. Каждый супершаг будет вычислять только одну группу старших братьев, оставляя другую группу старших братьев для сравнения расстояний.

Еще один момент, который следует отметить, заключается в том, что: Обычный итеративный расчет заключается в возврате пользовательских данных в Head-tail, но реализация KMeans здесь не использует этот метод, а сохраняет вычисленные центральные точки в общих переменных и взаимодействует друг с другом между промежуточными звеньями.

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具体业务逻辑代码
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具体业务逻辑代码
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

6.4 KMeansOutputModel

Здесь требуется специальное пояснение, так как KMeansOutputModel является конечной моделью вывода, а реализация алгоритма KMeans такова: все подзадачи имеют все центральные точки, то есть все подзадачи будут иметь одну и ту же модель, поэтому нет необходимости выводить all, поэтому первый здесь ограничен, выводить могут только подзадачи, больше ничего не выводится.

	@Override
	public List <Row> calc(ComContext context) {
    // 只有第一个subtask才输出模型数据。
		if (context.getTaskId() != 0) {
			return null;
		}

    ....
      
		modelData.params = new KMeansTrainModelData.ParamSummary();
		modelData.params.k = k;
		modelData.params.vectorColName = vectorColName;
		modelData.params.distanceType = distanceType;
		modelData.params.vectorSize = vectorSize;
		modelData.params.latitudeColName = latitudeColName;
		modelData.params.longtitudeColName = longtitudeColName;

		RowCollector collector = new RowCollector();
		new KMeansModelDataConverter().save(modelData, collector);
		return collector.getRows();
	}

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0xFF

Разница между несколькими моделями параллельных вычислений (BSP LogP PRAM)

this.apache.org/projects/legal… Кластеризация, K-средние, примеры, детали

Flink-Gelly: итеративная обработка графов

От модели BSP к Apache Hama

Итеративная операция Flink DataSet

Разница между несколькими моделями параллельных вычислений (BSP LogP PRAM)

Архитектура Flink, исходный код и отладка

Введение в поток данных, задачу, подзадачу, цепочки операторов и слот Flink

Задачи Flink и планирование

Создание графа заданий в среде выполнения Flink