Alink talk (16): Анализ исходного кода Word2Vec для построения дерева Хаффмана

машинное обучение

0x00 сводка

Alink — это платформа алгоритмов машинного обучения нового поколения, разработанная Alibaba на основе вычислительного движка реального времени Flink. Это первая в отрасли платформа машинного обучения, поддерживающая как пакетные, так и потоковые алгоритмы. Эта и последующие статьи помогут вам проанализировать реализацию Word2Vec в Alink.

Поскольку общедоступная информация Alink слишком мала, нижеследующее - все предположения, и обязательно будут упущения и ошибки. Я надеюсь, что все укажут, и я обновлю их в любое время.

0x01 Фоновая концепция

1.1 Основы работы с векторами Word

1.1.1 Горячее кодирование

Горячее кодирование гарантирует, что только 1 бит одного признака в каждом образце находится в состоянии 1, а остальные равны 0. Конкретный пример кодирования выглядит следующим образом. В корпусе каждому из Ханчжоу, Шанхая, Нинбо и Пекина соответствует вектор. Только одно значение в векторе равно 1, а остальные равны 0.

杭州 [0,0,0,0,0,0,0,1,0,……,0,0,0,0,0,0,0]
上海 [0,0,0,0,1,0,0,0,0,……,0,0,0,0,0,0,0]
宁波 [0,0,0,1,0,0,0,0,0,……,0,0,0,0,0,0,0]
北京 [0,0,0,0,0,0,0,0,0,……,1,0,0,0,0,0,0]

Недостатки:

  • Размерность вектора будет увеличиваться с увеличением количества и типа слов в предложении; если векторы, соответствующие всем названиям городов мира, объединить в матрицу, матрица будет слишком разреженной и вызовет размерную катастрофу.
  • Код города является случайным, а векторы независимы друг от друга, что не может представлять связанную информацию между словами на семантическом уровне.

Итак, люди хотят улучшить горячее кодирование следующим образом:

  • Измените каждый элемент вектора с целого на тип с плавающей запятой, чтобы представить весь диапазон действительных чисел;
  • Преобразуется в низкоразмерные непрерывные значения, то есть в плотные векторы. Сжать и встроить исходное разреженное огромное измерение в пространство меньшего измерения. И слова с похожими значениями будут отображаться в одинаковых позициях в векторном пространстве.

Проще говоря, нам нужно найти пространственное отображение для встраивания многомерных векторов слов в низкоразмерное пространство. Затем можно продолжить обработку.

1.1.2 Распределенное представление

Распределенное представление (Distributed Representation) на самом деле было предложено Хинтоном еще в 1986 году. Основная идея состоит в том, чтобы выразить каждое слово как n-мерный плотный и непрерывный вектор вещественных чисел. Связь между реальными векторами может представлять сходство между словами, например косинус угла или евклидово расстояние векторов.

Существует специальный термин для описания техники распределенного представления векторов слов — встраивание слов. Как видно из названия, горячее кодирование эквивалентно кодированию слов, в то время как распределенное представление предназначено для сжатия и встраивания слов из разреженного большого измерения в векторное пространство меньшей размерности.

Самый большой вклад распределенного представления заключается в том, чтобы сделать родственные или похожие слова ближе на расстоянии. Его основная идея заключается в следующем:Слова с похожим контекстом имеют сходную семантику.это известномодель пространства слов.

Еще одно различие между распределенным представлением и One-hot заключается в том, что размерность значительно уменьшена.Для словаря 100W мы можем использовать 100-мерный реальный вектор для представления слова, в то время как для One-hot требуется 100W.

Почему вектор слова, отображаемый в векторное пространство, может представлять, какое слово определено, и сходство между ними?

  • К вопросу о том, почему оно может обозначать слово. Распределение на самом деле состоит в том, чтобы найти функцию отображения, эта функция отображения сжимает и проецирует исходное однократное представление каждого слова в пространство более низкого измерения и соответствует один к одному. Таким образом, распределенные могут представлять определенные слова.
  • О том, почему распределённые тоже можно знать отношения между словами. Необходимо понять гипотезу распределения. Он основан на распределенном предположении, что слова, встречающиеся в одном и том же контексте, должны иметь сходные значения. Все методы обучения встраиванию слов используют математические методы для моделирования связи между словами и контекстом.

Основная идея распределенного представления векторов слов состоит из двух частей:

  • Выберите способ описания контекста;
  • Выберите модель, чтобы охарактеризовать отношения между целевым словом и его контекстом.

Фактически, будь то скрытый слой нейронной сети или вероятностная тематическая модель множества скрытых переменных, применяется распределенное представление.

1.2 CBOW & Skip-Gram

До появления word2vec нейронная сеть DNN использовалась для обучения векторов слов для обработки отношений между словами. Используемый метод, как правило, представляет собой трехслойную структуру нейронной сети (конечно, она также может быть многослойной), которая делится на входной слой, скрытый слой и выходной слой (слой softmax).

Как эта модель определяет ввод и вывод данных? Обычно делится на две модели CBOW (непрерывная модель мешка слов) и Skip-gram (непрерывная модель Skip-gram).

1.2.1 CBOW

CBOW использует контекст для прогнозирования текущего значения. Это эквивалентно вычитанию слова из предложения и предоставлению вам возможности угадать, что это за слово. CBOW заключается в вычислении вероятности появления слова на основе C слов перед словом или C последовательных слов до и после него.

Процесс обучения CBOW выглядит следующим образом:

  1. Выходной слой входного слоя: один из самых популярных контекстных слов. Предположим, что размерность словарного векторного пространства равна V, то есть размер всего корпуса тезауруса равен V, а размер контекстного словарного окна равен C.
  2. Предполагая, что размерность конечного вектора слов равна N, матрица распределения веса равна W. W имеет размер V * N и инициализируется.
  3. Предположим, в корпусе есть предложение «Я люблю тебя». Если мы сейчас сосредоточимся на слове «любовь», пусть С=2, его контекст «я», «ты». Модель принимает в качестве входных данных одну горячую форму «я» и «ты». Легко узнать, что его размер равен 1V. Векторы размера C 1V умножаются на одну и ту же матрицу распределения весов размера V*N соответственно, и получаются скрытые слои размером C 1N.
  4. C Скрытые слои размером 1N усредняются для получения вектора размером 1N, то есть скрытого слоя.
  5. Матрица выходных весов W' равна NV, и выполняется соответствующая работа по инициализации.
  6. Умножьте полученный вектор скрытого слоя 1N на W' и обработайте его с помощью softmax, чтобы получить вектор 1V, каждое измерение которого представляет слово в корпусе. Слово, представленное наибольшим индексом вероятности, является предсказанным промежуточным словом.
  7. Сравните с одной горячей наземной истиной и найдите минимальное значение функции потерь.
  8. С помощью алгоритма обратного распространения DNN мы можем найти параметры модели DNN и получить векторы слов, соответствующие всем словам одновременно. Таким образом, когда у нас есть новое требование найти наиболее вероятное выходное центральное слово, соответствующее определенным 8 словам, мы можем использовать алгоритм прямого распространения DNN и использовать функцию активации softmax, чтобы найти нейрон, соответствующий слову с самым высоким значением. вероятность.

1.2.2 Skip-gram

Skip-gram использует текущее слово для предсказания контекста. Это равносильно тому, что вам дают слово и просят угадать, какие слова могут стоять до и после него. то есть по слову, тосоответственноВычислите вероятности того, что определенные слова появятся до и после него. Как видно из этого, для каждого слова Skip-gram нужно обучатьCраз, где C — предустановленный размер окна, а CBOW нужно рассчитать только один раз, поэтому количество вычислений CBOW равно Skip-gram1/C, но поскольку Skip-gram одновременно соответствует словам C, он лучше, чем CBOW, избегая переобучения.Поэтому при обучении больших корпусов Skip-gram лучше, чем CBOW.

