Введение в Spark (4) — карта Spark, flatMap, mapToPair

Большие данные

Операции Spark RDD

в предыдущем разделеСчетчик слов Spark ClassicВ , я узнал о нескольких операциях RDD, включая flatMap, map, reduceByKey и упрощенную схему позже, countByValue. Затем в этом разделе будут представлены более часто используемые операции RDD, и для каждого RDD мы разберем его, чтобы увидеть, как он работает.

квартира искрыКарта

flatMap имеет представление «один ко многим» с одним входом и одним выходом. И он объединит несколько выходов, соответствующих каждому входу, в большой набор.Конечно, не нужно беспокоиться, что этот набор превысит диапазон памяти, потому что spark сознательно запишет слишком много контента на диск. Конечно если память работающей машиныиметь достаточно уверенности, также может хранить содержимое в памяти.

Чтобы лучше понять flatMap, мы возьмем пример для иллюстрации. Разумеется, как обычно, текст данных, соответствующий примеру, будет подготовлен, имя текста uv.txt, текст и программу-пример можно скачать сgithubскачать. Следующие три языка будут использоваться: scala, java, python для описания, а в java для реализации каждого примера будут использоваться java и java8. где находятся программы java и scalagithubЕго можно скачать напрямую, но python пока недоступен и будет добавлен позже.

Скала реализация


import org.apache.spark.{SparkConf, SparkContext}

object SparkFlatMap {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")

    val sc = new SparkContext(conf)

    //设置数据路径
    val textData = sc.textFile("./uv.txt")

    //输出处理前总行数
    println("before:"+textData.count()+"行")

    //输出处理前第一行数据
    println("first line:"+textData.first())

    //进行flatMap处理
    val flatData = textData.flatMap(line => line.split(" "))

    //输出处理后总行数
    println("after:"+flatData.count())

    //输出处理后第一行数据
    println("first line:"+flatData.first())

    //将结果保存在flatResultScala文件夹中
    flatData.saveAsTextFile("./flatResultScala")

  }

}

Java-реализация

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.Arrays;
import java.util.Iterator;

public class SparkFlatMapJava {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //java实现
        flatMapJava(sc);


        //java8实现
        flatMapJava8(sc);


    }

    public static void flatMapJava(JavaSparkContext sc){
        //设置数据路径
        JavaRDD<String> textData = sc.textFile("./uv.txt");

        //输出处理前总行数
        System.out.println("before:"+textData.count()+"行");

        //输出处理前第一行数据
        System.out.println("first line:"+textData.first()+"行");

        //进行flatMap处理
        JavaRDD<String> flatData = textData.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //输出处理后总行数
        System.out.println("after:"+flatData.count()+"行");

        //输出处理后第一行数据
        System.out.println("first line:"+flatData.first()+"行");

        //将结果保存在flatResultScala文件夹中
        flatData.saveAsTextFile("./flatResultJava");
    }


    public static void flatMapJava8(JavaSparkContext sc){
        sc.textFile("./uv.txt")
          .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
          .saveAsTextFile("./flatResultJava8");
    }

}

реализация питона

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("FlatMapPython")

sc = SparkContext(conf=conf)

textData = sc.textFile("./uv.txt")

print("before:"+str(textData.count())+"行")

print("first line"+textData.first())

flatData = textData.flatMap(lambda line:line.split(" "))

print("after:"+str(flatData.count())+"行")

print("first line"+flatData.first())

flatData.saveAsTextFile("./resultFlatMap")

Запустите любую программу с тем же результатом

before:86400行
first line:2015-08-24_00:00:00 55311 buy
after:259200
first line:2015-08-24_00:00:00

Просмотр файлов

Очевидно, что каждая строка разбивается на три строки в соответствии с пространством, поэтому общее количество строк в три раза больше, чем до разделения, а содержимое первой строки — это только первые данные, время исходной первой строки. Таким образом, эффект flatMap очевиден.

карта искры

Тот же метод используется для отображения операции карты.В отличие от flatMap, карта обычно является взаимно-однозначной, то есть один вход соответствует одному выходу. Но выходным результатом может быть кортеж, а кортеж может содержать несколько данных, но кортеж — это целое, поэтому он является элементом. Обратите внимание, что когда выходным результатом является кортеж, scala и python могут нормально его обрабатывать, но в java это немного отличается.

