Spark修練の道(進級編)——Spark入門から精通へ:第十一節Spark Streaming——DStream Transformation操作
6838 ワード
本節の主な内容
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html DStream Transformationオペレーション 1.Transformation操作
Transformation
Meaning
map(func)
DStreamの各要素に対してfunc関数操作を行い、新しいDStreamを返す.
flatMap(func)
mapメソッドと同様ですが、各入力項目はゼロまたは複数の出力項目として出力できます.
filter(func)
すべての関数func戻り値trueのDStream要素をフィルタし、新しいDStreamを返します.
repartition(numPartitions)
DStreamのパーティション数を増やしたり減らしたりして、DStreamの並列度を変更します.
union(otherStream)
ソースDStreamと入力パラメータがotherDStreamの要素を結合し、新しいDStreamを返します.
count()
DStreaimの各RDDの要素をカウントし、1つの要素のみのRDDからなるDStreamを返す
reduce(func)
ソースDStream中の各RDD中の要素をfuncで集約操作し、1つの要素のみのRDDからなる新しいDStreamを返す.
countByValue()
要素タイプKのDStreamについて、(K,Long)キー値ペア形式の新しいDStreamを返し、Longに対応する値はソースDStreamの各RDDのkeyが現れる回数である
reduceByKey(func, [numTasks])
func関数を使用してソースDStreamのkeyを集約し、新しい(K,V)対で構成されたDStreamを返します.
join(otherStream, [numTasks])
(K,V)、(K,W)タイプのDStreamと入力し、新しい(K,(V,W)タイプのDStreamを返します.
cogroup(otherStream, [numTasks])
(K,V)、(K,W)タイプのDStreamと入力し、新しい(K,Seq[V],Seq[W])メタグループタイプのDStreamを返します.
transform(func)
RDD-to-RDD関数がソースコードDStream内の各RDDに作用することによって、任意のRDD動作であることができ、それによって新しいRDDが返される
updateStateByKey(func)
キーの前置状態とキーの新しい値に基づいてキーを更新し、新しい状態のDStreamを返します.
具体例:
次のコードは、実行時に追加されたファイルの内容です.
次に、前の各関数の結果を示します.
例2:前回のレッスンで示したWordCountコードは、入力した単語を別々にカウントするだけでなく、前回のカウントの状態は記録されていません.連続的にカウントしたい場合は、updateStateByKeyメソッドを使用して行います.次のコードは主にupdateStateByKeyの方法を示しています.
次の図は初期値です:次のコマンドを使用してnetcat serverを起動します.
そして入力
下図の結果が得られます
そしてworldを入力し、
次の結果が得られます
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html
Transformation
Meaning
map(func)
DStreamの各要素に対してfunc関数操作を行い、新しいDStreamを返す.
flatMap(func)
mapメソッドと同様ですが、各入力項目はゼロまたは複数の出力項目として出力できます.
filter(func)
すべての関数func戻り値trueのDStream要素をフィルタし、新しいDStreamを返します.
repartition(numPartitions)
DStreamのパーティション数を増やしたり減らしたりして、DStreamの並列度を変更します.
union(otherStream)
ソースDStreamと入力パラメータがotherDStreamの要素を結合し、新しいDStreamを返します.
count()
DStreaimの各RDDの要素をカウントし、1つの要素のみのRDDからなるDStreamを返す
reduce(func)
ソースDStream中の各RDD中の要素をfuncで集約操作し、1つの要素のみのRDDからなる新しいDStreamを返す.
countByValue()
要素タイプKのDStreamについて、(K,Long)キー値ペア形式の新しいDStreamを返し、Longに対応する値はソースDStreamの各RDDのkeyが現れる回数である
reduceByKey(func, [numTasks])
func関数を使用してソースDStreamのkeyを集約し、新しい(K,V)対で構成されたDStreamを返します.
join(otherStream, [numTasks])
(K,V)、(K,W)タイプのDStreamと入力し、新しい(K,(V,W)タイプのDStreamを返します.
cogroup(otherStream, [numTasks])
(K,V)、(K,W)タイプのDStreamと入力し、新しい(K,Seq[V],Seq[W])メタグループタイプのDStreamを返します.
transform(func)
RDD-to-RDD関数がソースコードDStream内の各RDDに作用することによって、任意のRDD動作であることができ、それによって新しいRDDが返される
updateStateByKey(func)
キーの前置状態とキーの新しい値に基づいてキーを更新し、新しい状態のDStreamを返します.
具体例:
// ~/streaming
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(x => (x, 1))
val wordCounts=wordMap.reduceByKey(_ + _)
val filteredWordCounts=wordCounts.filter(_._2>1)
val numOfCount=filteredWordCounts.count()
val countByValue=words.countByValue()
val union=words.union(word1)
val transform=words.transform(x=>x.map(x=>(x,1)))
//
lines.print()
// flatMap
words.print()
// map
wordMap.print()
// reduceByKey
wordCounts.print()
// filter
filteredWordCounts.print()
// count
numOfCount.print()
// countByValue
countByValue.print()
// union
union.print()
// transform
transform.print()
次のコードは、実行時に追加されたファイルの内容です.
root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt
次に、前の各関数の結果を示します.
-------------------------------------------
lines.print()
-------------------------------------------
A B C D
A B
-------------------------------------------
flatMap
-------------------------------------------
A
B
C
D
A
B
-------------------------------------------
map
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)
-------------------------------------------
reduceByKey
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)
-------------------------------------------
filter
-------------------------------------------
(B,2)
(A,2)
-------------------------------------------
count
-------------------------------------------
2
-------------------------------------------
countByValue
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)
-------------------------------------------
union
-------------------------------------------
A
B
C
D
A
B
A
B
C
D
...
-------------------------------------------
transform
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)
例2:前回のレッスンで示したWordCountコードは、入力した単語を別々にカウントするだけでなく、前回のカウントの状態は記録されていません.連続的にカウントしたい場合は、updateStateByKeyメソッドを使用して行います.次のコードは主にupdateStateByKeyの方法を示しています.
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount ")
System.exit(1)
}
// ,
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// K,V,S, K,S
//V ,S
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]")
//
val ssc = new StreamingContext(sparkConf, Seconds(1))
// checkpoint , checkpoint Spark Streaming
ssc.checkpoint(".")
//RDD
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Socket , ip localhost, 9999
val lines = ssc.socketTextStream(args(0), args(1).toInt)
//flatMap
val words = lines.flatMap(_.split(" "))
//map
val wordDstream = words.map(x => (x, 1))
//updateStateByKey
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
次の図は初期値です:次のコマンドを使用してnetcat serverを起動します.
root@sparkmaster:~/streaming# nc -lk 9999
そして入力
root@sparkmaster:~/streaming# nc -lk 9999
hello
下図の結果が得られます
そしてworldを入力し、
root@sparkmaster:~/streaming# nc -lk 9999
hello
world
次の結果が得られます