Pythonにおける巨大データセット上のピアソン相関行列の計算


GoodIPにおける最新のタスクのうちの1つは、0~50 kの範囲でおよそ800の観測を持っているおよそ480 kアイテムの間の類似点を計算することでした.次元性を減らすことはロングテール結果の品質を損なうことになり、望ましくない.次の記事では、さまざまな実装のパフォーマンスを評価し、データセットを複数のチャンクに分割して並列処理する方法について説明します.私も興味深い、異なるアプローチを提供するDeepGraphに言及したい.
短い復習として、ピアソン相関は−1、負の相関を示す(増加する値が減少すると、Bの対応する値が減少する)、1、正の相関(増加Bが増加する場合).0の値は相関を持たない.と定義される

xとyはxとyの値の意味です.
私たちの場合、データセットを含んでいるCSVファイルは、1 GBのサイズのまわりにあります.私たちはそれをロードし、それからnumpy ndarrayを作成します.
items = pd.read_csv("./items.csv")
numpy_items = items.to_numpy() 

単にnpを実行しようとします.Corrcoef ( NumpyRule項目)は、MemoryError例外を送出します.実際に処理する必要があるペアの実際の数は、異なるオブジェクトの数です.ncr ( n , r )= n !/R!( n−——この場合NCR(480000、2)であるが、まだ115の199の760の000の組合せ.

実装性能比較
まず、Pandas , Numpy , Cuppyからの実装の違いを比較して、並列処理で動作します.したがって、データの1 %のサブセットのみを使用します.
import numpy as np
import cupy as cp
import pandas as pd
from scipy.stats import pearsonr
cp.cuda.set_allocator(None) # Disable cache
items = items[0:5000]
numpy_items = items.to_numpy()
cupy_items = cp.array(numpy_items)

# Pandas implementation
%%timeit -n 1 -r 5
r1 = items.T.corr()
# 1 loop, best of 5: 37 s per loop

# NumPy implementation
%%timeit -n 1 -r 5
r2 = np.corrcoef(numpy_items)
# 1 loop, best of 5: 1.21 s per loop

# CuPy implementation (on Tesla K80)
%%timeit -n 1 -r 5
r3 = cp.corrcoef(cupy_items)
# 1 loop, best of 5: 66 ms per loop

# NumPy custom
%%timeit -n 1 -r 5
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))
r4 = []
for i in range(numpy_profiles.shape[0]):
    temp = np.dot(datam[i:], datam[i].T)
    rs = temp / (datass[i:] * datass[i])
    r4.append(rs)
# 1 loop, best of 5: 44.7 s per loop

# CuPy custom version (on Tesla K80)
%%timeit -n 1 -r 5
ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = cupy_profiles - ms
datass = cp.sqrt(cp.sum(datam*datam, axis=1))
r5 = []
for i in range(cupy_profiles.shape[0]):
    temp = cp.dot(datam[i:], datam[i].T)
    rs = temp / (datass[i:] * datass[i])
    r5.append(rs)
# 1 loop, best of 5: 2.35 s per loop

GPUsは行列演算で大きくなります.そして、それはCuppyがこれまでの他の実装を上回る理由でありえました.このアプローチはまた、並列化されたCPUアプローチを外すことができ、フォローアップ記事で評価される.しかし、以下では、Numpyを使用したカスタム実装を使用しています.

データセットのチャンク
メモリ制約のためにすべてのペアリングを事前計算したくないので、アプローチは1行を取り、インデックスの下の他のすべての行との相関を計算し、インデックスをインクリメントします.この方法では、再帰的な比較を除外し、順序を無視して関数の可換性を使用します.第1のインデックスのための計算努力はそれから最後のものより高い方法です、そして、我々は完全なインデックス位置で裂くだけです.合計組み合わせ数のNCR関数が必要です.
import operator as op
def ncr(n: int, r: int) -> int:
    """
    Calculates the number of different, unordered combination 
    of r objects from a set of n objects.
    nCr(n,r) = nPr(n,r) / r!

    Args:
        n (int): Number of objects in set
        r (int): Number of objects to combine
    Returns:
        int: Number of combinations
    """
    r = min(r, n-r)
    numer = reduce(op.mul, range(n, n-r, -1), 1)
    denom = reduce(op.mul, range(1, r+1), 1)
    return numer // denom
Python 3.8以降、数学も使えます.櫛.次のコードは、インデックスを等しいサイズの部分に近いNSUNEチャンクに分割するインデックスを検索します.
n_chunks = 50 # Number of chunks
n = numpy_items.shape[0] # Number items
nr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings
# Minimum nr. of pairings per chunk
per_chunk = int(math.ceil(nr_pairings/(n_chunks - 1)))
split_indices = [] # Array containing the indices to split at
t = 0
for i in range(n + 1):
    # There are n - i pairings at index i
    t += n - i
    # If the current chunk has enough pairings 
    if t >= per_chunk:
        split_indices.append(i)
        t = 0
各チャンクでのペアリング数を確認できます.
s_indices = [0] + split_indices + [n]
pairings_chunks = []
for i in range(len(s_indices)-1):
    start = s_indices[i]
    end = s_indices[i + 1]
    pairings_chunks.append(sum(range(n - end, n - start)))

データセットの処理
まず、データセットの手段をあらかじめ計算します.データが読み取り専用の場合、グローバルスコープで定義できます.すべての子プロセスはそれからそれにアクセスすることができます、そして、それはコピーされません.
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))
最初のアイデアは、労働者からの結果を送るためには、専用のストレージワーカーにキューを介して.残念なことに、これはブロック化と直列化のためにかなりパフォーマンスを下げました.実装する最速のオプションは、作業者から直接ディスクに結果を書き込むことでした.
output_dir = "/var/tmp/"
def processor_fun(*, i_start: int, i_end: int) -> None:
    """
    Calculate correlation of rows in the range of i_start and i_end
    and the rows below these indices.
Args:
        i_start (int): Index of the start row
        i_end (int): Index of the end row
    Returns:
        None
    """    
    for i in range(i_start, i_end):
        temp = np.dot(datam[i:], datam[i].T)
        rs = temp / (datass[i:] * datass[i])
        rs = np.delete(rs, [0])
        # Create nparray that contains information about the indices
        res = np.zeros((rs.shape[0], 3))
        res[:, 0] = i
        res[:, 1] = list(range(i+1, i + rs.shape[0] + 1))
        res[:, 2] = rs
        # Save CSV file
        np.savetxt(f'{output_dir}/{i}.csv', res,
                   delimiter=',',
                   newline='\n', 
                   fmt=['%d','%d','%0.13f'])
# Create pool of worker processes which will carry out the
# computation
n_cpus = mp.cpu_count()
pool = mp.Pool(n_cpus)

# Start workers
s_indices = [0] + split_indices
workers = []
for i in range(0, len(s_indices)):
    start = s_indices[i]
    end = s_indices[i+1] if i < len(s_indices)-1 else n-1
    workers.append(pool.apply_async(
        processor_fun, 
        kwds={'i_start': start, 'i_end': end}))
for r in workers:
    r.wait()
# Close the pool and wait till all workers are done
pool.close()
pool.terminate()
pool.join()
我々のデータセットで、これは約4 TBの合計サイズで480 KのCSVファイルを作成します.我々はバッチし、gzipの結果を4 GBの部品に圧縮し、Google BigQueryにそれらをロードし、生産用に使用します.