Скала реализация


import org.apache.spark.{SparkConf, SparkContext}

object SparkMap {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")

    val sc = new SparkContext(conf)

    val textData = sc.textFile("./uv.txt")

    //得到一个最后一个操作值,前面的时间和次数舍弃
    val mapData1 = textData.map(line => line.split(" ")(2))

    println(mapData1.count())

    println(mapData1.first())

    mapData1.saveAsTextFile("./resultMapScala")

    //得到一个最后两个值,前面的时间舍弃
    val mapData2 = textData.map(line => (line.split(" ")(1),line.split(" ")(2)))

    println(mapData2.count())

    println(mapData2.first())

    //将所有值存到元组中去
    val mapData3 = textData.map(line => (line.split(" ")(1),line.split(" ")(1),line.split(" ")(2)))

    println(mapData3.count())

    println(mapData3.first())
    
  }

}

Java-реализация

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.janino.Java;
import scala.Tuple2;
import scala.Tuple3;

public class SparkMapJava {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //java实现
        mapJava(sc);


        //java8实现
        mapJava8(sc);


    }

    public static void mapJava(JavaSparkContext sc){
        JavaRDD<String> txtData = sc.textFile("./uv.txt");

        //保留最后一个值
        JavaRDD<String> mapData1 = txtData.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s.split(" ")[2];
            }
        });

        System.out.println(mapData1.count());
        System.out.println(mapData1.first());

        //保留最后两个值
        JavaRDD<Tuple2<String,String>> mapData2 = txtData.map(new Function<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(mapData2.count());
        System.out.println(mapData2.first());

        //保留最后三个值
        JavaRDD<Tuple3<String,String,String>> mapData3 = txtData.map(new Function<String, Tuple3<String,String,String>>() {
            @Override
            public Tuple3<String,String,String> call(String s) throws Exception {
                return new Tuple3<>(s.split(" ")[0],s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(mapData2.count());
        System.out.println(mapData2.first());


    }


    public static void mapJava8(JavaSparkContext sc){
        JavaRDD<String> mapData1 = sc.textFile("./uv.txt").map(line -> line.split(" ")[2]);
        System.out.println(mapData1.count());
        System.out.println(mapData1.first());

        JavaRDD<Tuple2<String,String>> mapData2 = sc.textFile("./uv.txt").map(line -> new Tuple2<String, String>(line.split(" ")[1],line.split(" ")[2]));
        System.out.println(mapData2.count());
        System.out.println(mapData2.first());

        JavaRDD<Tuple3<String,String,String>> mapData3 = sc.textFile("./uv.txt").map(line -> new Tuple3<String, String, String>(line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]));
        System.out.println(mapData3.count());
        System.out.println(mapData3.first());

    }

}

реализация питона

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("FlatMapPython")

sc = SparkContext(conf=conf)

textData = sc.textFile("./uv.txt")


mapData1 = textData.map(lambda line : line.split(" ")[2])

print(mapData1.count())
print(mapData1.first())

mapData2 = textData.map(lambda line : (line.split(" ")[1],line.split(" ")[2]))

print(mapData2.count())
print(mapData2.first())

mapData3 = textData.map(lambda line : (line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]))

print(mapData3.count())
print(mapData3.first())

Запустите любую программу с тем же результатом

86400
buy
86400
(55311,buy)
86400
(55311,55311,buy)

mapToPair уникален для Java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkMapToPair {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        mapToPairJava(sc);

        mapToPairJava8(sc);

    }


    public static void mapToPairJava(JavaSparkContext sc){

        JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(pairRDD.count());

        System.out.println(pairRDD.first());

    }

    public static void mapToPairJava8(JavaSparkContext sc){
        JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(line -> new Tuple2<>(line.split(" ")[1],line.split(" ")[2]));

        System.out.println(pairRDD.count());

        System.out.println(pairRDD.first());
    }
}

беги и получай результат

86400
(55311,buy)

Очевидно, мы обнаружили, что этот результат согласуется с результатами двух, полученных при обработке карты. Гибкое использование карт, flatMap, mapToPair будет очень важно, и позже будет множество операций для обработки сложных данных. Код всех вышеперечисленных программ можно найти вGitHubскачать