PrestoとHiveでのORDER BYでの評価の仕方を見てみる(on TreasureData)


はじめに

Hive 0.13/Presto0.144 on TreasureDataでのメモ。
Order byで下記のケースでHiveではエラーになり、Prestoでは成功したので、実行計画を眺めてみた。
あくまで眺めただけなので、なぜ実行計画がそのようになるかについては触れないです。
(TD_TIME_FORMATはunixtime(bigint/long/int)を指定のフォーマットの文字列に変換するUDF)

原因としては、HiveのOrder byでのTD_TIME_FORMATの評価がSELECT内でtimeを変換した後に行われるために、文字列に対してTD_TIME_FORMATをかけるためにエラーになってしまう。
ORDER BYの評価順としてはこれが正しそうで、Prestoはなぜか成功する。

SELECT 
  TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST') AS time,
  COUNT(*) AS cnt
FROM
  adjust
WHERE
  network = 'Organic'
  AND TD_TIME_RANGE(time,
    TD_TIME_ADD(TD_SCHEDULED_TIME(),
      '-1d',
      'JST'))
GROUP BY
  TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST')
ORDER BY TD_TIME_FORMAT(time,
    'yyyy‐MM‐dd',
    'JST') DESC LIMIT 100

結論としては、Prestoはクエリの実行計画のオプティマイザが賢いので、うまくやってくれてるっぽい。
が、SELECTの前にEXPLAINをしてそれぞれの実行計画をみてみた。

Hiveの実行計画

上記のままだとエラーになるので、order byのtd_time_formatはtimeに直している。
実行計画をみると、Stage-1のsort orderの際には既に_col0がstringになっている。
stringになるのは、その前段階で実行されるSelect Operatorの中でTD_TIME_FORMATが適用されているからで、単純な処理の流れで考えるとそうなりそうです。

                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Map-reduce partition columns: _col0 (type: string)

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: adjust
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            Filter Operator
              predicate: ((network = 'Organic') and TD_TIME_RANGE(time, td_time_add(TD_SCHEDULED_TIME(), '-1d', 'JST'))) (type: boolean)
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              Select Operator
                expressions: time (type: int)
                outputColumnNames: time
                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                Group By Operator
                  aggregations: count()
                  keys: td_time_format(time, 'yyyy‐MM‐dd', 'JST') (type: string)
                  mode: hash
                  outputColumnNames: _col0, _col1
                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                  Reduce Output Operator
                    key expressions: _col0 (type: string)
                    sort order: +
                    Map-reduce partition columns: _col0 (type: string)
                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                    value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Select Operator
            expressions: _col0 (type: string), _col1 (type: bigint)
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            File Output Operator
              compressed: true
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string)
              sort order: -
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              value expressions: _col0 (type: string), _col1 (type: bigint)
      Reduce Operator Tree:
        Extract
          Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
          Limit
            Number of rows: 100
            Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: 100

Prestoの実行計画

ORDER BY TD_~でもtimeでも1にしても全部下の実行計画になる。
下記の段階でlimitも含めて良きように計らってくれているみたい。
- TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]

- Output[time, cnt] => [td_time_format:varchar, count:bigint]
        time := td_time_format
        cnt := count
    - TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
        - Exchange[GATHER] => td_time_format:varchar, count:bigint
            - TopN[100 by (td_time_format DESC_NULLS_LAST)] => [td_time_format:varchar, count:bigint]
                - Project => [td_time_format:varchar, count:bigint]
                    - Aggregate(FINAL)[td_time_format] => [td_time_format:varchar, $hashvalue:bigint, count:bigint]
                            count := ""count""(""count_8"")
                        - Exchange[REPARTITION] => td_time_format:varchar, count_8:bigint, $hashvalue:bigint
                            - Aggregate(PARTIAL)[td_time_format] => [td_time_format:varchar, $hashvalue_9:bigint, count_8:bigint]
                                    count_8 := ""count""(*)
                                - Project => [td_time_format:varchar, $hashvalue_9:bigint]
                                        $hashvalue_9 := ""combine_hash""(0, COALESCE(""$operator$hash_code""(""td_time_format""), 0))
                                    - Project => [td_time_format:varchar]
                                            td_time_format := ""td_time_format""(""time"", CAST('yyyy‐MM‐dd' AS VARCHAR), CAST('JST' AS VARCHAR))
                                        - Filter[((""time"" >= 1460446740) AND (""network"" = CAST('Organic' AS VARCHAR)))] => [network:varchar, time:bigint]
                                            - TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]
                                                    LAYOUT: com.treasure_data.presto.connector.TDTableLayoutHandle@57531b8
                                                    network := td:network
                                                    time := td:time

そういえば、Prestoの- TableScan[td-presto:td:support.adjust, originalConstraint = ((""network"" = 'Organic') AND (""time"" >= 1460446740))] => [network:varchar, time:bigint]
を見ると、TableScan時にtimeカラムのパーティショニングとカラムナデータベースによるnetworkのカラムだけスキャン対象になっていることがわかったりする。

結論

ただの確認。
ORDER BY 1って使えば間違いが少ないかも。っておもったけど、hiveだとorder by 1ってやると1番目のカラムをそのまま利用するから同じくエラーになりますね。カラム名は変えていくのが良さそうです。