DolphinDB分散テーブルをOrcaで操作する方法


DolphinDBは分散シーケンスデータベースであり、豊富な計算と分析機能を内蔵しています.TB級の大量データを複数の物理機器に格納し、CPUを十分に利用し、大量データに対して高性能分析計算を行うことができる.Orcaでは、python環境でpandas構文と同じスクリプトを使用してDolphinDB分散データベースのデータを複雑かつ効率的に計算できます.このチュートリアルでは、DolphinDB分散テーブルに対するOrcaの操作について説明します.
この例ではDolphinDBスタンドアロンモードを使用します.まず、このチュートリアルのサンプル・データベースを作成します.dfs://orca_stock .データベースを作成するDolphinDBスクリプトは次のとおりです.
login("admin","123456")
if(existsDatabase("dfs://orca_stock")){
	dropDatabase("dfs://orca_stock")
}
dates=2019.01.01..2019.01.31
syms="A"+string(1..30)
sym_range=cutPoints(syms,3)
db1=database("",VALUE,dates)
db2=database("",RANGE,sym_range)
db=database("dfs://orca_stock",COMPO,[db1,db2])
n=10000000
datetimes=2019.01.01T00:00:00..2019.01.31T23:59:59
t=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(1000,n) as qty,rand(500.0,n) as price)
trades=db.createPartitionedTable(t,`trades,`trade_time`sym).append!(t)

n=200000
datetimes=2019.01.01T00:00:00..2019.01.02T23:59:59
syms="A"+string(1..30)
t2=table(rand(datetimes,n) as trade_time,rand(syms,n) as sym,rand(500.0,n) as bid,rand(500.0,n) as offer)
quotes=db.createPartitionedTable(t2,`quotes,`trade_time`sym).append!(t2)

