TreasureDataをSparkのSourceにしたいんだが


最初は一旦Listに展開してからsc.parallelizeしようとしてたけど、masterのメモリに乗りそうにないし
どうやらJdbcRDDとかいうのがあるらしいのでそれをつかう

早速実装

  val props = new Properties() {
    setProperty("apikey", conf.getString("apikey"))
    setProperty("useSSL", conf.getString("useSSL"))
    setProperty("type", conf.getString("jobType"))
  }

  val connection = DriverManager.getConnection(conf.getString("url"), props)

  val query = """
      select *
      from ad_impressions ai
      where TD_TIME_RANGE(ai.time, ?, ?, 'UTC')
      order by time asc
  """

  val treasuredataLogs: JdbcRDD[ArticleImpression] = new JdbcRDD(
    sc, // SparkContext
    () => TreasureData.getConnection(), // Jdbc Connection
    query, // Query String
        from.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MinValue, // the minimum value of the first placeholder
      to.map(_.toDateTime(DateTimeZone.UTC).getMillis / 1000) | Long.MaxValue, // the maximum value of the second placeholder
    10, // num partition 1~20
    rs => ArticleImpressionDxo.fromEntity(rs) // data exchange function
  )

JdbcRddのソースを見た感じ、勝手にcloseしてくれるっぽいネ(´ω`*)

  override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
    {
      context.addTaskCompletionListener{ context => closeIfNeeded() }

で、動いたけどなんかおっそいな。。。
なんでやろ

どうやら、クエリが何回も実行されてるっぽい?
TreasureDataのジョブ履歴を見たら10回クエリが実行されている。

 * @param numPartitions the number of partitions.
 *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
 *   the query would be executed twice, once with (1, 10) and once with (11, 20)

lower と upper に timeの範囲を入れて、クエリにプレースホルダを2つ設定する。
もしnum partitionの数で範囲を分割して実行してくれるって感じかな

今回は10で指定したので10クエリに分けてくれたっぽいネ
実際ジョブの履歴見たら、rangeを10個に分割してくれてた

select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468998720, 1469016000) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468981440, 1468998719) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468964160, 1468981439) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468946880, 1468964159) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468929600, 1468946879) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468912320, 1468929599) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468895040, 1468912319) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468877760, 1468895039) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468860480, 1468877759) order by time asc
select * from ad_impressions ai where TD_TIME_RANGE(ai.time, 1468843200, 1468860479) order by time asc

ありがとうJdbcRDD