concurrent.futuresによる非同期並行処理


単純な数値演算を、シーケンシャルに実行した場合と、別途非同期で並行して実行した場合の差異の確認

ykishi@dezembro concurrent.futures-test % cat test2.py 
#!/usr/bin/env python3

import concurrent.futures
import urllib.request
import datetime

TOKENS = ["a", "b", "c", "d", "e"]


def calc(token):
  # -------------------------------------
  #  Execute some simple calculation
  # -------------------------------------

  total = 0
  for i in range(100):
    print(token, i)
    total += i

  return token, total


if __name__ == "__main__":
  # =========================================
  # Execute sequentially
  # =========================================
  start_time = datetime.datetime.today()
  for token in TOKENS:
    print(calc(token))
  time_seq = datetime.datetime.today() - start_time

  # =========================================
  # Execute asynchonously in parallel
  # =========================================
  start_time = datetime.datetime.today()
  with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_calc = {executor.submit(calc, token): token for token in TOKENS}
    for future in concurrent.futures.as_completed(future_to_calc):
      token = future_to_calc[future]
      try:
        data = future.result()
      except Exception as ex:
        print("%r generated an exception: %s" % (token, ex))
      else:
        print("%r:  %s" % (token, data))

  time_async_in_parallel = datetime.datetime.today() - start_time

  # ======================================
  # Print the diff of latency
  # ======================================
  print()
  print(time_seq)
  print()
  print(time_async_in_parallel)

ykishi@dezembro concurrent.futures-test % ./test2.py| tail -n 50             
e 56
e 57
e 58
e 59
e 60
e 61
e 62
e 63
e 64
e 65
e 66
e 67
e 68
e 69
e 70
e 71
e 72
e 73
e 74
e 75
e 76
e 77
e 78
e 79
e 80
e 81
e 82
e 83
e 84
e 85
e 86
e 87
e 88
e 89
e 90
e 91
e 92
e 93
e 94
e 95
e 96
e 97
e 98
e 99
'd':  ('d', 4950)
'e':  ('e', 4950)

0:00:00.000254

0:00:00.001977
ykishi@dezembro concurrent.futures-test % 

このケースでは、逐次処理よりも並行処理の方が遅い。
これは足し算自体の処理量は同等だが、それ位以上に複数スレッドの切り替え(コンテキストスイッチ)の方の所要時間における割合が支配的であるからだと思われる。

ネットワークI/Oなどを含めた場合

一方で、ネットワーク処理などを含めた場合は、処理の多重度が大きい方が全体のパフォーマンスが向上するはずである(なぜならコンテキストスイッチ以上にネットワークレイテンシの方が明らかに所要時間が大であるからだ)。
以下を試してみたところ、期待通りの効果が得られた。

ykishi@dezembro concurrent.futures-test % cat test3.py 
#!/usr/bin/env python3

import concurrent.futures
import urllib.request
import datetime

TOKENS = ["a", "b", "c", "d", "e"]


def calc(token):
  # -------------------------------------
  #  Execute some simple calculation
  # -------------------------------------
  total = 0
  for i in range(100):
    # print(token, i)
    total += i

  # ------------------------------------------------
  #  Handle HTTP request/response with some URLs
  # ------------------------------------------------
  urls = [
      "http://www.foxnews.com/",
      "http://www.oracle.com/",
      "http://www.microsoft.com/",
  ]
  for url in urls:
    try:
      response = urllib.request.urlopen(url)
      content = response.read()
      response.close()
    except Exception as ex:
      print("%r generated an exception: %s" % (url, ex))
    else:
      print("%r:  %s" % (token, len(content)))

  return token, total


if __name__ == "__main__":
  # =========================================
  # Execute sequentially
  # =========================================
  print("...START...")
  start_time = datetime.datetime.today()
  for token in TOKENS:
    print(calc(token))
  time_seq = datetime.datetime.today() - start_time
  print("...END...")

  # =========================================
  # Execute asynchonously in parallel
  # =========================================
  print("...START...")
  start_time = datetime.datetime.today()
  with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_calc = {executor.submit(calc, token): token for token in TOKENS}
    for future in concurrent.futures.as_completed(future_to_calc):
      token = future_to_calc[future]
      try:
        data = future.result()
      except Exception as ex:
        print("%r generated an exception: %s" % (token, ex))
      else:
        print("%r:  %s" % (token, data))

  time_async_in_parallel = datetime.datetime.today() - start_time
  print("...END...")

  # ======================================
  # Print the diff of latency
  # ======================================
  print()
  print(time_seq)
  print()
  print(time_async_in_parallel)

ykishi@dezembro concurrent.futures-test % ./test3.py 
...START...
'a':  287515
'a':  92352
'a':  316
('a', 4950)
'b':  287515
'b':  92352
'b':  317
('b', 4950)
'c':  287515
'c':  92352
'c':  317
('c', 4950)
'd':  287515
'd':  92352
'd':  317
('d', 4950)
'e':  287515
'e':  92352
'e':  317
('e', 4950)
...END...
...START...
'e':  287515
'b':  287515
'a':  287515
'd':  287515
'c':  287515
'e':  92352
'b':  92352
'a':  92352
'e':  317
'e':  ('e', 4950)
'd':  92352
'c':  92352
'b':  316
'b':  ('b', 4950)
'a':  317
'a':  ('a', 4950)
'd':  316
'd':  ('d', 4950)
'c':  317
'c':  ('c', 4950)
...END...

0:00:01.751963

0:00:00.533492
ykishi@dezembro concurrent.futures-test % 

結論

よって非同期並行処理で、以下の処理を実行すれば、相応の恩恵が得られるはずである。

  • 国コードの取得
  • NETNAMEの取得
  • IPアドレスからの逆引き(FQDN取得)
  • ベースドメイン取得