[Alink Talk 3] Модель связи AllReduce

машинное обучение

Раньше информация была неверная, и ее разделили на две статьи и выпустили, а теперь публикуют полностью.

0x00 сводка

Alink — это платформа алгоритмов машинного обучения нового поколения, разработанная Alibaba на основе вычислительного движка реального времени Flink.Это первая в отрасли платформа машинного обучения, которая поддерживает как пакетные, так и потоковые алгоритмы. В этой статье вы проанализируете реализацию AllReduce, коммуникационной модели в Alink. AllReduce широко используется в Alink, например KMeans, LDA, Word2Vec, GD, lbfgs, метод Ньютона, owlqn, SGD, Gbdt, random forest — все используют эту модель связи.

Поскольку общедоступная информация Alink слишком мала, нижеследующее - все предположения, и обязательно будут упущения и ошибки. Я надеюсь, что все укажут, и я обновлю их в любое время.

0x01 Что такое MPI

MPI (Message-Passing Interface) — это межъязыковый коммуникационный протокол для написания параллельных вычислений, поддерживающий двухточечные и широковещательные передачи.

Целями MPI являются высокая производительность, масштабируемость и переносимость. MPI по-прежнему остается доминирующей моделью для высокопроизводительных вычислений.

Его характеристики

  • Разделенное адресное пространство. Каждый поток может читать нелокальные данные, только вызывая API. Все взаимодействия (нелокальная память) должны выполняться совместно (рукопожатие).

  • Поддерживает только явное распараллеливание Поддерживается только явное распараллеливание, и пользователь должен явно указать способ передачи сообщений.

AllReduce — это базовый примитив, предоставляемый MPI.Нам нужно понять сокращение, прежде чем мы сможем лучше понять AllReduce.

  • Функция сокращения MPI_Reduce: сокращение — это классическая концепция функционального программирования. Он принимает одну и ту же переменную каждого процесса в коммуникаторе для участия в вычислении редукции и выводит результат вычисления в указанный процесс. Например, пакет данных делится на более мелкие пакеты данных с помощью функции. Или уменьшить элемент массива до числа с помощью функции сложения.
  • Сокращение и широковещательная функция MPI_Allreduce: на основе спецификации расчета распределите результаты расчета по каждому процессу. Например, после того, как функция получает значение результата редукции, она распределяет значение результата каждому процессу, чтобы все значения процесса параллельно могли знать значение результата.

Одно различие между MPI_Allreduce и MPI_Reduce заключается в том, что функция MPI_Reduce передает окончательный результат только процессу с указанным номером dest_process, в то время как функция MPI_Allreduce может передавать результат всем процессам, поэтому все процессы могут получить результат. Таким образом, в прототипе функции MPI_Allreduce не требуется указывать номер целевого процесса.

0x02 Alink реализует идею MPI

AllReduce широко используется в Alink, например KMeans, LDA, Word2Vec, GD, lbfgs, метод Ньютона, owlqn, SGD, Gbdt, random forest — все используют эту модель связи.

AllReduce играет ключевую роль в реализации алгоритма, то есть принудительно прерывает исходные параллельные задачи, выполняющиеся последовательно, суммирует и распределяет результаты вычислений и позволяет продолжить последовательное выполнение. Это немного похоже на Barrier в параллелизме, с которым все знакомы.

Сравнивая родной алгоритм Flink KMeans, мы видим, что AllReduce соответствуетgroupBy(0).reduce. Операция groupBy может быть выполнена только после того, как будут сгенерированы все данные.

	DataSet<Centroid> newCentroids = points
		// compute closest centroid for each point
		.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
		// count and sum point coordinates for each centroid
		.map(new CountAppender())
        // 这里如果是Alink,就对应了AllReduce
		.groupBy(0).reduce(new CentroidAccumulator())
		// compute new centroids from point counts and coordinate sums
		.map(new CentroidAverager());

Из аннотаций AllReduce хорошо видна идея реализации MPI в Alink.

 * An implement of {@link CommunicateFunction} that do the AllReduce.
 *
 * AllReduce is a communication primitive widely used in MPI. In our implementation, all workers do reduce on a partition of the whole data and they all get the final reduce result.
 *
 * There're mainly three stages:
 *   1. All workers send the there partial data to other workers for reduce.
 *   2. All workers do reduce on all data it received and then send partial results to others.
 *   3. All workers merge partial results into final result and put it into session context with pre-defined object name.
 */

Перевод выглядит следующим образом:

所有的workers都在部分数据上做reduce操作,所有的workers都可以获取到reduce最终结果
    
主要有三个阶段:
1. 所有workers给其他workers发送需要reduce的部分数据
2. 所有workers在它收到的数据上做reduce,然后把这个部分reduce的结果发送给其他workers
3. 所有workers把部分reduce的结果合并成为最终结果,然后放入预定义的session 上下文变量中

«На бумаге это поверхностно, и я не знаю, должен ли я это делать».

Чтобы реализовать AllReduce, Alink проделала большую работу за кулисами, давайте проанализируем их один за другим.

0x03 Как добиться совместного использования

Совместное использование является первоочередной задачей для реализации AllReduce, поскольку в процессе слияния/трансляции требуются метаданные, а также ввод и вывод, а реализацию можно значительно упростить, если есть общие переменные. Давайте посмотрим, как Alink реализует совместное использование через диспетчер задач.

