SparkとSparkSQLに基づくNetFlowトラフィックの初歩的な分析-scala言語


SparkとSparkSQLに基づくNetFlowトラフィックの初歩的な分析-scala言語
ラベル:NetFlow Spark SparkSQL本文は主にSparkを使って簡単なNetFlowデータの処理をする方法を紹介して、IntelliJ IDEAに基づいてSparkのMavenプロジェクトを開発して、本文はいくつかの簡単なNetFlowの基礎知識を紹介して、およびどのようにIntelliJ IDEAの上でMavenプロジェクトを開発して、Scalaで書いたいくつかの簡単なNetFlowフィールドで統計のコードを分析して、SparkCoreとSparkSQLの2つのバージョンが含まれています.
NetFlowの初歩的な認識
具体的な分析コードを書く前に、NetFlowの基本的な知識とその具体的なフィールドがどのような意味を表しているかを理解してください.私の前の記事にはNetFlowについて簡単な紹介がありました.http://blog.csdn.net/u012462093/article/details/78251470#0-qzone-1-66165-d020d2d2a4e8d1a374a433f596ad1440
以下はNetFlowデータと対応するフィールドの意味(各行はNetFlowデータを表す)[外部チェーン画像の転送に失敗した場合、ソース局は盗難防止チェーンメカニズムがある可能性があり、画像を保存して直接アップロードすることを提案する(img-u 8 lzVkxQ-15922048881867)(https://i.imgur.com/Wfj5s4M.png)]
[外部チェーン画像の転送に失敗しました.ソース局には盗難防止チェーンのメカニズムがある可能性があります.画像を保存して直接アップロードすることをお勧めします(img-7 y 7 fDBkm-15922048881869)(https://i.imgur.com/MqeATZb.png)]
NetFlowは、フィールドの抽出と処理を容易にするために必要な情報を明確に示すことができます.
IDEAを使用したSparkのMavenプロジェクトの開発
IDEAのインストール方法やMavenプロジェクトのブログの構築方法についても詳しく紹介されていますhttp://blog.csdn.net/kwu_ganymede/article/details/51832427
Maven管理プロジェクトはJavaEEで一般的に使用されており、Sparkプロジェクトの開発も例外ではないため、Sparkプロジェクトを開発するためにMaven-Scalaプロジェクトを構築する必要がある.本文で採用したツールはIntelliJ IDEA 2017で、IDEAツールはますますみんなに認められて、java、python、scalaのサポートはすべてとても良くて、Scala言語はSparkプロジェクトの第一選択を開発します.
上のブログでは、Sparkを構築するMavenプロジェクトのインストール方法について、Scalaプラグインのインストールも含めて詳しく紹介していますが、ここではあまり説明しません.私たちはSparkプロジェクトだけでなくSparkSqlも使うので、そのブログのpomファイルは使えません.私のpomファイルをここに貼り付けます.

4.0.0
MDemo4
scm4
1.0-SNAPSHOT
2008

2.4.2
2.10.5
1.6.1
1.5.2






  scala-tools.org
  Scala-Tools Maven2 Repository
  http://scala-tools.org/repo-releases





  scala-tools.org
  Scala-Tools Maven2 Repository
  http://scala-tools.org/repo-releases





  org.scala-lang
  scala-library
  ${scala.version}




  junit
  junit
  4.4
  test


  org.specs
  specs
  1.2.5
  test


  org.scala-lang
  scala-compiler
  ${scala.version}
  compile



  org.apache.spark
  spark-core_2.10
  ${spark.version}



  org.apache.spark
  spark-streaming_2.10
  ${spark.version}



  org.apache.spark
  spark-streaming-kafka_2.10
  ${spark.version}



  org.apache.spark
  spark-mllib_2.10
  ${spark.version}



  org.apache.spark
  spark-sql_2.10
  ${spark.version}



  com.databricks
  spark-csv_2.10
  1.4.0



  org.slf4j
  slf4j-api
  ${org.slft4j.version}




  org.slf4j
  jcl-over-slf4j
  ${org.slft4j.version}



  org.slf4j
  slf4j-log4j12
  ${org.slft4j.version}



  com.alibaba
  fastjson
  1.2.15




src/main/scala
src/test


  
    org.scala-tools
    maven-scala-plugin
    
      
        
          compile
          testCompile
        
      
    
    
      ${scala.version}
      
        -target:jvm-1.5
      
    
  






  
    org.scala-tools
    maven-scala-plugin
    
      ${scala.version}
    
  




このpomファイルにはSpark、SparkSQL、SparkStreamingの依存関係が詳細に含まれているので、他のSparkプロジェクトを開発するには基本的に十分です.
次に、SparkでNetFlowデータを分析する方法について説明します.
Sparkで宛先IPからバイト数を統計する
テストデータソース:
まず解決しなければならないのはデータソースの問題で、2つの方法があり、flowdプログラムによって取得されたり、flowfakeによってデータを偽造したりして、地面に落ちてファイルを形成したりします.本稿では,2つ目の方法でデータを偽造したり,自分でスクリプトを書いてデータを生成したりするが,いずれの方法でも最後にファイルを作成しなければならない.私たちがデータを読み取る方法はファイルから読み取ることです.
データ生成後はそのまま使ってもHDFSにアップロードしても使えますが、2つの方法の違いは大きくありません.
ダイレクトコード
import org.apache.spark.{SparkConf, SparkContext}    
object SparkDemo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("hdfs:///home/fake/*")
    //    ip         
    val rdd2 = rdd1
      .map(m=>{(m.split("\\\t")(2),m.split("\\\t")(6).toInt)})
      .reduceByKey(_+_)
      .sortBy(_._2,false)
      .repartition(1)
    rdd2.saveAsTextFile("hdfs:///home/results")
  }
}

テストなので、単機運転でもクラスタ運転に変更できます.
  • textFileはhdfsからデータを取得することを示し、ローカルファイルを読むにはhdfsをfileに置き換えるだけで
  • になる.
  • /*は、そのディレクトリの下にあるすべてのファイル
  • を読み取ることができることを示す.
  • netflowデータフィールド間はタブで分割する
  • である.
  • データ量が大きい場合はデフォルトパーティションに戻り、repartition(1)で結果ファイルを1つにまとめることができ、
  • を簡単に表示できます.
    Sparkシステムにコミットして実行
    まず、Mavenコマンドでコードをjarパッケージにし、IDEAはMavenをよくサポートし、右側のMaven Projectsバーでpackageコマンドをクリックしてパッケージ化します.[外部チェーン画像の転送に失敗しました.ソース局には盗難防止チェーンのメカニズムがある可能性があります.画像を保存して直接アップロードすることをお勧めします(img-u 8 eroIO 0-1592204881872)(https://i.imgur.com/btXt4jy.png)]
    Terminalウィンドウにコマンドmvn clean packageを入力してパッケージ化することもでき、作成したjarパッケージはtargetフォルダの下に保存されます.
    jarパケットをSparkが存在するシステムに転送するにはSparkのsubmit方式で実行する.
    spark-submit --queue flow \
    	--class fake.SparkDemo1 \
    /home/scm4-1.0-SNAPSHOT.jar
    

    2行目はmain関数が存在するパスを指定し、最後の行はjarパッケージが存在するディレクトリを指定します.
    最後に実行した結果ファイルは次のとおりです.
    [外部チェーン画像の転送に失敗しました.ソース局には盗難防止チェーンのメカニズムがある可能性があります.画像を保存して直接アップロードすることをお勧めします(img-zl 2 Gy 2 Fi-15922048881875).https://i.imgur.com/58fn8se.png)]
    SparkSQLによるマルチフィールドグループ集約クエリー
    古いルール、まずコードをつけます
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql._    
    object SparkDemo3 {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("SparkDemo").setMaster("local")
        val sc=new SparkContext(conf)
        val sqlc=new SQLContext(sc)
        import sqlc.implicits._
        //    , ip,  ip,  4   ,        
        sc.textFile("hdfs:///home/fake/*")
          .map(x=>{(x.split("\\\t")(8),x.split("\\\t")(0),x.split("\\\t")(1),x.split("\\\t")(7))})
          .repartition(1)
          .toDF("time","rip","sip","bytes").registerTempTable("temp")
        sqlc.sql("select max(time), rip, sip, sum(bytes) as bytes from temp group by rip, sip order by bytes desc limit 10")
          .toJavaRDD
          .saveAsTextFile("hdfs:///home/results")
    
      }
    
    }
    
  • ファイルから直接データを読み込むので、RDDとして読んでからDataFreamに変換し、最後にファイルを書く前にRDDに戻る必要があります.本稿ではテストのみを行い,実際の環境ではhiveからデータを読み出すのが一般的である.

  • 実行結果は次のとおりです.
    [外部チェーン画像の転送に失敗しました.ソース局には盗難防止チェーンのメカニズムがある可能性があります.画像を保存して直接アップロードすることをお勧めします(img-PXbXqEoF-1592204881877).https://i.imgur.com/gIaFemK.png)]
    まとめ
    本稿では、基本的なSparkの使い方を紹介します.入門にのみ適用されます.皆さん、一緒に勉強を検討してください.その他の関連資料はすでに私のブログのウェブサイトに同期して、私の個人のブログにアクセスしてもっと多くの内容を理解することを歓迎します.