pands applyマルチスレッド実現コード
一、マルチスレッド化選択
一つのコードを並列化するには二つの選択があります。multithreadとmultiprocess。
Multithread,マルチスレッド,同じプロセスが複数のスレッドを開いて計算を実行できます。各スレッドはCPUコアを表しており、これだけ多くのスレッドは同じメモリアドレス(いわゆる共有メモリ)にアクセスでき、スレッド間の通信を実現し、最も単純な並列モデルといえる。
Multiprocessは、マルチプロセスは、複数のPythonインタプリタを同時に開くことに相当し、各インタプリタは独自のデータを持っています。もちろん、データの衝突はありません。
二、並行化思想
パラレル化の基本的な考え方は、dataframeをnp.arrayスプリット法は複数の背のdataframeにカットされます。Pool.map関数を再起動して並列に実行します。順序で実行されるpands.Data Frame.appyは、どのようにPool.mapに変換して並列に実行されるかに注目してください。
Poolオブジェクトは並行プロセスのセットで、オープンソースPoolクラスです。
オープンソースPoolクラス定義
withキーワードはcontext managerで、煩雑な処理スイッチプロセスのロジックを書かないようにします。
マルチスレッド時間比較とマルチスレッドのいくつかのアプリケーション
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
ここで、pands applyマルチスレッドコードに関する記事を紹介します。これに関連して、pands applyマルチスレッドの内容は以前の文章を検索したり、下記の関連記事を見たりしてください。これからもよろしくお願いします。
一つのコードを並列化するには二つの選択があります。multithreadとmultiprocess。
Multithread,マルチスレッド,同じプロセスが複数のスレッドを開いて計算を実行できます。各スレッドはCPUコアを表しており、これだけ多くのスレッドは同じメモリアドレス(いわゆる共有メモリ)にアクセスでき、スレッド間の通信を実現し、最も単純な並列モデルといえる。
Multiprocessは、マルチプロセスは、複数のPythonインタプリタを同時に開くことに相当し、各インタプリタは独自のデータを持っています。もちろん、データの衝突はありません。
二、並行化思想
パラレル化の基本的な考え方は、dataframeをnp.arrayスプリット法は複数の背のdataframeにカットされます。Pool.map関数を再起動して並列に実行します。順序で実行されるpands.Data Frame.appyは、どのようにPool.mapに変換して並列に実行されるかに注目してください。
Poolオブジェクトは並行プロセスのセットで、オープンソースPoolクラスです。
オープンソースPoolクラス定義
def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
'''Returns a process pool object'''
from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
context=self.get_context())
プロセス初期化関数の設定
def init_process(global_vars):
global a
a = global_vars
プロセス初期化関数の設定
Pool(processes=8,initializer=init_process,initargs=(a,))
ここで指定された8つのプロセスは、各プロセスの初期化にはinit_を実行する必要があります。process関数は、そのパラメータはsingleton tuple aであり、init_を利用する。processとinitargsは、プロセス間で共有するグローバル変数(ここではa)を設定しやすいです。withキーワードはcontext managerで、煩雑な処理スイッチプロセスのロジックを書かないようにします。
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
result_parts = pool.map(apply_f,df_parts)
三、マルチスレッド化アプリケーションマルチスレッド時間比較とマルチスレッドのいくつかのアプリケーション
import numpy as np
import pandas as pd
import time
from multiprocessing import Pool
def f(row):
#
return sum(row)+a
def f1_1(row):
# , columns=range(0,2), 0
return row[0]**2
def f1_2(row1):
# , columns=range(0,2), 0
return row1**2
def f2_1(row):
# , columns=range(0,2), 0,2
return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])
def f2_2(row1,row2):
# , columns=range(0,2), 0,2
return pd.Series([row1**2,row2**2],index=['2_1','2_2'])
def apply_f(df):
return df.apply(f,axis=1)
def apply_f1_1(df):
return df.apply(f1_1,axis=1)
def apply_f1_2(df):
return df[0].apply(f1_2)
def apply_f2_1(df):
return df.apply(f2_1,axis=1)
def apply_f2_2(df):
return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)
def init_process(global_vars):
global a
a = global_vars
def time_compare():
''' '''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
t1= time.time()
result_serial = df.apply(f,axis=1)
t2 = time.time()
print("Serial time =",t2-t1)
print(result_serial.head())
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
result_parts = pool.map(apply_f,df_parts)
result_parallel= pd.concat(result_parts)
t3 = time.time()
print("Parallel time =",t3-t2)
print(result_parallel.head())
def apply_fun():
''' apply '''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
res_part0 = pool.map(apply_f,df_parts)
res_part1 = pool.map(apply_f1_1,df_parts)
res_part2 = pool.map(apply_f1_2,df_parts)
res_part3 = pool.map(apply_f2_1,df_parts)
res_part4 = pool.map(apply_f2_2,df_parts)
res_parallel0 = pd.concat(res_part0)
res_parallel1 = pd.concat(res_part1)
res_parallel2 = pd.concat(res_part2)
res_parallel3 = pd.concat(res_part3)
res_parallel4 = pd.concat(res_part4)
print("f:
",res_parallel0.head())
print("f1:
",res_parallel1.head())
print("f2:
",res_parallel2.head())
print("f3:
",res_parallel3.head())
print("f4:
",res_parallel4.head())
df=pd.concat([df,res_parallel0],axis=1)
df=pd.concat([df,res_parallel1],axis=1)
df=pd.concat([df,res_parallel2],axis=1)
df=pd.concat([df,res_parallel3],axis=1)
df=pd.concat([df,res_parallel4],axis=1)
print(df.head())
if __name__ == '__main__':
time_compare()
apply_fun()
参考URLhttps://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
ここで、pands applyマルチスレッドコードに関する記事を紹介します。これに関連して、pands applyマルチスレッドの内容は以前の文章を検索したり、下記の関連記事を見たりしてください。これからもよろしくお願いします。