Решение Hive Hive Hundred Billion для обработки данных

искусственный интеллект Большие данные

Анализ проблемы перекоса данных

Перекос данных - неизбежная проблема распределенных систем. Любая распределенная система имеет шанс перекоса данных, но некоторые мелкие партнеры не очень четко это ощущают в своей обычной работе. Обратите внимание на название этой статьи здесь-"Сотни миллиардов данных",Почему вы говорите миллиарды, потому что, если объем данных задачи составляет всего несколько миллионов, даже если данные искажены, все данные будут запущены на одну машину для выполнения. В это время перекос данных для нас не заметен. Только когда данные достигают порядка величины, машина не может обработать такой объем данных.Если произойдет перекос данных, будет сложно вычислить результат в конце.

Первый публичный аккаунт этой статьи [Learn Big Data in Five Minutes]

Следовательно, нам нужно оптимизировать проблему перекоса данных и попытаться избежать или уменьшить влияние перекоса данных.

Прежде чем решать проблему перекоса данных, я должен упомянуть еще одну вещь: говорить об оптимизации без узких мест — значит напрашиваться на неприятности.

Подумайте об этом, в двух фазах карты и сокращения наиболее подвержена перекосу данных фаза уменьшения, потому что карта, которую нужно уменьшить, пройдет фазу перемешивания, а при перемешивании хэш будет выполняться в соответствии с ключом. по умолчанию.Если одинаковых ключей слишком много, результатом хэширования является то, что большое количество одинаковых ключей входит в один и тот же срез., что приводит к искажению данных.

Так возможен ли перекос данных на этапе карты?Такая возможность есть.

В задаче файл данных будет разделен перед входом в этап карты.По умолчанию блок данных 128M, но еслиПри использовании метода сжатия, такого как сжатие GZIP, для файлов, которые не поддерживают операции разделения файлов.Когда задача MR читает сжатый файл, он не может быть сегментирован, и сжатый файл может быть прочитан только одной задачей.Происходит перекос данных на этапе сопоставления.

Итак, по существу,Существует две причины перекоса данных: во-первых, в задаче необходимо обработать большой объем данных с одним и тем же ключом. Вторая — задача чтения неделимых больших файлов.

Решения для искажения данных

Принцип решения проблемы перекоса данных в MapReduce и Spark аналогичен. Ниже обсуждается перекос данных, вызванный Hive с использованием механизма MapReduce. Перекос данных Spark также можно использовать в качестве справочного материала.

1. Перекос данных, вызванный нулевыми значениями

В реальном бизнесе существует большое количество нулевых значений или некоторых бессмысленных данных, участвующих в операции вычисления.В таблице большое количество нулевых значений.Если между таблицами выполняется операция соединения, будет сгенерировано перемешивание, так что будут распределены все нулевые значения.При редукции неизбежно произойдет перекос данных.

Друг спрашивал раньше, если таблицы A и B объединены, если поле, которое необходимо объединить в таблице A, равно нулю, но поле, которое необходимо объединить в таблице B, не равно нулю, эти два поля не могут быть объединены в все, почему?

Здесь нам нужно прояснить концепцию.Причина, по которой данные помещаются в один и тот же редуктор, заключается не в том, что поля могут быть объединены или нет, а из-за хеш-операции на этапе перемешивания.Пока результат хеширования ключа то же самое, они будут тянуться к тому же.

решение:

Первый: вы можете напрямую запретить нулевому значению участвовать в операции соединения, то есть не позволять нулевому значению иметь фазу перемешивания

SELECT *
FROM log a
	JOIN users b
	ON a.user_id IS NOT NULL
		AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;

Второе: поскольку результаты хеширования нулевых значений, участвующих в тасовке, одинаковы, то мы можем присвоить нулевым значениям случайные значения, чтобы их результаты хеширования были разными, и они попали в разные редюсеры:

SELECT *
FROM log a
	LEFT JOIN users b ON CASE 
			WHEN a.user_id IS NULL THEN concat('hive_', rand())
			ELSE a.user_id
		END = b.user_id;

2. Неравномерность данных, вызванная разными типами данных

Для соединения двух таблиц ключ поля, которое должно быть объединено в таблице a, имеет тип int, а ключевое поле в таблице b имеет как тип string, так и тип int. Когда операция соединения двух таблиц выполняется в соответствии с ключом, хеш-операция по умолчанию будет выделяться в соответствии с идентификатором типа int, так что всем строковым типам назначается один и тот же идентификатор, и в результате все поля строкового типа введите a При уменьшении возникает перекос данных.

