PageRank — это итеративный алгоритм, выполняющий несколько объединений, поэтому он является хорошей демонстрацией для операций разбиения RDD. Алгоритм поддерживает два набора данных.
- (pageID,listList) содержит список смежных страниц для каждой страницы.
- (pageID,rank) содержит текущее значение рейтинга для каждой страницы, Процесс расчета pageRank примерно выглядит следующим образом:
- Инициализируйте значение сортировки для каждой страницы равным 1.0.
- В каждой итерации для страницы p значение вклада rank(p)/numNeighbors(p) ослабляется для каждой из соседних страниц (страниц с прямыми соединениями).
- Установите значение сортировки на странице 0,15 + 0,85 *contributionsReceived. Среди них 2 и 3 будут повторять цикл несколько раз.В ходе этого процесса алгоритм будет постепенно сходиться к фактическому значению PageRank каждой страницы.В реальной работе он обычно повторяется 10 раз.
package com.sowhat.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
/**
* links = (pageID,LinkList)
* ranks = (pageID,rank)
**/
object MyPageRank {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("pagerank")
//创建SparkContext,该对象是提交spark App的入口
val sc = new SparkContext(conf)
val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])]("filepwd").partitionBy(new HashPartitioner(100)).persist()
var ranks: RDD[(String, Double)] = links.mapValues(x => 1.0)
for (i <- 0 until 10) {
val totalRDD: RDD[(String, (Seq[String], Double))] = links.join(ranks)
val contributions: RDD[(String, Double)] = totalRDD.flatMap(
{
case (pageID, (links, rank)) => links.map(dest => (dest, rank / links.size))
}
)
ranks = contributions.reduceByKey(_ + _).mapValues(v => 0.15 + 0.85 * v)
}
ranks.saveAsTextFile("ranks")
}
}
Алгоритм начинается с инициализации значения каждого элементаrankRDD до 1,0, а затем непрерывно обновляет значение рангов на каждой итерации.Основная часть оптимизации заключается в следующем.
- Каждая итерация ссылокRDD будет соединяться с рангами, поэтому разделение ссылок на большие наборы данных сэкономит много накладных расходов на оптимизацию сетевого взаимодействия.
- По тем же причинам, что и выше, использование persist сохраняет данные в памяти для каждой итерации.
- Когда мы впервые создаем ранги, мы используем mapValues вместо map(), чтобы сохранить разделение родительских ссылок RDD, что значительно снижает накладные расходы на первую операцию соединения.
- Используйте mapValues после reduceByKey в теле цикла, потому что reduceByKey уже является хеш-разделом, а эффективность выше в следующей итерации.
建议
: чтобы максимизировать потенциальный эффект оптимизации, связанной с разделами, попробуйте использовать ее, когда вам не нужно менять ключи элементов.mapValuesилиflatMapValues.
В этой статье используетсяmdniceнабор текста