1. Понятия, связанные с задачей

  • Task(Задача): Задача — это набор нескольких подзадач с одной и той же функцией на этапе, аналогичный набору задач в Spark.
  • subTask(Подзадача): подзадача — это наименьшая исполнительная единица задачи во Flink. Это экземпляр класса Java. Этот класс Java имеет свойства и методы для выполнения определенной логики вычислений.
  • оптимизация цепи: Само собой разумеется, что экземпляр параллелизма каждого оператора является подзадачей. Затем это приносит много проблем, потому что, когда диспетчер задач flink запускает задачи, каждая задача использует отдельный поток, что приведет к большому количеству накладных расходов на переключение потоков, что повлияет на пропускную способность. Чтобы облегчить эту ситуацию, оптимизируется flink, то есть цепные операции выполняются над подзадачами, а задачи, полученные после выполнения цепных операций, используются как блок выполнения планирования и выполняются в потоке.
  • Operator Chains(Цепочка операторов): Flink объединяет несколько подзадач в одну задачу (задачу), этот процесс называется цепочкой операторов, и каждая задача выполняется потоком. Используя цепочки операторов, несколько отдельных подзадач могут быть объединены в одну задачу. Подобно конвейеру в Spark.
  • Slot(Слот): единица, которая изолирует вычислительные ресурсы в Flink. В слоте может выполняться несколько подзадач, но эти подзадачи должны быть подзадачами из разных этапов одного и того же приложения. В результате каждый слот может выполнять весь конвейер задания.

Программы во Flink изначально параллельны. Во время выполнения каждый оператор (преобразование) имеет одну или несколько подзадач оператора (подзадача оператора), причем подзадачи каждого оператора независимы друг от друга и выполняются в разных потоках и могут выполняться в разных исполняемых машинах или контейнерах.

Одно и то же приложение, несколько подзадач разных задач могут выполняться в одном слоте ресурсов слота. Несколько подзадач в одной задаче не могут выполняться в одном слоте ресурсов, они могут быть распределены по другим слотам ресурсов. Соответствующее обратное: несколько параллельных экземпляров AllReduceSend не могут работать в одном и том же слоте.

2. TaskManager

Каждый TaskManager в Flink — это процесс JVM, который может выполнять одну или несколько подзадач в отдельных потоках. TaskManager эквивалентен подчиненному узлу всего кластера, отвечающему за выполнение конкретных задач и использование ресурсов, а также управление соответствующими задачами на каждом узле.

Чтобы изолировать ресурсы и увеличить количество разрешенных задач, TaskManager вводит понятие слота, Изоляция ресурсов в этом слоте предназначена только для изоляции памяти, и стратегия заключается в равном распределении. TaskManager имеет по крайней мере один слот. Если TM имеет N слотов, размер памяти, выделенной для каждого слота, составляет 1/N от всей памяти TM.Слоты в одном и том же TM изолированы только памятью, а ЦП является общим.

Клиент компилирует и упаковывает написанное приложение Flink и отправляет его в JobManager, а затем JobManager назначит задачу узлу TaskManager с ресурсами в соответствии с ситуацией с ресурсами TaskManager, зарегистрированной в JobManager, а затем запустит и запустит задачу. .

TaskManager получает задачи для развертывания от JobManager, а затем использует ресурс Slot для запуска задачи, установления сетевого подключения для доступа к данным, получения данных и запуска обработки данных. При этом взаимодействие данных между диспетчерами задач осуществляется через поток данных.

Операция задачи Flink на самом деле многопоточная, и TaskManager (TM) выполняет несколько задач одновременно в нескольких потоках. Это сильно отличается от того, как MapReduce использует несколько JVM. Flink может значительно повысить эффективность использования ЦП. Системные ресурсы распределяются между несколькими задачами и задачами через TaskSlot. Каждый TaskManager управляет несколькими пулами ресурсов TaskSlot. Эффективно управляйте ресурсами.

Соответствующее обратное: несколько параллельных экземпляров AllReduceSend, работающих в середине TaskManager, будут совместно использовать все статические переменные в TaskManager.

3. Совместное использование состояния

Alink реализует совместное использование переменных с помощью статических переменных диспетчера задач. Некоторые из основных классов и концепций являются более сложными. Объясняем сверху вниз и видим, что по мере продвижения сверху вниз требуемые признаки и состояния постепенно увеличиваются.

3.1 Анализ концепции

Иерархия вызовов сверху вниз выглядит следующим образом:

Алгоритмическая перспектива: ComContext

Вызовы пользовательского кода: context.getObj(bufferName); Это идеально для пользователя, потому что пользователь знает имя переменной и может получить к ней доступ через контекст.

Но ComContext нужно знать больше, например, ему также нужно знать соответствующий sessioin и taskID, что будет объяснено ниже.

ComContext вызывается следующим образом: SessionSharedObjs.put(objName, sessionId, taskId, obj);

Перспектива фреймворка: IterativeComQueue

IterativeComQueue — это фреймворк. Принимая Kmeans в качестве примера, алгоритм Kmeans соответствует нескольким IterativeComQueues.

В IterativeComQueue есть много функций вычисления/коммуникации.Каждая функция должна знать, к какой IterativeComQueue она принадлежит, как взаимодействовать с другими функциями в этой очереди, и ее нельзя путать с другими очередями. Таким образом, должна быть концепция, указывающая на эту очередь. Итак, существует следующая концепция сеанса.

Угол сеанса: SessionSharedObjs

Чтобы различать каждую очередь IterativeComQueue, была создана концепция сеанса. Таким образом, все функции вычисления/коммуникации в IterativeComQueue будут привязаны к одному и тому же идентификатору сеанса, и все функции в одной и той же IterativeComQueue смогут обмениваться данными.

IterativeComQueue соответствует сеансу, поэтому соответствует переменной, к которой этот сеанс может получить доступ.