решение:

Если ключевое поле имеет как строковый тип, так и тип int, хэш по умолчанию будет выделен в соответствии с типом int, тогда мы можем напрямую преобразовать тип int в строку, чтобы все ключевые поля были строками, а хэш был бы выделен в соответствии с типом строки.

SELECT *
FROM users a
	LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);

3. Перекос данных, вызванный неразделенными большими файлами

Когда объем данных кластера увеличивается до определенного масштаба, и некоторые данные необходимо архивировать или создавать дамп, данные часто сжимаются;При использовании метода сжатия, такого как сжатие GZIP, для файлов, которые не поддерживают операции разделения файлов, когда задание предполагает чтение сжатого файла в будущем, сжатый файл будет прочитан только одной задачей.. Если сжатый файл большой, обработка карты файла займет гораздо больше времени, чем чтение карты обычного файла, и задача карты станет узким местом задания. Эта ситуация также является перекосом данных в файле чтения карты.

решение:

У этой проблемы перекоса данных нет хорошего решения.Вы можете конвертировать только файлы, которые не поддерживают разделение файлов, например сжатие GZIP, в методы сжатия, поддерживающие разделение файлов, такие как bzip и zip.

так,Когда мы сжимаем файлы, чтобы избежать наклона чтения данных, вызванного неразделяемыми большими файлами, мы можем использовать алгоритмы сжатия, такие как bzip2 и Zip, которые поддерживают сегментацию файлов при сжатии данных..

4. Перекос данных, вызванный инфляцией данных

При расчете многомерной агрегации, если слишком много полей для группировки агрегации, следующим образом:

select a,b,c,count(1)from log group by a,b,c with rollup;

Примечание: напоследокwith rollupЯ не знаю, использовали ли вы ключевое слово, которое используется для выполнения статистической сводки на основе сгруппированных статистических данных, то есть для получения сводной информации о группировке.

Если объем данных в приведенной выше таблице журнала велик, а агрегация на стороне карты не может хорошо выполнять сжатие данных, выходные данные на стороне карты будут быстро увеличиваться, что может легко привести к исключению переполнения памяти задания. Если таблица журнала содержит ключ искажения данных, искажение данных в процессе тасования будет усугубляться.

решение:

Приведенный выше sql можно разделить, аwith rollupРазделить на следующий sql:

SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;

SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;

SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;

SELECT NULL, NULL, NULL, COUNT(1)
FROM log;

Однако вышеописанный способ не очень хорош, так как теперь групповая агрегация выполняется по полям 3. Если это поля 5 или 10, то будет больше операторов SQL, которые нужно разобрать.

В Hive вы можете передавать параметрыhive.new.job.grouping.set.cardinalityМетод конфигурации автоматически контролирует демонтаж задания.Значение этого параметра по умолчанию равно 30. Указывает операции для многомерных агрегаций, таких как группировка наборов/сверток/кубов. Если последняя разобранная комбинация клавиш превышает это значение, новая задача будет включена для обработки комбинаций, отличных от этого значения. Если столбец группирующего агрегирования имеет большой перекос при обработке данных, вы можете соответствующим образом скорректировать значение.

5. Перекос данных, вызванный объединением таблиц

Когда две таблицы выполняют общее соединение с перераспределением, если ключи соединения таблиц искажены, во время фазы перемешивания неизбежно произойдет искажение данных.

решение:

Обычной практикой является сохранение искаженных данных в распределенном кеше и их распределение по узлам, где находится каждая задача Map. Операция соединения, то есть MapJoin, завершается на этапе сопоставления, что позволяет избежать случайного воспроизведения и, таким образом, избежать перекоса данных.

MapJoin — это операция оптимизации Hive,Он подходит для сценариев, в которых маленькие таблицы СОЕДИНЯЮТСЯ с большими таблицами., так как операция JOIN таблицы выполняется на стороне Map и в памяти, для нее не требуется запускать задачу Reduce и не нужно проходить стадию перемешивания, что может в определенной степени сэкономить ресурсы и улучшить ПРИСОЕДИНЯЙТЕСЬ к эффективности.

До версии Hive 0.11, если вы хотите завершить операцию соединения на этапе сопоставления, вы должны использовать MAPJOIN, чтобы явно отметить и запустить операцию оптимизации.Поскольку необходимо загрузить небольшую таблицу в память, обратите внимание на размер маленькой таблицы..

