pands applyマルチスレッド実現コード


一、マルチスレッド化選択
     一つのコードを並列化するには二つの選択があります。multithreadとmultiprocess。
     Multithread,マルチスレッド,同じプロセスが複数のスレッドを開いて計算を実行できます。各スレッドはCPUコアを表しており、これだけ多くのスレッドは同じメモリアドレス(いわゆる共有メモリ)にアクセスでき、スレッド間の通信を実現し、最も単純な並列モデルといえる。
    Multiprocessは、マルチプロセスは、複数のPythonインタプリタを同時に開くことに相当し、各インタプリタは独自のデータを持っています。もちろん、データの衝突はありません。
二、並行化思想
パラレル化の基本的な考え方は、dataframeをnp.arrayスプリット法は複数の背のdataframeにカットされます。Pool.map関数を再起動して並列に実行します。順序で実行されるpands.Data Frame.appyは、どのようにPool.mapに変換して並列に実行されるかに注目してください。
Poolオブジェクトは並行プロセスのセットで、オープンソースPoolクラスです。
オープンソースPoolクラス定義

 def Pool(self, processes=None, initializer=None, initargs=(),
       maxtasksperchild=None):
    '''Returns a process pool object'''
    from .pool import Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
          context=self.get_context())
プロセス初期化関数の設定

def init_process(global_vars):
  global a
  a = global_vars
プロセス初期化関数の設定

Pool(processes=8,initializer=init_process,initargs=(a,))
ここで指定された8つのプロセスは、各プロセスの初期化にはinit_を実行する必要があります。process関数は、そのパラメータはsingleton tuple aであり、init_を利用する。processとinitargsは、プロセス間で共有するグローバル変数(ここではa)を設定しやすいです。
withキーワードはcontext managerで、煩雑な処理スイッチプロセスのロジックを書かないようにします。

 with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:    
    result_parts = pool.map(apply_f,df_parts)
三、マルチスレッド化アプリケーション
マルチスレッド時間比較とマルチスレッドのいくつかのアプリケーション

import numpy as np
import pandas as pd
import time
from multiprocessing import Pool

def f(row):
  #         
  return sum(row)+a

def f1_1(row):
  #        ,    columns=range(0,2),     0     
  return row[0]**2

def f1_2(row1):
  #        ,    columns=range(0,2),     0     
  return row1**2

def f2_1(row):
  #        ,    columns=range(0,2),     0,2     
  return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])

def f2_2(row1,row2):
  #        ,    columns=range(0,2),     0,2     
  return pd.Series([row1**2,row2**2],index=['2_1','2_2'])

def apply_f(df):
  return df.apply(f,axis=1)

def apply_f1_1(df):
  return df.apply(f1_1,axis=1)

def apply_f1_2(df):
  return df[0].apply(f1_2)

def apply_f2_1(df):
  return df.apply(f2_1,axis=1)

def apply_f2_2(df):
  return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)
 
def init_process(global_vars):
  global a
  a = global_vars
  
def time_compare():
  '''              '''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
  print(df.columns)
   
  t1= time.time()
  result_serial = df.apply(f,axis=1)
  t2 = time.time()
  print("Serial time =",t2-t1)
  print(result_serial.head())

  
  df_parts=np.array_split(df,20)
  print(len(df_parts),type(df_parts[0]))
  with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool: 
  #with Pool(processes=8) as pool:    
    result_parts = pool.map(apply_f,df_parts)
  result_parallel= pd.concat(result_parts)
  t3 = time.time()
  print("Parallel time =",t3-t2)
  print(result_parallel.head())


def apply_fun():
  '''  apply     '''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
  print(df.columns)
  df_parts=np.array_split(df,20)
  print(len(df_parts),type(df_parts[0]))
  with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool: 
  #with Pool(processes=8) as pool:    
    res_part0 = pool.map(apply_f,df_parts)
    res_part1 = pool.map(apply_f1_1,df_parts)
    res_part2 = pool.map(apply_f1_2,df_parts)
    res_part3 = pool.map(apply_f2_1,df_parts)
    res_part4 = pool.map(apply_f2_2,df_parts)

  res_parallel0 = pd.concat(res_part0)
  res_parallel1 = pd.concat(res_part1)
  res_parallel2 = pd.concat(res_part2)
  res_parallel3 = pd.concat(res_part3)
  res_parallel4 = pd.concat(res_part4)
  
  print("f:
",res_parallel0.head()) print("f1:
",res_parallel1.head()) print("f2:
",res_parallel2.head()) print("f3:
",res_parallel3.head()) print("f4:
",res_parallel4.head()) df=pd.concat([df,res_parallel0],axis=1) df=pd.concat([df,res_parallel1],axis=1) df=pd.concat([df,res_parallel2],axis=1) df=pd.concat([df,res_parallel3],axis=1) df=pd.concat([df,res_parallel4],axis=1) print(df.head()) if __name__ == '__main__': time_compare() apply_fun()
参考URL
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
ここで、pands applyマルチスレッドコードに関する記事を紹介します。これに関連して、pands applyマルチスレッドの内容は以前の文章を検索したり、下記の関連記事を見たりしてください。これからもよろしくお願いします。