Метод обучения Skip-gram точно такой же, как и у CBOW, с той лишь разницей, что на вход Skip-gram подается вектор одного слова, а не сумма и среднее C слов. В то же время обучающие слова должны тренироваться C раз для центрального слова, каждый раз это другое контекстное слово, например, центральное словоПекин, слово окнаприходить,площадь ТяньаньмэньЭти два, тогда Skip-gram должен быть правПекин - приезжайте,Пекинская площадьТренируйтесь отдельно.

Есть проблема с текущей реализацией: вычисление от скрытого слоя к выходному softmax-слою очень велико, потому что нужно вычислить softmax-вероятность всех слов, а затем найти значение с наибольшей вероятностью. Например, размер Vocab равен 10^5, поэтому нереально вычислить 10^5 матричных умножений для каждой вероятности. Так появился Word2vec.

1.3 Word2vec

1.3.1 Основная идея Word2vec

Так называемая языковая модель относится к допущению и моделированию естественного языка, так что естественный язык может быть выражен так, чтобы его мог понять компьютер. word2vec используетn-граммная модель(модель n-грамм), то есть предполагается, что слово связано только с окружающими n словами и не имеет ничего общего с другими словами в тексте.

еслиВзяв слова в качестве признаков, эти признаки можно сопоставить с K-мерным векторным пространством, а для текстовых данных можно искать более глубокое представление признаков.. Итак, основная идея Word2vec заключается в том,Сопоставьте каждое слово с K-мерным реальным вектором путем обучения(K обычно является гиперпараметром в модели), и о семантическом сходстве между ними судят по расстоянию между словами (например, косинусное сходство, евклидово расстояние и т. д.).

Он используеттрехслойная нейронная сеть, входной слой - скрытый слой - выходной слой. Основной технологией являетсяКодирование Хаффмана по частоте слов, так что содержание активации скрытого слоя всех слов с одинаковой частотой слов в основном одинаково.Чем выше частота слов, тем меньше количество скрытых слоев, которые они активируют, что эффективно снижает вычислительную сложность.

Сама эта трехслойная нейронная сетьмоделирование языковых моделей, но такжеполучить представление слова в векторном пространстве, и этот побочный эффект является реальной целью Word2vec.

word2vec улучшил предыдущую модель,

  • Прежде всего, для отображения из входного слоя в скрытый слой вместо линейного преобразования нейронной сети и функции активации используется простой метод суммирования и усреднения всех векторов входных слов. Например, входными данными являются три 4-мерных вектора слов:(1,2,3,4),(9,6,11,8),(5,10,7,12), то вектор слов после нашего отображения word2vec равен **(5,6,7,8)**. Потому что здесь из нескольких векторов слов в один вектор слов.
  • Второе улучшение — это улучшение вычислений от скрытого слоя к выходному слою softmax. Чтобы не вычислять вероятности softmax для всех слов, word2vec выбирает дерево Хаффмана вместо сопоставления скрытых слоев с выходными слоями softmax.

1.3.2 Основная идея Hierarchical Softmax

Расчеты Word2vec можно использовать сИерархический алгоритм Softmax, этот алгоритм сочетает в себе кодирование Хаффмана, фактически с помощью идеи использования ряда бинарных классификаций для аппроксимации мультиклассификации в задачах классификации. Например, если мы возьмем все слова в качестве вывода, то «апельсин» и «машина» будут смешаны вместе. Учитывая контекст w_t, пусть модель сначала определит, является ли w_t существительным, затем оценит, является ли оно названием еды, затем оценит, является ли оно фруктом, а затем рассудит, является ли оно «апельсином».

Принимая окно соответствующего размера в качестве контекста, входной слой считывает слова в окне и складывает их векторы (K-мерные, начальные случайные) вместе, чтобы сформировать K узлов в скрытом слое. Выходной слой представляет собой огромное бинарное дерево, а листовые узлы представляют все слова в корпусе (корпус содержит V независимых слов, тогда бинарное дерево имеет |V| листовых узлов). Алгоритм, используемый для построения всего этого бинарного дерева, — это дерево Хаффмана.

Таким образом, слово w_t в корпусе соответствует определенному листовому узлу бинарного дерева, так что к каждому слову w можно получить доступ из корневого узла root дерева по уникальному пути, и его путь также образует его глобальный Unique двоичный код, например "010011".

Помните, что левое поддерево равно 1, а правое поддерево равно 0. Далее каждый узел скрытого слоя будет соединен с внутренним узлом бинарного дерева, поэтому для каждого внутреннего узла бинарного дерева будет K связанных ребер, и каждое ребро также будет иметь вес. Предполагая, что n(w, j) — это j-й узел на этом пути, а L(w) — длина этого пути, j кодируется из 1, то есть n(w, 1) = корень, n(w , L(ш)) = ш. Для j-го узла Метка, определяемая уровнем Softmax, равна 1 - code[j].

На этапе обучения, когда мы хотим предсказать следующее слово w_t с учетом контекста, мы начинаем обход с корневого узла двоичного дерева, Цель здесь состоит в том, чтобы предсказать каждый бит двоичного числа слова. То есть для заданного контекста наша цель — максимизировать вероятность двоичного кодирования предсказанного слова. Образно говоря, для «010011» мы надеемся, что вероятность бит=1, полученная логистическим вычислением в корневом узле, векторе слова и корневом узле, будет как можно ближе к 0. Во втором слое мы надеемся, что вероятность бит=1 максимально близок к 1. и так далее, мы умножаем вероятности, рассчитанные по пути, чтобы получить вероятность P(w_t) целевого слова w_t в текущей сети, тогда остаток для текущей выборки равен 1-P(w_t), поэтому теперь сеть можно обучать с использованием градиентного спуска для всех значений параметров. Очевидно, что окончательное значение вероятности, рассчитанное по двоичному коду искомого слова, нормировано.

Во время обучения модель назначает этим абстрактным промежуточным узлам соответствующий вектор, который представляет все соответствующие дочерние узлы. Поскольку реальные слова имеют общие векторы этих абстрактных узлов, метод Hierarchical Softmax не эквивалентен исходной задаче, но это приближение не приводит к значительным потерям производительности и значительно увеличивает размер модельного решения.

1.3.3 Математический вывод иерархического Softmax

Традиционный Softmax можно рассматривать как линейную таблицу, а среднее время поиска — O(n). Метод HS превращает Softmax в сбалансированное полное двоичное дерево, и после сохранения частоты слов оно становится деревом Хаффмана.

img

Поскольку мы преобразовали все расчеты вероятности из выходного слоя softmax в двоичное дерево Хаффмана, наш расчет вероятности softmax необходимо выполнять только вдоль древовидной структуры. слов, мы можем следовать дереву Хаффмана от корневого узла до наших листовых узлов.w2.

По сравнению с предыдущей языковой моделью нейронной сети все внутренние узлы нашего дерева Хаффмана подобны нейронам в скрытом слое предыдущей нейронной сети, где вектор слов корневого узла соответствует нашему проецируемому вектору слов, а все листья Узлы аналогичны нейронам в выходном слое softmax предыдущей нейронной сети, а количество листовых узлов соответствует размеру словаря. В дереве Хаффмана отображение softmax из скрытого слоя в выходной слой не завершается сразу, а выполняется шаг за шагом по дереву Хаффмана, поэтому этот softmax называется «Иерархический Softmax».

Как «шаг за шагом по дереву Хаффмана»? В word2vec мы используем метод бинарной логистической регрессии, то есть если мы идем по левому поддереву, то это отрицательный класс (код дерева Хаффмана 1), а если мы идем по правому поддереву, то это положительный класс (Код дерева Хаффмана 1. Код дерева Манна 0). Способ различать положительные и отрицательные классы состоит в использовании сигмовидной функции, а именно:

