Искра в действии — найдите самого посещаемого человека из 500 миллионов посещений

Большие данные

описание проблемы

Для крупного веб-сайта количество посещений пользователей может достигать миллиардов. Для того, что это за понятие миллиарды, мы можем просто вычислить здесь. Что касается пользователя, какие данные мы обычно записываем за одно посещение?

  • 1. Идентификатор пользователя
  • 2. Время доступа пользователя
  • 3. Время, потраченное пользователями
  • 4. Действия пользователя
  • 5. Остальные данные пользователя (типа IP и т.д.)

Только из идентификатора пользователя, такого как 10011802330414, этот идентификатор, то наш идентификатор имеет почти длинный тип, потому что при сохранении большого объема данных мы используем хранилище текста. Таким образом, для 500 миллионов идентификаторов пользователей, полностью хранящихся на диске, примерно5GРазмер для этого размера не может рассматриваться как большие данные. Но для одного случая вполне достаточно.

Мы сгенерируем набор данных из 500 миллионов идентификаторов.Как мы сказали выше, размер этого набора данных составляет 5G (без сжатия), поэтому я не буду загружать такой набор данных на GitHub, но мы предоставляем метод для создания 500 миллионов данных. .

Конечно, для решения этой проблемы можно еще запустить проект в локальном режиме, но у вас должно быть достаточно места на диске и памяти, около 8G дискового пространства (потому что помимо самих данных, процесс запуска spark также генерирует некоторые временные data), 5G Memory (сделать reduceByKey). Чтобы действительно продемонстрировать возможности искры, наш случай будет работать на искровом кластере.

О том, как построить кластер, я планирую добавить в последующих главах. Однако в Интернете есть большое количество учебных пособий по созданию кластеров, многие из которых подробные и превосходные. Конечно же, мы не будем говорить о том, как настроить кластер в этом разделе, но мы все же можем начать наше дело.

анализ проблемы

Итак, теперь у нас есть 500 миллионов фрагментов данных (на самом деле эти данные не хранятся в тексте, а генерируются во время выполнения), из 500 миллионов фрагментов данных узнайте человека с наибольшим количеством посещений, что не кажется быть трудным. Но на самом деле мы хотим понять реальные преимущества искры через этот случай.

Для 500 миллионов данных идентификаторов вы можете сначала кэшировать их в RDD с помощью карты, затем выполнить reduceByKey для RDD и, наконец, найти идентификатор с наибольшим количеством вхождений. Идея очень проста, поэтому количество кода не слишком много

выполнить

Скала реализация

Первый — это метод генерации идентификатора:

RandomId.class

import scala.Serializable;

public class RandomId implements Serializable {

    private static final long twist(long u, long v) {
        return (((u & 0x80000000L) | (v & 0x7fffffffL)) >> 1) ^ ((v & 1) == 1 ? 0x9908b0dfL : 0);
    }
    private long[] state= new long[624];
    private int left = 1;
    public RandomId() {
        for (int j = 1; j < 624; j++) {
            state[j] = (1812433253L * (state[j - 1] ^ (state[j - 1] >> 30)) + j);
            state[j] &= 0xfffffffffL;
        }
    }
    public void next_state() {
        int p = 0;
        left = 624;
        for (int j = 228; --j > 0; p++)
            state[p] = state[p+397] ^ twist(state[p], state[p + 1]);

        for (int j=397;--j>0;p++)
            state[p] = state[p-227] ^ twist(state[p], state[p + 1]);

        state[p] = state[p-227] ^ twist(state[p], state[0]);
    }

    public long next() {
        if (--left == 0) next_state();
        return state[624-left];
    }

}

Затем используйте его для генерации 500 миллионов единиц данных.

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    val list = 1 until 100000

    val id =new RandomId()

    var max = 0

    var maxId = 0L

    val lastNum = sc.parallelize(list).flatMap(num => {
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      println(num +"%")
      list2
    }).map((_,1)).reduceByKey(_+_).foreach(x => {
      if (x._2 > max){
        max = x._2
        maxId = x._1
        println(x)
      }
    })
  }

}

Обработка 500 миллионов единиц данных

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    //生成一个0-9999的列表
    val list = 1 until 10000

    val id =new RandomId()

    //这里记录最大的次数
    var max = 0

    //这里记录最大次数的ID
    var maxId = 0L

    val lastNum = sc.parallelize(list)
      //第一步生成5亿条数据
      .flatMap(num => {
      //遍历list列表
      //总共遍历1万次每次生成5万个ID
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      //这里记录当前生成ID的百分比
      println(num/1000.0 +"%")
      
      //返回生成完成后的list
      //每次循环里面都包含5万个ID
      list2
    })
      //遍历5亿条数据
      //为每条数据出现标记1
      .map((_,1))
      //对标记后的数据进行处理
      //得到每个ID出现的次数,即(ID,Count)
      .reduceByKey(_+_)
      //遍历处理后的数据
      .foreach(x => {
      //将最大值存储在max中
      if (x._2 > max){
        max = x._2
        maxId = x._1
        //若X比之前记录的值大,则输出该id和次数
        //最后一次输出结果,则是出现次数最多的的ID和以及其出现的次数
        //当然出现次数最多的可能有多个ID
        //这里只输出一个
        println(x)
      }
    })
  }

}

запустить и получить результат

Отправьте его на spark для запуска и посмотрите журнал

