Sparkは非常に実用的なウィンドウ関数です。


SParkのアキュムレータの歴史は主にウィンドウ関数を使っていますが、すべての統計を行うには、rollup関数が必要です。
1応用シーン:
1、私達はユーザーの総使用時間を統計する必要があります。(累積履歴)
2、フロントの展示ページは複数の次元を照会する必要があります。例えば、製品、地域など。
3、展示が必要なテーブルヘッドは、例えば、製品、2015-04、2015-05、2015-06
2元のデータ:
プロジェクトコード
イベントダテ
duration
1438
2016-05-13
165
1438
2016-05-14
595
1438
2016-05-15
105
1629
2016-05-13
12340
1629
2016-05-14
13850
1629
2016-05-15
227
3業務シーンの実現
3.1業務シーン1:積算履歴:
データソースが示すように、私達はすでに当日のユーザーの使用時間が長いです。統計をする時、14日は13日の積算ができます。15日は14、13日の積算ができます。これを類推します。
3.1.1 spark-sql実現
//spark sqlウィンドウ関数を使って履歴データを蓄積する
sqlContext.sql(
"""
  select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
  from userlogs_date
""").show
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
3.1.2 dataframe実現
//  Column   over   ,      
import org.apache.spark.sql.expressions._
val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
df_userlogs_date.select(
    $"pcode",
    $"event_date",
    sum($"duration").over(first_2_now_window).as("sum_duration")
).show

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
3.1.3拡張アキュムレータの時間範囲内
実際の業務の中のアキュムレータロジックは上よりずっと複雑です。たとえば、アキュムレータの前のN日間、アキュムレータの前のN日間から後のN日間などです。以下は私達が実現します。
3.1.3.1累積履歴の所有:
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")
上の四つの書き方はまったく同じだ。
3.1.3.2 N日間を積算する前に、N=3と仮定する。
//  ,     ,                   partition
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 3 preceding and current row) as sum_duration
 from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0) 
3.1.3.3累積前N日間、後M日間:N=3 M=5と仮定する。
select pcode,event_date,sum(duration) over (partition by pcode order by
 event_date asc rows between 3 preceding and 5 following ) as sum_duration
 from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
3.1.3.4このパーティション内のすべての行を積算します。
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following ) 
as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween
(Long.MinValue,Long.MaxValue)
要約は以下の通りです。preceding:前のN行を積算するために使用します。パーティションの最初の行から開始するとunboundedです。Nは:現在の行に対して前のオフセット量following:precedingとは反対に、累積後N行(パーティション内)である。このパーティションが終了するまで累積するとunboundedとなります。Nは:現在の行に対して後方のオフセット量current row:名前の通り、現在の行、オフセット量は0で説明されています。上の前N、後M、およびcurrent rowはこのオフセット量の行を積算します。
3.1.3.4測定結果
    :             
1:select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc) as sum_duration from userlogs_date




+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

    :             2:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and current row) as 
sum_duration from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

       :
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and current row) as sum_duration
 from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
+-----+----------+------------+
    、  、  :
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and 1 following ) as sum_duration
 from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
+-----+----------+------------+

       :         :
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following )
 as sum_duration from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.2     2:    

3.2.1 spark sql  



//spark sql   rollup  all  
sqlContext.sql(
"""
  select pcode,event_date,sum(duration) as sum_duration
  from userlogs_date_1
  group by pcode,event_date with rollup
  order by pcode,event_date
""").show()

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|
+-----+----------+------------