P(+)=о(xwTθ)=11+exwTθP(+) = \sigma(x_w^T\theta) = \frac{1}{1+e^{-x_w^T\theta}}

вxw— вектор слов текущего внутреннего узла, а θ — параметр модели логистической регрессии, который нам нужно найти из обучающих выборок.

Каковы преимущества использования деревьев Хаффмана?

  • Во-первых, поскольку это бинарное дерево, объем вычислений раньше был равен V, а теперь он становится log2V.
  • Во-вторых, поскольку высокочастотные слова находятся близко к корню дерева с использованием дерева Хаффмана, для поиска высокочастотных слов потребуется меньше времени, что соответствует нашей идее жадной оптимизации.

Легко понять, что вероятность разделить на левое поддерево и стать отрицательным классом равнаР(-)=1-Р(+). В некотором внутреннем узле критерием для решения, следует ли идти по левому поддереву или по правому поддереву, является просмотрП(-), П(+)у кого выше вероятность. контролируяП(-), П(+)Одним из факторов, значение вероятности которого больше, является вектор слова текущего узла, а другим является параметр модели текущего узла.θ.

Для картинки вышеw2, если это выход обучающей выборки, то ожидаем, что для скрытых узлов внутри **n(w2,1)изP(−)**вероятность,**n(w2,2)изP(−)** имеет высокую вероятность, **n(w2,3)изP(+)** имеет высокую вероятность.

Вернемся к самому word2vec, основанному на Hierarchical Softmax. Наша цель — найти соответствующие векторы слов всех узлов и всех внутренних узлов.θ, так что обучающие выборки достигают максимального правдоподобия.

Вероятность логистической регрессии определенного узла j дерева Хаффмана, через который проходит w, определяется как:

P(djwxw,θj1w)={о(xwTθj1w)djw=01о(xwTθj1w)djw=1P(d_j^w|x_w, \theta_{j-1}^w)= \begin{cases} \sigma(x_w^T\theta_{j-1}^w)& {d_j^w=0}\\ 1-\sigma(x_w^T\theta_{j-1}^w) & {d_j^w = 1} \end{cases}

Тогда для определенного целевого выходного слова w его максимальная вероятность равна:

j=2lwP(djwxw,θj1w)=j=2lw[о(xwTθj1w)]1djw[1о(xwTθj1w)]djw\prod_{j=2}^{l_w}P(d_j^w|x_w, \theta_{j-1}^w) = \prod_{j=2}^{l_w} [\sigma(x_w^T\theta_{j-1}^w)] ^{1-d_j^w}[1-\sigma(x_w^T\theta_{j-1}^w)]^{d_j^w}

В word2vec, поскольку используется метод стохастического градиентного подъема, вероятность всех выборок не умножается для получения истинной максимальной вероятности обучающего набора, а используется только одна выборка для обновления градиента за раз. чтобы уменьшить количество градиента расчета.

можно найтиx_wВыражение градиента выглядит следующим образом:

Lxw=j=2lw(1djwо(xwTθj1w))θj1w\frac{\partial L}{\partial x_w} = \sum\limits_{j=2}^{l_w}(1-d_j^w-\sigma(x_w^T\theta_{j-1}^w))\theta_{j-1}^w

С выражением градиента мы можем выполнять итерацию с градиентным восхождением, чтобы шаг за шагом решить все θwj−1 и xw, которые нам нужны.

Уведомление! word2vec нужно обучить два набора параметров: один — параметр скрытого слоя сети, а другой — параметр входного слова (1 * dim)

В пропускаемой грамме и CBOW вектор слова центрального слова не будет обновляться во время итеративного процесса, будет обновляться только вектор слова окна.Вектор слова, соответствующий этому центральному слову, необходимо итеративно обновить в следующий раз как не центральное слово.

0x02 Чтение с вопросами

Ядром реализации Alink являетсяGitHub.com/Sweet KO слегка V/I…Внесите изменения на его основе.На самом деле, если вы не очень устойчивы к языку C, рекомендуется сначала прочитать этот код. Потому что код параллельной обработки Alink действительно сложен для понимания, особенно часть предварительной обработки данных.

проблемно-ориентированный:

  • Какие модули используют возможности распределенной обработки Alink?
  • Какую модель Word2vec реализует Alink? Это модель CBOW или модель со скип-граммом?
  • Какой метод оптимизации использует Alink? Является ли Softmax иерархическим? Или отрицательная выборка?
  • Удаляются ли в этом алгоритме стоп-слова? Так называемые стоп-слова — это слова, которые появляются слишком часто, такие как запятые, точки и т. д., так что нет различия.
  • Вы использовали адаптивную скорость обучения?

0x03 Пример кода

Давайте изменим тестовый код Alink. Следует отметить, что Word2vec также ест память, поэтому мне нужно настроить параметры запуска ВМ на моей машине:Xms256m -Xmx640m -XX:PermSize=128m -XX:MaxPermSize=512m.

public class Word2VecTest {
    public static void main(String[] args) throws Exception {
        TableSchema schema = new TableSchema(
                new String[] {"docid", "content"},
                new TypeInformation <?>[] {Types.LONG(), Types.STRING()}
        );
        List <Row> rows = new ArrayList <>();
        rows.add(Row.of(0L, "老王 是 我们 团队 里 最胖 的"));
        rows.add(Row.of(1L, "老黄 是 第二 胖 的"));
        rows.add(Row.of(2L, "胖"));
        rows.add(Row.of(3L, "胖 胖 胖"));

        MemSourceBatchOp source = new MemSourceBatchOp(rows, schema);

        Word2Vec word2Vec = new Word2Vec()
                .setSelectedCol("content")
                .setOutputCol("output")
                .setMinCount(1);

        List<Row> result = word2Vec.fit(source).transform(source).collect();
        System.out.println(result);
    }
}

Вывод программы

[0,老王 是 我们 团队 里 最胖 的,0.8556591824716802 0.4185472857807756 0.5917632873908979 0.445803358747732 0.5351499521578621 0.6559828965377957 0.5965739474021792 0.473846881662404 0.516117276817363 0.3434555277582306 0.38403383919352685 ..., 
 
1,老黄 是 第二 胖 的,0.9227240557894372 0.5697617202790405 0.42338677208067105 0.5483285740408497 0.5950012315151869 0.4155926470754411 0.6283449603326386 0.47098108241809644 0.2874100346124693 0.41205111525453264 0.59972461077888 ..., 
 
3,胖 胖 胖,0.9220798404216994 0.8056990255747927 0.166767439210223 0.1651382099869762 0.7498624766177563 0.12363837145024788 0.16301554444226507 0.5992360550912706 0.6408649011941911 0.5504539398019214 0.4935531765920934 0.13805809361251292 0.2869384374291237 0.47796081976004645 0.6305720374272978 0.1745491550099714 ...]

0x04 Общая логика

4.1 Примерный процесс Word2vec

  1. Сегментация слов/образование корней и лемматизация. В китайском и английском НЛП есть свои трудности, сложность китайского языка заключается в необходимости выполнять сегментацию слов и разбивать каждое предложение на массив слов. Хотя английский язык не нуждается в сегментации слов, ему приходится иметь дело с различными временами, поэтому требуется извлечение основы и лемматизация.
  2. Составьте словарь и подсчитайте частоты слов. На этом шаге необходимо просмотреть все тексты, найти все появившиеся слова и подсчитать частоту появления каждого слова.
  3. Построить древовидную структуру. Постройте дерево Хаффмана по вероятности появления. Если это полное бинарное дерево, все намного проще. Следует отметить, что все категории должны находиться в листовых узлах.
  4. Сгенерируйте двоичный код, где находится узел. Этот двоичный код отражает положение узла в дереве, так же как и номер дома, соответствующий листовой узел можно найти шаг за шагом от корневого узла по коду.
  5. Инициализируйте промежуточный вектор каждого нелистового узла и вектор слова в листовом узле. В каждом узле дерева хранится вектор длины m, но значения векторов в листовых узлах и нелистовых узлах различны. Вектор слов каждого слова хранится в листовом узле, который используется в качестве входных данных нейронной сети. Вместо этого в нелистовом узле хранится промежуточный вектор, который соответствует параметрам скрытого слоя в нейронной сети и вместе с входом определяет результат классификации.
  6. Обучите промежуточные и словесные векторы. Для модели CBOW сначала добавьте векторы слов из n-1 слов рядом с определенным словом A в качестве входных данных системы и классифицируйте их шаг за шагом в соответствии с двоичным кодом, сгенерированным словом A на шаге 4, и обучите промежуточное звено в соответствии с результат классификации Векторы и векторы слов. Например, для узла мы уже знаем, что его двоичный код равен 100. Тогда в первом промежуточном узле соответствующий вход должен быть классифицирован вправо. Если он классифицируется слева, это указывает на то, что классификация неверна и вектор необходимо исправить. То же самое относится ко второму, третьему узлу и так далее, пока не будет достигнут конечный узел. Следовательно, для одного слова будет изменен не более чем промежуточный вектор узлов на его пути, а остальные узлы не будут изменены.

4.2 Тренировочный код

Класс Word2VecTrainBatchOp является кодовой реализацией обучения, а его функция linkFrom отражает общую логику программы.Пропущенный вариант кода выглядит следующим образом, и мы опишем его подробно позже.

  public Word2VecTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
    BatchOperator<?> in = checkAndGetFirst(inputs);
    final int vectorSize = getVectorSize();
    
    // 计算单词出现次数
    DataSet <Row> wordCnt = WordCountUtil
      .splitDocAndCount(in, getSelectedCol(), getWordDelimiter())
      .filter("cnt >= " + String.valueOf(getMinCount()))
      .getDataSet();

    // 根据词频对单词进行排序
    DataSet <Row> sorted = sortedIndexVocab(wordCnt);
    // 计算排序之后单词数目
    DataSet <Long> vocSize = DataSetUtils
      .countElementsPerPartition(sorted)
      .sum(1)
      .map(new MapFunction <Tuple2 <Integer, Long>, Long>() {
        @Override
        public Long map(Tuple2 <Integer, Long> value) throws Exception {
          return value.f1;
        }
      });
    // 建立字典和二叉树
    DataSet <Tuple3 <Integer, String, Word>> vocab = sorted
      .reduceGroup(new CreateVocab())
      .withBroadcastSet(vocSize, "vocSize")
      .rebalance();
    // 再次分割单词
    DataSet <String[]> split = in
      .select("`" + getSelectedCol() + "`")
      .getDataSet()
      .flatMap(new WordCountUtil.WordSpliter(getWordDelimiter()))
      .rebalance();
    // 生成训练数据
    DataSet <int[]> trainData = encodeContent(split, vocab)
      .rebalance();

    final long seed = System.currentTimeMillis();
    // 获取精简词典
    DataSet <Tuple2 <Integer, Word>> vocabWithoutWordStr = vocab
      .map(new UseVocabWithoutWordString());
    
    // 初始化模型
    DataSet <Tuple2 <Integer, double[]>> initialModel = vocabWithoutWordStr
      .mapPartition(new initialModel(seed, vectorSize))
      .rebalance();
    // 计算迭代次数
    DataSet <Integer> syncNum = DataSetUtils
      .countElementsPerPartition(trainData)
      .sum(1)
      .map(new MapFunction <Tuple2 <Integer, Long>, Integer>() {
        @Override
        public Integer map(Tuple2 <Integer, Long> value) throws Exception {
          return Math.max((int) (value.f1 / 100000L), 5);
        }
      });
    
    // 迭代训练
    DataSet <Row> model = new IterativeComQueue()
      .initWithPartitionedData("trainData", trainData)
      .initWithBroadcastData("vocSize", vocSize)
      .initWithBroadcastData("initialModel", initialModel)
      .initWithBroadcastData("vocabWithoutWordStr", vocabWithoutWordStr)
      .initWithBroadcastData("syncNum", syncNum)
      .add(new InitialVocabAndBuffer(getParams()))
      .add(new UpdateModel(getParams()))
      .add(new AllReduce("input"))
      .add(new AllReduce("output"))
      .add(new AvgInputOutput())
      .setCompareCriterionOfNode0(new Criterion(getParams()))
      .closeWith(new SerializeModel(getParams()))
      .exec();
    
    // 输出模型
    model = model
      .map(new MapFunction <Row, Tuple2 <Integer, DenseVector>>() {
        @Override
        public Tuple2 <Integer, DenseVector> map(Row value) throws Exception {
          return Tuple2.of((Integer) value.getField(0), (DenseVector) value.getField(1));
        }
      })
      .join(vocab)
      .where(0)
      .equalTo(0)
      .with(new JoinFunction <Tuple2 <Integer, DenseVector>, Tuple3 <Integer, String, Word>, Row>() {
        @Override
        public Row join(Tuple2 <Integer, DenseVector> first, Tuple3 <Integer, String, Word> second)
          throws Exception {
          return Row.of(second.f1, first.f1);
        }
      })
      .mapPartition(new MapPartitionFunction <Row, Row>() {
        @Override
        public void mapPartition(Iterable <Row> values, Collector <Row> out) throws Exception {
          Word2VecModelDataConverter model = new Word2VecModelDataConverter();

          model.modelRows = StreamSupport
            .stream(values.spliterator(), false)
            .collect(Collectors.toList());

          model.save(model, out);
        }
      });

    setOutput(model, new Word2VecModelDataConverter().getModelSchema());

    return this;
  }

