Python asyncio ベースの Web サーバーで実現する長期実行処理
こんにちは、PKSHA Communication ソフトウエアエンジニアの松下です。
私が担当しているプロダクト「PKSHA Voicebot」では、「アウトバウンドコール」という、長期実行処理が多数同時に発生しやすい機能を提供しています。
私たちのチームではこれを Python asyncio ベースの Web サーバー上で実装する少し変わった方法で実現・運用しており、この記事ではその設計や狙いについて共有していきたいと思います。少しニッチな話題とソリューションですが、同じような課題をお持ちの方の参考になれば幸いです。
PKSHA Voicebot とアウトバウンドコール機能
PKSHA Voicebot (以下 Voicebot)は、様々なお客様企業のコールセンターに入り込み、お問い合わせの電話をかけていただいた方(エンドユーザー)と “ことば” で対話する音声ボットです。
Voicebot では現場のオペレータに代わり、定型の問い合わせに自動対応するのはもちろん、人での対応が必要な場合にもエンドユーザーの意図を適切に把握してオペレーター・チャネルに振り分けることで、あらゆる入電に介在して価値を発揮します。
アウトバウンドコール機能
Voicebot では電話を受けるだけでなく、お客様の用意した発信先リストにもとづいて次々とこちらから電話をかける「アウトバウンドコール機能」を提供しています。アウトバウンドコール機能は主に、お客様企業が会員・エンドユーザー向けに未払い料金の督促や急ぎのご案内等を行う用途で活用いただいており、Voicebot で電話を受けた際と同様に自動音声シナリオを設定して、架電業務を代替することができます。
使い方は、電話が繋がった際の自動音声シナリオを選択し、発信先とシナリオ内で用いる変数を設定した CSV ファイルをアップロードすると、リストの先頭から架電処理を開始していく、といった形になります。電話は発信先のエンドユーザーの応答がなければ意味をなさないため、架電の開始〜終了時間帯を設定および予約する機能も付加されています。
アウトバウンドコール機能の要件
アウトバウンドコール機能を実現するにあたり特に考える必要のあった要件には、以下のようなものがあります。
契約や電話回線に由来する制限を守る
お問い合わせにリアルタイムで応える通常の入電応答とは異なり、電話発信は与えられたリストをどのようなタイミングで処理していくか、サービス側で決定する必要があります。
Voicebot では公平性の観点から、アウトバウンドコール機能におけるお客様毎の同時対話接続数を契約内容に基づいて定めており、こちらは月間のアウトバウンドご契約コール数に比例するような数となっています。また契約内容とは別に、Voicebot で利用する電話回線には通常、発信レート制限があり、多くは 1 秒あたり 1 発信等となっているため、この制限も顧客毎に遵守する必要があります。
許容された時間内で効率的に電話をかけ続ける
上記の制限とも関連しますが、例えば同時対話接続数が 6 であるお客様でいま 600 件の宛先に発信したい場合、1 コールが 1 分で完了するとしても、全ての電話をかけ終えるには最短 100 分もの時間を要します。こういった処理を顧客毎に同時に多数、基本的にはエンドユーザーの活動している日中に捌いていく必要があるのが、アウトバウンドコール機能の大きな特徴となります。
Starlette Background Tasks で動かす長期実行処理
Voicebot ではこの機能を、Python 製の asyncio ベースのWeb サーバーフレームワークである Starlette の Background Tasks で実装しています。
asyncio ベースのアプリケーションは Web サーバーであっても、見方によれば全体が多数のコルーチンの集合と考えることができます。 コルーチン達は互いに I/O 待ちを除いたあらゆる処理を譲り合いながら共存しており、CPU 時間を一人で長時間奪ってしまうような重い CPU バウンドの処理を含まないのであれば、こういった長期実行処理を多数並行で動かすようなワークロードも十分捌いていくことができます。 Voicebot では実際の電話シナリオの実行を含むほとんどの処理を API 経由で別のサービスに移譲しているため、アウトバウンドコール機能の長期実行処理にはどちらかといえば待ち時間が多く、この条件には合っていると言えるでしょう。
asyncio については、以下の記事もぜひ併せてご覧下さい。
2つの疑似プロセス:ジョブマネージャーと架電タスクブローカー
Voicebot では、発信先リストに含まれる 1 件の電話を「架電タスク」、一度のリスト登録に対応する架電タスクの集合を「ジョブ」と呼んでいます。 運用中の様子としては、あらゆる顧客が好きな時間に、いま電話発信したいリストをジョブとして登録・開始する形になります。また開始予約機能を用いて、今すぐには開始しないジョブを登録することもあります。
アウトバウンドコール機能の処理は、大まかに以下の 2 種類の疑似プロセス(実際には単に asyncio コルーチン)によって成り立っています。
ジョブマネージャー
1 分に 1 度定期起動し、全顧客の未完了のジョブを収集し、開始すべきものを開始、すべてのタスクが完了したジョブを終了するプロセス
ジョブの開始はすなわち顧客毎の架電タスクブローカーの起動であり、実際に電話発信を進める処理はそちらで実施される
▼コードのイメージ
class JobManagerProcess(object):
async def run(self) -> None:
# 未完扱いのジョブ、およびその中身の架電タスク数(未完のもの)を取得
queued_jobs: List[OutboundCallJob] = await self.job_repository.list_queued_jobs()
# 本当に未完のジョブ (まだ架電タスクの残っているジョブ) を集める
outstanding_jobs: List[OutboundCallJob] = []
for j in queued_jobs:
if (...): # 中身のタスクが全て完了していたら...
await self.job_repository.mark_as_completed(j.id)
continue
outstanding_jobs.append(j)
if not outstanding_jobs:
break
# 未完のジョブがあった顧客について、架電タスクブローカーを起動する
client_ids = set(j.client_id for j in outstanding_jobs)
for client_id in client_ids:
await self.task_broker_api.invoke(client_id=client_id)
架電タスクブローカー
ジョブマネージャーによって顧客ごとに起動され、その顧客の未完了の架電タスクを、同時対話接続数に配慮しながら開始(電話発信)していくプロセス
一度起動されると、所属ジョブに関わらず、その顧客のすべての未完了の架電タスクを処理し終えるまで長期的に動き続ける
実際の電話発信は API 経由で別のサービスに移譲される
▼コードのイメージ
class OutboundCallTaskBrokerProcess(object):
async def run(self, spec: OutboundCallTaskBrokerProcessSpec) -> None:
while True:
if (...): # この顧客で全ての架電タスクが処理されたら...
break
# 発信
await self._initiate_tasks(spec)
# 架電中の電話が減ることを期待して少し待機
await asyncio.sleep(spec.poll_interval_sec)
async def _initiate_tasks(self, spec: OutboundCallTaskBrokerProcessSpec) -> None:
# 顧客の同時対話接続数上限を取得
client_setting: Optional[OutboundCallClientSetting] = (
await self.client_setting_repository.get_by_client_id(spec.client_id))
# 今発信可能な架電数を、<契約上限> - <架電中のコールセッション数> の単純計算で求める
active_calls_count: int = (
await self.ongoing_calls_repository.count_active_calls(spec.client_id))
allowed_count = client_setting.concurrent_calls_limit - active_calls_count
if allowed_count <= 0:
return
# 今必要な分の架電タスクを取得して発信
outstanding_ids: List[int] = (
await self.task_repository.list_outstanding_task_ids(spec.client_id, allowed_count))
for task_id in outstanding_ids:
# 個別の架電で何かが起きても、できるだけ最後まで処理を進める
try:
await self.initiation_api.invoke(task_id)
except Exception as e:
await self.task_repository.mark_as_failed(task_id, e)
図にすると、以下のような形になります。
これらの疑似プロセスはそれぞれが対応する Starlette の API エンドポイントを持ち、単純な POST リクエストで起動できる形になっています。 ジョブマネージャーは同じ Web サーバーの架電タスクブローカーを起動しますが、こちらも API 経由で実施されます。
# ジョブマネージャーに対応 / 1分に1度の API 呼び出しで起動
async def invoke_batch_call_job_manager(_):
task = BackgroundTask(start_batch_call_job_manager)
return JSONResponse(dict(message='accepted!'), background=task)
# 架電タスクブローカーに対応 / ジョブマネージャーが API 経由で起動
async def invoke_call_task_broker(req):
task = BackgroundTask(start_outbound_call_v2_task_broker, ...)
return JSONResponse(dict(message='accepted!'), background=task)
routes = [
Route('/....../batch_call_job_manager/invoke', endpoint=invoke_batch_call_job_manager, methods=['POST']),
Route('/....../call_task_broker/invoke', endpoint=invoke_call_task_broker, methods=['POST']),
]
app = Starlette(routes=routes, ...)
上記の設計の留意事項として、各プロセス(ジョブマネージャー、顧客毎の架電タスクブローカー)は多重起動を避ける必要がある点があります。この機能では、プロセスに対応する排他ロック用のレコードを MySQL で作成することで、重複を回避しつつプロセスの起動状況を確認できるようになっています(詳細は記事末尾の補足に記載)。
この仕組みにより、各プロセスに関連する実装を以下のように単純化することができます。
ジョブマネージャー
定期呼び出しによる起動は At-least Once 程度の粗さで問題なくなる架電タスクブローカー
ある顧客のプロセスが既に実行中か?といったことを考慮せずに起動を試みても良くなる
設計判断と運用性
この設計の一番の利点は、機能全体が「手のひらに収まるアーキテクチャ」で運用できるという点です。 アウトバウンドコール機能はその性質だけ見れば、顧客毎のワークロードを本物の “プロセス” として実装、コンテナ化して信頼できるクラスタ上で動かす、といった設計が合っているようにも見え、それによって一部の実装考慮を不要にすることもできます。しかしその場合、この機能に必要な論理的な実行制御はインフラレイヤーにまで及んでしまい、見守るべきコンポーネントも増え、更にローカル環境ではどうしても本番環境と同等の仕組みを動作させることは難しいか、かなり大掛かりなものになってしまいます。
Voicebot は比較的少人数のチームで、かつ主に Ruby on Rails を中心とした開発スタイルを取っており、集約されたコードベースとコンポーネント、データベースを中心としたシステム運用といったところでこそ最大限の生産性を発揮できます。この設計にはいくつか限界もありますが、機能のほぼ全ての過程が Web サーバーという馴染み深く完結したコンポーネントに載り、またローカル環境でも本番環境と全く同じように動作します。各プロセスの実行状況はデータベース上のレコードとして、実行記録はサーバーアクセスログとして表現されることで、運用も Web サーバーと似たような形に近づきます。 少人数で可観測性・運用性を担保できることを目指した結果、私たちのチームに合っていそうなのがこのアーキテクチャだったのです。
安全な終了処理
Starlette は ASGI 以降を担当するフレームワークですので、このフロントには Web サーバー実装があり、Voicebot では Uvicorn を利用しています。
Uvicorn はシャットダウンの際、Starlette の Background Tasks を含めたすべてのタスクの終了を待機します。 それは都合が良いのですが、「架電タスクブローカー」について考えると、デプロイの際に問題になり得ます。例えば今からデプロイをしようとするとき、現在実行中の顧客について完了するまであと 2 時間を要するジョブがあった場合、終わるまでデプロイも保留状態となってしまいます。
夜間リリースや ”メンテナンス状態” を導入してジョブのない時間帯を作るといったことも考えられますが、不便ですし、そもそも日中のデプロイができないということは日中の障害にも耐えられないということを意味するため、ここではこの課題に真面目に向き合うこととしました。
asyncio.Event を用いたシャットダウン通知
asyncio.Event は asyncio に標準で組み込まれた同期要素で、asyncio.Event#set() することで asyncio コルーチンにそのタイミングを通知することができます。
具体的には、通知される側は asyncio.Event#wait() したりasyncio.Event#is_set() を確認したりすることで状況を知ることができ、架電タスクブローカーでは以下のように、発信処理ループを続けるべきか中断すべきかを判断するのに利用しています。
class OutboundCallTaskBrokerProcess(object):
async def run(self, spec: OutboundCallTaskBrokerProcessSpec) -> None:
while True:
# 発信処理ループ
.....
# ポーリング間隔待機 + シャットダウン通知の確認
should_continue = (
await self._should_continue_after_interval(spec.poll_interval_sec))
if not should_continue:
break
async def _should_continue_after_interval(self, interval_sec: int) -> bool:
if self.ev_shutdown.is_set():
return False
try:
await asyncio.wait_for(self.ev_shutdown.wait(), timeout=interval_sec)
except asyncio.TimeoutError:
return True
else:
return False
一方で通知する側は、シャットダウンシグナルを検出したら asyncio.Event#set() するという形になります。Uvicorn には、問答無用でユーザーが設定したシグナルハンドラを削除(※)し、終了シグナルを奪ってしまう挙動があるため、以下のようにモンキーパッチで対応しています。
※ 削除したユーザーのシグナルハンドラは Uvicorn 自身によって復旧されますが、これはタスクの終了待機後なので今回のユースケースには合いませんでした。
ev_shutdown = asyncio.Event()
_original_handle_exit = uvicorn.Server.handle_exit
def _handle_exit(*args, **kwargs):
ev_shutdown.set()
_original_handle_exit(*args, **kwargs)
uvicorn.Server.handle_exit = _handle_exit
server = uvicorn.Server(...)
asyncio.run(server.serve())
以上の機構下におけるデプロイ時の挙動は、プロセス排他制御の仕組みと上手く噛み合うことで、以下のようになめらかなものとなっています。
サーバーのシャットダウンに伴い、実行中のジョブがある顧客 A の「架電タスクブローカー」がシャットダウン通知を受け途中終了
定期起動された「ジョブマネージャー」が顧客 A の未完のジョブを認識、顧客 A の「架電タスクブローカー」を起動(「ジョブマネージャー」は通常と同じ動作をしているだけ)
(最初とは別のサーバーで) 顧客 A の架電タスクブローカーが起動、ジョブを途中から引き継ぎ
まとめ
この記事では、長時間にわたって実行される “アウトバウンドコール機能” のバックエンドを、asyncio ベースの Web サーバーのみでコンパクトに実現する設計を紹介しました。
Web サーバーのみではこの機能の全ての課題を解決できる訳ではありませんが、失敗を前提とした排他制御の仕組み、asyncio の同期プリミティブを用いた実行制御によるサポートで、シンプルさを維持しつつ堅牢なシステムにすることができました。
補足:MySQL のレコードによるプロセス排他制御
排他制御が必要なプロセスについて、以下のようなレコードを開始時に INSERT、終了時に DELETE するような仕組みとなっています。
craete table process_locks (
-- 排他ロックを取りたいプロセスを、以下2つの識別カラムで表現
-- process_name はプロセスの種別、lock_owner には顧客等が入る想定
process_name varchar, lock_owner varchar,
-- 同じ識別子の別のプロセスが勝手にロックを解除しないようにするための解除キー
unlock_key varchar,
-- 失効時刻: この時刻を過ぎていた場合、ロックは削除/上書きして問題ない
-- プロセスがロックを解放しないまま終了した場合の安全機構 (デプロイ時にも使われる)
expires_at datetime
)
このレコードをトランザクション内で、以下のように操作します:
プロセス開始時(ロックの取得)
ランダムな unlock_key を生成、適切な expires_at を決め、識別カラムでのレコードの有無を確認、なければ INSERT する(SELECT-INSERT の 1 ステートメントで行う必要がある)
ロックの延長
expires_at を過ぎる前に、開始時と同様の確認をしながら expires_at を更新する(この際は識別カラム + unlock_key の一致も確認が必要)
プロセス終了時(ロックの開放)
識別カラム + unlock_key が一致するレコードを削除
ロックの失効時間を保持するこの方法では排他制御と同時に、意図せぬサーバーダウン等で正常にプロセスが終了しなかった場合にも、被害を最小限に抑えつつ残りの仕事を再開できるよう工夫されています。
おわりに
この記事では、 “アウトバウンドコール機能” の少し変わった設計をご紹介しましたが、どう感じられたでしょうか?まだまだ課題はありますし、私たちも決してこれが究極の形だと思っている訳ではありません。 最適なアーキテクチャは事業状況とチームによって常に変化するものです。 PKSHA では日々、ユーザーの課題解決を進捗させるには何をすべきか、そのためにはどのようなチームでソフトウエアを進化させていくか、そのチームに適したアーキテクチャはどんなものか、といったことを考え続けています。この未来を共ににつくっていきたいと思っていただけた方は、ぜひ一度お声がけください。一緒に働けることを楽しみにしています!
―INFORMATION―
▼ 中途採用:ソフトウエアエンジニアの募集要項はこちら
▼ 新卒採用:ソフトウエアエンジニア 本選考
▼ PKSHA 採用サイト
▼ Wantedly はこちら:カジュアル面談も受付中です!