みーのぺーじ

みーが趣味でやっているPCやソフトウェアについて.Python, Javascript, Processing, Unityなど.

Python で非同期処理の gRPC を使用する

gRPCが流行っているようなので,アプリケーションの開発に取り入れるために,Pythonで非同期処理に対応したgRPCを使用する方法について,検討してみます.

教科書を読む

gRPCは様々な言語に対応しており,多くのライブラリが開発されているため,汎用性が高い利点があります.逆に初学者にとっては情報がありすぎてPythonでの標準的な使用方法が分かりにくい状況です.

gRPCはGoogle社から生み出されたものなので,同社がどのように使用しているかを確認すれば参考になるだろうだと考えました.同社が管理しているプロジェクトのソースコードの中から,Pythonで実装され.非同期処理に対応しているライブラリがないかを検索したところ,Google APIs の中で,Cloud Tasks のクライアントが規模的にちょうど良さそうだと思われましたので,教科書のかわりとしてソースコードを読んでみました.

GitHub - googleapis/python-tasks

transports/grpc_asyncio.py を読む

python-tasks/grpc_asyncio.py at main · googleapis/python-tasks · GitHub

CloudTasksGrpcAsyncIOTransport class では最初に create_channel() が呼び出されており,ここで google.api_core. grpc_helpers_async.create_channel() で作成したchannel を利用するようです.

python-api-core/google/api_core/grpc_helpers_async.py を読む

python-api-core/grpc_helpers_async.py at main · googleapis/python-api-core · GitHub

grpc.aio.secure_channel(target, composite_credentials)関数を呼び出しています.

grpc/src/python/grpcio/grpc/aio/ を読む

grpc/src/python/grpcio/grpc/aio at master · grpc/grpc · GitHub

grpc.aio ライブラリを使用するのがよさそうだと分かりました.

gRPC AsyncIO API is the new version of gRPC Python whose architecture is tailored to AsyncIO. Underlying, it utilizes the same C-extension, gRPC C-Core, as existing stack, and it replaces all gRPC IO operations with methods provided by the AsyncIO library. *1

gRPC AsyncIO API は,gRPC Python の新しいバージョンであり,非同期処理に合わせた設計を採用しています.既存のスタックと同じC言語拡張である gRPC C-Core を使用して,gRPCの全ての IO 操作を非同期処理ライブラリが提供するメソッドに置き換えます. (拙訳: みー)

_channel.py で secure_channel() 関数が定義されており,Channel class が定義されています.このクラスのコンストラクタで,cygrpc.AioChannel() クラスが作成されます.これは GRPC Python Cython layer を参照しています.

教科書のまとめ

  • PythonでgRPCを非同期処理で使用するならば grpc.aio ライブラリ を使う.
  • このライブラリはCythonを利用して高速に動作するよう設計されている.
  • 非同期処理用のChannelクラスを作成して操作する.

専門用語の整理

gRPCは,クライアントからサーバーの関数を呼び出す仕組みです.

*2

クライアントとサーバー

リクエストする側がクライアントで,リクエストを受信してレスポンスする側がサーバーです.

スタブ (stub)

クライアントの中に存在し,サーバーにリクエストする部分のことをスタブと呼びます.まるでローカル環境の関数を呼び出すかのようにサーバーの機能が使えるのでスタブと命名されているだけであり,テストの文脈で登場するスタブと似ていますが,テストのスタブは機能の実装が限定的なのに対して,gRPCのスタブはサーバーに問い合わせた結果を返すことで機能するため,明確に別の用語です.

Protocol Buffers (Proto)

gRPCで用いられる構造データをシリアライズする仕組みです.

リクエストとレスポンス

HTTP/2 が用いられます.

チャンネル (Channel)

gRPC Channels provide the abstraction over which clients can communicate with servers. The client-side channel object can be constructed using little more than a DNS name. Channels encapsulate a range of functionality including name resolution, establishing a TCP connection (with retries and backoff) and TLS handshakes. *3

Channel はクライアントがサーバーと通信するための機能を抽象化します.クライアント側の channel は DNS の名前で構築されているに過ぎません.Channel は名前解決,TPC接続の確立と再試行・バックオフ,TLS ハンドシェイクを含む一連の機能をカプセル化します.(拙訳: みー)

Channel には CONNECTING, READY, TRANSIENT_FAILURE, IDLE, SHUTDOWN の 5 種類の状態があります.

unary と streaming

gRPCには4種類の通信方法があります.

  • Unary RPC
  • Server streaming RPC
  • Client streaming RPC
  • Bidirectional streaming RPC

最も簡単なのはUnary RPCです.1個のリクエストに対して1個のレスポンスで対応します.ストリーミングをサーバー,クライアント,両方で使用するかどうかにより残りの3個が定義されます.