0x05 Вход процесса

Эта часть самая сложная и больше всего отличается от кода C. Поскольку Alink необходимо учитывать возможность обработки крупномасштабных входных данных, он выполняет распределенную обработку, и после распределенной обработки различные детали запутываются.

5.1 Подсчет вхождений слов

Эта часть кода выглядит следующим образом и разделена на две части.

DataSet <Row> wordCnt = WordCountUtil
      .splitDocAndCount(in, getSelectedCol(), getWordDelimiter())
      .filter("cnt >= " + String.valueOf(getMinCount()))
      .getDataSet();

5.1.1 Разделить слова и подсчитать

Логика здесь относительно понятна, то есть разделите слово splitDoc, а затем подсчитайте количество.

public static BatchOperator<?> splitDocAndCount(BatchOperator<?> input, String docColName, String wordDelimiter) {
  return count(splitDoc(input, docColName, wordDelimiter), WORD_COL_NAME, COUNT_COL_NAME);
}
5.1.1.1 Разделить слова

Разделить слова с помощью UDTF DocWordSplitCount.

public static BatchOperator splitDoc(BatchOperator<?> input, String docColName, String wordDelimiter) {
  return input.udtf(
    docColName,
    new String[] {WORD_COL_NAME, COUNT_COL_NAME},
    new DocWordSplitCount(wordDelimiter),
    new String[] {}
  );
}

Функция DocWordSplitCount состоит в том, чтобы разделить слова и подсчитать их.

public class DocWordSplitCount extends TableFunction <Row> {

  private String delimiter;

  public DocWordSplitCount(String delimiter) {
    this.delimiter = delimiter;
  }

  public void eval(String content) {
    String[] words = content.split(this.delimiter); // 分割单词
    HashMap <String, Long> map = new HashMap <>(0);

    for (String word : words) {
      if (word.length() > 0) {
        map.merge(word, 1L, Long::sum); // 计数
      }
    }

    for (Map.Entry <String, Long> entry : map.entrySet()) {
      collect(Row.of(entry.getKey(), entry.getValue())); // 发送二元组<单词,个数>
    }
  }
}