Если таблица помещается в память на стороне карты для выполнения, ее необходимо записать следующим образом до версии Hive 0.11:

select /* +mapjoin(a) */ a.id , a.name, b.age 
from a join b 
on a.id = b.id;

Если вы хотите поместить несколько таблиц в память на стороне карты, вам нужно всего лишь написать несколько имен таблиц в mapjoin(), разделенных запятыми.Например, если вы поместите таблицу и c-таблицу в память на стороне карты,/* +mapjoin(a,c) */.

В Hive версии 0.11 и более поздних версиях Hive запускает эту оптимизацию по умолчанию, то есть он не использует тег MAPJOIN, когда его необходимо отобразить. Он запускает операцию оптимизации для преобразования обычного JOIN в MapJoin, когда это необходимо. Оптимизация может быть устанавливается следующими двумя свойствами Время запуска:

hive.auto.convert.join=trueЗначение по умолчанию — true, что автоматически включает оптимизацию MAPJOIN.

hive.mapjoin.smalltable.filesize=2500000Значение по умолчанию 2500000 (25M).При настройке этого свойства определяется размер таблицы с использованием этой оптимизации.Если размер таблицы меньше этого значения, она будет загружена в память.

Уведомление: Используйте способ запуска оптимизации по умолчанию. Если есть необъяснимая ошибка (например, MAPJOIN не работает), установите для следующих двух свойств значение false и вручную используйте флаг MAPJOIN, чтобы начать оптимизацию:

hive.auto.convert.join=false(Отключите автоматические операции преобразования MAPJOIN)

hive.ignore.mapjoin.hint=false(не игнорирует тег MAPJOIN)

Еще одно слово: При размещении таблицы в Map-side memory, если память узла очень большая, но все равно происходит переполнение памяти, мы можем передать этот параметрmapreduce.map.memory.mbОтрегулируйте размер памяти на стороне карты.

6. Действительно невозможно уменьшить перекос данных, вызванный объемом данных.

В некоторых операциях у нас нет возможности уменьшить количество данных, например, при использовании функции collect_list:

select s_age,collect_list(s_score) list_score
from student
group by s_age

collect_list: преобразовать столбец в группе в массив и вернуть его.

В приведенном выше sql s_age имеет перекос данных, но если объем данных достаточно велик, это вызовет исключение переполнения памяти для перекошенной задачи Reduce.

collect_list выводит массив, а промежуточные результаты будут помещены в память, поэтому, если collect_list агрегирует слишком много данных, это вызовет переполнение памяти.

Некоторые друзья сказали, что это перекос данных, вызванный группировкой, которую можно включить.hive.groupby.skewindataпараметры для оптимизации. Давайте проанализируем следующее:

При включении этой конфигурации задание будет разделено на два задания. Первое задание будет максимально равномерно распределять данные карты на этапе сокращения и предварительно агрегировать данные на этом этапе, чтобы уменьшить объем данных, обрабатываемых вторым заданием. Второе задание агрегирует результаты на основе данных, обработанных первым заданием.

hive.groupby.skewindataОсновная роль заключается в том, что первое созданное задание может эффективно сократить количество. Однако для таких функций, как collect_list, требующих полной обработки промежуточных результатов всех данных, это явно не работает, наоборот, введение новых заданий увеличивает нагрузку на дисковый и сетевой ввод-вывод, что приводит к еще большему снижению производительности. .

решение:

Самый простой способ решить эту проблему — настроить размер памяти для выполнения сокращения.

Отрегулируйте размер памяти, чтобы уменьшитьmapreduce.reduce.memory.mbэта конфигурация.

Суммировать

Из вышеизложенного мы выяснили, чтоФаза перемешивания — убийца производительности, почему вы говорите, что, с одной стороны, этап тасования, скорее всего, вызовет перекос данных; с другой стороны, процесс тасования будет генерировать большой объем дискового ввода-вывода, сетевого ввода-вывода, сжатия, распаковка, сериализация и десериализация и т.д. Эти операции могут серьезно повлиять на производительность.

Таким образом, есть много преимуществ настройки при случайном перемешивании и перекосе данных:

  • Насколько велик буфер на стороне картографа? Установка большого буфера может повысить производительность и уменьшить дисковый ввод-вывод, но

Это требует памяти и оказывает давление на GC; если Buffer установлен маленьким, он может не занимать так много памяти, но может Может частый дисковый ввод-вывод, частый сетевой ввод-вывод.


--END--