みーのぺーじ

みーが趣味でやっているPCやソフトウェアについて.Python, Javascript, Processing, Unityなど.

Pythonのmultiprocessingで色々な並列処理をする

Pythonのmultiprocessingを使って,並列処理を色々とやってみて,動作を確認してみました.

環境

  • CPU 3.2 GHz Quad-Core Intel Core i5
  • macOS 10.15.7
  • Python 3.8.6

4個のプロセスで関数を4回並列実行する

import multiprocessing
import time
import os


def run(i):
    print(f"start worker={i} pid={os.getpid()}")
    time.sleep(1)
    print(f"stop worker={i}")
    return i*2


def main():
    with multiprocessing.Pool(processes=4) as pool:
        r = pool.map(run, range(4))
        print(r)


if __name__ == "__main__":
    main()

引数iを2倍するのに1秒かかるrun()関数を,4個のプロセスで関数を4回並列実行します.

これを実行すると,以下のようになりました.

start worker=0 pid=13516
start worker=1 pid=13517
start worker=2 pid=13518
start worker=3 pid=13515
stop worker=0
stop worker=1
stop worker=2
stop worker=3
[0, 2, 4, 6]

4個のプロセスで関数を10回並列実行する

import multiprocessing
import time
import os


def run(i):
    print(f"start worker={i} pid={os.getpid()}")
    time.sleep(1)
    print(f"stop worker={i}")
    return i*2


def main():
    with multiprocessing.Pool(processes=4) as pool:
        r = pool.map(run, range(10)) # ←変更
        print(r)


if __name__ == "__main__":
    main()

これを実行すると,以下のようになりました.いくつかの関数は同じpidのプロセスで実行されています.デフォルトでは作成済みのプロセスを再利用するようになっているようです.

start worker=0 pid=13732
start worker=1 pid=13731
start worker=2 pid=13733
start worker=3 pid=13734
stop worker=0
start worker=4 pid=13732
stop worker=1
start worker=5 pid=13731
stop worker=2
start worker=6 pid=13733
stop worker=3
start worker=7 pid=13734
stop worker=4
start worker=8 pid=13732
stop worker=5
start worker=9 pid=13731
stop worker=6
stop worker=7
stop worker=8
stop worker=9
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

新しいプロセスで関数を10回並列実行する

プロセスを使いまわしたくない場合は,maxtasksperchildを指定します.

multiprocessing --- プロセスベースの並列処理 — Python 3.9.1 ドキュメント

import multiprocessing
import time
import os


def run(i):
    print(f"start worker={i} pid={os.getpid()}")
    time.sleep(1)
    print(f"stop worker={i}")
    return i*2


def main():
    with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool: # ←変更
        r = pool.map(run, range(10))
        print(r)


if __name__ == "__main__":
    main()

これを実行すると,以下のようになりました.pidが全て異なっています.

start worker=0 pid=13989
start worker=1 pid=13991
start worker=2 pid=13990
start worker=3 pid=13992
stop worker=0
stop worker=1
stop worker=2
stop worker=3
start worker=4 pid=13994
start worker=5 pid=13995
start worker=6 pid=13996
start worker=7 pid=13997
stop worker=4
stop worker=5
stop worker=6
stop worker=7
start worker=8 pid=14000
start worker=9 pid=14001
stop worker=8
stop worker=9
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Pythonソースファイルが読み込まれるタイミング

並列処理をするために,Pythonソースファイルは複数回読み込まれます.

import multiprocessing
import time
import os

print(f"loading. pid={os.getpid()}, __name__={__name__}") # ←変更


def run(i):
    time.sleep(1)
    return i*2


def main():
    with multiprocessing.Pool(processes=4) as pool:
        r = pool.map(run, range(10))
        print(r)


if __name__ == "__main__":
    main()

これを実行すると,以下のようになりました.

loading. pid=14824, __name__=__main__
loading. pid=14827, __name__=__mp_main__
loading. pid=14828, __name__=__mp_main__
loading. pid=14829, __name__=__mp_main__
loading. pid=14826, __name__=__mp_main__
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

maxtasksperchildを1に設定して実行すると,以下のようになりました.

import multiprocessing
import time
import os

print(f"loading. pid={os.getpid()}, __name__={__name__}")


def run(i):
    time.sleep(1)
    return i*2


def main():
    with multiprocessing.Pool(processes=4, maxtasksperchild=1) as pool: # ←変更
        r = pool.map(run, range(10))
        print(r)


if __name__ == "__main__":
    main()

これを実行すると,以下のようになりました.

loading. pid=14621, __name__=__main__
loading. pid=14623, __name__=__mp_main__
loading. pid=14624, __name__=__mp_main__
loading. pid=14625, __name__=__mp_main__
loading. pid=14626, __name__=__mp_main__
loading. pid=14631, __name__=__mp_main__
loading. pid=14633, __name__=__mp_main__
loading. pid=14634, __name__=__mp_main__
loading. pid=14635, __name__=__mp_main__
loading. pid=14642, __name__=__mp_main__
loading. pid=14643, __name__=__mp_main__
loading. pid=14644, __name__=__mp_main__
loading. pid=14645, __name__=__mp_main__
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

新しいプロセスが作成されるタイミングでソースファイルが読み込まれていることが分かります.

まとめ

multiprocessing.Pool() を用いることで,気軽に色々な並列処理ができることが分かりました.