// runtime时候,变量如下:
content = "老王 是 我们 团队 里 最胖 的"
words = {String[7]@10021} 
 0 = "老王"
 1 = "是"
 2 = "我们"
 3 = "团队"
 4 = "里"
 5 = "最胖"
 6 = "的"
map = {HashMap@10024}  size = 7
 "最胖" -> {Long@10043} 1
 "的" -> {Long@10043} 1
 "里" -> {Long@10043} 1
 "老王" -> {Long@10043} 1
 "团队" -> {Long@10043} 1
 "我们" -> {Long@10043} 1
 "是" -> {Long@10043} 1
5.1.1.2 Подсчет

Здесь groupBy будет выполняться для двух кортежей , вычисленных распределенным образом, чтобы получить окончательное количество вхождений слова. Среди них ключевую роль играет groupBy of Flink.Если вам интересно, вы можете прочитать [Анализ исходного кода] Что именно делает группа Flink и сокращение?.

public static BatchOperator count(BatchOperator input, String wordColName) {
    return count(input, wordColName, null);
}

public static BatchOperator count(BatchOperator input, String wordColName, String wordValueColName) {
    if (null == wordValueColName) {
      return input.groupBy(wordColName,
        wordColName + " AS " + WORD_COL_NAME + ", COUNT(" + wordColName + ") AS " + COUNT_COL_NAME);
    } else {
      return input.groupBy(wordColName,
        wordColName + " AS " + WORD_COL_NAME + ", SUM(" + wordValueColName + ") AS " + COUNT_COL_NAME);
    }
}

5.1.2 Фильтр низкочастотных слов

Если количество вхождений слова слишком мало, нет необходимости добавлять его в словарь, поэтому его необходимо отфильтровать.

5.1.2.1 Конфигурация

Word2VecTrainBatchOp должен реализовать параметр конфигурации Word2VecTrainParams следующим образом:

public interface Word2VecTrainParams<T> extends
    HasNumIterDefaultAs1<T>,
  HasSelectedCol <T>,
  HasVectorSizeDv100 <T>,
  HasAlpha <T>,
  HasWordDelimiter <T>,
  HasMinCount <T>,
  HasRandomWindow <T>,
  HasWindow <T> {
}

Среди них HasMinCount — это порог, используемый для настройки низкочастотных слов.

public interface HasMinCount<T> extends WithParams<T> {
  ParamInfo <Integer> MIN_COUNT = ParamInfoFactory
    .createParamInfo("minCount", Integer.class)
    .setDescription("minimum count of word")
    .setHasDefaultValue(5)
    .build();

  default Integer getMinCount() {
    return get(MIN_COUNT);
  }

  default T setMinCount(Integer value) {
    return set(MIN_COUNT, value);
  }
}

В примере кода есть следующее, то есть минимальный порог установлен равным 1, это потому, что у нас очень мало входных данных и мы не будем фильтровать низкочастотные слова. Если словарный запас большой, его можно поставить на 5.

.setMinCount(1);
5.1.2.2 Фильтрация

Вынесем код использования.

DataSet <Row> wordCnt = WordCountUtil
      .splitDocAndCount(in, getSelectedCol(), getWordDelimiter())
      .filter("cnt >= " + String.valueOf(getMinCount()))
      .getDataSet();

можно увидеть,.filter("cnt >= " + String.valueOf(getMinCount()))Эта часть фильтрует. Это простое использование SQL.

Затем возвращается DataSet wordCnt.

5.2 Сортировка слов по частоте слов

После фильтрации низкочастотных слов полученные слова сортируются.

DataSet <Row> sorted = sortedIndexVocab(wordCnt);

Это относительно сложно и неясно, и нужно тщательно разобраться.Общая логика такова:

  • 1) использоватьSortUtils.pSortМассово-параллельная сортировка ;
  • 2) Разделите возвращаемое значение f0 предыдущего шагаsorted.f0.partitionCustom, так как f0 возвращаемого значения предыдущего шага равно , поэтому набор данных секционируется.
  • 3) Подсчитать количество слов в каждом разделеcountElementsPerPartition(partitioned); спускатьсяTuple2 ;результирующий набор данных cnt будет передан в эфир и использован в следующем расчете;
  • 4) В каждом разделе (то есть наборе данных, разделенном на втором шаге) используйте mapPartition для сортировки слов и используйте cnt из предыдущего шага;
    • В функции open вычисляется общее количество всех слов в этом разделе, total, количество слов в этой области, curLen и начальная позиция слов в этой области.
    • В функции mapPartition набор данных будет отсортирован, объединен и, наконец, отправлен.DataSet ;

Примечание 1. pSort может относиться кAlink's Talk (6): Реализация алгоритма TF-IDF. SortUtils.pSort — это массово-параллельная сортировка. Возвращаемое значение pSort:@return f0: dataset which is indexed by partition id, f1: dataset which has partition id and count..

Конкретная реализация выглядит следующим образом:

private static DataSet <Row> sortedIndexVocab(DataSet <Row> vocab) {
    final int sortIdx = 1;
    Tuple2 <DataSet <Tuple2 <Integer, Row>>, DataSet <Tuple2 <Integer, Long>>> sorted
      = SortUtils.pSort(vocab, sortIdx); // 进行大规模并行排序

    DataSet <Tuple2 <Integer, Row>> partitioned = sorted.f0.partitionCustom(new Partitioner <Integer>() {
      @Override
      public int partition(Integer key, int numPartitions) {
        return key; // 利用分区 idx 进行分区
      }
    }, 0);

    DataSet <Tuple2 <Integer, Long>> cnt = DataSetUtils.countElementsPerPartition(partitioned);

    return partitioned.mapPartition(new RichMapPartitionFunction <Tuple2 <Integer, Row>, Row>() {
      int start;
      int curLen;
      int total;

      @Override
      public void open(Configuration parameters) throws Exception {
        List <Tuple2 <Integer, Long>> cnts = getRuntimeContext().getBroadcastVariable("cnt");
        int taskId = getRuntimeContext().getIndexOfThisSubtask();
        start = 0;
        curLen = 0;
        total = 0;

        for (Tuple2 <Integer, Long> val : cnts) {
          if (val.f0 < taskId) {
            start += val.f1; // 本区单词起始位置 
          }

          if (val.f0 == taskId) {  // 只计算本分区对应的记录,因为 f0 是分区idx
            curLen = val.f1.intValue(); // 本区单词数目curLen
          }

          total += val.f1.intValue(); // 得倒 本分区内 所有单词的总数total
        }
                
// runtime 打印如下                
val = {Tuple2@10585} "(7,0)"
 f0 = {Integer@10586} 7
 f1 = {Long@10587} 0                
                
      }

      @Override
      public void mapPartition(Iterable <Tuple2 <Integer, Row>> values, Collector <Row> out) throws Exception {

        Row[] all = new Row[curLen];

        int i = 0;
        for (Tuple2 <Integer, Row> val : values) {
          all[i++] = val.f1; // 得倒所有的单词
        }

        Arrays.sort(all, (o1, o2) -> (int) ((Long) o1.getField(sortIdx) - (Long) o2.getField(sortIdx))); // 排序

        i = start;
        for (Row row : all) {
          // 归并 & 发送
          out.collect(RowUtil.merge(row, -(i - total + 1)));
          ++i;
        }
                
// runtime时的变量如下:                
all = {Row[2]@10655} 
 0 = {Row@13346} "我们,1"
 1 = {Row@13347} "里,1"
i = 0
total = 10
start = 0
      }
    }).withBroadcastSet(cnt, "cnt"); // 广播进来的变量
}

5.2.1 Количество слов после сортировки

