Spark Graphx計算有向リング
5548 ワード
作業中にSpark Graphxを使用していくつかの図の計算を行い、開発環境は以下の通りである:開発ツール:IDEA JDK:1.7.0_80 Maven:3.3.9
図計算については、pythonのnetworx、sparkのgraphx、アリを用いたオープンソースフレームワークなど、多くの技術フレームワークがあります.ここでは、Graphxを計算フレームワークとして使用します.個人的な簡単なケースのため、単機環境で実現され、簡単なデータが入力されます.
Graphxでは、通常、類似の操作は集約方法またはPregelで実現できます.公式ドキュメント:spark graphxを参照してください.公式ドキュメントには類似のケースがあります.参考にしてください.
========================== EdgeInfo(fm=4, to=5, hashCode=1665) EdgeInfo(fm=5, to=1, hashCode=1692) EdgeInfo(fm=1, to=2, hashCode=1569) EdgeInfo(fm=2, to=3, hashCode=1601) EdgeInfo(fm=3, to=4, hashCode=1633) ************************* ========================== EdgeInfo(fm=4, to=5, hashCode=1665) EdgeInfo(fm=5, to=3, hashCode=1694) EdgeInfo(fm=3, to=4, hashCode=1633) *************************
簡単な例を書くので、考えが足りないかもしれませんが、問題があれば、皆さんに許してほしいです.問題があれば、みんなで討論してください.
ありがとうございます.
図計算については、pythonのnetworx、sparkのgraphx、アリを用いたオープンソースフレームワークなど、多くの技術フレームワークがあります.ここでは、Graphxを計算フレームワークとして使用します.個人的な簡単なケースのため、単機環境で実現され、簡単なデータが入力されます.
Graphxでは、通常、類似の操作は集約方法またはPregelで実現できます.公式ドキュメント:spark graphxを参照してください.公式ドキュメントには類似のケースがあります.参考にしてください.
package com.pnlorf.graphx.pnlorf.graphx.circle
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* description:
*
* @author:
* date: 2019/10/12
* package: com.pnlorf.graphx.pnlorf
*/
object CircleGraph {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("Graphx_Circle")
val sc: SparkContext = new SparkContext(conf)
val myVertices = sc.parallelize(Array((1L, "A"),
(2L, "B"),
(3L, "C"),
(4L, "D"),
(5L, "E"),
(6L, "F"),
(7L, "G"),
(8L, "H"),
(9L, "I")
))
val myEdges = sc.parallelize(Array(Edge(1L, 2L, new EdgeInfo("1", "2")),
Edge(2L, 3L, new EdgeInfo("2", "3")),
Edge(3L, 4L, new EdgeInfo("3", "4")),
Edge(4L, 5L, new EdgeInfo("4", "5")),
Edge(5L, 1L, new EdgeInfo("5", "1")),
Edge(5L, 3L, new EdgeInfo("5", "3")),
Edge(6L, 7L, new EdgeInfo("6", "7")),
Edge(7L, 6L, new EdgeInfo("7", "6")),
Edge(7L, 8L, new EdgeInfo("7", "8")),
Edge(8L, 7L, new EdgeInfo("8", "7")),
Edge(1L, 9L, new EdgeInfo("1", "9")),
Edge(9L, 1L, new EdgeInfo("9", "1"))
))
//
val minSize = 2
//
val maxSize = 5
val myGraph = Graph(myVertices, myEdges)
type A = ArrayBuffer[ArrayBuffer[EdgeInfo]]
type VD = ArrayBuffer[ArrayBuffer[EdgeInfo]]
type ED = EdgeInfo
val graph = myGraph.mapVertices((vid, vd) => new ArrayBuffer[ArrayBuffer[EdgeInfo]]())
/**
* msg,
*
* @return msg
*/
def initialMsg(): A = {
new ArrayBuffer[ArrayBuffer[EdgeInfo]]()
}
/**
*
*
* @param vid id
* @param vd
* @param A
* @return
*/
def vprog(vid: Long, vd: VD, A: A): VD = {
val retArray = new ArrayBuffer[ArrayBuffer[EdgeInfo]]()
retArray.appendAll(vd)
retArray.appendAll(A)
retArray
}
def sendMsg(edgeTriplet: EdgeTriplet[VD, ED]): Iterator[(Long, A)] = {
if (edgeTriplet.srcAttr.length == 0) {
return Iterator((edgeTriplet.dstId, ArrayBuffer[ArrayBuffer[EdgeInfo]](ArrayBuffer[EdgeInfo](edgeTriplet.attr))))
}
val msgInfos = edgeTriplet.srcAttr.filter(msg => !msg.map(_.to).contains(edgeTriplet.attr.to))
val newAttr = msgInfos.map(msg => {
val retA = new ArrayBuffer[EdgeInfo]()
retA.appendAll(msg)
retA.append(edgeTriplet.attr)
retA
})
Iterator((edgeTriplet.dstId, newAttr))
}
def mergeMsg(a1: A, a2: A): A = {
a1 ++ a2
}
val result = Pregel(graph, initialMsg(), maxSize, EdgeDirection.Out)(vprog, sendMsg, mergeMsg).cache()
result.vertices.map(v => v._2.filter(path => path.head.fm.equalsIgnoreCase(path.last.to)))
.flatMap(v => v.toIterator)
.map(v => {
val array = new ArrayBuffer[String]()
v.foreach(v1 => {
array += v1.fm
array += v1.to
})
(array.sorted.mkString("_"), v)
})
.groupByKey()
.map(_._2.head)
.filter(v => v.size > minSize)
.collect()
.foreach(v => {
println("==========================")
v.foreach(v1 => {
println(v1.toString)
})
println("*************************")
})
result.unpersist(false)
}
}
別のカスタムクラス:package com.pnlorf.graphx.pnlorf.graphx.circle
/**
* description:
*
* @author:
* date: 2019/12/9
* package: com.pnlorf.graphx.pnlorf
*/
class EdgeInfo extends Serializable {
var fm = ""
var to = ""
def this(fm: String, to: String) {
this()
this.fm = fm
this.to = to
}
def canEqual(other: Any): Boolean = other.isInstanceOf[EdgeInfo]
override def equals(other: Any): Boolean = other match {
case that: EdgeInfo =>
(that canEqual this) &&
fm == that.fm &&
to == that.to
case _ => false
}
override def hashCode(): Int = {
val state = Seq(fm, to)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
override def toString = s"EdgeInfo(fm=$fm, to=$to, hashCode=$hashCode)"
}
上記のコードは直接実行でき、結果は以下のように出力されます.========================== EdgeInfo(fm=4, to=5, hashCode=1665) EdgeInfo(fm=5, to=1, hashCode=1692) EdgeInfo(fm=1, to=2, hashCode=1569) EdgeInfo(fm=2, to=3, hashCode=1601) EdgeInfo(fm=3, to=4, hashCode=1633) ************************* ========================== EdgeInfo(fm=4, to=5, hashCode=1665) EdgeInfo(fm=5, to=3, hashCode=1694) EdgeInfo(fm=3, to=4, hashCode=1633) *************************
簡単な例を書くので、考えが足りないかもしれませんが、問題があれば、皆さんに許してほしいです.問題があれば、みんなで討論してください.
ありがとうございます.