Pythonのmultiprocessingを使って,並列処理を色々とやってみて,動作を確認してみました.
環境
- CPU 3.2 GHz Quad-Core Intel Core i5
- macOS 10.15.7
- Python 3.8.6
4個のプロセスで関数を4回並列実行する
import multiprocessing import time import os def run(i): print(f"start worker={i} pid={os.getpid()}") time.sleep(1) print(f"stop worker={i}") return i*2 def main(): with multiprocessing.Pool(processes=4) as pool: r = pool.map(run, range(4)) print(r) if __name__ == "__main__": main()
引数i
を2倍するのに1秒かかるrun()
関数を,4個のプロセスで関数を4回並列実行します.
これを実行すると,以下のようになりました.
start worker=0 pid=13516 start worker=1 pid=13517 start worker=2 pid=13518 start worker=3 pid=13515 stop worker=0 stop worker=1 stop worker=2 stop worker=3 [0, 2, 4, 6]
4個のプロセスで関数を10回並列実行する
import multiprocessing import time import os def run(i): print(f"start worker={i} pid={os.getpid()}") time.sleep(1) print(f"stop worker={i}") return i*2 def main(): with multiprocessing.Pool(processes=4) as pool: r = pool.map(run, range(10)) # ←変更 print(r) if __name__ == "__main__": main()
これを実行すると,以下のようになりました.いくつかの関数は同じpidのプロセスで実行されています.デフォルトでは作成済みのプロセスを再利用するようになっているようです.
start worker=0 pid=13732 start worker=1 pid=13731 start worker=2 pid=13733 start worker=3 pid=13734 stop worker=0 start worker=4 pid=13732 stop worker=1 start worker=5 pid=13731 stop worker=2 start worker=6 pid=13733 stop worker=3 start worker=7 pid=13734 stop worker=4 start worker=8 pid=13732 stop worker=5 start worker=9 pid=13731 stop worker=6 stop worker=7 stop worker=8 stop worker=9 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
新しいプロセスで関数を10回並列実行する
プロセスを使いまわしたくない場合は,maxtasksperchild
を指定します.
multiprocessing --- プロセスベースの並列処理 — Python 3.9.1 ドキュメント
import multiprocessing import time import os def run(i): print(f"start worker={i} pid={os.getpid()}") time.sleep(1) print(f"stop worker={i}") return i*2 def main(): with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool: # ←変更 r = pool.map(run, range(10)) print(r) if __name__ == "__main__": main()
これを実行すると,以下のようになりました.pidが全て異なっています.
start worker=0 pid=13989 start worker=1 pid=13991 start worker=2 pid=13990 start worker=3 pid=13992 stop worker=0 stop worker=1 stop worker=2 stop worker=3 start worker=4 pid=13994 start worker=5 pid=13995 start worker=6 pid=13996 start worker=7 pid=13997 stop worker=4 stop worker=5 stop worker=6 stop worker=7 start worker=8 pid=14000 start worker=9 pid=14001 stop worker=8 stop worker=9 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Pythonソースファイルが読み込まれるタイミング
並列処理をするために,Pythonソースファイルは複数回読み込まれます.
import multiprocessing import time import os print(f"loading. pid={os.getpid()}, __name__={__name__}") # ←変更 def run(i): time.sleep(1) return i*2 def main(): with multiprocessing.Pool(processes=4) as pool: r = pool.map(run, range(10)) print(r) if __name__ == "__main__": main()
これを実行すると,以下のようになりました.
loading. pid=14824, __name__=__main__ loading. pid=14827, __name__=__mp_main__ loading. pid=14828, __name__=__mp_main__ loading. pid=14829, __name__=__mp_main__ loading. pid=14826, __name__=__mp_main__ [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
maxtasksperchild
を1に設定して実行すると,以下のようになりました.
import multiprocessing import time import os print(f"loading. pid={os.getpid()}, __name__={__name__}") def run(i): time.sleep(1) return i*2 def main(): with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool: # ←変更 r = pool.map(run, range(10)) print(r) if __name__ == "__main__": main()
これを実行すると,以下のようになりました.
loading. pid=14621, __name__=__main__ loading. pid=14623, __name__=__mp_main__ loading. pid=14624, __name__=__mp_main__ loading. pid=14625, __name__=__mp_main__ loading. pid=14626, __name__=__mp_main__ loading. pid=14631, __name__=__mp_main__ loading. pid=14633, __name__=__mp_main__ loading. pid=14634, __name__=__mp_main__ loading. pid=14635, __name__=__mp_main__ loading. pid=14642, __name__=__mp_main__ loading. pid=14643, __name__=__mp_main__ loading. pid=14644, __name__=__mp_main__ loading. pid=14645, __name__=__mp_main__ [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
新しいプロセスが作成されるタイミングでソースファイルが読み込まれていることが分かります.
まとめ
multiprocessing.Pool()
を用いることで,気軽に色々な並列処理ができることが分かりました.