TreasureDataをSparkのSourceにしたいんだが
3277 ワード
最初は一旦Listに展開してからsc.parallelize
しようとしてたけど、masterのメモリに乗りそうにないし
どうやらJdbcRDDとかいうのがあるらしいのでそれをつかう
早速実装
val props = new Properties() {
setProperty("apikey", conf.getString("apikey"))
setProperty("useSSL", conf.getString("useSSL"))
setProperty("type", conf.getString("jobType"))
}
val connection = DriverManager.getConnection(conf.getString("url"), props)
val query = """
select *
from ad_impressions ai
where TD_TIME_RANGE(ai.time, ?, ?, 'UTC')
order by time asc
"""
val treasuredataLogs: JdbcRDD[ArticleImpression] = new JdbcRDD(
sc, // SparkContext
() => TreasureData.getConnection(), // Jdbc Connection
query, // Query String
from.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MinValue, // the minimum value of the first placeholder
to.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MaxValue, // the maximum value of the second placeholder
10, // num partition 1~20
rs => ArticleImpressionDxo.fromEntity(rs) // data exchange function
)
JdbcRddのソースを見た感じ、勝手にcloseしてくれるっぽいネ(´ω`*)
override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
{
context.addTaskCompletionListener{ context => closeIfNeeded() }
で、動いたけどなんかおっそいな。。。
なんでやろ
どうやら、クエリが何回も実行されてるっぽい?
TreasureDataのジョブ履歴を見たら10回クエリが実行されている。
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
lower と upper に timeの範囲を入れて、クエリにプレースホルダを2つ設定する。
もしnum partitionの数で範囲を分割して実行してくれるって感じかな
今回は10で指定したので10クエリに分けてくれたっぽいネ
実際ジョブの履歴見たら、rangeを10個に分割してくれてた
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468998720, 1469016000) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468981440, 1468998719) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468964160, 1468981439) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468946880, 1468964159) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468929600, 1468946879) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468912320, 1468929599) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468895040, 1468912319) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468877760, 1468895039) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468860480, 1468877759) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468843200, 1468860479) order by time asc
ありがとうJdbcRDD
Author And Source
この問題について(TreasureDataをSparkのSourceにしたいんだが), 我々は、より多くの情報をここで見つけました https://qiita.com/katsut/items/8aa48c4cfed818f5425c著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .