0x00 сводка
Alink — это платформа алгоритмов машинного обучения нового поколения, разработанная Alibaba на основе вычислительного движка реального времени Flink, и первая в отрасли платформа машинного обучения, которая поддерживает как пакетные, так и потоковые алгоритмы. В этой статье вы проанализируете реализацию MultiStringIndexer в Alink.
Поскольку общедоступная информация Alink слишком мала, нижеследующее - все предположения, и обязательно будут упущения и ошибки. Я надеюсь, что все укажут, и я обновлю их в любое время.
Цель этой статьи — проанализировать GBDT и обнаружить, что GBDT включает использование MultiStringIndexer, поэтому мы можем сначала проанализировать только MultiStringIndexer.
0x01 Концепция
Официальное представление Alink: Роль обучающего компонента MultiStringIndexer заключается в обучении модели отображению строк с несколькими столбцами в целые числа.
В частности, StringIndexer (преобразование строкового индекса) кодирует «столбец строк» меток как «столбец индексов меток».
- Диапазон значений последовательности индексов меток: [0, numLabels (сумма всех слов, которые появляются в строке после удаления повторяющихся слов)], отсортированных по частоте появления метки и индексу отображаемой метки. максимальное значение равно 0 (в частности, можно настроить порядок возрастания и убывания).
- Если ввод является числовым, мы сначала сопоставляем числовое значение со строкой, а затем индексируем строку.
- Если нижестоящему конвейеру (например, Estimator или Transformer) необходимо использовать индексированную последовательность меток, необходимо указать имя входного столбца этого конвейера в качестве имени индексированной последовательности. В большинстве случаев имена входных столбцов устанавливаются через setSelectedCols.
Возьмите эти входы в качестве примера:
("football", "can"),
("football", "hhh"),
("football", "zzz"),
("basketball", "zzz"),
("basketball", "can"),
("tennis", "can")
Для первого столбца MultiStringIndexer перенумеровывает метки набора данных. По частоте появления метки преобразуется в 0 ~ numOfLabels - 1 (количество категорий). Если он отсортирован от высокого к низкому, преобразование с самой высокой частотой равно 0 и т. д., например:
- футбол, имеет наибольшее количество вхождений, появляется 3 раза, а переход (число) равен 0
- Далее идет баскетбол, который появляется 2 раза и имеет номер 1 и так далее.
После использования StringIndexer для перенумерации меток данные обучаются с этими пронумерованными метками, а затем прогнозируются другие данные для получения результатов прогнозирования.Метки результатов прогнозирования также перенумеровываются, поэтому их необходимо преобразовать обратно.
Пример кода 0x02
Пример кода выглядит следующим образом В этом примере кода он расположен в порядке возрастания, то есть общее количество футбольных мячей равно 3, затем его idx равно 3, количество теннисных мячей равно 1, а его idx равно 0:
public class MultiStringIndexerExample {
static AlgoOperator getData(boolean isBatch) {
Row[] array = new Row[] {
Row.of("football", "can"),
Row.of("football", "hhh"),
Row.of("football", "zzz"),
Row.of("basketball", "zzz"),
Row.of("basketball", "can"),
Row.of("tennis", "can")
};
if (isBatch) {
return new MemSourceBatchOp(
Arrays.asList(array), new String[] {"a", "b"});
} else {
return new MemSourceStreamOp(
Arrays.asList(array), new String[] {"a", "b"});
}
}
public static void main(String[] args) throws Exception {
BatchOperator data = (BatchOperator)getData(true);
MultiStringIndexer stringindexer = new MultiStringIndexer()
.setSelectedCols("a", "b")
.setOutputCols("a_indexed", "b_indexed")
.setStringOrderType("frequency_asc");
stringindexer.fit(data).transform(data).print();
}
}
Результат выглядит следующим образом:
a|b|a_indexed|b_indexed
-|-|---------|---------
football|can|2|2
football|hhh|2|0
football|zzz|2|1
basketball|zzz|1|1
basketball|can|1|2
tennis|can|0|2
Преобразованный в таблицу, чтобы видеть более четко.
a | b | a_indexed | b_indexed |
---|---|---|---|
football | can | 2 | 2 |
football | hhh | 2 | 0 |
football | zzz | 2 | 1 |
basketball | zzz | 1 | 1 |
basketball | can | 1 | 2 |
tennis | can | 0 | 2 |
0x03 Общая логика
Сначала мы даем блок-схему
Старая процедура, мы начинаем майнинг с MultiStringIndexerTrainBatchOp.linkFrom.
@Override
public MultiStringIndexerTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
BatchOperator<?> in = checkAndGetFirst(inputs);
// 示例中有 .setSelectedCols("a", "b"),这里是取出具体列名字
final String[] selectedColNames = getSelectedCols();
// 获取列的类型
final String[] selectedColSqlType = new String[selectedColNames.length];
for (int i = 0; i < selectedColNames.length; i++) {
selectedColSqlType[i] = FlinkTypeConverter.getTypeString(
TableUtil.findColTypeWithAssertAndHint(in.getSchema(), selectedColNames[i]));
}
// runtime打印数据
selectedColNames = {String[2]@2536}
0 = "a"
1 = "b"
selectedColSqlType = {String[2]@2537}
0 = "VARCHAR"
1 = "VARCHAR"
// 获取选取列对应的数据
DataSet<Row> inputRows = in.select(selectedColNames).getDataSet();
//
DataSet<Tuple3<Integer, String, Long>> indexedToken =
StringIndexerUtil.indexTokens(inputRows, getStringOrderType(), 0L, true);
DataSet<Row> values = indexedToken
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, String, Long>, Row>() {
@Override
public void mapPartition(Iterable<Tuple3<Integer, String, Long>> values, Collector<Row> out)
throws Exception {
Params meta = null;
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
// 第一个task会做这个计算,就是把列名,列类型作为元数据传送
meta = new Params().set(HasSelectedCols.SELECTED_COLS, selectedColNames)
.set(HasSelectedColTypes.SELECTED_COL_TYPES, selectedColSqlType);
}
// runtime打印数据
meta = {Params@9311} "Params {selectedCols=["a","b"], selectedColTypes=["VARCHAR","VARCHAR"]}"
params = {HashMap@9316} size = 2
new MultiStringIndexerModelDataConverter().save(Tuple2.of(meta, values), out);
}
})
.name("build_model");
this.setOutput(values, new MultiStringIndexerModelDataConverter().getModelSchema());
return this;
}
Общая логика тренировочного процесса сводится к следующему:
- Выньте конкретное имя столбца, тип столбца;
- Получить данные, соответствующие «выбранному столбцу»;
- Передайте имя столбца, тип столбца в качестве метаданных;
-
StringIndexerUtil.indexTokens
Назначьте последовательные индексы разным строкам в каждом столбце. Индексы каждого столбца не связаны друг с другом;- позвонить
indexSortedByFreq(data, startIndex, ignoreNull, true)
, функция заключается в присвоении последовательных индексов разным строкам в каждом столбце, а индексы сортируются в соответствии с частотой появления строк;- Функция вызова countTokens состоит в том, чтобы объединить и вычислить количество слов в соответствии с «idx столбца», «слово» и получить , например, количество слова футбол в первом столбце — 3, возвращаемая тройка — , где idx столбца начинается с 0.
- Вызовите flattenTokens, чтобы разбить строку входных данных и вернуть набор данных, состоящий из кортежей индекса столбца и маркера, то есть . Например, для ввода Row.of("футбол", "можно"), flattenTokens выводит два Tuple2 , и .
- Выполните операцию сопоставления с приведенной выше структурой и выведите
, например ; - Группировать по "idx столбца", "слову";
- Объедините и подсчитайте количество слов в соответствии с «idx столбца», «слово»;
- indexSortedByFreq будет обрабатывать результаты, возвращенные countTokens ;
- Первая группа по столбцу idx;
- Затем на основе приведенных выше результатов отсортируйте по количеству слов;
- Отсортированный индекс начинается с входного параметра startIndex, где startIndex равен 0;
- Наконец, первый столбец (0,футбол,0), (0,баскетбол,1), (0,футбол,2); второй столбец данных (1,ччч,0), (1,ззз,1) , (1, может, 2);
- Функция вызова countTokens состоит в том, чтобы объединить и вычислить количество слов в соответствии с «idx столбца», «слово» и получить , например, количество слова футбол в первом столбце — 3, возвращаемая тройка — , где idx столбца начинается с 0.
- позвонить
- Сохраните результат indexTokens как модель, которая использует «имя столбца, тип столбца в качестве метаданных», упомянутые ранее.
Два последних этапа подробно анализируются ниже.
0x04 Add Index to Token
Эта часть предназначена для присвоения последовательных индексов разным строкам каждого столбца. Индексы для каждого столбца не связаны друг с другом.
В частности, это делается с помощью StringIndexerUtil.indexTokens.
public static DataSet<Tuple3<Integer, String, Long>> indexTokens(
DataSet<Row> data, HasStringOrderTypeDefaultAsRandom.StringOrderType orderType,
final long startIndex, final boolean ignoreNull) {
case FREQUENCY_ASC:
return indexSortedByFreq(data, startIndex, ignoreNull, true);
}
4.1 Объединить количество слов
indexSortedByFreq вызовет countTokens для подсчета количества слов, поэтому сначала рассмотрим countTokens.
Функция countTokens состоит в том, чтобы объединить и подсчитать количество слов в соответствии с «idx столбца» и «слова». Например, в первом столбце количество слова футбол равно 3, тогда возвращенная тройка , где idx столбца начинается с 0.
Конкретная логика выглядит следующим образом:
- Вызовите flattenTokens, чтобы разбить строку входных данных и вернуть набор данных, состоящий из кортежей индекса столбца и маркера, то есть . Например, для ввода Row.of("футбол", "можно"), flattenTokens выводит два Tuple2 , и .
- Выполните операцию сопоставления с приведенными выше результатами и выведите
, например , это обычная операция подсчета. - Группировать по "idx столбца", "слову";
- В соответствии с «столбец idx», «слово» для слияния и подсчета количества слов, то есть для продолжения слияния выше 1L.
4.1.1 Разбить входные данные
Функция flattenTokens состоит в том, чтобы разбить строку входных данных и вернуть набор данных, состоящий из кортежей индекса столбца и токена.
Например, для ввода Row.of("футбол", "можно"), flattenTokens используетout.collect(Tuple2.of(i, String.valueOf(o)));
Выведите два Tuple2.
value = {Row@9212} "football,can"
fields = {Object[2]@9215}
0 = "football"
1 = "can"
输出 <0, "football"> 和 <1, "can">
4.1.2 Количество групповых расчетов
Это делается с помощью серии операций map, groupBy и reduce над результатами flattenTokens.
Конкретный код выглядит следующим образом:
public static DataSet<Tuple3<Integer, String, Long>> countTokens(DataSet<Row> data, final boolean ignoreNull) {
return flattenTokens(data, ignoreNull) // 把输入数据 Row 给打散
.map(new MapFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Long>>() {
@Override
public Tuple3<Integer, String, Long> map(Tuple2<Integer, String> value) throws Exception {
return Tuple3.of(value.f0, value.f1, 1L); // 输出<column idx, word, 1L>,比如 <0, "football", 1L>
}
})
.groupBy(0, 1) // 按照 "列idx","word" 来分组
.reduce(new ReduceFunction<Tuple3<Integer, String, Long>>() {
@Override
public Tuple3<Integer, String, Long> reduce(Tuple3<Integer, String, Long> value1, Tuple3<Integer, String, Long> value2) throws Exception {
value1.f2 += value2.f2;
return value1; // 按照 "列idx","word" 来合并计算单词个数
}
})
.name("count_tokens");
}
// reduce之后发出
value1 = {Tuple3@9284} "(0,football,3)"
f0 = {Integer@9226} 0
f1 = "football"
f2 = {Long@9295} 3
4.2 Объединить количество слов
Возвращаемый триплет countTokens выше — это
indexSortedByFreq будет обрабатывать результаты, возвращенные countTokens ;
- Первая группа по столбцу idx;
- Затем на основе приведенных выше результатов отсортируйте по количеству слов;
- Отсортированный индекс начинается с входного параметра startIndex, где startIndex равен 0;
- Наконец, первый столбец (0,теннис,0), (0,баскетбол,1), (0,футбол,2); второй столбец данных (1,ччч,0), (1,ззз, 1) , (1,может,2);
Конкретный код выглядит следующим образом:
public static DataSet<Tuple3<Integer, String, Long>> indexSortedByFreq(
DataSet<Row> data, final long startIndex, final boolean ignoreNull, final boolean isAscending) {
return countTokens(data, ignoreNull)
.groupBy(0) //按照 列idx 做分组
.sortGroup(2, isAscending ? Order.ASCENDING : Order.DESCENDING) //按照单词个数排序
.reduceGroup(new GroupReduceFunction<Tuple3<Integer, String, Long>, Tuple3<Integer, String, Long>>() {
@Override
public void reduce(Iterable<Tuple3<Integer, String, Long>> values,
Collector<Tuple3<Integer, String, Long>> out) {
long id = startIndex;
for (Tuple3<Integer, String, Long> value : values) {
out.collect(Tuple3.of(value.f0, value.f1, id++)); // 归并
}
}
});
}
Модель вывода 0x05
Эта часть разделена на две части:
- Выходные метаданные — это «имя столбца, тип столбца в качестве метаданных», полученные ранее.
- Выведите информацию о каждом слове каждого столбца, например (0,теннис,0), (0,баскетбол,1), (0,футбол,2) в первом столбце; данные во втором столбце (1,ччч, 0 ), (1,zzz,1), (1,can,2);
public class MultiStringIndexerModelDataConverter implements
ModelDataConverter<Tuple2<Params, Iterable<Tuple3<Integer, String, Long>>>, MultiStringIndexerModelData> {
@Override
public void save(Tuple2<Params, Iterable<Tuple3<Integer, String, Long>>> modelData, Collector<Row> collector) {
if (modelData.f0 != null) {
collector.collect(Row.of(-1L, modelData.f0.toJson(), null));
}
modelData.f1.forEach(tuple -> {
collector.collect(Row.of(tuple.f0.longValue(), tuple.f1, tuple.f2));
});
}
}
tuple = {Tuple3@9405} "(0,tennis,0)"
f0 = {Integer@9406} 0
f1 = "tennis"
f2 = {Long@9408} 0
0x06 предсказание
Функция прогнозирования выполняется в ModelMapperAdapter.
public class ModelMapperAdapter extends RichMapFunction<Row, Row> implements Serializable {
private final ModelMapper mapper;
private final ModelSource modelSource;
@Override
public void open(Configuration parameters) throws Exception {
List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext());
this.mapper.loadModel(modelRows); //加载模型
}
@Override
public Row map(Row row) throws Exception {
return this.mapper.map(row); //预测
}
}
6.1 Загрузка модели
В MultiStringIndexerModelDataConverter мы загрузим модель.
- Метаинформация загружается первой
- Далее информация о модели будет загружаться одна за другой.
public MultiStringIndexerModelData load(List<Row> rows) {
MultiStringIndexerModelData modelData = new MultiStringIndexerModelData();
modelData.tokenAndIndex = new ArrayList<>();
modelData.tokenNumber = new HashMap<>();
for (Row row : rows) {
long colIndex = (Long) row.getField(0);
if (colIndex < 0L) { // 元数据
modelData.meta = Params.fromJson((String) row.getField(1));
} else { // 具体模型信息
int columnIndex = ((Long) row.getField(0)).intValue();
Long tokenIndex = Long.valueOf(String.valueOf(row.getField(2)));
modelData.tokenAndIndex.add(Tuple3.of(columnIndex, (String) row.getField(1), tokenIndex));
modelData.tokenNumber.merge(columnIndex, 1L, Long::sum); // 合并列数据个数
}
}
// To ensure that every columns has token number.
int numFields = 0;
if (modelData.meta != null) {
numFields = modelData.meta.get(HasSelectedCols.SELECTED_COLS).length;
}
for (int i = 0; i < numFields; i++) {
modelData.tokenNumber.merge(i, 0L, Long::sum);
}
return modelData;
}
Окончательное содержимое модели выглядит следующим образом, где tokenNumber представляет количество данных в каждом столбце, tokenAndIndex представляет конкретную информацию, например (0, теннис, 0), (0, баскетбол, 1), (0, футбол, 2). ) означает, что они все В первом столбце преобразованные данные баскетбольного мяча равны 1:
modelData = {MultiStringIndexerModelData@9348}
meta = {Params@9440} "Params {selectedCols=["a","b"], selectedColTypes=["VARCHAR","VARCHAR"]}"
tokenAndIndex = {ArrayList@9360} size = 6
0 = {Tuple3@9472} "(0,football,2)"
1 = {Tuple3@9511} "(0,tennis,0)"
2 = {Tuple3@9512} "(1,zzz,1)"
3 = {Tuple3@9513} "(1,hhh,0)"
4 = {Tuple3@9514} "(0,basketball,1)"
5 = {Tuple3@9515} "(1,can,2)"
tokenNumber = {HashMap@9385} size = 2
{Integer@9507} 0 -> {Long@9508} 3
{Integer@9509} 1 -> {Long@9508} 3
numFields = 2
6.2 Прогноз
Прогнозирование выполняется в MultiStringIndexerModelMapper.
// 假设输入是:row = {Row@9309} "football,can"
// 选择的列是:selectedColNames = {String[2]@9314} 0 = "a" 1 = "b"
// 模型映射器是:
this = {MultiStringIndexerModelMapper@9309}
indexMapper = {HashMap@9318} size = 2
{Integer@9357} 0 -> {HashMap@9314} size = 3
key = {Integer@9357} 0
value = 0
value = {HashMap@9314} size = 3
"basketball" -> {Long@9386} 1
"football" -> {Long@9332} 2
"tennis" -> {Long@9384} 0
{Integer@9352} 1 -> {HashMap@9358} size = 3
key = {Integer@9352} 1
value = 1
value = {HashMap@9358} size = 3
"can" -> {Long@9332} 2
"hhh" -> {Long@9384} 0
"zzz" -> {Long@9386} 1
Пройдя следующий код, вы, наконец, можете делать прогнозы
public Row map(Row row) throws Exception {
Row result = new Row(selectedColNames.length);
for (int i = 0; i < selectedColNames.length; i++) {
Map<String, Long> mapper = indexMapper.get(i);
int colIdxInData = selectedColIndicesInData[i];
Object val = row.getField(colIdxInData);
String key = val == null ? null : String.valueOf(val);
Long index = mapper.get(key);
if (index != null) {
result.setField(i, index); // 我们主要执行在这里
} else {
}
}
// 最后预测结果是:
row = {Row@9308} "football,can"
result = {Row@9313} "2,2"
return outputColsHelper.getResultRow(row, result);
}
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat: мысли Росси
Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.