Sparkラーニングインスタンス(Python):入力ソースのリアルタイム処理Input Sources Streaming
以前に勉强したRDDとDataFrameデータセットは主にオフラインデータを処理しています.时代の発展と进歩につれて、私たちはますます多くのデータがデータセンターに戻ってくることを発见します.同时に、すぐにユーザーに応答する必要があります.このような状况はリアルタイムで処理します.よく使われるシーンは、あるデパートの1时间の人の流れの密度をリアルタイムで表示します.当日の駅の人口総数などをリアルタイムで表示します.次に、リアルタイム・データ・ソースから言えば、リアルタイム・データ・ソースは主に次のとおりです. File Source Socket Source Flume Source Kafka Source
File Sourceとは、ファイルをデータソースとして使用し、ローカルファイルfileと分散システムhdfsが一般的である.ここではローカルファイルで説明し、実装コードは以下の通りである.
その後、/home/llh/data/streaming/ディレクトリにファイルをコピーし続け、上記のコメントに示すようにします.
Socket Sourceとは、データソースとしてネットワークソケットを指し、コマンドncでネットワーク送信情報をシミュレートし、実装コードは以下の通りである.
コマンド側実行~$nc-lk 9999
hadoop spark
後でコードを実行すればいいです
Flumeは高可用性の大量収集ログシステムであるため、データソースとして実現可能なコードは以下の通りである.
Kafkaは分散型メッセージキューであり、常にミドルウェアとして伝送、隔離に用いられている.Kafkaは以上の4つの中で実際に開発された最も一般的なストリームデータソースであり、同じ実現コードは以下の通りである.
はい、以上が主なデータソースをリアルタイムで処理することであり、4つ目が最も重要であることを把握しなければなりません.
Spark学習ディレクトリ: Spark学習例1(Python):単語統計Word Count Sparkラーニングインスタンス2(Python):ロードデータソースLoad Data Source Spark学習例3(Python):保存データSave Data Spark学習例4(Python):RDD変換Transformations Spark学習例5(Python):RDD実行Actions Spark学習例6(Python):共有変数Shared Variables Spark学習例7(Python):RDD、DataFrame、DataSet相互変換 Spark学習例8(Python):入力元リアルタイム処理Input Sources Streaming Spark学習例9(Python):ウィンドウ操作Window Operations
File Sourceとは、ファイルをデータソースとして使用し、ローカルファイルfileと分散システムhdfsが一般的である.ここではローカルファイルで説明し、実装コードは以下の通りである.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
#
ssc = StreamingContext(sc, 5)
lines = ssc.textFileStream("file:///home/llh/data/streaming")
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
# -------------------------------------------
# Time: 2019-07-31 18:11:55
# -------------------------------------------
# ('hong', 2)
# ('zhang', 2)
# ('li', 2)
# ('san', 2)
# ('wang', 2)
ssc.start()
ssc.awaitTermination()
その後、/home/llh/data/streaming/ディレクトリにファイルをコピーし続け、上記のコメントに示すようにします.
Socket Sourceとは、データソースとしてネットワークソケットを指し、コマンドncでネットワーク送信情報をシミュレートし、実装コードは以下の通りである.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
#
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
# -------------------------------------------
# Time: 2019-07-31 18: 43:25
# -------------------------------------------
# ('hadoop', 1)
# ('spark', 1)
ssc.start()
ssc.awaitTermination()
コマンド側実行~$nc-lk 9999
hadoop spark
後でコードを実行すればいいです
Flumeは高可用性の大量収集ログシステムであるため、データソースとして実現可能なコードは以下の通りである.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
if __name__ == '__main__':
sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
#
ssc = StreamingContext(sc, 5)
lines = FlumeUtils.createStream("localhost", 34545)
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Kafkaは分散型メッセージキューであり、常にミドルウェアとして伝送、隔離に用いられている.Kafkaは以上の4つの中で実際に開発された最も一般的なストリームデータソースであり、同じ実現コードは以下の通りである.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == '__main__':
sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
#
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createDirectStream(ssc, "topic-name", "localhost:9092")
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
はい、以上が主なデータソースをリアルタイムで処理することであり、4つ目が最も重要であることを把握しなければなりません.
Spark学習ディレクトリ: