Как MapReduce элегантно реализует глобальную сортировку

Большие данные

считать

Думая о глобальной сортировке, первое, что приходит на ум, это собрать данные со стороны карты, перетасовать для редукции, настроить редукцию, а затем отсортировать данные в редукции.Очевидно, что это ничем не отличается от одиночной машины. , Вы должны знать, что фреймворк mapreduce по умолчанию используется для сортировки ключа, конечно, вы также можете поместить значение на ключ, чтобы отсортировать значение, и, наконец, переключить его обратно при сокращении, Кроме того, сортировка выполняется для того же партиция, то есть редукция, на самом деле она не может быть полностью применена к кластеру параллельно, так как же более изящно реализовать глобальную сортировку?

Резюме

Сортировка в Hadoop делится на частичную сортировку, глобальную сортировку, вспомогательную сортировку, вторичную сортировку и т. д. В этой статье в основном рассказывается о том, как реализовать глобальную сортировку ключей.Существует три метода реализации:

  1. установить сокращение
  2. Используйте настраиваемые разделы для разделения данных на несколько разделов в последовательных пакетах.
  3. Самостоятельная реализация с помощью фреймворковTotalOrderPartitionerразделитель для реализации

выполнить

Сначала подготовьте некоторые входные данные:GitHub.com/Fox в направлении/Be…

/data/job/file.txt
2
32
654
32
15
756
65223

Реализуйте глобальную сортировку, установив сокращение

Используя сокращение для достижения глобальной сортировки, можно сказать, что никаких специальных операций не требуется.Маппер, сокращение и драйвер реализованы следующим образом:

package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
        context.write(intWritable, intWritable);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JobReducer  extends
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private int index = 0;//全局排序计数器
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(new IntWritable(++index), value);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("input-path output-path");
            System.exit(1);
        }

        Job job = Job.getInstance(getConf());
        job.setJarByClass(JobDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(JobMapper.class);
        job.setReducerClass(JobReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //使用一个reduce来排序
        job.setNumReduceTasks(1);
        job.setJobName("JobDriver");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args)throws Exception{

//        int exitCode = ToolRunner.run(new JobDriver(), args);
        int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
        System.exit(exitCode);
    }
}

//加了排序索引,最后输出一个文件,内容如下:
1	2
2	6
3	15
4	22
5	26
6	32
7	32
8	54
9	92
10	650
11	654
12	756
13	5956
14	65223

PS; Вышеупомянутые задачи запускаются через инструмент ToolRunner, который идет в комплекте с hadoop.Последующие коды, предполагающие повторение, больше не перечислены, а только для различий в коде.

Используйте настраиваемые разделы для разделения данных на несколько разделов в последовательных пакетах.

Как обеспечить глобальный порядок данных с помощью пользовательского разделения? Мы знаем, что раздел «ключ-значение» будет отправлять ключи разных диапазонов разным редукторам через функцию раздела по умолчанию HashPartition, поэтому используйте ее для реализации разделителя. данные 10 миллионов + 1-20 миллионов используются для сокращения2. . . . Наконец, эти десять файлов можно объединить, чтобы получить данные глобальной сортировки всех данных в порядке разделов.Из-за небольшого объема данных используются 11 разделов, 1-1000, 10001-2000 соответственно. В отличие от первого метода, есть следующие два момента:

//partitionner实现
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int keyValue = Integer.parseInt(key.toString());

        for (int i = 0; i < 10; i++) {
            if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
                System.out.println("key:" + keyValue + ", part:" + i);
                return i;
            }
        }

        return 10;
    }
}

//driver处需要增加:
        //设置自定义分区器
        job.setPartitionerClass(JobPartitioner.class);
        
//driver处需要修改reduce数量
        job.setNumReduceTasks(10);

Выполните программу, в результате будет сгенерировано 10 файлов, файлы упорядочены.

part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009