SessionSharedObjs содержит статические переменные-члены:

  • int sessionId = 0; Инкрементальный флаг, используемый для различения сеансов.
  • HashMap, Long> key2Handle. Отображение, указывающее, что имя переменной в сеансе соответствует дескриптору переменной.

Обычно «переменная с именем» соответствует «дескриптору переменной». То есть имя переменной в сеансе соответствует дескриптору переменной. Однако во Flink будет несколько подзадач, работающих параллельно, поэтому необходима новая концепция для обозначения переменной, соответствующей подзадаче, которая должна быть связана с идентификатором задачи. Итак, существует следующая концепция состояния.

SessionSharedObjs вызывает: IterTaskObjKeeper.put(handle, taskId, obj);

Угол подзадачи: IterTaskObjKeeper

Вот использование статических переменных для совместного использования. Является экземпляром общей переменной, к которой могут получить доступ все задачи (потоки) в диспетчере задач.

IterTaskObjKeeper содержит статические переменные-члены:

  • длинный дескриптор = 0L; знак приращения, используемый для различения состояния.
  • Map <Tuple2.of(handle, taskId), состояние> состояния; является картой. То есть, какой дескриптор состояния переменной представляет, а представляет экземпляр состояния, соответствующий «какой задаче» в этой переменной, которая является подзадачей для подзадач.

Во Flink алгоритм работает параллельно с несколькими подзадачами. Если есть только один дескриптор, то несколько подзадач будут обращаться вместе, и возникнут различные проблемы многопоточной работы, с которыми все знакомы. Поэтому Alink здесь разбивает дескриптор на несколько состояний. С точки зрения подзадачи каждое состояние использует taskId>Чтобы быть однозначно отмеченным.

Подводя итог, для одного и того же имени переменной общее состояние, соответствующее каждой подзадаче, фактически независимо, и все не мешают друг другу. Совместное использование — это фактически совместное использование между операторами, выполняющими эту подзадачу.

3.2 Анализ экземпляра переменной

Из фактической реализации переменных мы можем получить более четкое представление.

// 能看出来 session 0 中,centroidAllReduce这个变量 对应的handle是 7
SessionSharedObjs.key2Handle = {HashMap@10480}  size = 9
 {Tuple2@10492} "(initCentroid,0)" -> {Long@10493} 1
 {Tuple2@10494} "(statistics,0)" -> {Long@10495} 2
 {Tuple2@10496} "(362158a2-588b-429f-b848-c901a1e15e17,0)" -> {Long@10497} 8
 {Tuple2@10498} "(k,0)" -> {Long@10499} 6
 {Tuple2@10500} "(centroidAllReduce,0)" -> {Long@10501} 7 // 这里就是所说的
 {Tuple2@10502} "(trainData,0)" -> {Long@10503} 0
 {Tuple2@10504} "(vectorSize,0)" -> {Long@10505} 3
 {Tuple2@10506} "(centroid2,0)" -> {Long@10507} 5
 {Tuple2@10508} "(centroid1,0)" -> {Long@10509} 4

// 下面能看出来,handle 7 这一种变量,因为有 4 个subtask,所以细分为4个state。 
 com.alibaba.alink.common.comqueue.IterTaskObjKeeper.states = {HashMap@10520}  size = 36
 {Tuple2@10571} "(7,0)" -> {double[15]@10572} 
 {Tuple2@10573} "(7,1)" -> {double[15]@10574} 
 {Tuple2@10577} "(7,2)" -> {double[15]@10578} 
 {Tuple2@10581} "(7,3)" -> {double[15]@10582} 

 {Tuple2@10575} "(5,0)" -> {Tuple2@10576} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@29a72fbb)"
 {Tuple2@10579} "(5,1)" -> {Tuple2@10580} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@26c52354)"
 {Tuple2@10585} "(5,2)" -> {Tuple2@10586} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@7c6ed779)"
 {Tuple2@10588} "(5,3)" -> {Tuple2@10589} "(10,com.alibaba.alink.operator.common.distance.FastDistanceMatrixData@154b8a4d)"

Давайте объединим код и проанализируем задействованные классы один за другим.

3.3 ComContext

ComContext — это класс верхнего уровня, используемый для получения информации о времени выполнения и общих переменных. Все функции вычислений/коммуникаций в IterativeComQueue (BaseComQueue) обращаются к общим переменным через ComContext. Например:

public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {

    // 每一个BaseComQueue都会得到唯一一个sessionId。
    private final int sessionId = SessionSharedObjs.getNewSessionId();

    int taskId = getRuntimeContext().getIndexOfThisSubtask();

    public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
        // 获取到了一个ComContext
        ComContext context = new ComContext(sessionId, getIterationRuntimeContext());
        if (getIterationRuntimeContext().getSuperstepNumber() == maxIter || criterion) {
            // 利用ComContext继续访问共享变量
            List<Row> model = completeResult.calc(context);
        }
    }
}

// 用户类似这么调用

double[] sendBuf = context.getObj(bufferName);

Можно видеть, что ComContext — это концепция контекста верхнего уровня, которую должны видеть пользователи. taskId, sessionId — это ключи для использования.

  • sessionId — это статическая переменная-член класса, определенная в SessionSharedObjs, которая автоматически увеличивается. Каждая BaseComQueue получит уникальный идентификатор sessionId, то есть очередь поддерживает уникальный сеанс. Таким образом, ComContexts, сгенерированные в BaseComQueue, имеют один и тот же идентификатор сеанса.
  • taskId получается из среды выполнения.
/**
 * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
 */