Вот чтобы подсчитать количество слов в каждом разделе после сортировки, Логика относительно проста, и полученный набор данных будет транслироваться для следующего шага.

DataSet <Long> vocSize = DataSetUtils // vocSize是词汇的个数
      .countElementsPerPartition(sorted)
      .sum(1) // 累计第一个key
      .map(new MapFunction <Tuple2 <Integer, Long>, Long>() {
        @Override
        public Long map(Tuple2 <Integer, Long> value) throws Exception {
          return value.f1;
        }
      });

5.3 Построить словарь и двоичное дерево

В этой части будут использованы результаты предыдущих двух шагов: «отсортированные слова» и «слова на раздел» для построения словаря и двоичного дерева.

DataSet <Tuple3 <Integer, String, Word>> vocab = sorted // 排序后的单词数据集
      .reduceGroup(new CreateVocab())
      .withBroadcastSet(vocSize, "vocSize") // 广播上一步产生的结果集
      .rebalance();

CreateVocab завершает определенную работу, и набор результатов выглядит следующим образом: Tuple3 .

private static class CreateVocab extends RichGroupReduceFunction <Row, Tuple3 <Integer, String, Word>> {
    int vocSize;

    @Override
    public void open(Configuration parameters) throws Exception {
      vocSize = getRuntimeContext().getBroadcastVariableWithInitializer("vocSize",
        new BroadcastVariableInitializer <Long, Integer>() {
          @Override
          public Integer initializeBroadcastVariable(Iterable <Long> data) {
            return data.iterator().next().intValue();
          }
        });
    }

    @Override
    public void reduce(Iterable <Row> values, Collector <Tuple3 <Integer, String, Word>> out) throws Exception {
      String[] words = new String[vocSize];
      Word[] vocab = new Word[vocSize];

            // 建立词典
      for (Row row : values) {
        Word word = new Word();
        word.cnt = (long) row.getField(1);
        vocab[(int) row.getField(2)] = word;
        words[(int) row.getField(2)] = (String) row.getField(0);
      }

// runtime变量如下
words = {String[10]@10606} 
 0 = "胖"
 1 = "的"
 2 = "是"
 3 = "团队"
 4 = "老王"
 5 = "第二"
 6 = "最胖"
 7 = "老黄"
 8 = "里"
 9 = "我们"            
            
      // 建立二叉树,建立过程中会更新词典内容
      createBinaryTree(vocab);

// runtime变量如下            
vocab = {Word2VecTrainBatchOp$Word[10]@10669} 
 0 = {Word2VecTrainBatchOp$Word@13372} 
  cnt = 5
  point = {int[2]@13382} 
   0 = 8
   1 = 7
  code = {int[2]@13383} 
   0 = 1
   1 = 1
 1 = {Word2VecTrainBatchOp$Word@13373} 
  cnt = 2
  point = {int[3]@13384} 
   0 = 8
   1 = 7
   2 = 5
  code = {int[3]@13385} 
   0 = 1
   1 = 0
   2 = 1            
            
      for (int i = 0; i < vocab.length; ++i) {
        // 结果集是:Tuple3<单词在词典的idx,单词,单词对应的词典元素>
        out.collect(Tuple3.of(i, words[i], vocab[i]));
      }        
    }
}

5.3.1 Структура данных

Структура данных словаря следующая:

private static class Word implements Serializable {
  public long cnt; // 词频,左右两个输入节点的词频之和
  public int[] point; //在树中的节点序列, 即从根结点到叶子节点的路径
  public int[] code; //霍夫曼码, HuffmanCode
}

Пункт путаницы:

  • vocab[word].code[d] относится к d-му коду текущего слова word, код не содержит корневого узла
  • vocab[word].point[d] относится к текущему слову word в кодировке dth, предшествующему узлу.

Например, vocab[word].point[0] должен быть корневым узлом, а vocab[word].code[0] должен быть кодом корневого узла до следующей точки.

5.3.2 Построение бинарного дерева

Здесь дерево Хаффмана (на основе частоты слов) устанавливается на основе обучающих выборок корпуса.

Alink в основном является java-реализацией языка C здесь. Может быть, многие братья не знакомы с ним, поэтому его нужно объяснить.

Word2vec использует перемещение нижнего индекса массива для завершения построения и кодирования. Самое главное, что он использует родительский массив только для отметки сгенерированного родительского узла (диапазон VocabSize, VocabSize*2−2). Наконец, вычтите VocabSize из родительского узла, чтобы получить массив путей точек, начиная с 0.

Основная рутина такова:

  • Во-первых, установите два указателя pos1 и pos2, соответственно указывающие на последнее слово и следующий бит последнего слова;
  • Затем выберите наименьшее значение из чисел, на которые указывают два указателя. Обозначим его как min1i. Если значение, на которое указывает pos1, является наименьшим, в это время переместите pos1 влево, а затем сравните числа, на которые указывают pos1 и pos2. Выберите наименьшее значение, обозначенное как min2i, и сохраните их сумму в месте, указанном pos2.
  • В это время установите позицию, на которую указывает pos2, как родительский узел min1i и min2i, В то же время запишите код позиции, на которую указывает min2i, как 1.
private static void createBinaryTree(Word[] vocab) {
    int vocabSize = vocab.length;

    int[] point = new int[MAX_CODE_LENGTH];
    int[] code = new int[MAX_CODE_LENGTH];
        // 首先定义了3个长度为vocab_size*2+1的数组
        // count数组中前vocab_size存储的是每个词的相应的词频。后面初始化的是非常大的数,已知词库中的词是依照降序排列的。
    long[] count = new long[vocabSize * 2 - 1];
    int[] binary = new int[vocabSize * 2 - 1];
    int[] parent = new int[vocabSize * 2 - 1];

      // 前半部分初始化为每个词出现的次数
    for (int i = 0; i < vocabSize; ++i) {
      count[i] = vocab[i].cnt;
    }
    // 后半部分初始化为一个固定的常数
    Arrays.fill(count, vocabSize, vocabSize * 2 - 1, Integer.MAX_VALUE);

    // pos1, pos2 可以理解为 下一步 将要构建的左右两个节点
    // min1i, min2i 是当前正在构建的左右两个节点
    int min1i, min2i, pos1, pos2;

    pos1 = vocabSize - 1; // pos1指向前半截的尾部
    pos2 = vocabSize; // pos2指向后半截的开始

    // 每次增加一个节点,构建Huffman树
    for (int a = 0; a < vocabSize - 1; ++a) {
      // First, find two smallest nodes 'min1, min2'
      // 选择最小的节点min1
      // 根据pos1, pos2找到目前的 左 min1i 的位置,并且调整下一次的pos1, pos2
      if (pos1 >= 0) {
        if (count[pos1] < count[pos2]) {
          min1i = pos1;
          pos1--;
        } else {
          min1i = pos2;
          pos2++;
        }
      } else {
        min1i = pos2;
        pos2++;
      }
            
      // 选择最小的节点min2
      // 根据上一步调整的pos1, pos2找到目前的 右 min2i 的位置,并且调整下一次的pos1, pos2
      if (pos1 >= 0) {
        if (count[pos1] < count[pos2]) {
          min2i = pos1;
          pos1--;
        } else {
          min2i = pos2;
          pos2++;
        }
      } else {
        min2i = pos2;
        pos2++;
      }

      // 新生成的节点的概率是两个输入节点的概率之和,其左右子节点即为输入的两个节点。值得注意的是,新生成的节点肯定不是叶节点,而非叶结点的value值是中间向量,初始化为零向量。
      count[vocabSize + a] = count[min1i] + count[min2i];
      parent[min1i] = vocabSize + a; // 设置父节点
      parent[min2i] = vocabSize + a;
      binary[min2i] = 1;  // 设置一个子树的编码为1
    }
    
// runtime变量如下:
binary = {int[19]@13405}  0 = 1 1 = 1 2 = 0 3 = 0 4 = 1 5 = 0 6 = 1 7 = 0 8 = 1 9 = 0 10 = 1 11 = 0 12 = 1 13 = 0 14 = 1 15 = 0 16 = 0 17 = 1 18 = 0
    
parent = {int[19]@13406}  0 = 17 1 = 15 2 = 15 3 = 13 4 = 12 5 = 12 6 = 11 7 = 11 8 = 10 9 = 10 10 = 13 11 = 14 12 = 14 13 = 16 14 = 16 15 = 17 16 = 18 17 = 18 18 = 0    
    
count = {long[19]@13374}  0 = 5 1 = 2 2 = 2 3 = 1 4 = 1 5 = 1 6 = 1 7 = 1 8 = 1 9 = 1 10 = 2 11 = 2 12 = 2 13 = 3 14 = 4 15 = 4 16 = 7 17 = 9 18 = 16    
    
      // Now assign binary code to each vocabulary word
      // 生成Huffman码,即找到每一个字的code,和对应的在树中的节点序列,在生成Huffman编码的过程中。针对每个词(词都在叶子节点上),从叶子节点開始。将编码存入到code数组中,如对于上图中的“R”节点来说。其code数组为{1,0}。再对其反转便是Huffman编码:
    for (int a = 0; a < vocabSize; ++a) { // 为每一个词分配二进制编码,即Huffman编码
      int b = a;
      int i = 0;

      do {
        code[i] = binary[b]; // 找到当前的节点的编码
        point[i] = b; // 记录从叶子节点到根结点的序列
        i++;
        b = parent[b]; // 找到当前节点的父节点
      } while (b != vocabSize * 2 - 2); // 已经找到了根结点,根节点是没有编码的

      vocab[a].code = new int[i];

      for (b = 0; b < i; ++b) {
        vocab[a].code[i - b - 1] = code[b]; // 编码的反转
      }

      vocab[a].point = new int[i];
      vocab[a].point[0] = vocabSize - 2;
      for (b = 1; b < i; ++b) {
        vocab[a].point[i - b] = point[b] - vocabSize; // 记录的是从根结点到叶子节点的路径
      }
    }
}