Примечание. Следует отметить, что раздел, содержащий данные в разделе, должен быть меньше или равен количеству редукторов, в противном случае будет включена ошибка Illegal partition. Кроме того, реализация неблагоприятного раздела может привести к перекосу данных и проблемам с OOM, если он знает о данных меньше.

Самостоятельная реализация с помощью фреймворковTotalOrderPartitionerразделитель для реализации

Поскольку я подумал о втором пользовательском методе, он фактически может решить большинство проблем перекоса, но на самом деле, до того, как распределение данных не будет понято, единственный способ оценить распределение данных — это попытаться увидеть, каковы значения результата. есть, а затем настроить разделитель, разве это не просто выборка, способ выборки, а затем реализация разделителя, Hadoop уже помог нам реализовать его и решил проблемы перекоса данных и OOM, то естьTotalOrderPartitionerКласс, который обеспечивает выборку данных, частично выбирает значение ключа, а затем находит наилучшую точку разделения значения ключа в соответствии с результатом выборки, чтобы равномерно распределить ключ в разных разделах.

TotalOrderPartitionerТри пробоотборника предоставляются следующим образом:

  • Семплер сегментов SplitSampler, выборка данных из сегментов данных, этот сэмплер не подходит для уже отсортированных данных
  • Случайный сэмплер RandomSampler, выборка из набора данных в соответствии с заданной частотой дискретизации
  • Интервальный сэмплер IntervalSampler, выборка данных из осколков с фиксированными интервалами, очень хорошо работает с уже отсортированными данными.

В сэмплере реализован метод K[] getSample(InputFormat info, Job job), который возвращает массив семплирования, где InputFormat — входной вспомогательный класс перед вводом карты, который генерируется по длине возвращаемый K[] Длина массива равна -1 разделу, и, наконец, соответствующие данные отправляются в соответствующий раздел в соответствии с диапазоном точки разделения.

Код:

//mapper和driver的类型略有不同
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
    @Override
    protected void map(Text key, Text value,
                       Context context) throws IOException, InterruptedException {
        System.out.println("key:" + key.toString() + ", value:" + value.toString());
        context.write(key, new IntWritable(Integer.parseInt(key.toString())));
    }
}
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(value, NullWritable.get());
    }
}
//比较器
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定义比较器来比较key的顺序
 * @author hulichao
 * @date 20-9-20
 **/
public class KeyComparator extends WritableComparator {
    protected KeyComparator() {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        int num1 = Integer.valueOf(w1.toString());
        int num2 = Integer.valueOf(w2.toString());
        return num1 - num2;
    }
}
package com.hoult.mr.job.totalsort;

//driver 实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //设置非分区排序
        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
        Job job = Job.getInstance(conf, "Total Driver");
        job.setJarByClass(TotalDriver.class);

        //设置读取文件的路径,都是从HDFS中读取。读取文件路径从脚本文件中传进来
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //设置比较器,用于比较数据的大小,然后按顺序排序,该例子主要用于比较两个key的大小
        job.setSortComparatorClass(KeyComparator.class);
        job.setNumReduceTasks(10);//设置reduce数量

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //设置保存partitions文件的路径
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
        //key值采样,0.01是采样率,
        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
        //将采样数据写入到分区文件中
        InputSampler.writePartitionFile(job, sampler);

        job.setMapperClass(TotalMapper.class);
        job.setReducerClass(TotalReducer.class);
        //设置分区类。
        job.setPartitionerClass(TotalOrderPartitioner.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args)throws Exception{
//        int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
        int exitCode = ToolRunner.run(new TotalDriver(), args);
        System.exit(exitCode);
    }
}

Результат аналогичен второй реализации, следует отметитьДействительно только для кластерного тестирования, локальный тест может сообщить об ошибке

2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
	at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
	at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)

У Се, третий мастер, новичок в области больших данных и искусственного интеллекта. Больше, пожалуйста, следите