Spark ListenerBusとMetricsSystemアーキテクチャ分析


Sparkイベント体系の中枢はListener Busであり,このクラスによってEventが受け入れられ,各Listenerに配布される.MetricsSystemは、システムの様々な指標を測定するための である.Listenerは、MetricsSystemの情報源の1つであってもよい.彼らの間には全体的に補完的な関係がある.
前言
モニタリングは大きなシステムが完了した後で最も重要な部分です.Sparkのシステム全体の動作はListenerBusおよびMetricsSystemによって行われる.この文章は彼らの間の仕事のメカニズムとどのようにこの2つのシステムを通じてより多くの指標の収集を完成するかを重点的に分析している.
ListenerBusがどのように機能しているか
Sparkのイベントシステムはどのように機能していますか?まず簡単に説明して、皆さんに大まかな理解をさせます.
まず、大部分の分類にはlistenerBusというオブジェクトが導入され、このクラスは具体的に何が実現するかを見なければならないが、org.apache.spark.util.ListenerBusから継承されているに違いない.
タスクセットをコミットするとします.この動作は多くの人が関心を持っているかもしれませんが、私はlistenerBusを使ってEventを出して、次の2行目のコードに似ています.
  def submitJobSet(jobSet: JobSet) {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))    
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }

ListenerBusには多くのリスナーが登録されています.listenerと呼ばれています.通常、listenerBusはスレッドを非同期に呼び出してこのEventを消費します. とは、情報格納などの動作を実行するために事前に設計されたコールバック関数をトリガすることである.
これがlistenerBus全体の働き方です.ここでは、実際には に似ています.これは侵入性があり、注目しなければならない場所ごとに、人に知られたいなら、特定のEventを出す必要があります.
ListenerBus分析
     <   AsynchronousListenerBus  < ListenerBus
     <   SparkListenerBus  < ListenerBus

ここの は、
   *  StreamingListenerBus extends  AsynchronousListenerBus 
   *  LiveListenerBus extends  AsynchronousListenerBus  with SparkListenerBus
   *  ReplayListenerBus extends SparkListenerBus 

AsynchronousListener Busは内部でqueueを維持し、イベントはこのqueueに先に配置され、スレッドを介してListenerにEventを処理させます.
SparkListenerBusもtraitですが、具体的な実装があり、onPostEventメソッドを定義して特定のイベントを処理しています.
他のより下のクラスは、必要に応じてSparkListener Bus、AsynchronousListener Busを混入または継承して、必要な機能を完了します.
異なるListener Busには異なるEventセットとListenerが必要です.たとえば、StreamingListener Busの署名を見ると、すべてのEventがStreamingListener Eventでなければならないことがわかります.すべてのListenerはStreamingListenerでなければなりません.
  StreamingListenerBus  
  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]

Listener(Listener)
通常、Listenerは状態があり、一般的に1つのEventを受け取ると、内部のデータ構造が更新される可能性があります.org.apache.spark.streaming.ui.StreamingJobProgressListenerを例にとると、彼はStreamingListenerで、内部にはいくつかのストレージ構造が含まれています.例えば、
  private val waitingBatchUIData = new HashMap[Time, BatchUIData]
  private val runningBatchUIData = new HashMap[Time, BatchUIData]

申明を見ると普通のHashMapなので、操作はsynchronized操作が必要です.次のようになります.
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
    synchronized {
      receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
    }
  }

MetricsSystemの紹介
MetricsSystemは、システムの様々な指標の を測定するために一般的によく理解されている.key-value形態のものです.簡単な例を挙げて、私はどのように現在のJVM関連情報を展示しますか?作り方はもちろん多く、MetricsSystemを通じてより標準化することができます.具体的な方法は以下の通りです.
  • Source .データソース.例えばorg.apache.spark.metrics.source.JvmSource
  • が対応する.
  • Sink. データはどこへ送信されますか.受動的で積極的である.一般にアクティブなのはタイマによる出力であり,例えばCSVsink,MetricsServeretなどの受動的なユーザによるアクティブな呼び出しが必要である.
  • はSourceとSinkをブリッジしているのはMetricRegistryです.

  • Sparkは、最下位のMetricsの機能を実装するのではなく、サードパーティ製ライブラリを使用しています.http://metrics.codahale.com .興味のある人は見てもいいです.もっと完全な認識があります.
    MetricsSystemの構成方法
    MetricsSystemの構成は2種類あり,1つ目はmetrics.propertiesプロファイルの形態である.2つ目はspark confで完了し、パラメータはspark.metrics.conf.で始まる.
    ここでは2つ目の方法を簡単に紹介します.
    例えば私はJVMの情報を見たいです.GCとMemoryの使用状況を含めて、私は類似の
     conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")
    

    デフォルトでは、MetricsSystemはグローバルなSink,MetricsServiceを構成します.追加したソースはpath /metrics/jsonで入手できますプログラム設定が上記の設定をしてspark-uiのパスを/metrics/jsonに変更すると、jvmソースの情報が表示されます.
    通常、カスタムソースを実装する場合は、次の手順に従います(ここではJvmSourceを例に挙げます).
    -Sourceを作成
    private[spark] class JvmSource extends Source {
      override val sourceName = "jvm"
      override val metricRegistry = new MetricRegistry()
    
      metricRegistry.registerAll(new GarbageCollectorMetricSet)
      metricRegistry.registerAll(new MemoryUsageGaugeSet)
    }

    ここでsourceNameは、上記のように構成用に設定されています.
    spark.metrics.conf.driver.source.jvm.class
    

    中のjvmがJvmSourceに設置されているsourceNameです
    各Sourceは通常、MetricRegistryを独自に構築します.上記の例では、具体的なデータ収集作業は、GarbageCollectorMetricSetMemoryUsageGaugeSetによって行われる.
    具体的には、クラス継承com.codahale.metrics.MetricSetを書き、Map<String, Metric> getMetrics()メソッドを実装すればよい.
    そしてmetricRegistry.registerAllで書いたMetricSetを登録すればいいです.
    –構成の追加
    conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")

    -呼び出し結果
    Spark UIのアドレスを/metrics/jsonに変更すると、出力結果が表示されます.もちろん、ここではデフォルトのシステムがSink実装を提供しているためです:org.apache.spark.metrics.sink.MetricsServlet、あなたは自分で実装することができます.
    より多くのモニタリング指標をカスタマイズする方法
    以前私が書いたSpark UI(Yarnベース)の分析とカスタマイズを通じて、新しいページをSpark UIに追加する方法を学ぶべきです.
    この記事では、データ・ソースには2つあることを理解する必要があります.
  • 各Listener
  • MetricsSystem

  • 既存のListenerとMetrics Sourceを組み合わせて、任意のコンテンツを表示できます.
    既存のニーズが満たされていない場合は、通常、新しいニーズは次の2つの方法で満たすことができます.
  • 新しいイベントを監視する必要があります.新しいListener Bus、Listener、Eventを追加し、必要な場所に (postイベント)に行く必要があります.これはspark-coreのコードを修正する必要があるに違いない.
  • 既存のlistenerまたは既知のオブジェクトの変数を表示する必要がある場合は、MetricsSystemを使用して新しいSourceを定義します.

  • これにより、これらのオブジェクトをあなたのPageに渡すことで、展示することができます.