Spark GraphXの簡単な紹介


Spark GraphXは分布式の図処理フレームワークである.ソーシャルネットワークでは、ユーザーとユーザーの間に複雑なつながりがあり、例えば微信、QQ、微博のユーザー間の親友、関心などの関係があり、巨大な図を構成しており、単機では処理できず、分布式図処理フレームワークしか使用できず、Spark GraphXは分布式図処理フレームワークである.
簡単な例で説明します.
1.POMファイル
プロジェクトのpomファイルにSpark GraphXのパッケージを追加します.

    org.apache.spark
    spark-graphx_2.10
    1.6.0

2.運転環境の設定
//       
    val conf = new SparkConf().setAppName("Simple GraphX").setMaster("spark://master:7077").setJars(Seq("E:\\Intellij\\Projects\\SimpleGraphX\\SimpleGraphX.jar"))
    val sc = new SparkContext(conf)


3.図の構成
図はいくつかの頂点とエッジで構成されており、Spark GraphXの中の図も同じですので、初期図の前にいくつかの頂点とエッジを定義します.
//                          
val vertexArray = Array(    
  (1L,("Alice", 38)),       
  (2L,("Henry", 27)),       
  (3L,("Charlie", 55)),     
  (4L,("Peter", 32)),       
  (5L,("Mike", 35)),        
  (6L,("Kate", 23))         
)                           

                          
//                          
val edgeArray = Array(      
  Edge(2L, 1L, 5),          
  Edge(2L, 4L, 2),          
  Edge(3L, 2L, 7),          
  Edge(3L, 6L, 3),          
  Edge(4L, 1L, 1),          
  Edge(5L, 2L, 3),          
  Edge(5L, 3L, 8),          
  Edge(5L, 6L, 8)           
)                           

次に、点とエッジを再利用して、それぞれのRDDを生成する.
//  vertexRDD edgeRDD
val vertexRDD:RDD[(Long,(String,Int))] = sc.parallelize(vertexArray)
val edgeRDD:RDD[Edge[Int]] = sc.parallelize(edgeArray)

最後に、2つのRDDを使用して図を生成する.
//    
val graph:Graph[(String,Int),Int] = Graph(vertexRDD, edgeRDD)

 
4.図の属性操作
Spark GraphX図のプロパティは次のとおりです.
    • Graph.vertices:図中のすべての頂点;
    • Graph.edges:図中のすべてのエッジ;
    • Graph.triplets:ソース頂点、宛先頂点、および2つの頂点間のエッジの3つの部分から構成されます.
    • Graph.degrees:図中のすべての頂点の度;
    • Graph.inDegrees:図中のすべての頂点の入度;
    • Graph.outDegrees:図中のすべての頂点の出度;
これらのプロパティの操作には、直接コードを付けます.

//      
println("*************************************************************")
println("    ")
println("*************************************************************")

//    
println("        20       :")
graph.vertices.filter{case(id,(name,age)) => age>20}.collect.foreach {
  case(id,(name,age)) => println(s"$name is $age")
}

//    
println("        20       :")
graph.vertices.filter(v => v._2._2>20).collect.foreach {
  v => println(s"${v._2._1} is ${v._2._2}")
}

//     
println("        3  :")
graph.edges.filter(e => e.attr>3).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
println

// Triplet  
println("     Triples:")
for(triplet 3 Triples:")
for(triplet  t.attr > 3).collect){
  println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}
println

// Degree  
println("         ,  ,  :")
def max(a:(VertexId,Int), b:(VertexId,Int)):(VertexId,Int) = {
  if (a._2>b._2) a else b
}
println("Max of OutDegrees:" + graph.outDegrees.reduce(max))
println("Max of InDegrees:" + graph.inDegrees.reduce(max))
println("Max of Degrees:" + graph.degrees.reduce(max))
println

実行結果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/05/22 20:45:35 INFO Slf4jLogger: Slf4jLogger started
17/05/22 20:45:35 INFO Remoting: Starting remoting
17/05/22 20:45:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:53375]
*************************************************************
    
*************************************************************
        20       :
Peter is 32
Alice is 38
Charlie is 55
Mike is 35
        20       :
Peter is 32
Alice is 38
Charlie is 55
Mike is 35
        3  :
to 2 att 7
to 3 att 8
to 6 att 8

     Triples:
Henry likes Alice
Henry likes Peter
Charlie likes Henry
Charlie likes Kate
Peter likes Alice
Mike likes Henry
Mike likes Charlie
Mike likes Kate

     >3 Triples:
Charlie likes Henry
Mike likes Charlie
Mike likes Kate

         ,  ,  :
Max of OutDegrees:(5,3)
Max of InDegrees:(1,2)
Max of Degrees:(2,4)