何らかの時間がかかる処理を,非同期で実行するWebサービスを作成することを考えます.
しばしば用いられる方法の1つに,Workerプロセスに分離する方法があります.ユーザーのリクエストに対してレスポンスを即座に返すWebプロセスと,内部で実際の処理を実行するWorkerプロセスに分業させるわけです.このようにすることで,ユーザーがレスポンス待ちする状況が回避されるので,よいとされています.以下の記事に詳細が書かれています.
Worker Dynos, Background Jobs and Queueing | Heroku Dev Center
Herokuでは,Workerプロセスを実行するDynoが24時間稼働することを保証してくれるため,上記を実装しやすいです.たくさん課金すれば,自動でスケールさせることも可能のようです*1.しかし,必要でないときもDynoを稼働させておかなければならないというデメリットがあります.
HTTP/Sリクエストに対してレスポンスを返すまでしかインスタンスの動作が保証されないサービスを利用すれば,実際に必要なリソースのみに対して課金されるのでコストのメリットがあります.Google Cloud RunやGoogle Cloud Functionなどがあります.これらを利用してWorkerプロセスを実装をするときは,処理が終了するまでレスポンスを返却してはいけません.これをPythonで実験的に実装してみます.
要件
- HTTP/S リクエストをリッスンする.
- HTTP/S リクエストをトリガーとして,時間がかかる処理を実行する.
- 処理が終了したら,結果をレスポンスとして返す.
- 処理中でも別のHTTP/S リクエストを受け付ける.
実装
import datetime import multiprocessing import time from bottle import Bottle, run app = Bottle() app.process = None def task(): time.sleep(10) def start_task(): if app.process: return False app.process = multiprocessing.Process(target=task) app.process.start() app.process.join() app.process = None return True @app.route('/') def index(): dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') return f'index<br>{dt}' @app.route('/run') def run_task(): start_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') response = start_task() end_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') return f"task complete. <br>Status={response}<br>start={start_dt}<br>end={end_dt}" if __name__ == '__main__': run(app, host='localhost', port=8001)
HTTP/Sを処理するライブラリとして,PythonのBottleを使います.Bottleは小さくて軽量なWSGI Webフレームワークなので,今回の実装に適していると思います.
ブラウザーから/
にアクセスすると,現在時刻を返し,/run
にアクセスすると,multiprocessingを使ってtask()
関数を実行します.上記の例では10秒待つという処理を行っています.task()
関数の実行が完了したら,レスポンスを返すようにしています.
ブラウザーを3個開いて,1秒毎に/run
, /run
, /
にアクセスした結果は以下のようになりました.
3個のリクエストはすべて処理されレスポンスが正常に帰ってきました.開始時刻と終了時刻を見比べると,task()
関数は2回実行され,逐次的に処理されたことが分かります.2個目の/run
と3個目の/
の順番が入れ替わっています.どうやら,別の処理中に受け付けた複数のリクエストが処理される順番は入れ替わる場合があるようです.
Workerプロセスの雛形として使えそうです.