Spark図処理GraphX学習ノート!


Spark図処理GraphX学習ノート!
一、GraphXとは何ですか.
GraphxはSparkのような並列処理フレームワークを用いて図上のいくつかの並列化実行可能なアルゴリズムを実現した.
  • アルゴリズムがSpark自体に関係なく並列化できるかどうか
  • アルゴリズムの並列化の有無自体は,
  • を数学的に証明する必要がある.
  • で実証された並列化可能なアルゴリズムは、Graphxがpregelの図計算モデル
  • をサポートするため、Sparkを用いて実現することは誤った選択である.
    二、Graphxにはどのようなコンポーネントと基本フレームワークが含まれていますか.
    1、メンバー変数
    graphで重要なメンバー変数はそれぞれ
  • vertices
  • edges
  • triplets

  • なぜtripletsを導入するのか、主にPregelという計算モデルに関連しており、tripletsにはedgeとvertexが同時に記録されている.具体的なコードは羅列しません.
    2、メンバー関数
    関数はいくつかのクラスに分けられます
  • はすべての頂点またはエッジに対して動作するが、mapEdges、mapVertices
  • のような図構造自体は変更されない.
  • サブ図は、集合動作におけるfilter subGraph
  • に類似する.
  • 図の分割、すなわちparitition操作は、Spark計算にとって重要であり、異なるパーティションがあるからこそ、並列処理の可能性があり、異なるパーティションStrategyは、その収益が異なる.最も容易に考えられるのは,Hashを用いて図全体を複数の領域に分割することである.
  • outerJoinVertices頂点の外部接続動作
  • 三、図の演算と操作GraphOps
    図の一般的なアルゴリズムはGraphOpsというクラスに集中して抽象化し、Graphで暗黙的に変換し、GraphをGraphOpsに変換し、具体的には以下の12の演算子がある.
  • collectNeighborIds
  • collectNeighbors
  • collectEdges
  • joinVertices
  • filter
  • pickRandomVertex
  • pregel
  • pageRank
  • staticPageRank
  • connectedComponents
  • triangleCount
  • stronglyConnectedComponents

  • RDD
    RDDはSparkシステムの核心であり、Graphxにはどのような新しいRDDが導入されているのだろうか.
  • VertexRDD
  • EdgeRDD

  • EdgeRddよりもVertexRDDの方が重要で、その上の操作も多く、主にVertex上の属性の合併に集中しており、合併といえば関係代数と集合論に巻き込まれざるを得ないため、VertexRddではsqlのような用語が多く見られます.
  • leftJoin
  • innerJoin

  • 四、GraphXシーン分析
    1、図の記憶とロード
    数学計算を行う場合、図は線形代数の行列で表されますが、どのように記憶しますか?
    データ構造を勉強するとき、先生はきっとたくさんの方法を言ったに違いない.もうくどくどしない.
    しかし、大きなデータの環境では、図が巨大であれば、頂点とエッジを表すデータが1つのファイルに置くのに十分ではありません.HDFSで
    ロードする時、1台の機械のメモリが足りなくてどうすればいいですか?遅延ロードは、本当にデータが必要な場合に、異なるマシンにデータを配布し、カスケード方式を採用します.
    一般的に、頂点に関連するすべてのコンテンツを1つのファイルに保存し、エッジに関連するすべての情報を別のファイルに保存します.
    ある具体的な図を生成する場合,edgeで図中の頂点の関連関係を表すとともに,図の構造も表す.
    以下にSparkの公式例を示し,2つのArrayでGraphを構築した.
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                           (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    //Create an RDD for edges
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                           Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
    //Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")
    //Build the initial Graph
    val graph = Graph(users, relationships, defaultUser)
    2、GraphLoader
    graphLoaderはgraphxで図のロードと生成に特化しており、最も重要な関数はedgeListFileである.
    //頂点で区切って、4つのパーティションに分ける
    val graph = GraphLoader.edgeListFile(sc,"hdfs://192.168.0.10:9000/input/graph/web-Google.txt",minEdgePartitions = 4)
    五、GraphX応用例
    1行のコード:
    val rank = graph.pageRank(0.01).vertices
    RDDで実現:
    完全なコード
    // Connect to the Spark clusterval 
    sc = new SparkContext("spark://master.amplab.org", "research")
    // Load my user data and parse into tuples of user id and attribute list
    val users = (sc.textFile("graphx/data/users.txt")
      .map(line => line.split(","))
      .map( parts => (parts.head.toLong, parts.tail) ))
      // Parse the edge data which is already in userId -> userId format
      val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
      // Attach the user attributes
      val graph = followerGraph.outerJoinVertices(users) { 
       case (uid, deg, Some(attrList)) => attrList  
       // Some users may not have attributes so we set them as empty
        case (uid, deg, None) => Array.empty[String]
        }
    // Restrict the graph to users with usernames and names
    val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
    // Compute the PageRank
    
    // Get the attributes of the top pagerank users
    val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { 
     case (uid, attrList, Some(pr)) => (pr, attrList.toList) 
      case (uid, attrList, None) => (0.0, attrList.toList)
    }
    
    println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("
    "))