Spark Streaming foreachRDDの正しい使い方


エラー1:driver上に接続オブジェクト(ネットワーク接続やデータベース接続など)を作成する
driverで接続オブジェクトを作成し、RDDの演算子関数で接続オブジェクトを使用する場合は、接続オブジェクトをシーケンス化してdriverからworkerに渡す必要があります.接続オブジェクト(Connectionオブジェクトなど)は通常、シーケンス化がサポートされていません.この場合、シーケンス化の例外(serialization errors)が報告されます.したがって、接続オブジェクトはdriver上で作成しないでworker上に作成する必要があります.
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  //  driver   
  rdd.foreach { record =>
    connection.send(record) //  worker   
  }
}

エラー2:各レコードに接続オブジェクトを作成する
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常、接続オブジェクトの作成と破棄には時間がかかります.したがって、接続オブジェクトを頻繁に作成および破棄すると、sparkジョブ全体のパフォーマンスとスループットが低下する可能性があります.
正しい方法1:RDDパーティションごとに接続オブジェクトを作成する

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

正しい方法は、DStreamのRDDに対してforeachPartitionを呼び出し、RDDの各パーティションに対して接続オブジェクトを作成し、1つの接続オブジェクトを使用して1つのパーティション内のデータを最下位MySQLに書き込むことです.これにより、作成した接続オブジェクトの数を大幅に削減できます.
正しい方法2:RDDパーティションごとに接続プール内の接続オブジェクトを使用する
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    //      ,         
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  //              ,    
  }
}