1%
5000%
2%
5001%
3%
5002%
4%
5003%
5%
5004%
6%
5005%
7%
5006%
8%
5007%
9%
5008%
10%
5009%
11%
5010%
12%
5011%
5012%
13%
5013%
14%
15%
5014%

...
...
...

Вот частичный лог вывода, из лога мы явно обнаружили, что программа параллельна. Кластер, который я использовал, состоит из четырех узлов, каждый узел обеспечивает пространство памяти 5G, кластер работает на разных узлах, разделы, назначенные некоторым узлам, начинаются с 1, а есть узлы, начинающиеся с 5000, поэтому программа не с 1%- 9999%, как мы думаем. К счастью, он выполняется не по порядку и на конечный результат не влияет, ведь в конце концов выполняется reduceByKey, именно здесь нам и нужно получить результат.

Посмотрите на другую часть журнала

5634%
5635%
5636%
5637%
5638%
5639%
5640%
5641%
5642%
5643%
5644%
5645%
2019-03-05 11:52:14 INFO  ExternalSorter:54 - Thread 63 spilling in-memory map of 1007.3 MB to disk (2 times so far)
647%
648%
649%
650%
651%
652%
653%
654%
655%
656%

Обратите внимание, что при сбросе карты в памяти размером 1007,3 МБ на диск операция сброса переполняет 1007,3 МБ данных карты на диск. Это связано с тем, что spark записывает слишком много данных на диск из-за огромного количества данных в процессе обработки, при повторном использовании он будет прочитан с диска. Для программ, работающих в режиме реального времени, категорически не разрешается читать и записывать большое количество дисков. Но при обработке больших данных сброс на диск — очень распространенная операция.

На самом деле в полном логе мы видим, что немалая часть лога генерируется при переполнении диска, примерно 49 раз (это общее количество за время моей работы)

Как показано на рисунке:

Всего имеется 49 журналов операций записи с переполнением, каждый размером около 1G, что также подтверждает наше заявление о том, что 500 миллионов фрагментов данных занимают 5G пространства. Фактически, я сохранил эти 500 миллионов фрагментов данных на диске, и они занимают около 5 ГБ места.

результат

В конце концов, мы можем увидеть результат в журнале.

Весь процесс длился почти 47 минут.Конечно, в огромном кластере время может сильно сократиться.Вы должны знать, что мы используем сейчас только 4 узла.

Мы видели, что количество раз 2, 4, 6 и 8 появилось дважды, что неудивительно, потому что кластер работает параллельно, асинхронные операции и повторяющиеся результаты — это очень нормально.Конечно, мы также можем использовать механизм параллелизма. бороться с этим явлением. В последующем случае мы продолжим оптимизировать результаты.

Из результатов мы обнаружили, что среди 500 миллионов фрагментов данных идентификатор с наибольшим количеством вхождений появлялся только 8 раз, что показывает, что в большом объеме данных многие идентификаторы могут появляться только один или два раза. Вот почему в конце концов я использовал метод foreach для поиска максимального значения вместо следующего метода

import org.apache.spark.{SparkConf, SparkContext}

object ActiveVisitor {


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("spark://master:7077").setAppName("ActiveVisitor")

    val sc = new SparkContext(conf)

    //生成一个0-9999的列表
    val list = 1 until 10000

    val id =new RandomId()

    //这里记录最大的次数
    var max = 0

    //这里记录最大次数的ID
    var maxId = 0L

    val lastNum = sc.parallelize(list)
      //第一步生成5亿条数据
      .flatMap(num => {
      //遍历list列表
      //总共遍历1万次每次生成5万个ID
      var list2 = List(id.next())
      for (i <- 1 to 50000){
        list2 = id.next() :: list2
      }
      //这里记录当前生成ID的百分比
      println(num/1000.0 +"%")
      
      //返回生成完成后的list
      //每次循环里面都包含5万个ID
      list2
    })
      //遍历5亿条数据
      //为每条数据出现标记1
      .map((_,1))
      //对标记后的数据进行处理
      //得到每个ID出现的次数,即(ID,Count)
      .reduceByKey(_+_)
      //为数据进行排序
      //倒序
      .sortByKey(false)

    //次数最多的,在第一个,将其输出
    println(lastNum.first())
  }

}

В этом методе мы сортируем результаты reduceByKey и выводим первый из отсортированных результатов, то есть ID с наибольшим количеством раз. Это больше соответствует нашим требованиям. Но на самом деле, чтобы получить тот же результат, потребуется больше ресурсов. Как мы уже говорили, многие идентификаторы появляются только один или два раза, в процессе сортировки их еще нужно отсортировать. Имейте в виду, что, поскольку многие идентификаторы встречаются только один раз, размер отсортированного набора данных, вероятно, будет исчисляться сотнями миллионов записей.

Согласно нашему пониманию алгоритма сортировки, сортировка такого огромного набора данных требует больших ресурсов. Поэтому мы можем допустить некоторый избыточный вывод информации, но это не повлияет на наши правильные результаты.

На данный момент мы завершили 500 миллионов данных, чтобы найти данные с наибольшим количеством вхождений. Если вам интересно, вы можете попробовать использовать этот метод для решения 5 миллиардов данных, большинства записей данных. Но для этого вам нужно подготовить 50G пространства. Несмотря на использование вышеупомянутой процедуры, она относится к записи после чтения, но 5 миллиардов данных все равно будут занимать много времени.