DolphinDB分散テーブルをOrcaで操作する方法
17021 ワード
DolphinDBは分散シーケンスデータベースであり、豊富な計算と分析機能を内蔵しています.TB級の大量データを複数の物理機器に格納し、CPUを十分に利用し、大量データに対して高性能分析計算を行うことができる.Orcaでは、python環境でpandas構文と同じスクリプトを使用してDolphinDB分散データベースのデータを複雑かつ効率的に計算できます.このチュートリアルでは、DolphinDB分散テーブルに対するOrcaの操作について説明します.
この例ではDolphinDBスタンドアロンモードを使用します.まず、このチュートリアルのサンプル・データベースを作成します.dfs://orca_stock .データベースを作成するDolphinDBスクリプトは次のとおりです.
注意:DolphinDBクライアントまたはDolphinDB Python APIで分散テーブルを作成する必要があります.Orcaで直接分散テーブルを作成することはできません.
Orcaで
ユーザーは、実際の状況に応じてIPアドレスとポート番号を変更する必要があります.
1分散テーブルの読み込み
Orcaは
tradesのカラム名を表示するには、次の手順に従います.
trades列のデータ型を表示するには、次の手順に従います.
tradesの行数を表示するには、次の手順に従います.
DolphinDB分散テーブルに対応するOrca Data Frameには、テーブル名、データのカラム名などの情報を含むメタデータのみが格納されます.分散テーブルは連続記憶ではなく,各パーティション間に厳密な順序関係がないため,分散テーブルに対応するDataFrameにはRangeIndexの概念がない.indexを設定する必要がある場合は、
index列をデータ列に変換する場合は、
2クエリーと計算
Orcaは不活性評価を採用しており、一部の計算はすぐにサービス側で計算されるのではなく、本当に必要なときに計算されるまで中間式に変換されます.ユーザーが計算を即時にトリガーする必要がある場合は、
サンプル・データベースdfs://orca_stockのデータはランダムに生成されるため,ユーザの実行結果は本章の結果と異なる.
2.1前のn条の記録を取る
2.2ソート
複数の列でソート:
2.3条件による照会
Orcaは、単一または複数の条件に従ってクエリーを考慮することをサポートします.たとえば、
tradesの2019年1月2日のデータを検索します.
照会tradesで2019年1月30日、株式コードはA 2のデータです.
2.4 groupbyグループクエリー tradesの毎日のレコード数を計算します.
tradesの1日あたりのレコード数を計算します.
Orcaはaggを介して複数の集約関数を一度に適用することをサポートする.pandasとは異なり、Orcaはaggで呼び出す集約関数を表す文字列を使用します.たとえば、tradesの1日あたりの価格の最大値、最小値、平均値を計算します.
Orca groupbyはフィルタ機能をサポートしています.pandasとは異なり,Orcaにおけるフィルタ条件はlambda関数ではなく文字列形式の式で表される.
たとえば、tradesの1日あたりの平均株価が200を超え、記録数が11000を超える記録を返します.
2.5 resample再サンプリング
Orcaはrule:DateOffset、文字列またはdateoffsetオブジェクト on:時間列、この列を用いて再サンプリング level:文字列または整数、MultiIndexの場合、levelで指定された列を再サンプリング OrcaがサポートするDateOffsetは次のとおりです.
たとえば、tradesのデータを再サンプリングし、3分ごとに計算します.
tradesがtrade_を設定した場合timeはindexであり、次の方法で再サンプリングすることもできます.
dateoffset関数で生成したオブジェクトでdateoffsetを表す場合は、pandasのdateoffsetをインポートする必要があります.3分ごとに再サンプリングする場合は、次のように書くこともできます.
2.6 rolling移動ウィンドウ
Orcaはwindow::整数、ウィンドウの長さを表す on:ウィンドウ を計算する文字列
次の関数はorca.DataFrame.rollingオブジェクトで使用できます. 分散テーブルに対応するDataFrameでは、スライドウィンドウで計算する場合、パーティション単位で個別に計算されるため、各パーティションの計算結果の前のwindow-1個の値が空になります.例えば、tradesの2019.01.01と2019.01.02のデータは、長さ3のスライドウィンドウでpriceの和を求める.
2.7データ接続
Orcaは、DataFrameを接続する機能を提供しています.分散テーブル対応のDataFrameは、通常のメモリテーブル対応のDataFrameに接続してもよいし、分散テーブル対応のDataFrameに接続してもよい.2つの分散テーブルに対応するData Frame接続は、次の条件を同時に満たす必要があります.2つの分散テーブルが同じデータベースにある 接続列はすべてのパーティション列 を含む必要があります.
Orcaは、right:Orca Data FrameまたはSeries how:接続のタイプを表す文字列.left、right、outer、innerであってもよく、デフォルト値はinner on:接続列 を示す文字列 left_on:左テーブルの接続列 を示す文字列 right_on:文字列、右表の接続列 を表す left_index:左テーブルのインデックス right_index:右テーブルのインデックス suffixes:文字列、重複列を表す接尾辞
たとえば、tradesとquotesを内部接続します.
3 dataframeをdfsテーブルに追加
Orcaはother:追加するDataFrame ignore_index:ブール値、インデックスを無視するかどうか.デフォルトはFalse verify_integrity:ブール値.デフォルトはFalse sort:ブール値.ソートするかどうかを示します.デフォルトはNone inplace:ブール値.dfsテーブルに挿入するかどうかを示します.デフォルトはFalse たとえば、dataframeにtrades対応の分散テーブルを追加します.
Orcaはappend関数を拡張し、inplaceパラメータをサポートします.つまり、その場でデータを追加できます.inplaceがFalseの場合、pandasと同じように表現されます.分散テーブルの内容はメモリにコピーされ、tradesはメモリテーブルに対応し、odfの内容はメモリテーブルに追加され、dfsテーブルに追加されません.
4まとめ
分散テーブルの場合、Orcaには、現在、パーティションテーブルに対応するDataFrameにRangeIndexの概念がない、分散テーブルでの使用がサポートされていない関数、テーブル内のデータの変更の制限など、いくつかの機能上の制限があります.詳細については、Orcaクイックエントリーガイドを参照してください.
この例ではDolphinDBスタンドアロンモードを使用します.まず、このチュートリアルのサンプル・データベースを作成します.dfs://orca_stock .データベースを作成するDolphinDBスクリプトは次のとおりです.
login("admin","123456")
if(existsDatabase("dfs://orca_stock")){
dropDatabase("dfs://orca_stock")
}
dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://orca_stock",COMPO,[db1,db2])
n=10000000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)
n=200000
datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59
syms="A"+string(1..30)
t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)
syms="A"+string(1..30)
t3=table(syms as sym,rand(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)
注意:DolphinDBクライアントまたはDolphinDB Python APIで分散テーブルを作成する必要があります.Orcaで直接分散テーブルを作成することはできません.
Orcaで
connect
関数を使用してDolphinDBサーバに接続します.>>> import dolphindb.orca as orca
>>> orca.connect("localhost",8848,"admin","123456")
ユーザーは、実際の状況に応じてIPアドレスとポート番号を変更する必要があります.
1分散テーブルの読み込み
Orcaは
read_table
関数で分散テーブルを読み込み、結果としてOrca Data Frameを返します.例:サンプル・データベースの読み込みdfs://orca_stockのテーブルtrades:>>> trades = orca.read_table('dfs://orca_stock','trades')
>>> type(trades)
orca.core.frame.DataFrame
tradesのカラム名を表示するには、次の手順に従います.
>>> trades.columns
Index(['trade_time', 'sym', 'qty', 'price'], dtype='object')
trades列のデータ型を表示するには、次の手順に従います.
>>> trades.dtypes
trade_time datetime64[s]
sym object
qty int32
price float64
dtype: object
tradesの行数を表示するには、次の手順に従います.
>>> len(trades)
10000000
DolphinDB分散テーブルに対応するOrca Data Frameには、テーブル名、データのカラム名などの情報を含むメタデータのみが格納されます.分散テーブルは連続記憶ではなく,各パーティション間に厳密な順序関係がないため,分散テーブルに対応するDataFrameにはRangeIndexの概念がない.indexを設定する必要がある場合は、
set_index
関数を使用します.たとえば、tradesのtrade_timeがindexに設定されています.>>> trades.set_index('trade_time')
index列をデータ列に変換する場合は、
reset_index
関数を使用します.>>> trades.reset_index()
2クエリーと計算
Orcaは不活性評価を採用しており、一部の計算はすぐにサービス側で計算されるのではなく、本当に必要なときに計算されるまで中間式に変換されます.ユーザーが計算を即時にトリガーする必要がある場合は、
compute
関数を呼び出すことができます.サンプル・データベースdfs://orca_stockのデータはランダムに生成されるため,ユーザの実行結果は本章の結果と異なる.
2.1前のn条の記録を取る
head
関数は、前のnレコードをクエリーし、デフォルトでは前の5レコードを取得します.たとえば、tradesの最初の5つのレコードを取得します.>>> trades.head()
trade_time sym qty price
0 2019-01-01 18:04:33 A16 855 482.526769
1 2019-01-01 13:57:38 A12 244 61.675293
2 2019-01-01 23:58:15 A10 36 297.623295
3 2019-01-01 23:02:43 A16 426 109.041012
4 2019-01-01 04:33:53 A1 472 75.778951
2.2ソート
sort_values
メソッドは、カラムに基づいてソートできます.たとえば、tradesはprice降順にソートし、上位5つのレコードを取得します.>>> trades.sort_values(by='price', ascending=False).head()
trade_time sym qty price
0 2019-01-03 12:56:09 A22 861 499.999998
1 2019-01-18 17:25:21 A19 95 499.999963
2 2019-01-30 02:18:48 A30 114 499.999949
3 2019-01-23 08:31:56 A3 926 499.999926
4 2019-01-20 03:36:53 A3 719 499.999892
複数の列でソート:
>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head()
trade_time sym qty price
0 2019-01-31 23:58:50 A24 999 359.887697
1 2019-01-31 23:57:26 A3 999 420.156175
2 2019-01-31 23:56:34 A2 999 455.228435
3 2019-01-31 23:52:58 A6 999 210.819227
4 2019-01-31 23:45:17 A14 999 310.813216
2.3条件による照会
Orcaは、単一または複数の条件に従ってクエリーを考慮することをサポートします.たとえば、
tradesの2019年1月2日のデータを検索します.
>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"]
>>> tmp.head()
trade_time sym qty price
0 2019-01-01 00:32:21 A2 139 383.971293
1 2019-01-01 21:19:09 A2 263 100.932553
2 2019-01-01 18:50:48 A2 890 335.614454
3 2019-01-01 23:29:16 A2 858 469.223992
4 2019-01-01 09:58:51 A2 883 235.753424
照会tradesで2019年1月30日、株式コードはA 2のデータです.
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')]
>>> tmp.head()
trade_time sym qty price
0 2019-01-30 04:41:56 A2 880 428.552654
1 2019-01-30 14:13:53 A2 512 488.826978
2 2019-01-30 14:31:28 A2 536 478.578219
3 2019-01-30 04:09:41 A2 709 255.435903
4 2019-01-30 13:18:50 A2 355 404.782260
2.4 groupbyグループクエリー
groupby
関数は、パケット集約に使用されます.次の関数はgroupbyオブジェクトで使用できます.count
:NULL以外の要素を返す個数sum
:求和mean
:平均min
:最小値max
:最大値mode
:衆数abs
:絶対値prod
:積std
:標準差var
:分散sem
:平均値の標準誤差skew
:傾斜度kurtosis
:ピークcumsum
:累積合計cumprod
:累積積cummax
:累積最大値cummin
:累積最小値>>> trades.groupby(trades.trade_time.dt.date)['sym'].count()
trade_time
2019-01-01 322573
2019-01-02 322662
2019-01-03 323116
2019-01-04 322436
2019-01-05 322156
2019-01-06 324191
2019-01-07 321879
2019-01-08 323319
2019-01-09 322262
2019-01-10 322585
2019-01-11 322986
2019-01-12 322839
2019-01-13 322302
2019-01-14 322032
2019-01-15 322409
2019-01-16 321810
2019-01-17 321566
2019-01-18 323651
2019-01-19 323463
2019-01-20 322675
2019-01-21 322845
2019-01-22 322931
2019-01-23 322598
2019-01-24 322404
2019-01-25 322454
2019-01-26 321760
2019-01-27 321955
2019-01-28 322013
2019-01-29 322745
2019-01-30 322193
2019-01-31 323190
dtype: int64
tradesの1日あたりのレコード数を計算します.
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count()
trade_time sym
2019-01-01 A1 10638
A10 10747
A11 10709
A12 10715
A13 10914
...
2019-01-31 A5 10717
A6 10934
A7 10963
A8 10907
A9 10815
Length: 930, dtype: int64
Orcaはaggを介して複数の集約関数を一度に適用することをサポートする.pandasとは異なり、Orcaはaggで呼び出す集約関数を表す文字列を使用します.たとえば、tradesの1日あたりの価格の最大値、最小値、平均値を計算します.
>>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"])
price
min max avg
trade_time
2019-01-01 0.003263 499.999073 249.913612
2019-01-02 0.000468 499.999533 249.956874
2019-01-03 0.000054 499.999998 249.927257
2019-01-04 0.000252 499.999762 249.982737
2019-01-05 0.001907 499.999704 250.097487
2019-01-06 0.000318 499.999824 249.991605
2019-01-07 0.003196 499.999548 249.560505
2019-01-08 0.000216 499.996703 250.024405
2019-01-09 0.002635 499.998985 249.966446
2019-01-10 0.000725 499.996717 249.663324
2019-01-11 0.003140 499.998267 250.243786
2019-01-12 0.000105 499.998453 250.077061
2019-01-13 0.004297 499.999139 250.097489
2019-01-14 0.003510 499.999452 249.775830
2019-01-15 0.002501 499.999638 250.021218
2019-01-16 0.000451 499.998059 250.044059
2019-01-17 0.002359 499.998462 249.808932
2019-01-18 0.000104 499.999963 249.918651
2019-01-19 0.000999 499.998000 249.899495
2019-01-20 0.000489 499.999892 249.606668
2019-01-21 0.000729 499.999774 249.839876
2019-01-22 0.000834 499.999331 249.632037
2019-01-23 0.001982 499.999926 249.955031
2019-01-24 0.000323 499.993956 249.557851
2019-01-25 0.000978 499.999716 249.722053
2019-01-26 0.002582 499.998753 249.897519
2019-01-27 0.000547 499.999809 250.404666
2019-01-28 0.002729 499.998545 249.622289
2019-01-29 0.000487 499.999598 249.950167
2019-01-30 0.000811 499.999949 250.182493
2019-01-31 0.000801 499.999292 249.317517
Orca groupbyはフィルタ機能をサポートしています.pandasとは異なり,Orcaにおけるフィルタ条件はlambda関数ではなく文字列形式の式で表される.
たとえば、tradesの1日あたりの平均株価が200を超え、記録数が11000を超える記録を返します.
>>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000")
0 499.171179
1 375.553059
2 119.240890
3 370.198534
4 5.876941
...
88416 37.872317
88417 373.259785
88418 435.154484
88419 436.163806
88420 428.455914
Length: 88421, dtype: float64
2.5 resample再サンプリング
Orcaは
resample
関数をサポートし、通常の時系列データを再サンプリングし、周波数変換することができる.現在、resample関数のパラメータは次のとおりです.'B':BDay or BusinessDay
'WOM':WeekOfMonth
'LWOM':LastWeekOfMonth
'M':MonthEnd
'MS':MonthBegin
'BM':BMonthEnd or BusinessMonthEnd
'BMS':BMonthBegin or BusinessMonthBegin
'SM':SemiMonthEnd
'SMS':SemiMonthBegin
'Q':QuarterEnd
'QS':QuarterBegin
'BQ':BQuarterEnd
'BQS':BQuarterBegin
'REQ':FY5253Quarter
'A':YearEnd
'AS' or 'BYS':YearBegin
'BA':BYearEnd
'BAS':BYearBegin
'RE':FY5253
'D':Day
'H':Hour
'T' or 'min':Minute
'S':Second
'L' or 'ms':Milli
'U' or 'us':Micro
'N':Nano
たとえば、tradesのデータを再サンプリングし、3分ごとに計算します.
>>> trades.resample('3T', on='trade_time')['qty'].sum()
trade_time
2019-01-01 00:00:00 321063
2019-01-01 00:03:00 354917
2019-01-01 00:06:00 329419
2019-01-01 00:09:00 340880
2019-01-01 00:12:00 356612
...
2019-01-31 23:45:00 322829
2019-01-31 23:48:00 344753
2019-01-31 23:51:00 330959
2019-01-31 23:54:00 336712
2019-01-31 23:57:00 328730
Length: 14880, dtype: int64
tradesがtrade_を設定した場合timeはindexであり、次の方法で再サンプリングすることもできます.
>>> trades.resample('3T', level='trade_time')['qty'].sum()
dateoffset関数で生成したオブジェクトでdateoffsetを表す場合は、pandasのdateoffsetをインポートする必要があります.3分ごとに再サンプリングする場合は、次のように書くこともできます.
>>> from pandas.tseries.offsets import *
>>> ofst = Minute(n=3)
>>> trades.resample(ofst,on='trade_time')['qty'].sum()
2.6 rolling移動ウィンドウ
Orcaは
rolling
関数を提供し、モバイルウィンドウで計算することができます.現在、rolling
関数のパラメータは次のとおりです.次の関数はorca.DataFrame.rollingオブジェクトで使用できます.
count
:NULL以外の要素を返す個数sum
:求和min
:最小値max
:最大値std
:標準差var
:分散corr
:相関covar
:共分散skew
:傾斜度kurtosis
:ピーク>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')]
>>> re = tmp.rolling(window=3)['price'].sum()
0 NaN
1 NaN
2 792.386603
3 601.826312
4 444.858366
...
646057 1281.099161
646058 1287.816045
646059 963.262163
646060 865.797011
646061 719.050068
Name: price, Length: 646062, dtype: float64
2.7データ接続
Orcaは、DataFrameを接続する機能を提供しています.分散テーブル対応のDataFrameは、通常のメモリテーブル対応のDataFrameに接続してもよいし、分散テーブル対応のDataFrameに接続してもよい.2つの分散テーブルに対応するData Frame接続は、次の条件を同時に満たす必要があります.
Orcaは、
merge
およびjoin
関数を提供する.merge
関数は、次のパラメータをサポートします.join
関数はmerge
関数の特例であり、そのパラメータおよび意味はmerge
とほぼ同じであるが、join
のデフォルトは左外接続、すなわちhow='left'である.たとえば、tradesとquotesを内部接続します.
>>> quotes = orca.read_table('dfs://orca_stock','quotes')
>>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner')
trade_time sym qty price bid offer
0 2019-01-01 02:36:34 A15 273 186.144261 317.458480 155.361661
1 2019-01-01 05:37:59 A13 185 420.397500 248.447426 115.722893
2 2019-01-01 00:59:43 A10 751 89.801687 193.925714 144.345473
3 2019-01-01 21:58:36 A16 175 251.753495 116.810807 439.178207
4 2019-01-01 10:53:54 A16 532 71.733640 240.927647 388.718680
... ... ... ... ... ... ...
25035 2019-01-02 03:59:51 A3 220 50.004418 107.905522 167.375994
25036 2019-01-02 17:54:01 A3 202 195.189216 134.463906 142.443428
25037 2019-01-02 16:57:50 A9 627 68.661644 440.421876 110.801070
25038 2019-01-02 10:27:43 A28 414 487.337282 169.081363 261.171073
25039 2019-01-02 17:02:51 A3 661 243.960836 92.999404 26.747609
[25040 rows x 6 columns]
join
関数を使用してtradesとquotesを左外部に接続します.>>> trades.set_index(['trade_time','sym'], inplace=True)
>>> quotes.set_index(['trade_time','sym'], inplace=True)
>>> trades.join(quotes)
qty price bid offer
trade_time sym
2019-01-01 18:04:25 A14 435 378.595626 NaN NaN
2019-01-01 20:38:47 A13 701 275.039372 NaN NaN
2019-01-01 02:43:03 A16 787 138.751605 NaN NaN
2019-01-01 20:32:42 A14 989 188.035335 NaN NaN
2019-01-01 16:59:16 A13 847 118.071427 NaN NaN
... ... ... ... ...
2019-01-31 17:21:27 A30 3 49.855063 NaN NaN
2019-01-31 13:49:01 A6 273 245.966115 NaN NaN
2019-01-31 16:42:29 A7 548 197.814548 NaN NaN
2019-01-31 03:42:11 A5 563 263.999224 NaN NaN
2019-01-31 20:48:57 A9 809 318.420522 NaN NaN
[10000481 rows x 4 columns]
3 dataframeをdfsテーブルに追加
Orcaは
append
関数を提供し、Orca Data Frameをdfsテーブルに追加できます.append
関数には、次のパラメータがあります.>>> import pandas as pd
>>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'),
'sym':['A1','A2','A3','A4','A5'],
'qty':[100,200,300,400,500],
'price':[100.5,263.1,254.9,215.1,245.6]})
>>> trades.append(odf,inplace=True)
>>> len(trades)
10000005
Orcaはappend関数を拡張し、inplaceパラメータをサポートします.つまり、その場でデータを追加できます.inplaceがFalseの場合、pandasと同じように表現されます.分散テーブルの内容はメモリにコピーされ、tradesはメモリテーブルに対応し、odfの内容はメモリテーブルに追加され、dfsテーブルに追加されません.
4まとめ
分散テーブルの場合、Orcaには、現在、パーティションテーブルに対応するDataFrameにRangeIndexの概念がない、分散テーブルでの使用がサポートされていない関数、テーブル内のデータの変更の制限など、いくつかの機能上の制限があります.詳細については、Orcaクイックエントリーガイドを参照してください.