@Internal
public class TaskInfo {
	/**
	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
	 *
	 * @return The index of the parallel subtask.
	 */
	public int getIndexOfThisSubtask() {
		return this.indexOfSubtask; // 这里获取taskId
	}
}

Конкретный класс ComContext определяется следующим образом

/**
 * Context used in BaseComQueue to access basic runtime information and shared objects.
 */
public class ComContext {
	private final int taskId;
	private final int numTask;
	private final int stepNo;
	private final int sessionId;

	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber();
	}
    
	/**
	 * Put an object into shared objects for access of other QueueItem of the same taskId.
	 *
	 * @param objName object name
	 * @param obj     object itself.
	 */
	public void putObj(String objName, Object obj) {
		SessionSharedObjs.put(objName, sessionId, taskId, obj);
	}
}

// 比如具体举例如下
this = {ComContext@10578} 
 taskId = 4
 numTask = 8
 stepNo = 1
 sessionId = 0

3.4 SessionSharedObjs

SessionSharedObjs — это класс следующего уровня, который поддерживает общие объекты сеанса. Это совместное использование сеанса осуществляется через sessionId.

SessionSharedObjs поддерживает статическую переменную класса sessionId, которая отличает каждый сеанс.

Ядром SessionSharedObjs являетсяHashMap<Tuple2<String, Integer>, Long> key2Handle . То есть сопоставление ---> .

IterativeComQueue соответствует сеансу, поэтому соответствует переменной, к которой может обращаться эта очередь IterativeComQueue.Обычно имеется дескриптор переменной.

Но потому чтоIterativeComQueue будет выполняться параллельно несколькими подзадачами, поэтому для взаимного исключения и дифференциации каждый дескриптор подразделяется на несколько состояний, и каждое состояние уникально помечается . будет упомянуто ниже.

/**
 * An static class that manage shared objects for {@link BaseComQueue}s.
 */
class SessionSharedObjs implements Serializable {
	private static HashMap<Tuple2<String, Integer>, Long> key2Handle = new HashMap<>();
	private static int sessionId = 0;
	private static ReadWriteLock rwlock = new ReentrantReadWriteLock();
    
	/**
	 * Get a new session id.
	 * All access operation should bind with a session id. This id is usually shared among compute/communicate function of an {@link IterativeComQueue}.
	 *
	 * @return new session id.
	 */
	synchronized static int getNewSessionId() {
		return sessionId++;
	}    
    
	static void put(String objName, int session, int taskId, Object obj) {
		rwlock.writeLock().lock();
		try {
			Long handle = key2Handle.get(Tuple2.of(objName, session));
			if (handle == null) {
				handle = IterTaskObjKeeper.getNewHandle();
				key2Handle.put(Tuple2.of(objName, session), handle);
			}
      // 这里进行调用。taskId也是辨识关键。
			IterTaskObjKeeper.put(handle, taskId, obj);
		} finally {
			rwlock.writeLock().unlock();
		}
	}    
}

3.5 IterTaskObjKeeper

Это общий класс самого низкого уровня и статический экземпляр в куче памяти процесса диспетчера задач. Все задачи (потоки) диспетчера задач могут быть общими.

Глядя на исходный код, можно увидеть, что IterTaskObjKeeper используется во всей JVM через статическую переменную состояния. Конкретное содержимое определяется дескриптором и идентификатором задачи.

IterTaskObjKeeper поддерживает приращения дескриптора как уникальный вид идентификатора для «переменного состояния».

Используйте в качестве уникального идентификатора «переменного состояния». Это переменная, совместно используемая всеми в памяти кучи процесса диспетчера задач.

То есть, какой тип дескриптора состояния переменной представляет, а представляет, какая переменная какой задачи соответствует этой переменной. Это подразделение задач.

/**
 * A 'state' is an object in the heap memory of task manager process,
 * shared across all tasks (threads) in the task manager.

 * Note that the 'state' is shared by all tasks on the same task manager,
 * users should guarantee that no two tasks modify a 'state' at the same time.

 * A 'state' is identified by 'handle' and 'taskId'.
 */
public class IterTaskObjKeeper implements Serializable {
	private static Map <Tuple2 <Long, Integer>, Object> states;

	/**
	 * A 'handle' is a unique identifier of a state.
	 */
	private static long handle = 0L;

	private static ReadWriteLock rwlock = new ReentrantReadWriteLock();

	static {
		states = new HashMap <>();
	}

	/**
	 * @note Should get a new handle on the client side and pass it to transformers.
	 */
	synchronized public static long getNewHandle() {
		return handle++;
	}

	public static void put(long handle, int taskId, Object state) {
		rwlock.writeLock().lock();
		try {
			states.put(Tuple2.of(handle, taskId), state); 
		} finally {
			rwlock.writeLock().unlock();
		}
	}
}

0x04 Пример кода

Наш пример кода по-прежнему выглядит следующим образом.

Вызов KMeansTrainBatchOp

	static DataSet <Row> iterateICQ(...省略...) {
		return new IterativeComQueue()
			.initWithPartitionedData(TRAIN_DATA, data)
			.initWithBroadcastData(INIT_CENTROID, initCentroid)
			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
			.add(new KMeansPreallocateCentroid())
			.add(new KMeansAssignCluster(distance))
			.add(new AllReduce(CENTROID_ALL_REDUCE))
			.add(new KMeansUpdateCentroids(distance))
			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
			.setMaxIter(maxIter)
			.exec();
	}

Реализация AllReduce

Основной код AllReduce от Alink выглядит следующим образом:

