Программирование MapReduce, которому может научиться Сяобай
Возвращаясь к MapReduce
Мы знаем, что ядро Hadoop состоит из четырех основных компонентов:
- HDFS
- MapReduce
- YARN
- Common
HDFS: Распределенная система хранения
MapReduce: Распределенная вычислительная системаYARN: система планирования ресурсов Hadoop.Common: базовые компоненты поддержки трех вышеупомянутых компонентов, в основном предоставляющие базовые наборы инструментов и инфраструктуры RPC (удаленный вызов процедур, вызов серверных служб) и т. д.
А у MapReduce, как у одной из вычислительных систем, есть три основных понятия в крупномасштабной обработке данных:
Как справляться с обработкой больших данных: разделяй и властвуй
Для больших данных, которые не имеют вычислительных зависимостей друг от друга, наиболее естественным способом достижения параллелизма является принятие стратегии «разделяй и властвуй».
Переход к абстрактной модели: Mapper и Reducer
В методах параллельных вычислений, таких как MPI, отсутствует модель параллельного программирования высокого уровня.Чтобы преодолеть этот недостаток, MapReduce опирается на идею функционального языка Lisp и предоставляет абстрактную модель параллельного программирования высокого уровня с двумя функциями, Map и Уменьшать.
Подъем к архитектуре: унифицированная архитектура, сокрытие деталей системного уровня для программистов
Методы параллельных вычислений, такие как MPI, не поддерживаются унифицированной вычислительной средой, и программистам необходимо учитывать множество деталей, таких как хранение данных, разделение, распределение, сбор результатов, устранение ошибок и т. д. По этой причине MapReduce разрабатывает и обеспечивает унифицированную вычислительную среду. framework, который скрывает данные для программистов.Большинство деталей обработки на уровне системы
Абстрактное описание Карта и уменьшение
MapReduce опирается на идеи функционального языка программирования Lisp и определяет следующие два абстрактных интерфейса программирования, Map и Reduce, которые реализуются пользователями посредством программирования:
map: (k1; v1) -> [(k2; v2)]
Вход: данные, представленные парой ключ-значение (k1; v1) Обработка: записи данных документа (такие как строки в текстовом файле или строки в таблице данных) будут переданы в функцию карты как «пары ключ-значение»; функция карты обработает эти пары ключ-значение и вернет их в другой Набор промежуточных результатов пары ключ-значение, обработанных формальным выходом [(k2; v2)] Выход: набор промежуточных данных, представленных парами ключ-значение [(k2; v2)]
reduce: (k2; [v2]) -> [(k3; v3)]
Ввод: набор пар ключ-значение [(k2; v2)], выводимый картой, будет объединен для объединения различных значений под одним и тем же первичным ключом в список [v2], поэтому ввод сокращения равен (k2; [ v2] ) Обработка: Выполните некоторую сортировку или дальнейшую обработку входящих данных списка промежуточных результатов и создайте окончательный вывод результата в некоторой форме [(k3; v3)]. вывод: окончательный результат вывода [(k3; v3)]
Map и Reduce предоставляют программистам четкое абстрактное описание интерфейса операции.
резюме
- Каждая функция карты обрабатывает разделенные данные параллельно и генерирует разные промежуточные результаты из разных входных данных.
- Каждое сокращение также рассчитывается параллельно, и каждое из них отвечает за обработку разных наборов данных промежуточных результатов.
- Перед обработкой редукции необходимо дождаться завершения всех функций карты.Поэтому перед вводом редукции должен быть барьер синхронизации; этот этап также отвечает за сбор и перетасовку обработки промежуточных данных результата карты, так что редукция более эффективно вычисляет конечный результат
- Окончательный результат можно получить путем окончательного суммирования выходных результатов всех операций сокращения.
Детали программирования MapReduce
Простая программа WordCount
Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* LongWritable 偏移量 long,表示该行在文件中的位置,而不是行号
* Text map阶段的输入数据 一行文本信息 字符串类型 String
* Text map阶段的数据字符串类型 String
* IntWritable map阶段输出的value类型,对应java中的int型,表示行号
*/
public class WorkCountMap
extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* key 输入的 键
* value 输入的 值
* context 上下文对象
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("/t");//分词
for(String word : words) {
Text wordText = new Text(word);
IntWritable outValue = new IntWritable();
//写出
context.write(wordText, outValue);
}
}
}
- Угловые скобки — это дженерики JAVA, которые здесь ограничивают тип входных данных функции.
- В приведенном выше коде обратите внимание, что универсальный тип класса Mapper — это не базовый тип java, а тип данных Text и IntWritable Hadoop. Мы можем просто эквивалентно классу java String, int.
Дженерики класса Mapper в коде, в свою очередь,<k1,v1,k2,v2>
. Второй параметр метода карты — это текстовое содержимое строки, которое нас интересует. Основной код состоит в том, чтобы разделить текстовое содержимое строки по пробелам, извлечь данные каждой строки, использовать слово в качестве нового ключа и число в качестве нового значения и записать его в контекст.context
середина. Здесь, поскольку существует несколько наборов данных, каждый набор будет выводить<wordText, outValue>
пара ключ-значение.
Reducer
Вход стадии сокращения является выходом стадии картографа.
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Text 数据类型:字符串类型 String
* IntWritable reduce阶段的输入类型 int
* Text reduce阶段的输出数据类型 String类型
* IntWritable 输出词频个数 Int型
*/
public class WorkCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* key 输入的 键
* value 输入的 值
* context 上下文对象,用于输出键值对
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> value,
Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable number : value) {
sum += number.get();
}
//单词 个数 hadoop,10
context.write(key, new IntWritable(sum));
}
}
основная функция
Вычислительная задача в Hadoop называется job, основная функция в основном отвечает за создание нового объекта Job и установку для него соответствующих классов Mapper и Reducer, а также входных и выходных путей и т.д.
public static void main(String[] args) throws Exception
{ //为任务设定配置文件
Configuration conf = new Configuration();
//命令行参数
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{ System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, “word count”); //新建一个用户定义的Job
job.setJarByClass(WordCount.class); //设置执行任务的jar
job.setMapperClass(WorkCountMap.class); //设置Mapper类
job.setCombinerClass(WorkCountReduce.class); //设置Combine类
job.setReducerClass(WorkCountReduce.class); //设置Reducer类
job.setOutputKeyClass(Text.class); //设置job输出的key
//设置job输出的value
job.setOutputValueClass(IntWritable.class);
//设置输入文件的路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//设置输出文件的路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//提交任务并等待任务完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Завершение основного рабочего механизма MapReduce
всегда говори
Полная программа mapreduce выполняется распределенным образом с двумя типами процессов экземпляра:
- MRAppMaster: отвечает за планирование процесса и координацию состояния всей программы (процесс находится на узле пряжи)
- Ярнчайлд: отвечает за весь процесс обработки данных на этапе карты.
- Ярнчайлд: отвечает за весь поток обработки данных на этапе сокращения.
Процессы maptask и reducetask на двух вышеприведенных стадиях являются yarnchild, что не означает, что maptask и reducetask выполняются в одном и том же процессе yarnchild (процесс Yarnchild находится на узле, выполняющем команду).
Работающий процесс программы MapReduce
- Когда запускается программа MapReduce, сначала запускается MRAppMaster. После запуска MRAppMaster вычисляет необходимое количество экземпляров maptask в соответствии с информацией описания этого задания, а затем применяет к кластеру запуск соответствующего количества процессов maptask.
- После запуска процесса maptask обработка данных выполняется в соответствии с заданным срезом данных (какой диапазон смещения какого файла), и основной процесс: A. Используйте входной формат, указанный клиентом, чтобы получить данные, считанные с помощью RecordReader, для формирования входной пары KV. B. Передать входную пару KV методу map(), определенному заказчиком, выполнить логические операции и собрать пару KV, выводимую методом map(), в кеш. C. После сортировки пар KV в кеше по разделу K переполнение непрерывно записывается в файл на диске (превышение объема кеша и запись во временный файл на диске, и, наконец, запись в файл, после того, как редюсер получит файл, удалите его)
- MRAppMaster следит за выполнением всех задач процесса maptask (в действительности, после обработки некоторых процессов maptask, запускает reducetask для выборки данных из выполненной maptask), и запускает соответствующее количество процессов reducetask по заданным заказчиком параметрам , И скажите процессу reducetask обработать диапазон данных (раздел данных)
- После запуска процесса Reducetask в соответствии с расположением данных, подлежащих обработке, о которых сообщает MRAppMaster, с машин, на которых запущено несколько задач maptask, получают несколько выходных файлов результатов maptask, повторно объединяют и сортируют локально, а затем группируют в соответствии с KV того же ключа Вызовите метод reduce(), определенный клиентом, для выполнения логической операции, соберите результат KV вывода операции, а затем вызовите outputformat, указанный клиентом, для вывода данных результата во внешнее хранилище.
Механизм принятия решения о параллелизме Maptask
Параллелизм maptask определяет параллелизм обработки задачи на этапе карты, что, в свою очередь, влияет на скорость обработки всего задания.
Параллелизм фазы карты задания определяется клиентом при отправке задания и планом клиента по параллелизму фазы карты.
Основная логика такова:
Выполнение логических срезов данных для обработки (то есть, в соответствии с определенным размером слайса, разделение данных для обработки на несколько логических сплитов), а затем выделение параллельного экземпляра mapTask для каждого сплита для обработки
Эта логика и результирующий файл описания планирования слайсов определяютсяFileInputFormat
Это делается методом getSplits() класса реализации.
Этот метод возвращаетList<InputSplit>
, InputSplit инкапсулирует информацию о каждом логическом срезе, включая информацию о длине и положении, а метод getSplits() возвращает набор InputSplit.