Программирование MapReduce, которому может научиться Сяобай

Большие данные

Программирование MapReduce, которому может научиться Сяобай

Возвращаясь к MapReduce

Мы знаем, что ядро ​​Hadoop состоит из четырех основных компонентов:

  • HDFS
  • MapReduce
  • YARN
  • Common

HDFS: Распределенная система хранения

MapReduce: Распределенная вычислительная системаYARN: система планирования ресурсов Hadoop.Common: базовые компоненты поддержки трех вышеупомянутых компонентов, в основном предоставляющие базовые наборы инструментов и инфраструктуры RPC (удаленный вызов процедур, вызов серверных служб) и т. д.

А у MapReduce, как у одной из вычислительных систем, есть три основных понятия в крупномасштабной обработке данных:

Как справляться с обработкой больших данных: разделяй и властвуй

Для больших данных, которые не имеют вычислительных зависимостей друг от друга, наиболее естественным способом достижения параллелизма является принятие стратегии «разделяй и властвуй».

Переход к абстрактной модели: Mapper и Reducer

В методах параллельных вычислений, таких как MPI, отсутствует модель параллельного программирования высокого уровня.Чтобы преодолеть этот недостаток, MapReduce опирается на идею функционального языка Lisp и предоставляет абстрактную модель параллельного программирования высокого уровня с двумя функциями, Map и Уменьшать.

Подъем к архитектуре: унифицированная архитектура, сокрытие деталей системного уровня для программистов

Методы параллельных вычислений, такие как MPI, не поддерживаются унифицированной вычислительной средой, и программистам необходимо учитывать множество деталей, таких как хранение данных, разделение, распределение, сбор результатов, устранение ошибок и т. д. По этой причине MapReduce разрабатывает и обеспечивает унифицированную вычислительную среду. framework, который скрывает данные для программистов.Большинство деталей обработки на уровне системы

Абстрактное описание Карта и уменьшение

MapReduce опирается на идеи функционального языка программирования Lisp и определяет следующие два абстрактных интерфейса программирования, Map и Reduce, которые реализуются пользователями посредством программирования:

map: (k1; v1) -> [(k2; v2)]

Вход: данные, представленные парой ключ-значение (k1; v1) Обработка: записи данных документа (такие как строки в текстовом файле или строки в таблице данных) будут переданы в функцию карты как «пары ключ-значение»; функция карты обработает эти пары ключ-значение и вернет их в другой Набор промежуточных результатов пары ключ-значение, обработанных формальным выходом [(k2; v2)] Выход: набор промежуточных данных, представленных парами ключ-значение [(k2; v2)]

reduce: (k2; [v2]) -> [(k3; v3)]

Ввод: набор пар ключ-значение [(k2; v2)], выводимый картой, будет объединен для объединения различных значений под одним и тем же первичным ключом в список [v2], поэтому ввод сокращения равен (k2; [ v2] ) Обработка: Выполните некоторую сортировку или дальнейшую обработку входящих данных списка промежуточных результатов и создайте окончательный вывод результата в некоторой форме [(k3; v3)]. вывод: окончательный результат вывода [(k3; v3)]

Map и Reduce предоставляют программистам четкое абстрактное описание интерфейса операции.

резюме

  • Каждая функция карты обрабатывает разделенные данные параллельно и генерирует разные промежуточные результаты из разных входных данных.
  • Каждое сокращение также рассчитывается параллельно, и каждое из них отвечает за обработку разных наборов данных промежуточных результатов.
  • Перед обработкой редукции необходимо дождаться завершения всех функций карты.Поэтому перед вводом редукции должен быть барьер синхронизации; этот этап также отвечает за сбор и перетасовку обработки промежуточных данных результата карты, так что редукция более эффективно вычисляет конечный результат
  • Окончательный результат можно получить путем окончательного суммирования выходных результатов всех операций сокращения.

Детали программирования MapReduce

Простая программа WordCount

Mapper

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * LongWritable 偏移量 long,表示该行在文件中的位置,而不是行号
 * Text map阶段的输入数据 一行文本信息 字符串类型 String
 * Text map阶段的数据字符串类型 String
 * IntWritable map阶段输出的value类型,对应java中的int型,表示行号
 */
public class WorkCountMap 
    extends Mapper<LongWritable, Text, Text, IntWritable>{
	/**
	 * key 输入的 键
	 * value 输入的 值
	 * context 上下文对象
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String line = value.toString();
		String[] words = line.split("/t");//分词
		for(String word : words) {
			Text wordText = new Text(word);
			IntWritable outValue = new IntWritable();
			//写出
			context.write(wordText, outValue);
		}
	}
}

  • Угловые скобки — это дженерики JAVA, которые здесь ограничивают тип входных данных функции.
  • В приведенном выше коде обратите внимание, что универсальный тип класса Mapper — это не базовый тип java, а тип данных Text и IntWritable Hadoop. Мы можем просто эквивалентно классу java String, int.

Дженерики класса Mapper в коде, в свою очередь,<k1,v1,k2,v2>. Второй параметр метода карты — это текстовое содержимое строки, которое нас интересует. Основной код состоит в том, чтобы разделить текстовое содержимое строки по пробелам, извлечь данные каждой строки, использовать слово в качестве нового ключа и число в качестве нового значения и записать его в контекст.contextсередина. Здесь, поскольку существует несколько наборов данных, каждый набор будет выводить<wordText, outValue>пара ключ-значение.

Reducer

Вход стадии сокращения является выходом стадии картографа.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * Text  数据类型:字符串类型 String
 * IntWritable reduce阶段的输入类型 int 
 * Text reduce阶段的输出数据类型 String类型
 * IntWritable 输出词频个数 Int型
 */
public class WorkCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * key 输入的 键
	 * value 输入的 值
	 * context 上下文对象,用于输出键值对
	 */
	@Override
	protected void reduce(Text key, Iterable<IntWritable> value,
			Context context) throws IOException, InterruptedException {

		int sum=0;
		for (IntWritable number : value) {
			sum += number.get();
		}
		//单词  个数  hadoop,10
		context.write(key, new IntWritable(sum));
	}	
}

