算法的流程:
Spark pagerank。初始化:我们用pages(pairRDD)来记录每个页面和其相关联的页面之间的关系,用ranks(pairRDD)来记录每个页面初始化的rank,初始值为1.0 在每次迭代的过程中,对页面p,我们向其每个相邻的页面,发送一个至为rank(p)/numNeighbors(p)的贡献值 将每个页面收到的contributions相加得到contributionsReceived 将每个页面的排序值设置为0.15 +0.85 * contributionsReceived 算法的实现:
public static JavaPairRDD> run_page(){ JavaPairRDD> res = sc.textFile( "/home/liang/workspace/learnSpark/pagerank.txt" ).mapToPair(new PairFunction>() { @Override public Tuple2> call(String s) throws Exception { String key = s.split(" ")[0]; String values = s.split(" ")[1]; ArrayList values_integer = new ArrayList(); for(String str : values.split(",")){ values_integer.add(Integer.parseInt(str)); } return new Tuple2>(Integer.parseInt(key), values_integer); } }); return res; } public static JavaPairRDD run_rank(){ JavaPairRDD res = sc.textFile( "/home/liang/workspace/learnSpark/pagerank.txt" ).mapToPair( line->new Tuple2(Integer.parseInt(line.split("")[0]),1.0) ); return res; }
public static JavaPairRDD> run_page(){ JavaPairRDD> res = sc.textFile( "/home/liang/workspace/learnSpark/pagerank.txt" ).mapToPair(new PairFunction>() { @Override public Tuple2> call(String s) throws Exception { String key = s.split(" ")[0]; String values = s.split(" ")[1]; ArrayList values_integer = new ArrayList(); for(String str : values.split(",")){ values_integer.add(Integer.parseInt(str)); } return new Tuple2>(Integer.parseInt(key), values_integer); } }); return res; } public static JavaPairRDD run_rank(){ JavaPairRDD res = sc.textFile( "/home/liang/workspace/learnSpark/pagerank.txt" ).mapToPair( line->new Tuple2(Integer.parseInt(line.split("")[0]),1.0) ); return res; }