Spark GraphXの簡単な紹介
Spark GraphXは分布式の図処理フレームワークである.ソーシャルネットワークでは、ユーザーとユーザーの間に複雑なつながりがあり、例えば微信、QQ、微博のユーザー間の親友、関心などの関係があり、巨大な図を構成しており、単機では処理できず、分布式図処理フレームワークしか使用できず、Spark GraphXは分布式図処理フレームワークである.
簡単な例で説明します.
1.POMファイル
プロジェクトのpomファイルにSpark GraphXのパッケージを追加します.
2.運転環境の設定
3.図の構成
図はいくつかの頂点とエッジで構成されており、Spark GraphXの中の図も同じですので、初期図の前にいくつかの頂点とエッジを定義します.
次に、点とエッジを再利用して、それぞれのRDDを生成する.
最後に、2つのRDDを使用して図を生成する.
4.図の属性操作
Spark GraphX図のプロパティは次のとおりです.
• Graph.vertices:図中のすべての頂点;
• Graph.edges:図中のすべてのエッジ;
• Graph.triplets:ソース頂点、宛先頂点、および2つの頂点間のエッジの3つの部分から構成されます.
• Graph.degrees:図中のすべての頂点の度;
• Graph.inDegrees:図中のすべての頂点の入度;
• Graph.outDegrees:図中のすべての頂点の出度;
これらのプロパティの操作には、直接コードを付けます.
実行結果:
簡単な例で説明します.
1.POMファイル
プロジェクトのpomファイルにSpark GraphXのパッケージを追加します.
org.apache.spark
spark-graphx_2.10
1.6.0
2.運転環境の設定
//
val conf = new SparkConf().setAppName("Simple GraphX").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
val sc = new SparkContext(conf)
3.図の構成
図はいくつかの頂点とエッジで構成されており、Spark GraphXの中の図も同じですので、初期図の前にいくつかの頂点とエッジを定義します.
//
val vertexArray = Array(
(1L,("Alice", 38)),
(2L,("Henry", 27)),
(3L,("Charlie", 55)),
(4L,("Peter", 32)),
(5L,("Mike", 35)),
(6L,("Kate", 23))
)
//
val edgeArray = Array(
Edge(2L, 1L, 5),
Edge(2L, 4L, 2),
Edge(3L, 2L, 7),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(5L, 2L, 3),
Edge(5L, 3L, 8),
Edge(5L, 6L, 8)
)
次に、点とエッジを再利用して、それぞれのRDDを生成する.
// vertexRDD edgeRDD
val vertexRDD:RDD[(Long,(String,Int))] = sc.parallelize(vertexArray)
val edgeRDD:RDD[Edge[Int]] = sc.parallelize(edgeArray)
最後に、2つのRDDを使用して図を生成する.
//
val graph:Graph[(String,Int),Int] = Graph(vertexRDD, edgeRDD)
4.図の属性操作
Spark GraphX図のプロパティは次のとおりです.
• Graph.vertices:図中のすべての頂点;
• Graph.edges:図中のすべてのエッジ;
• Graph.triplets:ソース頂点、宛先頂点、および2つの頂点間のエッジの3つの部分から構成されます.
• Graph.degrees:図中のすべての頂点の度;
• Graph.inDegrees:図中のすべての頂点の入度;
• Graph.outDegrees:図中のすべての頂点の出度;
これらのプロパティの操作には、直接コードを付けます.
//
println("*************************************************************")
println(" ")
println("*************************************************************")
//
println(" 20 :")
graph.vertices.filter{case(id,(name,age)) => age>20}.collect.foreach {
case(id,(name,age)) => println(s"$name is $age")
}
//
println(" 20 :")
graph.vertices.filter(v => v._2._2>20).collect.foreach {
v => println(s"${v._2._1} is ${v._2._2}")
}
//
println(" 3 :")
graph.edges.filter(e => e.attr>3).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println
// Triplet
println(" Triples:")
for(triplet 3 Triples:")
for(triplet t.attr > 3).collect){
println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}
println
// Degree
println(" , , :")
def max(a:(VertexId,Int), b:(VertexId,Int)):(VertexId,Int) = {
if (a._2>b._2) a else b
}
println("Max of OutDegrees:" + graph.outDegrees.reduce(max))
println("Max of InDegrees:" + graph.inDegrees.reduce(max))
println("Max of Degrees:" + graph.degrees.reduce(max))
println
実行結果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/05/22 20:45:35 INFO Slf4jLogger: Slf4jLogger started
17/05/22 20:45:35 INFO Remoting: Starting remoting
17/05/22 20:45:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:53375]
*************************************************************
*************************************************************
20 :
Peter is 32
Alice is 38
Charlie is 55
Mike is 35
20 :
Peter is 32
Alice is 38
Charlie is 55
Mike is 35
3 :
to 2 att 7
to 3 att 8
to 6 att 8
Triples:
Henry likes Alice
Henry likes Peter
Charlie likes Henry
Charlie likes Kate
Peter likes Alice
Mike likes Henry
Mike likes Charlie
Mike likes Kate
>3 Triples:
Charlie likes Henry
Mike likes Charlie
Mike likes Kate
, , :
Max of OutDegrees:(5,3)
Max of InDegrees:(1,2)
Max of Degrees:(2,4)