syms="A"+string(1..30)
t3=table(syms as sym,rand(0 1,30) as type)
infos=db.createTable(t3,`infos).append!(t3)

注意:DolphinDBクライアントまたはDolphinDB Python APIで分散テーブルを作成する必要があります.Orcaで直接分散テーブルを作成することはできません.
Orcaでconnect関数を使用してDolphinDBサーバに接続します.
>>> import dolphindb.orca as orca
>>> orca.connect("localhost",8848,"admin","123456")

ユーザーは、実際の状況に応じてIPアドレスとポート番号を変更する必要があります.
1分散テーブルの読み込み
Orcaはread_table関数で分散テーブルを読み込み、結果としてOrca Data Frameを返します.例:サンプル・データベースの読み込みdfs://orca_stockのテーブルtrades:
>>> trades = orca.read_table('dfs://orca_stock','trades')
>>> type(trades)
orca.core.frame.DataFrame

tradesのカラム名を表示するには、次の手順に従います.
>>> trades.columns
Index(['trade_time', 'sym', 'qty', 'price'], dtype='object')

trades列のデータ型を表示するには、次の手順に従います.
>>> trades.dtypes
trade_time    datetime64[s]
sym                  object
qty                   int32
price               float64
dtype: object

tradesの行数を表示するには、次の手順に従います.
>>> len(trades)
10000000

DolphinDB分散テーブルに対応するOrca Data Frameには、テーブル名、データのカラム名などの情報を含むメタデータのみが格納されます.分散テーブルは連続記憶ではなく,各パーティション間に厳密な順序関係がないため,分散テーブルに対応するDataFrameにはRangeIndexの概念がない.indexを設定する必要がある場合は、set_index関数を使用します.たとえば、tradesのtrade_timeがindexに設定されています.
>>> trades.set_index('trade_time')

index列をデータ列に変換する場合は、reset_index関数を使用します.
>>> trades.reset_index()

2クエリーと計算
Orcaは不活性評価を採用しており、一部の計算はすぐにサービス側で計算されるのではなく、本当に必要なときに計算されるまで中間式に変換されます.ユーザーが計算を即時にトリガーする必要がある場合は、compute関数を呼び出すことができます.
サンプル・データベースdfs://orca_stockのデータはランダムに生成されるため,ユーザの実行結果は本章の結果と異なる.
2.1前のn条の記録を取るhead関数は、前のnレコードをクエリーし、デフォルトでは前の5レコードを取得します.たとえば、tradesの最初の5つのレコードを取得します.
>>> trades.head()
           trade_time  sym  qty       price
0 2019-01-01 18:04:33  A16  855  482.526769
1 2019-01-01 13:57:38  A12  244   61.675293
2 2019-01-01 23:58:15  A10   36  297.623295
3 2019-01-01 23:02:43  A16  426  109.041012
4 2019-01-01 04:33:53   A1  472   75.778951

2.2ソートsort_valuesメソッドは、カラムに基づいてソートできます.たとえば、tradesはprice降順にソートし、上位5つのレコードを取得します.
>>> trades.sort_values(by='price', ascending=False).head()
           trade_time  sym  qty       price
0 2019-01-03 12:56:09  A22  861  499.999998
1 2019-01-18 17:25:21  A19   95  499.999963
2 2019-01-30 02:18:48  A30  114  499.999949
3 2019-01-23 08:31:56   A3  926  499.999926
4 2019-01-20 03:36:53   A3  719  499.999892

複数の列でソート:
>>> trades.sort_values(by=['qty','trade_time'], ascending=False).head()
           trade_time  sym  qty       price
0 2019-01-31 23:58:50  A24  999  359.887697
1 2019-01-31 23:57:26   A3  999  420.156175
2 2019-01-31 23:56:34   A2  999  455.228435
3 2019-01-31 23:52:58   A6  999  210.819227
4 2019-01-31 23:45:17  A14  999  310.813216

2.3条件による照会
Orcaは、単一または複数の条件に従ってクエリーを考慮することをサポートします.たとえば、
tradesの2019年1月2日のデータを検索します.
>>> tmp = trades[trades.trade_time.dt.date == "2019.01.01"]
>>> tmp.head()
           trade_time sym  qty       price
0 2019-01-01 00:32:21  A2  139  383.971293
1 2019-01-01 21:19:09  A2  263  100.932553
2 2019-01-01 18:50:48  A2  890  335.614454
3 2019-01-01 23:29:16  A2  858  469.223992
4 2019-01-01 09:58:51  A2  883  235.753424

照会tradesで2019年1月30日、株式コードはA 2のデータです.
>>> tmp = trades[(trades.trade_time.dt.date == '2019.01.30') & (trades.sym == 'A2')]
>>> tmp.head()
           trade_time sym  qty       price
0 2019-01-30 04:41:56  A2  880  428.552654
1 2019-01-30 14:13:53  A2  512  488.826978
2 2019-01-30 14:31:28  A2  536  478.578219
3 2019-01-30 04:09:41  A2  709  255.435903
4 2019-01-30 13:18:50  A2  355  404.782260

2.4 groupbyグループクエリーgroupby関数は、パケット集約に使用されます.次の関数はgroupbyオブジェクトで使用できます.
  • count:NULL以外の要素を返す個数
  • sum:求和
  • mean:平均
  • min:最小値
  • max:最大値
  • mode:衆数
  • abs:絶対値
  • prod:積
  • std:標準差
  • var:分散
  • sem:平均値の標準誤差
  • skew:傾斜度
  • kurtosis:ピーク
  • cumsum:累積合計
  • cumprod:累積積
  • cummax:累積最大値
  • cummin:累積最小値
  • tradesの毎日のレコード数を計算します.
    >>> trades.groupby(trades.trade_time.dt.date)['sym'].count()
    trade_time
    2019-01-01    322573
    2019-01-02    322662
    2019-01-03    323116
    2019-01-04    322436
    2019-01-05    322156
    2019-01-06    324191
    2019-01-07    321879
    2019-01-08    323319
    2019-01-09    322262
    2019-01-10    322585
    2019-01-11    322986
    2019-01-12    322839
    2019-01-13    322302
    2019-01-14    322032
    2019-01-15    322409
    2019-01-16    321810
    2019-01-17    321566
    2019-01-18    323651
    2019-01-19    323463
    2019-01-20    322675
    2019-01-21    322845
    2019-01-22    322931
    2019-01-23    322598
    2019-01-24    322404
    2019-01-25    322454
    2019-01-26    321760
    2019-01-27    321955
    2019-01-28    322013
    2019-01-29    322745
    2019-01-30    322193
    2019-01-31    323190
    dtype: int64

    tradesの1日あたりのレコード数を計算します.
    >>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].count()
    trade_time  sym
    2019-01-01  A1     10638
                A10    10747
                A11    10709
                A12    10715
                A13    10914
                       ...  
    2019-01-31  A5     10717
                A6     10934
                A7     10963
                A8     10907
                A9     10815
    Length: 930, dtype: int64

    Orcaはaggを介して複数の集約関数を一度に適用することをサポートする.pandasとは異なり、Orcaはaggで呼び出す集約関数を表す文字列を使用します.たとえば、tradesの1日あたりの価格の最大値、最小値、平均値を計算します.
    >>> trades.groupby(trades.trade_time.dt.date)['price'].agg(["min","max","avg"])
                   price                        
                     min         max         avg
    trade_time                                  
    2019-01-01  0.003263  499.999073  249.913612
    2019-01-02  0.000468  499.999533  249.956874
    2019-01-03  0.000054  499.999998  249.927257
    2019-01-04  0.000252  499.999762  249.982737
    2019-01-05  0.001907  499.999704  250.097487
    2019-01-06  0.000318  499.999824  249.991605
    2019-01-07  0.003196  499.999548  249.560505
    2019-01-08  0.000216  499.996703  250.024405
    2019-01-09  0.002635  499.998985  249.966446
    2019-01-10  0.000725  499.996717  249.663324
    2019-01-11  0.003140  499.998267  250.243786
    2019-01-12  0.000105  499.998453  250.077061
    2019-01-13  0.004297  499.999139  250.097489
    2019-01-14  0.003510  499.999452  249.775830
    2019-01-15  0.002501  499.999638  250.021218
    2019-01-16  0.000451  499.998059  250.044059
    2019-01-17  0.002359  499.998462  249.808932
    2019-01-18  0.000104  499.999963  249.918651
    2019-01-19  0.000999  499.998000  249.899495
    2019-01-20  0.000489  499.999892  249.606668
    2019-01-21  0.000729  499.999774  249.839876
    2019-01-22  0.000834  499.999331  249.632037
    2019-01-23  0.001982  499.999926  249.955031
    2019-01-24  0.000323  499.993956  249.557851
    2019-01-25  0.000978  499.999716  249.722053
    2019-01-26  0.002582  499.998753  249.897519
    2019-01-27  0.000547  499.999809  250.404666
    2019-01-28  0.002729  499.998545  249.622289
    2019-01-29  0.000487  499.999598  249.950167
    2019-01-30  0.000811  499.999949  250.182493
    2019-01-31  0.000801  499.999292  249.317517

    Orca groupbyはフィルタ機能をサポートしています.pandasとは異なり,Orcaにおけるフィルタ条件はlambda関数ではなく文字列形式の式で表される.
    たとえば、tradesの1日あたりの平均株価が200を超え、記録数が11000を超える記録を返します.
    >>> trades.groupby([trades.trade_time.dt.date,'sym'])['price'].filter("avg(price) > 200 and count(price) > 11000")
    0        499.171179
    1        375.553059
    2        119.240890
    3        370.198534
    4          5.876941
                ...    
    88416     37.872317
    88417    373.259785
    88418    435.154484
    88419    436.163806
    88420    428.455914
    Length: 88421, dtype: float64

    2.5 resample再サンプリング
    Orcaはresample関数をサポートし、通常の時系列データを再サンプリングし、周波数変換することができる.現在、resample関数のパラメータは次のとおりです.
  • rule:DateOffset、文字列またはdateoffsetオブジェクト
  • on:時間列、この列を用いて再サンプリング
  • level:文字列または整数、MultiIndexの場合、levelで指定された列を再サンプリング
  • OrcaがサポートするDateOffsetは次のとおりです.
    'B':BDay or BusinessDay
    'WOM':WeekOfMonth
    'LWOM':LastWeekOfMonth
    'M':MonthEnd
    'MS':MonthBegin
    'BM':BMonthEnd or BusinessMonthEnd
    'BMS':BMonthBegin or BusinessMonthBegin
    'SM':SemiMonthEnd
    'SMS':SemiMonthBegin
    'Q':QuarterEnd
    'QS':QuarterBegin
    'BQ':BQuarterEnd
    'BQS':BQuarterBegin
    'REQ':FY5253Quarter
    'A':YearEnd
    'AS' or 'BYS':YearBegin
    'BA':BYearEnd
    'BAS':BYearBegin
    'RE':FY5253
    'D':Day
    'H':Hour
    'T' or 'min':Minute
    'S':Second
    'L' or 'ms':Milli
    'U' or 'us':Micro
    'N':Nano

    たとえば、tradesのデータを再サンプリングし、3分ごとに計算します.
    >>> trades.resample('3T', on='trade_time')['qty'].sum()
    trade_time
    2019-01-01 00:00:00    321063
    2019-01-01 00:03:00    354917
    2019-01-01 00:06:00    329419
    2019-01-01 00:09:00    340880
    2019-01-01 00:12:00    356612
                            ...  
    2019-01-31 23:45:00    322829
    2019-01-31 23:48:00    344753
    2019-01-31 23:51:00    330959
    2019-01-31 23:54:00    336712
    2019-01-31 23:57:00    328730
    Length: 14880, dtype: int64

    tradesがtrade_を設定した場合timeはindexであり、次の方法で再サンプリングすることもできます.
    >>> trades.resample('3T', level='trade_time')['qty'].sum()

    dateoffset関数で生成したオブジェクトでdateoffsetを表す場合は、pandasのdateoffsetをインポートする必要があります.3分ごとに再サンプリングする場合は、次のように書くこともできます.
    >>> from pandas.tseries.offsets import *
    >>> ofst = Minute(n=3)
    >>> trades.resample(ofst,on='trade_time')['qty'].sum()

    2.6 rolling移動ウィンドウ
    Orcaはrolling関数を提供し、モバイルウィンドウで計算することができます.現在、rolling関数のパラメータは次のとおりです.
  • window::整数、ウィンドウの長さを表す
  • on:ウィンドウ
  • を計算する文字列
    次の関数はorca.DataFrame.rollingオブジェクトで使用できます.
  • count:NULL以外の要素を返す個数
  • sum:求和
  • min:最小値
  • max:最大値
  • std:標準差
  • var:分散
  • corr:相関
  • covar:共分散
  • skew:傾斜度
  • kurtosis:ピーク
  • 分散テーブルに対応するDataFrameでは、スライドウィンドウで計算する場合、パーティション単位で個別に計算されるため、各パーティションの計算結果の前のwindow-1個の値が空になります.例えば、tradesの2019.01.01と2019.01.02のデータは、長さ3のスライドウィンドウでpriceの和を求める.
    >>> tmp = trades[(trades.trade_time.dt.date == '2019.01.01') | (trades.trade_time.dt.date == '2019.01.02')]
    >>> re = tmp.rolling(window=3)['price'].sum()
    0                 NaN
    1                 NaN
    2          792.386603
    3          601.826312
    4          444.858366
                 ...     
    646057    1281.099161
    646058    1287.816045
    646059     963.262163
    646060     865.797011
    646061     719.050068
    Name: price, Length: 646062, dtype: float64

    2.7データ接続
    Orcaは、DataFrameを接続する機能を提供しています.分散テーブル対応のDataFrameは、通常のメモリテーブル対応のDataFrameに接続してもよいし、分散テーブル対応のDataFrameに接続してもよい.2つの分散テーブルに対応するData Frame接続は、次の条件を同時に満たす必要があります.
  • 2つの分散テーブルが同じデータベースにある
  • 接続列はすべてのパーティション列
  • を含む必要があります.
    Orcaは、mergeおよびjoin関数を提供する.merge関数は、次のパラメータをサポートします.
  • right:Orca Data FrameまたはSeries
  • how:接続のタイプを表す文字列.left、right、outer、innerであってもよく、デフォルト値はinner
  • on:接続列
  • を示す文字列
  • left_on:左テーブルの接続列
  • を示す文字列
  • right_on:文字列、右表の接続列
  • を表す
  • left_index:左テーブルのインデックス
  • right_index:右テーブルのインデックス
  • suffixes:文字列、重複列を表す接尾辞
  • join関数はmerge関数の特例であり、そのパラメータおよび意味はmergeとほぼ同じであるが、joinのデフォルトは左外接続、すなわちhow='left'である.
    たとえば、tradesとquotesを内部接続します.
    >>> quotes = orca.read_table('dfs://orca_stock','quotes')
    >>> trades.merge(right=quotes, left_on=['trade_time','sym'], right_on=['trade_time','sym'], how='inner')
                   trade_time  sym  qty       price         bid       offer
    0     2019-01-01 02:36:34  A15  273  186.144261  317.458480  155.361661
    1     2019-01-01 05:37:59  A13  185  420.397500  248.447426  115.722893
    2     2019-01-01 00:59:43  A10  751   89.801687  193.925714  144.345473
    3     2019-01-01 21:58:36  A16  175  251.753495  116.810807  439.178207
    4     2019-01-01 10:53:54  A16  532   71.733640  240.927647  388.718680
    ...                   ...  ...  ...         ...         ...         ...
    25035 2019-01-02 03:59:51   A3  220   50.004418  107.905522  167.375994
    25036 2019-01-02 17:54:01   A3  202  195.189216  134.463906  142.443428
    25037 2019-01-02 16:57:50   A9  627   68.661644  440.421876  110.801070
    25038 2019-01-02 10:27:43  A28  414  487.337282  169.081363  261.171073
    25039 2019-01-02 17:02:51   A3  661  243.960836   92.999404   26.747609
    
    [25040 rows x 6 columns]
    join関数を使用してtradesとquotesを左外部に接続します.
    >>> trades.set_index(['trade_time','sym'], inplace=True)
    >>> quotes.set_index(['trade_time','sym'], inplace=True)
    >>> trades.join(quotes)
                             qty       price  bid  offer
    trade_time          sym                             
    2019-01-01 18:04:25 A14  435  378.595626  NaN    NaN
    2019-01-01 20:38:47 A13  701  275.039372  NaN    NaN
    2019-01-01 02:43:03 A16  787  138.751605  NaN    NaN
    2019-01-01 20:32:42 A14  989  188.035335  NaN    NaN
    2019-01-01 16:59:16 A13  847  118.071427  NaN    NaN
    ...                      ...         ...  ...    ...
    2019-01-31 17:21:27 A30    3   49.855063  NaN    NaN
    2019-01-31 13:49:01 A6   273  245.966115  NaN    NaN
    2019-01-31 16:42:29 A7   548  197.814548  NaN    NaN
    2019-01-31 03:42:11 A5   563  263.999224  NaN    NaN
    2019-01-31 20:48:57 A9   809  318.420522  NaN    NaN
    
    [10000481 rows x 4 columns]

    3 dataframeをdfsテーブルに追加
    Orcaはappend関数を提供し、Orca Data Frameをdfsテーブルに追加できます.append関数には、次のパラメータがあります.
  • other:追加するDataFrame
  • ignore_index:ブール値、インデックスを無視するかどうか.デフォルトはFalse
  • verify_integrity:ブール値.デフォルトはFalse
  • sort:ブール値.ソートするかどうかを示します.デフォルトはNone
  • inplace:ブール値.dfsテーブルに挿入するかどうかを示します.デフォルトはFalse
  • たとえば、dataframeにtrades対応の分散テーブルを追加します.
    >>> import pandas as pd
    >>> odf=orca.DataFrame({'trade_time':pd.date_range('20190101 12:30',periods=5,freq='T'),
                       'sym':['A1','A2','A3','A4','A5'],
                       'qty':[100,200,300,400,500],
                       'price':[100.5,263.1,254.9,215.1,245.6]})
    >>> trades.append(odf,inplace=True)
    >>> len(trades)
    10000005

    Orcaはappend関数を拡張し、inplaceパラメータをサポートします.つまり、その場でデータを追加できます.inplaceがFalseの場合、pandasと同じように表現されます.分散テーブルの内容はメモリにコピーされ、tradesはメモリテーブルに対応し、odfの内容はメモリテーブルに追加され、dfsテーブルに追加されません.
    4まとめ
    分散テーブルの場合、Orcaには、現在、パーティションテーブルに対応するDataFrameにRangeIndexの概念がない、分散テーブルでの使用がサポートされていない関数、テーブル内のデータの変更の制限など、いくつかの機能上の制限があります.詳細については、Orcaクイックエントリーガイドを参照してください.