Python大規模データストレージと読み取り、並列計算:Daskライブラリの概要
8751 ワード
この文書は次のとおりです.https://blog.csdn.net/sinat_26917383/article/details/78044437
データ構造はpandasと非常に似ており,比較的理解しやすい.原文ドキュメント:http://dask.pydata.org/en/latest/index.html
github:https://github.com/dask
daskの内容はたくさんありますが、私がよく見ている内容を選んで重点を置いてください.
一、データの読み取りと記憶
まずdaskがどのような内容を読み込むことができるかを見てみましょう:
1、csv
とても似ています.compute() . 2、Dask Array hdf 5を読み取る
左はPandas、右はdask
3、Dask Bag
import dask.bag as db b = db.read_text('2015-*-*.json.gz').map(json.loads) b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
大規模なjsonファイルを読み込んで、何億もeasy
>>> b = db.read_text('myfile.txt') >>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...]) >>> b = db.read_text('myfile.*.txt')
txtを読み込む
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
Dataframe形式のコンテンツに変更
4、Dask Delayed並列計算
from dask import delayed L = [] for fn in filenames: # Use for loops to build up computation data = delayed(load)(fn) # Delay execution of function L.append(delayed(process)(data)) # Build connections between variables
result = delayed(summarize)(L) result.compute()
5、concurrent.futuresカスタムタスク
from dask.distributed import Client client = Client('scheduler:port')
futures = [] for fn in filenames: future = client.submit(load, fn) futures.append(future)
summary = client.submit(summarize, futures) summary.result()
二、Delayed並列計算モジュール
先行例、本来の例:
def inc(x): return x + 1
def double(x): return x + 2
def add(x, y): return x + y
data = [1, 2, 3, 4, 5]
output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c)
total = sum(output)
delayで加速したものを見てみましょう:ここに画像の説明を書きます
from dask import delayed
output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c)
total = delayed(sum)(output)
計算プロセスを視覚化することもできます.
total.visualize() # see image to the right
三、SKLearnと結合した並列アルゴリズム
一般化回帰GLM:https://github.com/dask/dask-glmtensorflow深さ学習ライブラリ:Dask-Tensorflow
XGBoostを例にとると、公式:https://github.com/dask/dask-xgboostケースコードを見てみましょう1、データのロード
import dask.dataframe as dd
# Subset of the columns to use cols = ['Year', 'Month', 'DayOfWeek', 'Distance', 'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']
# Create the dataframe df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols, storage_options={'anon': True})
df = df.sample(frac=0.2) # we blow out ram otherwise
is_delayed = (df.DepDelay.fillna(16) > 15)
df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399) del df['DepDelay']
df, is_delayed = persist(df, is_delayed) progress(df, is_delayed)
2、One hot encodeコード
df2 = dd.get_dummies(df.categorize()).persist()
ここに画像の説明を書きます.3、トレーニングセットとテストセット+トレーニングの準備
data_train, data_test = df2.random_split([0.9, 0.1], random_state=1234) labels_train, labels_test = is_delayed.random_split([0.9, 0.1], random_state=1234)
トレーニング
import dask_xgboost as dxgb
params = {'objective': 'binary:logistic', 'nround': 1000, 'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 'min_child_weight': 1}
bst = dxgb.train(client, params, data_train, labels_train) bst
4、予測
# Use normal XGBoost model with normal Pandas import xgboost as xgb dtest = xgb.DMatrix(data_test.head()) bst.predict(dtest)
predictions = dxgb.predict(client, bst, data_test).persist() predictions.head()
.5、モデル評価
from sklearn.metrics import roc_auc_score, roc_curve print(roc_auc_score(labels_test.compute(), predictions.compute())) import matplotlib.pyplot as plt %matplotlib inline
fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute()) # Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py plt.figure(figsize=(8, 8)) lw = 2 plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve') plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver operating characteristic example') plt.legend(loc="lower right") plt.show()
. 四、計算プロセスの可視化部分——Dask.array
ソース:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97e
import numpy as np import dask.array as da x = da.ones(15, chunks=(5,)) x.visualize('dask.svg')
2 Dモジュールを1つください.
----------------------作者:悟乙己出所:CSDN原文:https://blog.csdn.net/sinat_26917383/article/details/78044437本文は博主のオリジナルの文章で、転載して博文のリンクを添付してください!
データ構造はpandasと非常に似ており,比較的理解しやすい.
github:https://github.com/dask
daskの内容はたくさんありますが、私がよく見ている内容を選んで重点を置いてください.
一、データの読み取りと記憶
まずdaskがどのような内容を読み込むことができるかを見てみましょう:
1、csv
# pandas
import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()
#dask
import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()
とても似ています.compute() . 2、Dask Array hdf 5を読み取る
import numpy as np import dask.array as da
f = h5py.File('myfile.hdf5') f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data']) x = da.from_array(f['/big-data'],
chunks=(1000, 1000))
x - x.mean(axis=1) x - x.mean(axis=1).compute()
左はPandas、右はdask
3、Dask Bag
import dask.bag as db b = db.read_text('2015-*-*.json.gz').map(json.loads) b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()
大規模なjsonファイルを読み込んで、何億もeasy
>>> b = db.read_text('myfile.txt') >>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...]) >>> b = db.read_text('myfile.*.txt')
txtを読み込む
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
Dataframe形式のコンテンツに変更
4、Dask Delayed並列計算
from dask import delayed L = [] for fn in filenames: # Use for loops to build up computation data = delayed(load)(fn) # Delay execution of function L.append(delayed(process)(data)) # Build connections between variables
result = delayed(summarize)(L) result.compute()
5、concurrent.futuresカスタムタスク
from dask.distributed import Client client = Client('scheduler:port')
futures = [] for fn in filenames: future = client.submit(load, fn) futures.append(future)
summary = client.submit(summarize, futures) summary.result()
二、Delayed並列計算モジュール
先行例、本来の例:
def inc(x): return x + 1
def double(x): return x + 2
def add(x, y): return x + y
data = [1, 2, 3, 4, 5]
output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c)
total = sum(output)
delayで加速したものを見てみましょう:ここに画像の説明を書きます
from dask import delayed
output = [] for x in data: a = delayed(inc)(x) b = delayed(double)(x) c = delayed(add)(a, b) output.append(c)
total = delayed(sum)(output)
計算プロセスを視覚化することもできます.
total.visualize() # see image to the right
三、SKLearnと結合した並列アルゴリズム
一般化回帰GLM:https://github.com/dask/dask-glmtensorflow深さ学習ライブラリ:Dask-Tensorflow
XGBoostを例にとると、公式:https://github.com/dask/dask-xgboostケースコードを見てみましょう1、データのロード
import dask.dataframe as dd
# Subset of the columns to use cols = ['Year', 'Month', 'DayOfWeek', 'Distance', 'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']
# Create the dataframe df = dd.read_csv('s3://dask-data/airline-data/20*.csv', usecols=cols, storage_options={'anon': True})
df = df.sample(frac=0.2) # we blow out ram otherwise
is_delayed = (df.DepDelay.fillna(16) > 15)
df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399) del df['DepDelay']
df, is_delayed = persist(df, is_delayed) progress(df, is_delayed)
2、One hot encodeコード
df2 = dd.get_dummies(df.categorize()).persist()
ここに画像の説明を書きます.3、トレーニングセットとテストセット+トレーニングの準備
data_train, data_test = df2.random_split([0.9, 0.1], random_state=1234) labels_train, labels_test = is_delayed.random_split([0.9, 0.1], random_state=1234)
トレーニング
import dask_xgboost as dxgb
params = {'objective': 'binary:logistic', 'nround': 1000, 'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 'min_child_weight': 1}
bst = dxgb.train(client, params, data_train, labels_train) bst
4、予測
# Use normal XGBoost model with normal Pandas import xgboost as xgb dtest = xgb.DMatrix(data_test.head()) bst.predict(dtest)
predictions = dxgb.predict(client, bst, data_test).persist() predictions.head()
.5、モデル評価
from sklearn.metrics import roc_auc_score, roc_curve print(roc_auc_score(labels_test.compute(), predictions.compute())) import matplotlib.pyplot as plt %matplotlib inline
fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute()) # Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py plt.figure(figsize=(8, 8)) lw = 2 plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve') plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver operating characteristic example') plt.legend(loc="lower right") plt.show()
. 四、計算プロセスの可視化部分——Dask.array
ソース:https://gist.github.com/mrocklin/b61f795004ec0a70e43de350e453e97e
import numpy as np import dask.array as da x = da.ones(15, chunks=(5,)) x.visualize('dask.svg')
(x + 1).sum().visualize('dask.svg')
2 Dモジュールを1つください.
x = da.ones((15, 15), chunks=(5, 5))
x.visualize('dask.svg')
(x.dot(x.T + 1) - x.mean(axis=0)).std().visualize('dask.svg')
----------------------作者:悟乙己出所:CSDN原文:https://blog.csdn.net/sinat_26917383/article/details/78044437本文は博主のオリジナルの文章で、転載して博文のリンクを添付してください!