Sparkラーニングインスタンス(Python):入力ソースのリアルタイム処理Input Sources Streaming


以前に勉强したRDDとDataFrameデータセットは主にオフラインデータを処理しています.时代の発展と进歩につれて、私たちはますます多くのデータがデータセンターに戻ってくることを発见します.同时に、すぐにユーザーに応答する必要があります.このような状况はリアルタイムで処理します.よく使われるシーンは、あるデパートの1时间の人の流れの密度をリアルタイムで表示します.当日の駅の人口総数などをリアルタイムで表示します.次に、リアルタイム・データ・ソースから言えば、リアルタイム・データ・ソースは主に次のとおりです.
  • File Source
  • Socket Source
  • Flume Source
  • Kafka Source

  • File Sourceとは、ファイルをデータソースとして使用し、ローカルファイルfileと分散システムhdfsが一般的である.ここではローカルファイルで説明し、実装コードは以下の通りである.
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == '__main__':
        sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
        #                
        ssc = StreamingContext(sc, 5)
    
        lines = ssc.textFileStream("file:///home/llh/data/streaming")
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
        counts.pprint()
        # -------------------------------------------
        # Time: 2019-07-31 18:11:55
        # -------------------------------------------
        # ('hong', 2)
        # ('zhang', 2)
        # ('li', 2)
        # ('san', 2)
        # ('wang', 2)
        ssc.start()
        ssc.awaitTermination()

    その後、/home/llh/data/streaming/ディレクトリにファイルをコピーし続け、上記のコメントに示すようにします.
    Socket Sourceとは、データソースとしてネットワークソケットを指し、コマンドncでネットワーク送信情報をシミュレートし、実装コードは以下の通りである.
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    
    if __name__ == '__main__':
        sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
        #                
        ssc = StreamingContext(sc, 5)
    
        lines = ssc.socketTextStream("localhost", 9999)
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
        counts.pprint()
        # -------------------------------------------
        # Time: 2019-07-31 18: 43:25
        # -------------------------------------------
        # ('hadoop', 1)
        # ('spark', 1)
        ssc.start()
        ssc.awaitTermination()

    コマンド側実行~$nc-lk 9999
    hadoop spark
    後でコードを実行すればいいです
    Flumeは高可用性の大量収集ログシステムであるため、データソースとして実現可能なコードは以下の通りである.
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.flume import FlumeUtils
    
    if __name__ == '__main__':
        sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
        #                
        ssc = StreamingContext(sc, 5)
    
        lines = FlumeUtils.createStream("localhost", 34545)
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
        counts.pprint()
    
        ssc.start()
        ssc.awaitTermination()

    Kafkaは分散型メッセージキューであり、常にミドルウェアとして伝送、隔離に用いられている.Kafkaは以上の4つの中で実際に開発された最も一般的なストリームデータソースであり、同じ実現コードは以下の通りである.
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    if __name__ == '__main__':
        sc = SparkContext(appName="inputSourceStreaming", master="local[*]")
        #                
        ssc = StreamingContext(sc, 5)
    
        kvs = KafkaUtils.createDirectStream(ssc, "topic-name", "localhost:9092")
        lines = kvs.map(lambda x: x[1])
        counts = lines.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a+b)
        counts.pprint()
    
        ssc.start()
        ssc.awaitTermination()

    はい、以上が主なデータソースをリアルタイムで処理することであり、4つ目が最も重要であることを把握しなければなりません.
     
    Spark学習ディレクトリ:
  • Spark学習例1(Python):単語統計Word Count
  • Sparkラーニングインスタンス2(Python):ロードデータソースLoad Data Source
  • Spark学習例3(Python):保存データSave Data
  • Spark学習例4(Python):RDD変換Transformations
  • Spark学習例5(Python):RDD実行Actions
  • Spark学習例6(Python):共有変数Shared Variables
  • Spark学習例7(Python):RDD、DataFrame、DataSet相互変換
  • Spark学習例8(Python):入力元リアルタイム処理Input Sources Streaming
  • Spark学習例9(Python):ウィンドウ操作Window Operations