основная функция

Вычислительная задача в Hadoop называется job, основная функция в основном отвечает за создание нового объекта Job и установку для него соответствующих классов Mapper и Reducer, а также входных и выходных путей и т.д.

public static void main(String[] args) throws Exception 
{    //为任务设定配置文件 
    Configuration conf = new Configuration();	 
    //命令行参数 
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();	 
    if (otherArgs.length != 2) 
    {    System.err.println("Usage: wordcount <in> <out>");
         System.exit(2);
    } 	
    Job job = new Job(conf, “word count”);	//新建一个用户定义的Job
    job.setJarByClass(WordCount.class);	//设置执行任务的jar
    job.setMapperClass(WorkCountMap.class);	//设置Mapper类 
    job.setCombinerClass(WorkCountReduce.class);	//设置Combine类 
    job.setReducerClass(WorkCountReduce.class);	//设置Reducer类 
    job.setOutputKeyClass(Text.class);	//设置job输出的key
    //设置job输出的value 
    job.setOutputValueClass(IntWritable.class);	
    //设置输入文件的路径 
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));	
    //设置输出文件的路径 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));	 
    //提交任务并等待任务完成  
    System.exit(job.waitForCompletion(true) ? 0 : 1);		 
}

Завершение основного рабочего механизма MapReduce

всегда говори

Полная программа mapreduce выполняется распределенным образом с двумя типами процессов экземпляра:

  1. MRAppMaster: отвечает за планирование процесса и координацию состояния всей программы (процесс находится на узле пряжи)
  2. Ярнчайлд: отвечает за весь процесс обработки данных на этапе карты.
  3. Ярнчайлд: отвечает за весь поток обработки данных на этапе сокращения.

Процессы maptask и reducetask на двух вышеприведенных стадиях являются yarnchild, что не означает, что maptask и reducetask выполняются в одном и том же процессе yarnchild (процесс Yarnchild находится на узле, выполняющем команду).

Работающий процесс программы MapReduce

  1. Когда запускается программа MapReduce, сначала запускается MRAppMaster. После запуска MRAppMaster вычисляет необходимое количество экземпляров maptask в соответствии с информацией описания этого задания, а затем применяет к кластеру запуск соответствующего количества процессов maptask.
  2. После запуска процесса maptask обработка данных выполняется в соответствии с заданным срезом данных (какой диапазон смещения какого файла), и основной процесс: A. Используйте входной формат, указанный клиентом, чтобы получить данные, считанные с помощью RecordReader, для формирования входной пары KV. B. Передать входную пару KV методу map(), определенному заказчиком, выполнить логические операции и собрать пару KV, выводимую методом map(), в кеш. C. После сортировки пар KV в кеше по разделу K переполнение непрерывно записывается в файл на диске (превышение объема кеша и запись во временный файл на диске, и, наконец, запись в файл, после того, как редюсер получит файл, удалите его)
  3. MRAppMaster следит за выполнением всех задач процесса maptask (в действительности, после обработки некоторых процессов maptask, запускает reducetask для выборки данных из выполненной maptask), и запускает соответствующее количество процессов reducetask по заданным заказчиком параметрам , И скажите процессу reducetask обработать диапазон данных (раздел данных)
  4. После запуска процесса Reducetask в соответствии с расположением данных, подлежащих обработке, о которых сообщает MRAppMaster, с машин, на которых запущено несколько задач maptask, получают несколько выходных файлов результатов maptask, повторно объединяют и сортируют локально, а затем группируют в соответствии с KV того же ключа Вызовите метод reduce(), определенный клиентом, для выполнения логической операции, соберите результат KV вывода операции, а затем вызовите outputformat, указанный клиентом, для вывода данных результата во внешнее хранилище.

Механизм принятия решения о параллелизме Maptask

Параллелизм maptask определяет параллелизм обработки задачи на этапе карты, что, в свою очередь, влияет на скорость обработки всего задания. Параллелизм фазы карты задания определяется клиентом при отправке задания и планом клиента по параллелизму фазы карты. Основная логика такова: Выполнение логических срезов данных для обработки (то есть, в соответствии с определенным размером слайса, разделение данных для обработки на несколько логических сплитов), а затем выделение параллельного экземпляра mapTask для каждого сплита для обработки Эта логика и результирующий файл описания планирования слайсов определяютсяFileInputFormatЭто делается методом getSplits() класса реализации. Этот метод возвращаетList<InputSplit>, InputSplit инкапсулирует информацию о каждом логическом срезе, включая информацию о длине и положении, а метод getSplits() возвращает набор InputSplit.