Получите PageRank в одной статье

Scala

在这里插入图片描述PageRank — это итеративный алгоритм, выполняющий несколько объединений, поэтому он является хорошей демонстрацией для операций разбиения RDD. Алгоритм поддерживает два набора данных.

  1. (pageID,listList) содержит список смежных страниц для каждой страницы.
  2. (pageID,rank) содержит текущее значение рейтинга для каждой страницы, Процесс расчета pageRank примерно выглядит следующим образом:
  3. Инициализируйте значение сортировки для каждой страницы равным 1.0.
  4. В каждой итерации для страницы p значение вклада rank(p)/numNeighbors(p) ослабляется для каждой из соседних страниц (страниц с прямыми соединениями).
  5. Установите значение сортировки на странице 0,15 + 0,85 *contributionsReceived. Среди них 2 и 3 будут повторять цикл несколько раз.В ходе этого процесса алгоритм будет постепенно сходиться к фактическому значению PageRank каждой страницы.В реальной работе он обычно повторяется 10 раз.
package com.sowhat.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * links = (pageID,LinkList)
 * ranks = (pageID,rank)
 **/
object MyPageRank {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("pagerank")

    //创建SparkContext,该对象是提交spark App的入口
    val sc = new SparkContext(conf)

    val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])]("filepwd").partitionBy(new HashPartitioner(100)).persist()
    var ranks: RDD[(String, Double)] = links.mapValues(x => 1.0)

    for (i <- 0 until 10) {
      val totalRDD: RDD[(String, (Seq[String], Double))] = links.join(ranks)
      val contributions: RDD[(String, Double)] = totalRDD.flatMap(
        {
          case (pageID, (links, rank)) => links.map(dest => (dest, rank / links.size))
        }
      )
      ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
    }
    ranks.saveAsTextFile("ranks")
  }

}

Алгоритм начинается с инициализации значения каждого элементаrankRDD до 1,0, а затем непрерывно обновляет значение рангов на каждой итерации.Основная часть оптимизации заключается в следующем.

  1. Каждая итерация ссылокRDD будет соединяться с рангами, поэтому разделение ссылок на большие наборы данных сэкономит много накладных расходов на оптимизацию сетевого взаимодействия.
  2. По тем же причинам, что и выше, использование persist сохраняет данные в памяти для каждой итерации.
  3. Когда мы впервые создаем ранги, мы используем mapValues ​​вместо map(), чтобы сохранить разделение родительских ссылок RDD, что значительно снижает накладные расходы на первую операцию соединения.
  4. Используйте mapValues ​​после reduceByKey в теле цикла, потому что reduceByKey уже является хеш-разделом, а эффективность выше в следующей итерации.

建议: чтобы максимизировать потенциальный эффект оптимизации, связанной с разделами, попробуйте использовать ее, когда вам не нужно менять ключи элементов.mapValuesилиflatMapValues.

В этой статье используетсяmdniceнабор текста