public static <T> DataSet <T> allReduce(
    return input
		.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
		.withBroadcastSet(input, "barrier")
		.returns(
			new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
		.name("AllReduceSend")
		.partitionCustom(new Partitioner <Integer>() {
			@Override
			public int partition(Integer key, int numPartitions) {
				return key;
			}
		}, 0)
		.name("AllReduceBroadcastRaw")
		.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
		.returns(
			new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
		.name("AllReduceSum")
		.partitionCustom(new Partitioner <Integer>() {
			@Override
			public int partition(Integer key, int numPartitions) {
				return key;
			}
		}, 0)
		.name("AllReduceBroadcastSum")
		.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
		.returns(input.getType())
		.name("AllReduceRecv");
}

0x05 Реализация AllReduce

В сочетании с приведенным выше кодом мы сначала резюмируем процесс использования AllReduce следующим образом.

  • KMeansAssignCluster: найти ближайший кластер для каждой точки и вычислить суммы точек, принадлежащих к одному кластеру. Затем запишите рассчитанный вами кластер в CENTROID_ALL_REDUCE вашего диспетчера задач.

  • Каждый AllReduceSend берет ранее сохраненный кластер из CENTROID_ALL_REDUCE своего диспетчера задач (кластер, полученный каждым AllReduceSend, виден только сам по себе), а затем отправляет его нижестоящей задаче. При отправке решите, какие задачи следует отправлять, исходя из «индекса нисходящей задачи и объема данных». Здесь следует отметить, что какая часть переменной отправляется в какую задачу, рассчитывается на основе индекса задачи и объема данных этой задачи. Этот механизм расчета (как его вычислить в коде, а часть его отправляется вместе с данными в качестве метаинформации) позже повторно используется AllReduceRecv.

  • Каждый AllReduceSum получает кластер, отправленный AllReduceSend, вычисляет сумму и затем отправляет результат вычисления. Каждый AllReduceSum единообразно отправляет рассчитанные и суммированные данные каждой нижестоящей задаче.

  • Каждый AllReduceRecv получает (суммированные) кластеры, отправленные всеми AllReduceSums. Хранится в общей переменной CENTROID_ALL_REDUCE. В частности, повторно используйте вычислительный механизм AllReduceSend, чтобы при сохранении общих переменных они не конфликтовали друг с другом. Это можно понимать как операцию слияния: например, есть 5 AllReduce, и данные каждого AllReduce отправляются в 5 AllReduceRecv. индекс подзадачи.Однако эти 5 фрагментов данных записываются в состоянии и указываются в информации метаданных, и не будет конфликта записи друг с другом, так что каждый AllReduceRecv имеет все 5 фрагментов данных.

  • KMeansUpdateCentroids: удалите переменную CENTROID_ALL_REDUCE, затем обновите центроиды на основе суммы точек и номера точки, принадлежащих одному и тому же кластеру.

1. KMeansAssignCluster

Роль этого класса заключается в вычислении ближайшего центра кластера для каждой точки, а также в подсчете и суммировании координат точек для каждого центра кластера.

Мы видим, что KMeansAssignCluster сохраняет CENTROID_ALL_REDUCE через ComContext для последующего использования AllReduce. Если имеется 5 KMeansAssignCluster, их расчетные результаты обычно отличаются. Хотя сохраняется одно и то же имя переменной CENTROID_ALL_REDUCE, ее состояние отличается.

Поскольку эти 5 KMeansAssignClusters должны соответствовать 5 подзадачам, их в общих переменных должны быть разными, соответствующими разным состояниям, поэтому они хранятся отдельно.

// Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
public class KMeansAssignCluster extends ComputeFunction {
        // 存取共享变量
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
        if (sumMatrixData == null) {
            sumMatrixData = new double[k * (vectorSize + 1)];
            context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
        }  
    
        for (FastDistanceVectorData sample : trainData) {
            // Find the closest centroid from centroids for sample, and add the sample to sumMatrix.
            KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance, distanceMatrix);
        }    
}

// 程序中各个变量如下

sample = {FastDistanceVectorData@13274} 
 vector = {DenseVector@13281} "6.3 2.5 4.9 1.5"
 label = {DenseVector@13282} "72.2"
 rows = {Row[1]@13283} 

// 这个就是共享变量。4维向量 + 1 weight ---> 都是"sample和"。
sumMatrixData = {double[15]@10574} 
 0 = 23.6
 1 = 14.9
 2 = 8.7
 3 = 1.7000000000000002
 4 = 5.0
 5 = 52.400000000000006
 6 = 25.1
 7 = 39.699999999999996
 8 = 13.299999999999999
 9 = 9.0
 10 = 33.0
 11 = 16.9
 12 = 28.900000000000002
 13 = 11.4
 14 = 5.0
     
trainData = {ArrayList@10580}  size = 19
 0 = {FastDistanceVectorData@10590} 
  vector = {DenseVector@10595} "7.7 3.8 6.7 2.2"
   data = {double[4]@10601} 
    0 = 7.7
    1 = 3.8
    2 = 6.7
    3 = 2.2
  label = {DenseVector@10596} "123.46000000000001"
  rows = {Row[1]@10597} 
 1 = {FastDistanceVectorData@10603} 
  vector = {DenseVector@10623} "5.7 2.8 4.1 1.3"
  label = {DenseVector@10624} "58.83"
  rows = {Row[1]@10625} 
 2 = {FastDistanceVectorData@10604} 
 3 = {FastDistanceVectorData@10605} 
......
 17 = {FastDistanceVectorData@10619} 
 18 = {FastDistanceVectorData@10620} 
  vector = {DenseVector@10654} "6.5 3.0 5.2 2.0"
  label = {DenseVector@10655} "82.29"
  rows = {Row[1]@10656}      

2. AllReduceSend

Здесь код нужно снова вытащить, в основном потому, что есть withBroadcastSet. Его роль:

  • Это можно понимать как общедоступную общую переменную.Мы можем транслировать набор данных набора данных, а затем разные задачи могут получить его на узле.На каждом узле существует только одна копия этих данных.
  • Если широковещательная рассылка не используется, набор данных набора данных необходимо копировать в каждой задаче на каждом узле, что является пустой тратой памяти (то есть в узле может быть несколько копий данных набора данных).
		return input
			.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
			.withBroadcastSet(input, "barrier")

KMeansAssignCluster добавит данные в переменную контекста centroidAllReduce. Таким образом, AllReduce фактически ожидает эту переменную.

Первый шаг AllReduce — взять общую переменную из контекста и отправить ее. Эта часть кода выполняется AllReduceSend.

Для каждой задачи AllReduceSend имя_буфера равно centroidAllReduce.

Поскольку каждый AllReduceSend также соответствует другой задаче, centroidAllReduce, прочитанный каждым AllReduceSend, должен быть другим, поэтому sendBuf, полученный каждой задачей, отличается. Они извлекают состояние «centroidAllReduce», соответствующее их , и отправляют их в нисходящий поток.

Когда AllReduceSend отправляется в свой нижестоящий поток, он отправляется в каждую задачу на основе порядкового номера подзадачи, то есть в каждую задачу будет отправлена ​​общая переменная, полученная в этой задаче, но какая часть переменной отправляется в какую задача основана на том, что рассчитывается индекс задачи и объем данных задачи. Если объем данных небольшой, их можно отправить только одной или нескольким задачам.

Идентификатор задачи в последующем — это идентификатор всех подзадач.

Среди них, как рассчитать, сколько отправить, на какую задачу делается в DefaultDistributedInfo. Это должно быть объединено с функцией частей для анализа. Следует отметить, что: AllReduceSend отправляется так, и AllReduceRecv также принимается в соответствии с этой процедурой. Таким образом, AllReduceRecv может объединяться.

AllReduceSend这么发送,AllReduceRecv后面也按照这个套路接受

int pieces = pieces(sendLen);//表示本人这次send的数据分成几片,比如分成50片。每片大小是TRANSFER_BUFFER_SIZE

// 将要发给 8 个 subtask
for (int i = 0; i < numOfSubTasks; ++i) {
      // 假如第5个subtask,那么它发送的起始位置就是50/8 * 4
      int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces);
      // 给第5个subtask发送多少片
      int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces);

