pythonマルチプロセス実践

31941 ワード

トレーニングセットのサンプル数が少なすぎるため、同僚はネット上で550 W+の画像URLを登って、urlのtxtファイルだけを保存して500 Mになって、辛い問題が来て、もし1つのurlをダウンロードして1回保存するならば、いつまでこの550 Wの画像をすべてdownすることができますかを知りません.解決策は、マルチプロセスが同時にdownし、cpu、NIC、ハードディスクを並列に動作させるが、ディスクI/Oの効率が非常に遅いことを考慮して、一括を貯めて集中的に書き込むことである.ハードウェア構成I 7-7700 4コア8スレッド、32 Gメモリ、合計4時間30分程度かかる様子、urlタイムアウト設定1.5 s(最初は設定されておらず、いつもプロセスが詰まっていて、プログラムが実行できない!隠れた大きな穴ですね~)、合計510 W枚の画像をダウンロードします.次に、510 W枚の画像を重量除去し、サイズが100未満の画像をフィルタリングする(downの場合、このことを忘れ、めまいがする)、思想は同じであるが、重量除去の際に検索が必要であることを考慮すると、pythonのset()とdict()のkeyバックグラウンドは赤黒樹を採用して検索効率が高く、list()は線形構造であるべきであり、このような百万以上のデータ量の需要については考慮しない.510 W+の画像をフィルタリングし、合計3時間未満で7.1 Wの画像をフィルタリングし、効率は許容範囲内である.コードは細かく最適化されていません.そして、一度の小さなツールで、走ればいいので、少し乱れています.
============================================================get()はピクチャをダウンロードし,imagehashはピクチャのhashを計算して繰り返す
import datetime
import requests
from tqdm import tqdm
from PIL import Image
from multiprocessing import Process, Manager, Lock
import copy
import json
import time
import numpy as np
import imagehash
import shutil

sum = 0


def _save_img(url_str, fail_list):
    data_dir = r'D:\bdtb'
    nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    url_list = json.loads(url_str)
    guid = id(url_list)
    print(nowTime + ': process id {0} start save images...'.format(guid))
    img_list = []
    for index, url in enumerate(url_list):
        try:
            img = Image.open(requests.get(url, stream=True, timeout=1.5).raw)
            img = img.convert('RGB')
            img_list.append(img)
            if index % 2000 == 0:
                print('{0}: img_list length: {1}'.format(guid, len(img_list)))
                head = 'bdtb_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')
                img_copy = copy.copy(img_list)
                for i, img in enumerate(img_copy):
                    name = head + '_{0:05}.jpg'.format(i)
                    img.save(os.path.join(data_dir, name), quality=90)
                img_list.clear()
                nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                print('{0}: {1} have download {2} images.'.format(nowTime, guid, index))
        except Exception as e:
            fail_list.append(url)
            print('{0} have failed'.format(len(fail_list)))
            continue
    nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(nowTime + ': {0} save images success!'.format(guid))


def download():
    #   url    , 550W   
    block_num = 20000
    task = []
    process_list = []
    manger = Manager()
    fail_list = manger.list()
    nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('start: ' + nowTime)
    with open("url.txt") as f:
        for url in f:
            task.append(url.strip())
            sum += 1
            if sum % block_num == 0:
                task_copy = copy.copy(task)
                p = Process(target=_save_img, args=(json.dumps(task_copy), fail_list))
                process_list.append(p)
                task.clear()

    task_copy = copy.copy(task)
    p = Process(target=_save_img, args=(json.dumps(task_copy),))
    process_list.append(p)

    nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(nowTime + ': read file end!')
    print('start download images...')
    _start_process(process_list)
    print('{0} images failed!'.format(len(fail_list)))
    with open('faile.txt', 'w') as sf:
        for url in fail_list:
            sf.write(url + '
'
) print('success!') def _start_process(process_list): process_sum = len(process_list) start = 0 end = 0 for i in range(8, process_sum, 8): end = i start_time = time.clock() for p in process_list[start:end]: p.start() for p in process_list[start:end]: p.join() end_time = time.clock() print('{0}:{1} cost {2}s'.format(start, end, end_time - start_time)) start = end if end < process_sum: for p in process_list[end:process_sum]: p.start() for p in process_list[end:process_sum]: p.join() def _filter_process(path_list): start = time.clock() local_dic = {} guid = id(local_dic) for path in path_list: img = Image.open(path) min_size = np.min(img.size) if min_size > 99: hash_value = str(imagehash.phash(img)) local_dic[hash_value] = path print('{0}: receive {1} images, output {1} images'.format(guid, len(path_list), len(local_dic))) for key in local_dic.keys(): src_path = local_dic[key] des_path = src_path.replace('bdtb', 'bdtb_filter') shutil.copyfile(src_path, des_path) end = time.clock() print('{0}: process {1} images cost {2}s'.format(guid, len(path_list), end - start)) def filter(): # , manger = Manager() src_dir = r'E:\code\data\bdtb' branch_size = 20000 process_list = [] start = time.clock() path_list = manger.list([os.path.join(src_dir, name) for name in os.listdir(src_dir)]) end = time.clock() print('read file list cost: {0}s'.format(end - start)) start_index = 0 end_index = 0 for i in range(branch_size, len(path_list), branch_size): end_index = i p = Process(target=_filter_process, args=(path_list[start_index:end_index],)) process_list.append(p) start_index = end_index if end_index < len(path_list): p = Process(target=_filter_process, args=(path_list[end_index:],)) process_list.append(p) nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('{0}:start images process...!'.format(nowTime)) _start_process(process_list) nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('{0}:success!'.format(nowTime)) if __name__ == '__main__': filter()