Окончательный результат бинарного дерева выглядит следующим образом:

vocab = {Word2VecTrainBatchOp$Word[10]@10608} 
 0 = {Word2VecTrainBatchOp$Word@13314} 
  cnt = 5
  point = {int[2]@13329} 
   0 = 8
   1 = 7
  code = {int[2]@13330} 
   0 = 1
   1 = 1
 1 = {Word2VecTrainBatchOp$Word@13320} 
  cnt = 2
  point = {int[3]@13331} 
   0 = 8
   1 = 7
   2 = 5
  code = {int[3]@13332} 
   0 = 1
   1 = 0
   2 = 1
 2 = {Word2VecTrainBatchOp$Word@13321} 
 3 = {Word2VecTrainBatchOp$Word@13322} 
 ......
 9 = {Word2VecTrainBatchOp$Word@13328} 

5.4 Разделить слова

Здесь исходный ввод будет снова сегментирован.Всегда кажется, что этот шаг можно оптимизировать вместе с предыдущими шагами.

DataSet <String[]> split = in
      .select("`" + getSelectedCol() + "`")
      .getDataSet()
      .flatMap(new WordCountUtil.WordSpliter(getWordDelimiter()))
      .rebalance();

5.5 Генерация обучающих данных

Код для генерации обучающих данных выглядит следующим образом, что тоже довольно неясно.

DataSet <int[]> trainData = encodeContent(split, vocab).rebalance();

Конечная цельДа, перевести каждое предложение в последовательность словарных idx, например:

Первоначальный ввод: «Фараон самый толстый в нашей команде».

После кодирования: «4,1,9,3,8,6,2», где каждое число — это порядковый номер каждого слова в предложении в словаре.

Вход для encodeContentДа:

  • Исходный ввод, который был разделен (на самом деле, исходный ввод в примере из этой статьи разделен пробелами) — это предложение для encodeContent;
  • Набор словарных данных Tuple3;

Логика процессаследующее:

  • Раздельная обработка входных предложенийcontent.mapPartition, получить набор данныхTuple4 <>(taskId, localCnt, i, val[i]), соответственноTuple4 <>(taskId, 本分区句子数目, 本单词在本句子中的idx, 本单词), поэтому ядро, отправленное сюда, является словом.
  • использовалFlink coGroupЭта функция дополняет функцию сопоставления и слияния двух потоков, которая сочетает в себе поток слов и фильтрацию по словарю.(where(3).equalTo(1)), в предыдущей обработке f3 — это слово, vocab.f1 — это слово, поэтому необходимо найти одно и то же слово в двух потоках, а затем выполнить операцию. спускатьсяTuple4.of(tuple.f0, tuple.f1, tuple.f2, row.getField(0))), то есть результирующий наборTuple4 <taskId, 本分区句子数目, 本单词在本句子中的idx,单词在词典的idx>.
  • Сортировать по группам, объединятьgroupBy(0, 1).reduceGroup, а затем отсортировать (отсортировать по идентификатору этого слова в этом предложении); результирующий наборDataSet <int[]>, то есть вернуть «idx этого слова в словаре», например[4,1,9,3,8,6,2]. Это порядковый номер каждого слова в предложении в словаре.

Конкретный код выглядит следующим образом:

private static DataSet <int[]> encodeContent(
    DataSet <String[]> content,
    DataSet <Tuple3 <Integer, String, Word>> vocab) {
    return content
      .mapPartition(new RichMapPartitionFunction <String[], Tuple4 <Integer, Long, Integer, String>>() {
        @Override
        public void mapPartition(Iterable <String[]> values,
                     Collector <Tuple4 <Integer, Long, Integer, String>> out)
          throws Exception {
          int taskId = getRuntimeContext().getIndexOfThisSubtask();
          long localCnt = 0L;
          for (String[] val : values) {
            if (val == null || val.length == 0) {
              continue;
            }

            for (int i = 0; i < val.length; ++i) {
              // 核心是发送单词
              out.collect(new Tuple4 <>(taskId, localCnt, i, val[i]));
            }

            ++localCnt; // 这里注意,发送时候 localCnt 还没有更新

// runtime 的数据如下:
val = {String[7]@10008} 
 0 = "老王"
 1 = "是"
 2 = "我们"
 3 = "团队"
 4 = "里"
 5 = "最胖"
 6 = "的"                    
                    }
        }
      }).coGroup(vocab)
      .where(3) // 上步处理中,f3是word
      .equalTo(1) // vocab.f1 是word
      .with(new CoGroupFunction <Tuple4 <Integer, Long, Integer, String>, Tuple3 <Integer, String, Word>,
        Tuple4 <Integer, Long, Integer, Integer>>() {
        @Override
        public void coGroup(Iterable <Tuple4 <Integer, Long, Integer, String>> first,
                  Iterable <Tuple3 <Integer, String, Word>> second,
                  Collector <Tuple4 <Integer, Long, Integer, Integer>> out) {
          for (Tuple3 <Integer, String, Word> row : second) {
            for (Tuple4 <Integer, Long, Integer, String> tuple : first) {
              out.collect(
                Tuple4.of(tuple.f0, tuple.f1, tuple.f2,
                  row.getField(0))); // 将单词和词典筛选合并, 返回 <taskId, 本分区句子数目, 本单词在本句子中的idx,单词在词典的idx>
// runtime的变量是:
row = {Tuple3@10640}  // Tuple3<单词在词典的idx,单词,单词在词典中对应的元素>
 f0 = {Integer@10650} 7
 f1 = "老黄"
 f2 = {Word2VecTrainBatchOp$Word@10652} 
                            
tuple = {Tuple4@10641} // (taskId, 本分区句子数目, 本单词在本句子中的idx, 本单词)
 f0 = {Integer@10642} 1
 f1 = {Long@10643} 0
 f2 = {Integer@10644} 0
 f3 = "老黄"                        
                        
                        }
          }
        }
      }).groupBy(0, 1) // 分组排序
      .reduceGroup(new GroupReduceFunction <Tuple4 <Integer, Long, Integer, Integer>, int[]>() {
        @Override
        public void reduce(Iterable <Tuple4 <Integer, Long, Integer, Integer>> values, Collector <int[]> out) {
          ArrayList <Tuple2 <Integer, Integer>> elements = new ArrayList <>();

          for (Tuple4 <Integer, Long, Integer, Integer> val : values) {
            // 得到 (本单词在本句子中的idx, 本单词在词典的idx)
            elements.add(Tuple2.of(val.f2, val.f3));
          }
 
// runtime变量如下:
val = {Tuple4@10732} "(2,0,0,0)" //  <taskId, 本分区句子数目, 本单词在本句子中的idx,单词在词典的idx>
 f0 = {Integer@10737} 2
 f1 = {Long@10738} 0
 f2 = {Integer@10733} 0
 f3 = {Integer@10733} 0  
    
elements = {ArrayList@10797}  size = 7
 0 = {Tuple2@10803} "(0,4)"
 1 = {Tuple2@10804} "(1,1)"
 2 = {Tuple2@10805} "(2,9)"
 3 = {Tuple2@10806} "(3,3)"
 4 = {Tuple2@10807} "(4,8)"
 5 = {Tuple2@10808} "(5,6)"
 6 = {Tuple2@10809} "(6,2)"                 

          Collections.sort(elements, new Comparator <Tuple2 <Integer, Integer>>() {
            @Override
            public int compare(Tuple2 <Integer, Integer> o1, Tuple2 <Integer, Integer> o2) {
              return o1.f0.compareTo(o2.f0);
            }
          });

          int[] ret = new int[elements.size()];

          for (int i = 0; i < elements.size(); ++i) {
            ret[i] = elements.get(i).f1; // 返回 "本单词在词典的idx"
          }

// runtime变量如下:                    
ret = {int[7]@10799} 
 0 = 4
 1 = 1
 2 = 9
 3 = 3
 4 = 8
 5 = 6
 6 = 2                    
          out.collect(ret);
        }
      });
}