Конкретный код выглядит следующим образом:

	private static int pieces(int len) {
		int div = len / TRANSFER_BUFFER_SIZE; //本人这次send的数据分成几片,每片大小是TRANSFER_BUFFER_SIZE
		int mod = len % TRANSFER_BUFFER_SIZE;

		return mod == 0 ? div : div + 1;
	}

public class DefaultDistributedInfo implements DistributedInfo {

	public long startPos(long taskId, long parallelism, long globalRowCnt) {
		long div = globalRowCnt / parallelism;
		long mod = globalRowCnt % parallelism;

		if (mod == 0) {
			return div * taskId;
		} else if (taskId >= mod) {
			return div * taskId + mod;
		} else {
			return div * taskId + taskId;
		}
	}
    
	public long localRowCnt(long taskId, long parallelism, long globalRowCnt) {
		long div = globalRowCnt / parallelism;
		long mod = globalRowCnt % parallelism;

		if (mod == 0) {
			return div;
		} else if (taskId >= mod) {
			return div;
		} else {
			return div + 1;
		}
	}     
}

Конкретный код AllReduceSend выглядит следующим образом и подробно описан в комментариях.

// 这里是变量名字定义。	
public static final String CENTROID_ALL_REDUCE = "centroidAllReduce";

private static class AllReduceSend<T> extends RichMapPartitionFunction <T, Tuple3 <Integer, Integer, double[]>> {
        
    	int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
		// 与并行度相关,每个task都会执行相同操作
		// bufferName都是 centroidAllReduce,每个task获取的sendBuf都不一样
    
        // 计算怎么发送所需要的数据结构
    	int pieces = pieces(sendLen);
    	DistributedInfo distributedInfo = new DefaultDistributedInfo();

        // 从上下文中获取需要传送的数据
		double[] sendBuf = context.getObj(bufferName);
        
			int agg = 0;
    		// 可以看出来,是把需要传送的数据给每个task都发送。当然这个发送是根据发送数据的大小来确定的,如果数据量小,可能就只给一个或者几个task发送。
			for (int i = 0; i < numOfSubTasks; ++i) {
                // startPos : 具体发送变量的那一部分,是依据task index来决定的。
                // cnt : 具体哪一个下游 task i 发送多少数据由此决定,如果是0,就不给task i发送数据。
				int startPos = (int) distributedInfo.startPos(i, numOfSubTasks, pieces);
				int cnt = (int) distributedInfo.localRowCnt(i, numOfSubTasks, pieces);

				for (int j = 0; j < cnt; ++j) {
                    // 发送哪一个部分
					int bufStart = (startPos + j) * TRANSFER_BUFFER_SIZE;
					// the last
					if (startPos + j == pieces - 1) {
						System.arraycopy(sendBuf, bufStart, transBuf, 0, lastLen(sendLen));
					} else {
						System.arraycopy(sendBuf, bufStart, transBuf, 0, TRANSFER_BUFFER_SIZE);
					}
					agg++;
                    
          // i 是subTasks的index,startPos + j是buffer内的位置,后续分区实际就是按照这个 i 来分区的。本AllReduceSend就是发送到numOfSubTasks这些task中。
					out.collect(Tuple3.of(i, startPos + j, transBuf));
				}
			}
}

	private static int pieces(int len) {
		int div = len / TRANSFER_BUFFER_SIZE; // 4096
		int mod = len % TRANSFER_BUFFER_SIZE;
		return mod == 0 ? div : div + 1;
	}

