Spark Streaming foreachRDDの正しい使い方
エラー1:driver上に接続オブジェクト(ネットワーク接続やデータベース接続など)を作成する
driverで接続オブジェクトを作成し、RDDの演算子関数で接続オブジェクトを使用する場合は、接続オブジェクトをシーケンス化してdriverからworkerに渡す必要があります.接続オブジェクト(Connectionオブジェクトなど)は通常、シーケンス化がサポートされていません.この場合、シーケンス化の例外(serialization errors)が報告されます.したがって、接続オブジェクトはdriver上で作成しないでworker上に作成する必要があります.
エラー2:各レコードに接続オブジェクトを作成する
通常、接続オブジェクトの作成と破棄には時間がかかります.したがって、接続オブジェクトを頻繁に作成および破棄すると、sparkジョブ全体のパフォーマンスとスループットが低下する可能性があります.
正しい方法1:RDDパーティションごとに接続オブジェクトを作成する
正しい方法は、DStreamのRDDに対してforeachPartitionを呼び出し、RDDの各パーティションに対して接続オブジェクトを作成し、1つの接続オブジェクトを使用して1つのパーティション内のデータを最下位MySQLに書き込むことです.これにより、作成した接続オブジェクトの数を大幅に削減できます.
正しい方法2:RDDパーティションごとに接続プール内の接続オブジェクトを使用する
driverで接続オブジェクトを作成し、RDDの演算子関数で接続オブジェクトを使用する場合は、接続オブジェクトをシーケンス化してdriverからworkerに渡す必要があります.接続オブジェクト(Connectionオブジェクトなど)は通常、シーケンス化がサポートされていません.この場合、シーケンス化の例外(serialization errors)が報告されます.したがって、接続オブジェクトはdriver上で作成しないでworker上に作成する必要があります.
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // driver
rdd.foreach { record =>
connection.send(record) // worker
}
}
エラー2:各レコードに接続オブジェクトを作成する
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
通常、接続オブジェクトの作成と破棄には時間がかかります.したがって、接続オブジェクトを頻繁に作成および破棄すると、sparkジョブ全体のパフォーマンスとスループットが低下する可能性があります.
正しい方法1:RDDパーティションごとに接続オブジェクトを作成する
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
正しい方法は、DStreamのRDDに対してforeachPartitionを呼び出し、RDDの各パーティションに対して接続オブジェクトを作成し、1つの接続オブジェクトを使用して1つのパーティション内のデータを最下位MySQLに書き込むことです.これにより、作成した接続オブジェクトの数を大幅に削減できます.
正しい方法2:RDDパーティションごとに接続プール内の接続オブジェクトを使用する
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ,
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // ,
}
}