5.5.1 Flink coGroup

Здесь функция Flink coGroup используется для завершения функции сопоставления и слияния двух потоков. Разница между coGroup и Join заключается в следующем:

  • Присоединение: Flink выводит пользователю только те пары элементов, которые соответствуют условиям;
  • coGroup : помимо вывода совпадающих пар элементов, он также выводит несопоставленные элементы;

В CoGroupFunction coGroup можно выводить элементы любой формы, и это полностью зависит от конкретной реализации пользователя.

5.6 Получить сокращенный словарь

На данный момент каждое предложение было переведено в последовательность словарных idx, например:

Первоначальный ввод: «Фараон самый толстый в нашей команде».

После кодирования: «4,1,9,3,8,6,2», где каждое число — это порядковый номер каждого слова в предложении в словаре.

Затем Alink применил другой подход, упростив словарь, то есть удалив исходный текст слова.

DataSet <Tuple2 <Integer, Word>> vocabWithoutWordStr = vocab
      .map(new UseVocabWithoutWordString());

Оригинальный словарь этоTuple3<单词在词典的idx,单词,单词在词典中对应的元素>:

"(1,的,com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)"

Сокращенный словарьTuple2<单词在词典的idx,单词在词典中对应的元素>:

"(1, com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)"

код показывает, как показано ниже:

private static class UseVocabWithoutWordString
    implements MapFunction <Tuple3 <Integer, String, Word>, Tuple2 <Integer, Word>> {
    @Override
    public Tuple2 <Integer, Word> map(Tuple3 <Integer, String, Word> value) throws Exception {
      return Tuple2.of(value.f0, value.f2); // 去掉单词原始文字 f1
    }
}

// runtime变量如下:
value = {Tuple3@10692} "(1,的,com.alibaba.alink.operator.batch.nlp.Word2VecTrainBatchOp$Word@13099fc)"
 f0 = {Integer@10693} 1
  value = 1
 f1 = "的"
  value = {char[1]@10700} 
  hash = 0
 f2 = {Word2VecTrainBatchOp$Word@10694} 
  cnt = 2
  point = {int[3]@10698} 
   0 = 8
   1 = 7
   2 = 5
  code = {int[3]@10699} 
   0 = 1
   1 = 0
   2 = 1

5.7 Инициализация модели

Инициализировать модель с уменьшенным словарем, то есть инициализировать все весовые параметры модели случайным образомθ, все векторы словw.

DataSet <Tuple2 <Integer, double[]>> initialModel = vocabWithoutWordStr
      .mapPartition(new initialModel(seed, vectorSize))
      .rebalance();

Теперь словарь такой: Tuple2, здесь используется только idx.

Окончательная инициализированная модель: . Размер веса по умолчанию равен 100.

Конкретный код

private static class initialModel
    extends RichMapPartitionFunction <Tuple2 <Integer, Word>, Tuple2 <Integer, double[]>> {
    private final long seed;
    private final int vectorSize;
    Random random;

    public initialModel(long seed, int vectorSize) {
      this.seed = seed;
      this.vectorSize = vectorSize;
      random = new Random();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      random.setSeed(seed + getRuntimeContext().getIndexOfThisSubtask());
    }

    @Override
    public void mapPartition(Iterable <Tuple2 <Integer, Word>> values,
                 Collector <Tuple2 <Integer, double[]>> out) throws Exception {
      for (Tuple2 <Integer, Word> val : values) {
        double[] inBuf = new double[vectorSize];

        for (int i = 0; i < vectorSize; ++i) {
          inBuf[i] = random.nextFloat();
        }

        // 发送 <每个单词在词典中的idx,随机初始化的系数>
        out.collect(Tuple2.of(val.f0, inBuf));
      }
    }
}

5.8 Расчет количества итераций

Теперь рассчитаем количество итераций обучения, которое является максимальным значением между «количество всех слов в обучающем корпусе / 100000L» и 5.

DataSet <Integer> syncNum = DataSetUtils
      .countElementsPerPartition(trainData)
      .sum(1)
      .map(new MapFunction <Tuple2 <Integer, Long>, Integer>() {
        @Override
        public Integer map(Tuple2 <Integer, Long> value) throws Exception {
          return Math.max((int) (value.f1 / 100000L), 5);
        }
      });

На данный момент узел предварительной обработки завершен: обработка входных данных, создание словаря и бинарного дерева. Следующий шаг – начать обучение.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0xFF

Вывод принципа word2vec и анализ кода

Модель представления глубины текста Word2Vec

Принцип word2vec (2) Иерархическая модель на основе Softmax

принцип word2vec (1) основа модели CBOW и Skip-Gram

Принцип модели word2vec (3), основанной на отрицательной выборке

обзор word2vec

Понимание Word2Vec

Написать word2vec самостоятельно (1): основные понятия и процессы

Напишите word2vec самостоятельно (2): подсчитайте частоту слов

Напишите word2vec самостоятельно (3): постройте дерево Хаффмана

Напишите сами word2vec (4): CBOW и модель skip-gram

Подробное объяснение математических принципов в Word2vec (1) Содержание и предисловие

Иерархическая модель на основе Softmax

Отрицательные модели на основе выборки

Анализ реализации алгоритма машинного обучения - анализ исходного кода word2vec

Анализ исходного кода Word2Vec

исходные идеи word2vec и ключевые переменные

Самый подробный анализ исходного кода Word2Vec (ниже)

исходные идеи word2vec и ключевые переменные