sendBuf = {double[15]@10602} 
 0 = 40.3
 1 = 18.200000000000003
 2 = 33.6
 3 = 12.5
 4 = 6.0
 5 = 45.3
 6 = 30.599999999999998
 7 = 12.4
 8 = 2.0
 9 = 9.0
 10 = 24.0
 11 = 10.4
 12 = 17.1
 13 = 5.199999999999999
 14 = 4.0

this = {AllReduce$AllReduceSend@10598} 
 bufferName = "centroidAllReduce"
 lengthName = null
 transferBufferName = "3dfb2aae-683d-4497-91fc-30b8d6853bce"
 sessionId = 0
 runtimeContext = {AbstractIterativeTask$IterativeRuntimeUdfContext@10606}       

3. AllReduceBroadcastRaw

Когда AllReduceSend отправляет переменные в нисходящий поток, используется настраиваемый раздел (partitionCustom). Он использует индекс подзадачи в качестве ключевого раздела. Это соответствует out.collect для AllReduceSend.

			.partitionCustom(new Partitioner <Integer>() {
				@Override
				public int partition(Integer key, int numPartitions) {
					return key;
				}
			}, 0)
			.name("AllReduceBroadcastRaw")
               
// 调用到这个partition函数的调用栈
                
partition:102, AllReduce$2 (com.alibaba.alink.common.comqueue.communication)
partition:99, AllReduce$2 (com.alibaba.alink.common.comqueue.communication)
customPartition:235, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:149, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:120, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
mapPartition:257, AllReduce$AllReduceSend (com.alibaba.alink.common.comqueue.communication)
run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:705, Task (org.apache.flink.runtime.taskmanager)
run:530, Task (org.apache.flink.runtime.taskmanager)
run:745, Thread (java.lang)                  
                
                 
 // @AllReduceSend.mapPartition 这里开始调用   
 for (int i = 0; i < numOfSubTasks; ++i) {   
     // i 是subTasks的index,后续分区实际就是按照这个 i 来分区的。本AllReduceSend就是发送到numOfSubTasks这些task中。
	 out.collect(Tuple3.of(i, startPos + j, transBuf));     
 }
                
 // 从后续调用序列可以看出来,最终是用 index of subtask 来作为key分区。    

// 这里发送record

 public class CountingCollector<OUT> implements Collector<OUT> {
	public void collect(OUT record) {
		this.numRecordsOut.inc();
		this.collector.collect(record);
	}     
 }
             
 record = {Tuple3@10586} "(0,0,[40.50000000000001, 18.7, 33.300000000000004, 12.8, 6.0, 29.7, 21.0, 8.4, 1.7, 6.0, 48.1, 22.199999999999996, 36.0, 12.200000000000001, 8.0, 0.0,"
 f0 = {Integer@10583} 0
 f1 = {Integer@10583} 0
 f2 = {double[4096]@10598}                
       
// 这里开始分区

public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
	private int customPartition(T record, int numberOfChannels) {
		if (extractedKeys == null) {
			extractedKeys = new Object[1];
		}

		if (comparator.extractKeys(record, extractedKeys, 0) == 1) {
            // 所以 key 是 0
			final Object key = extractedKeys[0];
			return partitioner.partition(key, numberOfChannels);
		}            
	}    
}

public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
	public int extractKeys(Object record, Object[] target, int index) {
		int localIndex = index;
		for(int i = 0; i < comparators.length; i++) {
			localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex);
		}
		return localIndex - index;
	}    
}

// 就是取出第一个field的数值

key = {Integer@10583} 0
 value = 0
    
extractedKeys = {Object[1]@10587} 
 0 = {Integer@10583} 0
  value = 0

4. AllReduceSum

Все рабочие процессы сокращают данные, которые они получают, а затем отправляют частичные результаты этого частичного сокращения другим рабочим процессам.

Частичные результаты связаны с тем, что данные, полученные каждой задачей, различаются, а восходящий поток вычисляет позицию в соответствии с индексом задачи и отправляет ее.

Но результат вычисления AllReduceSum будет отправлен в индекс каждой нижестоящей задачи.

private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> {
    
    	public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,Collector <Tuple3 <Integer, Integer, double[]>> out) {
            
            // 这时候虽然也用到了context取出了sendBuf,但是只是用来获取其长度而已。
    		int taskId = getRuntimeContext().getIndexOfThisSubtask();
			int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();

			double[] sendBuf = context.getObj(bufferName);
			int sendLen = lengthName != null ? context.getObj(lengthName) : sendBuf.length;
			int pieces = pieces(sendLen);
			DistributedInfo distributedInfo = new DefaultDistributedInfo();

            // startPos : 本task接受的数据,startPos 是应该从原始数据的哪个位置开始。是依据task index来决定的。
            // cnt : 具体哪一个下游 task i 发送多少数据由此决定。   
			int startPos = (int) distributedInfo.startPos(taskId, numOfSubTasks, pieces);
			int cnt = (int) distributedInfo.localRowCnt(taskId, numOfSubTasks, pieces);
    
    		// 这里进行了reduce SUM工作
			double[][] sum = new double[cnt][];
			double[] agg = new double[cnt];
			do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				int localPos = val.f1 - startPos;
				if (sum[localPos] == null) {
					sum[localPos] = val.f2;
					agg[localPos]++;
				} else {
					op.accept(sum[localPos], val.f2);
				}
			} while (it.hasNext());    
    
    		// 依然发送给下游,依然是用subtask index来作为partition key。
            // 注意,这里是把结果发送给所有的下游task。
			for (int i = 0; i < numOfSubTasks; ++i) {
				for (int j = 0; j < cnt; ++j) {
          // startPos是本task发送的数据应该从原始数据的哪个位置开始。
          // 但是给每一个 task i 发的都是同样的数据。但是 startPos + j 很重要,下游task i 会根据这个知道它应该把接收到的数据存储在预定义变量的什么地方。
					out.collect(Tuple3.of(i, startPos + j, sum[j]));
				}
			}   
        }
}

