Spark修練の道(進級編)——Spark入門から精通へ:第十節Spark Streaming(一)
13361 ワード
本節の主な内容
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations Sparkフロー計算概要 Spark Streaming関連コアクラス 入門事例 1.Sparkフロー計算の概要
HadoopのMapReduceやSpark SQLなどはオフライン計算しかできず、リアルタイム性の要求が高いビジネスニーズを満たすことができません.例えば、リアルタイム推奨、リアルタイムサイト性能分析など、ストリーミング計算はこれらの問題を解決することができます.現在、Storm、Spark Streaming、Samzaの3つの比較的一般的なフロー計算フレームワークがあります.各フレームワークの比較と使用状況については、以下を参照してください.http://www.csdn.net/article/2015-03-09/2824135.このセクションでは、Spark Streamingについて重点的に説明します.Spark StreamingはSparkの5つのコアコンポーネントの1つとして、多くのデータソースへのアクセスを原生的にサポートし、Spark MLLib、Graphxと組み合わせて使用でき、分散環境でのオンラインマシン学習アルゴリズムの設計を簡単に完了します.Sparkがサポートする入力データソースおよび出力ファイルを下図に示します.
次のケースの実戦では、この部分に触れます.中間の「Spark Streaming」は、入力したデータソースを処理し、結果を出力します.その内部動作原理は次の図のようになります.
Spark Streamingリアルタイムで入力されたデータストリームを受け取り、バッチで分割した後、Sparkエンジンに渡して処理し、処理が完了したら結果を外部ファイルに出力します.
まず、次のSpark Streamingベースのword countコードを見てみましょう.ストリーム計算の初歩的な理解を助けることができます.
上記のプログラムを実行した後、コマンドラインインタフェースを介して、対応するファイルディレクトリにファイルをコピーします.具体的には、
プログラムが実行されると、ファイル作成時間に基づいてファイルが処理され、前回の実行時間以降に作成されたファイルが処理され、出力結果は以下の通りである:
2.Spark Streaming関連コアクラス
1. DStream(discretized stream)
Spark Streamingは、前述のKafka、Flumeなどのデータソースから作成できるDStreamであるデータストリームの抽象化を提供し、DStreamは本質的に一連のRDDから構成されている.各RDDのデータは、対応する時間間隔(interval)に流入するデータであり、下図のように
DStreamのすべての操作は、最終的にはRDDの操作に変換されます.たとえば、前のStreamingWordCountプログラムでは、flatMap操作はDStreamのすべてのRDDに作用します.次の図に示します.
2.StreamingContext Spark Streamingでは、StreamingContextはプログラム全体のエントリであり、その作成方法は様々であり、最も一般的なのはSparkConfによって作成される.
StreamingContextオブジェクトの作成時にSparkConfに基づいてSparkContextが作成されます
つまりStreamingContextはSparkContextのパッケージであり、StreamingContextには他にもいくつかの構造方法があり、興味のあることは、後期にソース解析時に詳細に説明し、StreamingContextを作成する際にbatchDurationを指定し、バッチ間隔を設定するために使用され、アプリケーションやクラスタリソースの状況に応じて設定する必要があることを理解することができます.StreamingContextの作成が完了したら、次の手順に従います.入力ソースによるInputDStreaim の作成 DStreamingに対してtransformationとoutput操作を行い、この操作は後期フロー計算の論理 を構成する. StreamingContextを通過する.start()メソッドは、データの受信および処理のプロセス を開始する. streamingContextを使用する.awaitTermination()メソッドは、プログラム処理の終了(手動停止またはエラー停止) を待つ. streamingContextを呼び出すこともできる.stop()メソッド終了プログラムの実行 StreamingContextについていくつか注意すべき点があります.
1.StreamingContextが起動すると、新しい操作を追加しても機能しません.つまり、StreamingContextが起動する前に、すべての計算ロジック2を定義する.StreamingContextが停止すると、再起動できません.つまり、再計算するには、プログラム全体を再実行する必要があります. 3.単一のJVMでは、一定期間に2つのactive状態のStreamingContext 4が発生しない.StreamingContextのstopメソッドを呼び出すとSparkContextもstopによって削除する、StreamingContextを閉じる場合にSparkContextを保持するにはstopメソッドにパラメータstopSparkContext=false/***Stop the execution of the streams immediately(does not wait for all received data*to be processed)を入力する必要がある.By default, if
3.InputDStreamsおよびReceivers InputDStreamとは、データストリームのソースから受け取る入力データストリームを指し、先のStreamingWordCountプログラムではval lines=ssc.textFileStream(args(0))はInputDStreamの一種である.ファイル・ストリームに加えて、各input DStreamは、データ・ソースから送信されたデータを受信し、後でSpark処理のためにメモリに保存するReceiverオブジェクトに関連付けられています.
Spark Streaimgは、2つのオリジナルサポートストリームデータソースを提供します. Basic sources(ベースストリームデータソース).ファイルシステム(ローカルファイルシステムおよび分散ファイルシステム)、Socket接続、AkkaのActorなど、StreamingContext APIから直接作成します.ファイルストリーム(File Streams)の作成方法:a.streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass b. streamingContext.textFileStream(dataDirectory)リアルタイムでtextFileStreamメソッドが最終的に呼び出されるのもfileStreamメソッドdef textFileStream(directory:String):DStream[String]=withNamedScope("text file stream"){fileStream LongWritable,Text,TextInputFormat.map(._2.toString)}Akka Actorストリームデータに基づく作成方法:streamingContext.actorStream(actorProps,actor-name)Socketストリームデータに基づく作成方法:ssc.socketTextStream(hostname:String,port:Int,storageLevel:StorageLevel=StorageLevel.MEMORY_AND_DISK_SER_2)RDDキューに基づくストリームデータ作成方式:streamingContext.queueStream(queueOfRDDs) Advanced sources(高度なストリームデータソース).Kafka,Flume,Kinesis,Twitterなど,外部ツールクラスを借りる必要があり,実行時に外部依存が必要である(次節で説明する) .
Spark Streamingはユーザー3もサポートする.Custom Source(カスタムストリームデータソース)では、次のセクションで説明するreceiverを定義する必要があります.
最後に注意すべき点は2つあります.ローカルでSpark Streamingを実行する場合、master URLは「local」または「local[1]」を使用できません.なぜなら、input DStreamがreceiver(sockets、Kafka、Flumeなど)に関連付けられている場合、receiver自身が実行するためにスレッドが必要であり、この場合、スレッドが受信したデータを処理しないからです.したがって、SparkStreamingプログラムをローカルで実行する場合は、master URLとして「local[n]」を使用し、nはreceiverの数より大きい. クラスタ上でSpark Streamingを実行する場合、Spark Streamingプログラムに割り当てられたCPUコア数もreceiverの数より大きくなければなりません.そうしないと、システムはデータのみを受け入れ、データを処理できません.
3.導入事例
実行結果の後期表示を容易にするため、変更ログレベルはLevel.WARN NetworkWordCount Socketストリームデータ に基づく
ランタイムパラメータの構成
使用
NetworkWordCountプログラムを実行し、netcat serverが実行するコンソールに任意の文字列を入力します.
QueueStream RDDキューに基づくストリームデータ
このセクションの内容は、公式ドキュメントから来ています.http://spark.apache.org/docs/latest/streaming-programming-guide.html#mllib-operations
HadoopのMapReduceやSpark SQLなどはオフライン計算しかできず、リアルタイム性の要求が高いビジネスニーズを満たすことができません.例えば、リアルタイム推奨、リアルタイムサイト性能分析など、ストリーミング計算はこれらの問題を解決することができます.現在、Storm、Spark Streaming、Samzaの3つの比較的一般的なフロー計算フレームワークがあります.各フレームワークの比較と使用状況については、以下を参照してください.http://www.csdn.net/article/2015-03-09/2824135.このセクションでは、Spark Streamingについて重点的に説明します.Spark StreamingはSparkの5つのコアコンポーネントの1つとして、多くのデータソースへのアクセスを原生的にサポートし、Spark MLLib、Graphxと組み合わせて使用でき、分散環境でのオンラインマシン学習アルゴリズムの設計を簡単に完了します.Sparkがサポートする入力データソースおよび出力ファイルを下図に示します.
次のケースの実戦では、この部分に触れます.中間の「Spark Streaming」は、入力したデータソースを処理し、結果を出力します.その内部動作原理は次の図のようになります.
Spark Streamingリアルタイムで入力されたデータストリームを受け取り、バッチで分割した後、Sparkエンジンに渡して処理し、処理が完了したら結果を外部ファイルに出力します.
まず、次のSpark Streamingベースのword countコードを見てみましょう.ストリーム計算の初歩的な理解を助けることができます.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: StreamingWordCount ")
System.exit(1)
}
// SparkConf
val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
// Create the context
// StreamingContext ,
val ssc = new StreamingContext(sparkConf, Seconds(20))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
// ,
val lines = ssc.textFileStream(args(0))
//
val words = lines.flatMap(_.split(" "))
//
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//
wordCounts.print()
// Spark Streaming
ssc.start()
// ,
ssc.awaitTermination()
}
}
上記のプログラムを実行した後、コマンドラインインタフェースを介して、対応するファイルディレクトリにファイルをコピーします.具体的には、
プログラムが実行されると、ファイル作成時間に基づいてファイルが処理され、前回の実行時間以降に作成されたファイルが処理され、出力結果は以下の通りである:
2.Spark Streaming関連コアクラス
1. DStream(discretized stream)
Spark Streamingは、前述のKafka、Flumeなどのデータソースから作成できるDStreamであるデータストリームの抽象化を提供し、DStreamは本質的に一連のRDDから構成されている.各RDDのデータは、対応する時間間隔(interval)に流入するデータであり、下図のように
DStreamのすべての操作は、最終的にはRDDの操作に変換されます.たとえば、前のStreamingWordCountプログラムでは、flatMap操作はDStreamのすべてのRDDに作用します.次の図に示します.
2.StreamingContext Spark Streamingでは、StreamingContextはプログラム全体のエントリであり、その作成方法は様々であり、最も一般的なのはSparkConfによって作成される.
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
StreamingContextオブジェクトの作成時にSparkConfに基づいてSparkContextが作成されます
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
つまりStreamingContextはSparkContextのパッケージであり、StreamingContextには他にもいくつかの構造方法があり、興味のあることは、後期にソース解析時に詳細に説明し、StreamingContextを作成する際にbatchDurationを指定し、バッチ間隔を設定するために使用され、アプリケーションやクラスタリソースの状況に応じて設定する必要があることを理解することができます.StreamingContextの作成が完了したら、次の手順に従います.
1.StreamingContextが起動すると、新しい操作を追加しても機能しません.つまり、StreamingContextが起動する前に、すべての計算ロジック2を定義する.StreamingContextが停止すると、再起動できません.つまり、再計算するには、プログラム全体を再実行する必要があります. 3.単一のJVMでは、一定期間に2つのactive状態のStreamingContext 4が発生しない.StreamingContextのstopメソッドを呼び出すとSparkContextもstopによって削除する、StreamingContextを閉じる場合にSparkContextを保持するにはstopメソッドにパラメータstopSparkContext=false/***Stop the execution of the streams immediately(does not wait for all received data*to be processed)を入力する必要がある.By default, if
stopSparkContext
is not specified, the underlying * SparkContext will also be stopped. This implicit behavior can be configured using the * SparkConf configuration spark.streaming.stopSparkContextByDefault. * * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext * will be stopped regardless of whether this StreamingContext has been * started. */ def stop( stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true) ): Unit = synchronized { stop(stopSparkContext, false) } 5.SparkContextオブジェクトは複数のStreamingContextsで繰り返し使用できますが、前のStreamingContextsが停止してから次のStreamingContextオブジェクトを作成する必要があります.3.InputDStreamsおよびReceivers InputDStreamとは、データストリームのソースから受け取る入力データストリームを指し、先のStreamingWordCountプログラムではval lines=ssc.textFileStream(args(0))はInputDStreamの一種である.ファイル・ストリームに加えて、各input DStreamは、データ・ソースから送信されたデータを受信し、後でSpark処理のためにメモリに保存するReceiverオブジェクトに関連付けられています.
Spark Streaimgは、2つのオリジナルサポートストリームデータソースを提供します.
Spark Streamingはユーザー3もサポートする.Custom Source(カスタムストリームデータソース)では、次のセクションで説明するreceiverを定義する必要があります.
最後に注意すべき点は2つあります.
3.導入事例
実行結果の後期表示を容易にするため、変更ログレベルはLevel.WARN
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount ")
System.exit(1)
}
// Level.WARN
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of
delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
// SocketInputDStream, ip:port
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
ランタイムパラメータの構成
使用
// netcat server
root@sparkmaster:~/streaming# nc -lk 9999
NetworkWordCountプログラムを実行し、netcat serverが実行するコンソールに任意の文字列を入力します.
root@sparkmaster:~/streaming# nc -lk 9999
Hello WORLD
HELLO WORLD WORLD
TEWST
NIMA
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueStream {
def main(args: Array[String]) {
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
// RDD
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
// RDD
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//
reducedStream.print()
//
ssc.start()
// Create and push some RDDs into
for (i