簡単な proto を定義する

簡単な proto として,文字列に"Banana"が含まれるかどうかを判定する gRCP を作るための main.proto を以下のように定義しました.

main.proto

syntax = "proto3";

message SearchParam {
  string query = 1;
}
message SearchStatus {
  int32 status = 1;
}

service Search {
  rpc search(SearchParam) returns (SearchStatus) {}
}

これを buf で pythonスクリプトに変換します.main_pb2.py, main_pb2.pyi, main_pb2_grpc.py が生成されます.

サーバーを作成する

server.py

import asyncio
from grpc.aio import server

import main_pb2_grpc, main_pb2


class Search(main_pb2_grpc.SearchServicer):
    async def search(
        self, request: main_pb2.SearchParam, context
    ) -> main_pb2.SearchStatus:
        query = request.query
        if "banana" in query.lower():
            status = 1
        else:
            status = 0
        return main_pb2.SearchStatus(status=status)


host = "localhost:8100"


async def main():
    s = server()
    main_pb2_grpc.add_SearchServicer_to_server(Search(), s)
    bound_port = s.add_insecure_port(host)
    print(f"localhost:{bound_port}")
    await s.start()
    await s.wait_for_termination()


if __name__ == "__main__":
    asyncio.run(main())

文字列に"Banana"が含まれるかどうかを実際に判定するsearch()関数を定義したSearchクラスを用意し,gRPC server を main()関数で定義してSearchクラスを追加し,非同期処理として呼び出します.

クライアントを作成する

Cloud Tasks のクライアントのソースコードに戻ります.

python-tasks/grpc_asyncio.py at main · googleapis/python-tasks · GitHub

Channel を作成した後は,channel.unary_unary() 関数を使用してgRPCの関数を登録し,後で使えるようにself._stubs変数に追加しているだけです.まるでローカル関数を呼び出せるようにサーバーの機能が呼び出せるというgRPCのコンセプトが生きています.

client.py

import argparse
import asyncio
from typing import cast

#
import main_pb2
from grpc.aio import insecure_channel

target = "localhost:8100"


async def main(query: str):
    channel = insecure_channel(target)
    search = channel.unary_unary(
        "/Search/search",
        request_serializer=main_pb2.SearchParam.SerializeToString,
        response_deserializer=main_pb2.SearchStatus.FromString,
    )
    request = main_pb2.SearchParam(query=query)
    response = cast(main_pb2.SearchStatus, await search(request))
    print(f"status={response.status}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("query")
    args = parser.parse_args()
    asyncio.run(main(args.query))

クライアントには,文字列に"Banana"が含まれるかどうかを判定する部分はありませんが,引数から文字列を取得し,サーバーの関数を gRPC で呼び出して結果を表示する処理が記載されています.

channel を作成するときに指定するtargetは "localhost:8000" のように指定します."http://" などのスキーマを指定すると "DNS resolution failed" とエラーになるので注意してください.gRPC を使うのでスキーマの指定は不要です.

server.py を実行している状態にして,client.py を実行すると,以下のような出力が得られ,意図したとおりの動作をすることが確認できました.

サーバーの入出力

% python server.py
localhost:8100

クライアントの入出力

% python client.py Banana
status=1
% python client.py Apple 
status=0

gRPC の型と Python のアノテーション

せっかく main.proto で型定義をしているので,Python の型ヒントで活用したいです.

buf の "buf.build/protocolbuffers/python" プラグインで生成された pb2.py だけでは型が分からないため,"buf.build/protocolbuffers/pyi" プラグインを併用して,pb2.pyi も生成すると便利です.

Python のモジュールと gRPC の深いディレクトリ

Python では,ディレクトリに __init__.py を作成するとモジュールになりますが,gRPCで主流な深いディレクトリに出力するとインポートが大変です.以下の issue で相対インポートをどうするか議論されていますが,gRPCのディレクトリ構造は Python のモジュール構造に馴染まない印象を受けました.

python: use relative imports in generated modules · Issue #1491 · protocolbuffers/protobuf · GitHub

pb2_grpc.py には以下のようなインポート文が定義されているので,カレントディレクトリに保存してそのまま使用することが想定されているように思われました.

import main_pb2 as main__pb2

buf plugin と Connect

buf Connect が積極的に開発されているようですが, 2023/03/21現在は Python 用のライブラリは存在しないので,protocolbuffersのライブラリをプラグインとして使用するようです.

まとめ

ひとまず Python で非同期処理の gRPC が使用できるようになりました.製品に組み込むには,リトライ処理やタイムアウトを適切に設定した上で,負荷試験を実施する必要があるためさらなる検討が必要ですが,型情報をサーバーとクライアントで共有できるという gRPC のコンセプトはとても便利に感じましたので,積極的に使用していきたいです.