sum = {double[1][]@10605} 
 0 = {double[4096]@10613} 
  0 = 118.50000000000001
  1 = 77.7
  2 = 37.2
  3 = 5.9
  4 = 25.0
  5 = 621.1000000000001
  6 = 284.7
  7 = 487.59999999999997
  8 = 166.5
  9 = 99.0
  10 = 136.9
  11 = 95.7
  12 = 39.0
  13 = 7.4
  14 = 26.0

5. AllReduceBroadcastSum

Когда AllReduceSum отправляет переменные в нисходящий поток, он использует пользовательский раздел (partitionCustom). Он использует индекс подзадачи в качестве ключевого раздела.

Его значение такое же, как у предыдущего раздела Custom.

6. AllReduceRecv

All workers merge partial results into final result and put it into session context with pre-defined object name.

КаждыйОба нижестоящих AllReduceRecv получаютКаждыйКластер отправляет восходящий AllReduceSum (после суммирования), а затем сохраняет каждый фрагмент данных в другой части предопределенной переменной состояния, соответствующей его диспетчеру задач (эта другая часть вычисляется на основе полученных данных val.f1).

В сочетании с вышеизложенным видно, что отправка AllReduceSend и получение AllReduceRecv основаны на одной и той же процедуре для вычисления местоположения данных в общей переменной. Таким образом, AllReduceRecv может объединяться.

Это завершает все рабочие процессы, чтобы объединить результаты частичной суммы сокращения в окончательный результат, а затем поместить его в предопределенную контекстную переменную.

	private static class AllReduceRecv<T> extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, T> {
		private final String bufferName;
		private final String lengthName;
		private final int sessionId;

		@Override
		public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values, Collector <T> out) throws Exception {
			ComContext context = new ComContext(sessionId, getIterationRuntimeContext());
			Iterator <Tuple3 <Integer, Integer, double[]>> it = values.iterator();
			if (!it.hasNext()) {
				return;
			}
			double[] recvBuf = context.getObj(bufferName);
			int recvLen = lengthName != null ? context.getObj(lengthName) : recvBuf.length;
			int pieces = pieces(recvLen); // 和之前AllReduceSend一样的套路计算应该存储在共享变量什么位置。
			do {
				Tuple3 <Integer, Integer, double[]> val = it.next();
				if (val.f1 == pieces - 1) {
					System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, lastLen(recvLen));
				} else {
           // 拷贝到共享变量的相应部位。val.f1 是上游发送过来的。作为merge功能的起始位置。
					System.arraycopy(val.f2, 0, recvBuf, val.f1 * TRANSFER_BUFFER_SIZE, TRANSFER_BUFFER_SIZE);
				}
			} while (it.hasNext());
		}
	}

val = {Tuple3@10672} "(3,0,[335.3, 150.89999999999998, 277.5, 99.79999999999998, 50.0, 290.9, 136.3, 213.1, 67.8, 50.0, 250.3, 170.89999999999998, 73.2, 12.2, 50.0, 0.0....."
 f0 = {Integer@10682} 3
  value = 3
 f1 = {Integer@10638} 0
  value = 0
 f2 = {double[4096]@10674} 
  0 = 335.3
  1 = 150.89999999999998
  2 = 277.5
  3 = 99.79999999999998
  4 = 50.0
  5 = 290.9
  6 = 136.3
  7 = 213.1
  8 = 67.8
  9 = 50.0
  10 = 250.3
  11 = 170.89999999999998
  12 = 73.2
  13 = 12.2
  14 = 50.0
  15 = 0.0
  ......
      
// 每个task都收到了reduce sum结果。      
recvBuf = {double[15]@10666} 
 0 = 404.3
 1 = 183.1
 2 = 329.3
 3 = 117.2
 4 = 61.0
 5 = 250.3
 6 = 170.89999999999998
 7 = 73.20000000000002
 8 = 12.2
 9 = 50.0
 10 = 221.89999999999998
 11 = 104.1
 12 = 161.29999999999998
 13 = 50.4
 14 = 39.0      
      

7. KMeansUpdateCentroids

На основе подсчета точек и координат рассчитываются новые центры кластеров. Вот общая переменная CENTROID_ALL_REDUCE, сохраненная AllReduce из диспетчера задач.

/**
 * Update the centroids based on the sum of points and point number belonging to the same cluster.
 */
public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {

        Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
        Integer k = context.getObj(KMeansTrainBatchOp.K);

        // 这里取出AllReduce存储的共享变量
        double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);

        Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }

        stepNumCentroids.f0 = context.getStepNo();

        context.putObj(KMeansTrainBatchOp.K,
            updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
    }
}

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0x0FF

Мой путь к параллельным вычислениям (4) Уменьшить и полностью сократить коллективную коммуникацию MPI

Message Passing Interface(MPI)

Введение в поток данных, задачу, подзадачу, цепочки операторов и слот Flink

TaskManager выполняет задачу во время выполнения Flink