2022年10月: Python 3.11新機能asyncio.TaskGroup を使った予測可能でより安全な非同期処理(fukuda)

こんにちは福田(@JunyaFff)です。 第4回目である10月の「Python Monthly Topics」は、2022年10月24日にリリース予定[1]のPython 3.11に追加されるasyncioの新機能 asyncio.TaskGroup を紹介します。

Python 3.11の新機能

高速化が話題のPython 3.11ですが、今回取り上げるのは非同期I/Oで並行処理を実現する標準ライブラリasyncioの新機能 asyncio.TaskGroup です。 asyncio.TaskGroup は複数のタスクを並行処理する高レベルAPIになります。 同様の既存機能(asyncio.gather()asyncio.wait())と大きく違う2つの特徴があります。

  • 同時に発生した例外の捕捉

  • 例外発生時のタスクのキャンセル

この特徴によって、発生した例外を漏らさずに記録でき、制御しやすい並行処理の実装がより簡単にできるようになりました。私個人としては、かゆいところに手が届くイチオシの新機能です。[2]

asyncioを利用していてすでに例外やタスクのキャンセルを意識されている方も、あまりasyncioを使っておらず例外やキャンセルを意識されていない方も、この記事を通してPython 3.11の asyncio.TaskGroup を試してみよう!と思っていただけると幸いです。

なおこの記事は、Python 3.10.6、Python 3.11.0rc2で動作確認をしています。

asyncio.TaskGroup の基本の書き方

公式ドキュメントにあるサンプルコードを見てみましょう。 asyncio.TaskGroup はコンテキストマネージャーを利用し、その中で複数のタスクを同時に実行します。 サンプルコードでは some_coro()another_coro() という非同期関数(中身の実装はありません)を同時に実行しています。コンテキストマネージャーを抜けるタイミングでタスクが完了しています。

asyncio.TaskGroup のサンプルコード

import asyncio

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")

引用元: コルーチンとTask — Python 3.11 documentation

しかし、複数のタスクを同時に処理するのであれば、既存機能の asyncio.gather()asyncio.wait() でも同様の書き方ができます。

asyncio.TaskGroup のサンプルコードを asyncio.gather() で書き換え

import asyncio

async def main():
    await asyncio.gather(
      some_coro(...),
      another_coro(...)
    )
    print("Both tasks have completed now.")

違いは 例外 の扱いにあります。まずは、「同時に発生した例外の捕捉」について詳しく見てみましょう。

同時に発生した例外の捕捉

ここでは同時に発生した例外処理をPython 3.10の asyncio.gather() と、 Python 3.11の asyncio.TaskGroup で、それぞれ比較してみましょう。

Python 3.10以前のasyncioの例外処理

Python 3.10以前の例外処理について見てみましょう。いくつか方法がありますが、ここでは asyncio.gather() の利用例を紹介します。 asyncio.gather() では最初の例外を送出するか、すべてのタスクを実行した後に結果をリストで取得するしかありませんでした。

  • asyncio.gather() での例外処理

    • return_exceptions 指定なし: 最初の例外を送出

    • return_exceptions=True 指定あり: すべてのタスクの完了を待ち、例外を含む結果をリストで取得

return_exceptions 指定なしの場合、try-exceptで囲い捕捉します。 return_exceptions=True の場合、戻り値のリストをチェックする必要があります。

実際にコードにして動きを見てみましょう。同時に実行するための簡易な非同期関数を3つ用意します。うち2つは、例外を送出します。 まずは return_exceptions を指定せずに asyncio.gather() で動かしてみます。

asyncio.gather() で最初の例外を送出

import asyncio

async def coro_success():
    return "成功"

async def coro_value_err():
    raise ValueError

async def coro_type_err():
    raise TypeError

async def main():
    """return_exceptionsなし"""
    try:
        results = await asyncio.gather(
            coro_success(), coro_value_err(), coro_type_err()
        )
        print(f"{results=}")
    except BaseException as err:
        print(f"{err=}")

asyncio.run(main())

実行すると次のような結果になります。最初の例外のみを捕捉できます。 同時に発生した coro_type_err() のTypeErrorは捕捉できません。

$ python3.10 asyncio_gather_except.py
err=ValueError()

次に asyncio.gather()return_exceptions=True を指定します。 return_exceptions=True はすべてのタスクを実行し、例外を含むすべてのタスクの結果をリストで取得します。

async def main():
    """return_exceptions=Trueあり"""
    results = await asyncio.gather(
        coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
    print(f"{results=}")

これを実行すると、次のようになります。

$ python3.10 asyncio_gather_except.py
results=['成功', ValueError(), TypeError()]

この場合、例外を処理するには、戻り値のリストをチェックする必要があります。

async def main():
    """return_exceptions=Trueあり"""
    results = await asyncio.gather(
        coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
    print(f"{results=}")

    for result in results:
        match result:
            case ValueError():
                print("ValueError")
            case TypeError():
                print("TypeError")

asyncio.gather() ではすべてのタスクを実行しないと、すべての例外を捕捉できないことが分かります。 たとえば、10個のHTTPリクエストを同時に行う処理があるとします。最初のタスクに例外が発生した場合、残りの失敗するかもしれない9つが完了するまで待つことになります(結果10個すべてが同じエラーになったとしてもです)。

asyncio.gather() での例外処理には、以下の2つの課題があります。

  • asyncio.gather() の例外処理での課題

    • return_exceptions 指定なし: 同時に発生した例外を1つしか捕捉できない

    • return_exceptions=True 指定あり: 戻り値のリストをチェックする必要がある。またすべてのタスクを実行しなければならない

これらの課題を解決し読みやすく書けるのが asyncio.TaskGroup です。

asyncio.TaskGroup による新しい例外処理の書き方

asyncio.TaskGroup は、 Python 3.11で追加された新しい構文 except* を利用することで、逐次関数の例外処理と同様に書けます。 見慣れない except* については後述します。

では asyncio.TaskGroup で同時に発生する例外を捕捉してみましょう。 先ほどの asyncio.gather() の例を asyncio.TaskGroup で書き換えてみましょう。

なお except* で捕捉した場合、 err はExceptionGroupオブジェクトになります。実際に発生した例外は、exceptions属性にタプルで格納されています。

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_success())
            task2 = tg.create_task(coro_value_err())
            task3 = tg.create_task(coro_type_err())
        results = [task1.result(), task2.result(), task3.result()]
    except* ValueError as err:
        print(f"{err.exceptions=}")
    except* TypeError as err:
        print(f"{err.exceptions=}")

実行すると次のような結果になります。複数のタスクから同時に発生する例外を捕捉できました。

$ python3.11 asyncio_taskgroup_except.py
err.exceptions=(ValueError(),)
err.exceptions=(TypeError(),)

同期処理を書くように、直感的に実装できるようになっています。

タスクのキャンセル

さて、 asyncio.TaskGroup のもう1つの特徴である、タスクのキャンセルについて見てみましょう。 Python 3.10の asyncio.gather() と、 Python 3.11の asyncio.TaskGroup をそれぞれ比較してみましょう。

タスクがキャンセルされずに残ってしまうケース

asyncioで複数のタスクを同時に処理する際に例外が送出されると、完了していないタスクが残り意図せず動作してしまっている場合があります。 asyncio.gather() を利用しタスクが残ることを確認してみましょう。

非同期関数の coro_long() を追加しています。 asyncio.sleep() を利用して、出力をします。

それぞれのタスクの状態を、Taskオブジェクトのインスタンス属性の _state で確認します。 _state は「PENDING」「FINISHED」「CANCELLED」が設定され、作られた時点で「PENDING」、終了が「FINISHED」、そしてキャンセル完了の「CANCELLED」を示します。

import asyncio

async def coro_success():
    return "成功"

async def coro_value_err():
    raise ValueError

async def coro_long():
    await asyncio.sleep(1)
    print("完了していないタスクが出力しています")
    return "成功?"

async def main():
    try:
        task1 = asyncio.create_task(coro_success())
        task2 = asyncio.create_task(coro_value_err())
        task3 = asyncio.create_task(coro_long(), name="残るコルーチン")  # 分かりやすくするためタスクに名づけ

        results = await asyncio.gather(*[task1, task2, task3])
        print(f"{results=}")
    except ValueError as err:
        print(f"{err=}")

    print(f"タスク1の状態 {task1._state=}")
    print(f"タスク2の状態 {task2._state=}")
    print(f"タスク3の状態 {task3._state=}")
    await asyncio.sleep(1.5)  # 1.5秒待つことでException発生後にcoro_log()が動いていることを確認できる

asyncio.run(main())

実行すると次のような結果になります。追加した非同期関数の coro_long()PENDING の状態で残っており、 "完了していないタスクが出力しています" が出力されていることから、例外発生後に coro_long() が動作しているとわかります。

$ python3.10 asyncio_gather_cancel.py
err=ValueError()
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='PENDING'
完了していないタスクが出力しています

このように例外が発生すると、タスクが残ってしまう場合があります。 タスクが残ってしまうと、意図せず後続処理の間に動作が完了したり、実装によってはI/Oのコネクションが閉じられないというような可能性が残り、処理の予測が困難です。

Python 3.10以前のタスクのキャンセル

上記のサンプルコードを修正し、タスクのキャンセル処理を追加します。 修正のポイントは次の3つです。

  • キャンセルされるタスク(非同期関数)に asyncio.CancelledError の捕捉と再送出 [3] を追加する

  • 完了していないタスクに対しキャンセルを指示する

  • キャンセルの完了を待つ

まず、キャンセルする非同期関数の coro_long()asyncio.CancelledError の捕捉とキャンセル時の処理を追加します。[4]
次に asyncio.gather() を実行している main() にタスクの完了を判定する Task.done() と、タスクのキャンセルを指示する Task.cancel() を追加します。 最後にキャンセル処理の完了を待つ必要があるため、ここでは asyncio.sleep() を利用し完了を待ちます。


async def coro_long():
    try:
        await asyncio.sleep(1)
        print("完了していないタスクが出力しています")
        return "成功?"
    except asyncio.CancelledError as err:  # キャンセル処理を追加
        print("キャンセルされたタスクが出力しています")
        raise asyncio.CancelledError  # 再送出する

async def main():
    try:
        task1 = asyncio.create_task(coro_success())
        task2 = asyncio.create_task(coro_value_err())
        task3 = asyncio.create_task(coro_long(), name="残るコルーチン")  # 分かりやすくするためタスクに名づけ

        results = await asyncio.gather(*[task1, task2, task3])
        print(f"{results=}")
    except ValueError as err:
        print(f"{err=}")
    print(f"タスク1の状態 {task1._state=}")
    print(f"タスク2の状態 {task2._state=}")
    print(f"タスク3の状態 {task3._state=}")  # この時点ではキャンセルされていない
    for task in [task1, task2, task3]:
        if task.done() is False:
            task.cancel()  # 未完了のタスクをキャンセル
    print(f"タスク3の状態 {task3._state=}")  # この時点ではキャンセル依頼されただけでキャンセルされていない
    await asyncio.sleep(1)  # キャンセル処理完了を待つ必要がある
    print(f"タスク3の状態 {task3._state=}")  # CancelledError内の処理が完了後、キャンセルになる

実行すると次のような結果になります。 "完了していないタスクが出力しています" が出力されず、代わりに "キャンセルされたタスクが出力しています" が出力されます。 また、タスクの状態も task.cancel() 実行直後は「PENDING」ですが、 asyncio.sleep() を利用しキャンセル処理の完了を待つと「CANCELLED」になり、キャンセルされていることがわかります。

$ python3.10 asyncio_gather_cancel.py
err=ValueError()
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='PENDING'
タスク3の状態 task3._state='PENDING'
キャンセルされたタスクが出力しています
タスク3の状態 task3._state='CANCELLED'

なお、サンプルコードではキャンセル処理の完了待ちに asyncio.sleep() を利用していますが、本来であれば未完了のタスクに対し asyncio.gather() などを再度実行しなければなりません。


    for task in [task1, task2, task3]:
        if task.done() is False:
            task.cancel()  # 未完了のタスクをキャンセル

    # await asyncio.sleep(1)  # キャンセル処理完了を待つ必要がある
    # asyncio.sleep(1)の代わりに、キャンセル完了を待つため asyncio.gather() を実行する
    results = await asyncio.gather(*[task3], return_exceptions=True)

例外発生時にタスクが残ってしまうことを避けるため、キャンセル処理は必要です。 サンプルコードと出力からわかるように asyncio.gather() では、終了していないタスクをキャンセルする処理、そしてキャンセル完了を待つ処理の実装が必要です。

asyncio.TaskGroup によるタスクのキャンセル

asyncio.TaskGroup では、コンテキストマネージャーを抜けるタイミングで未完了のタスクをキャンセルします。 例外が発生した場合に開発者は、タスクがキャンセルされたか、またキャンセル処理が完了しているか、を意識する必要がなくなります。

非常にシンプルで読みやすい実装になります。

async def main():
    try:
        async with asyncio.TaskGroup() as g:
            task1 = g.create_task(coro_success())
            task2 = g.create_task(coro_value_err())
            task3 = g.create_task(coro_long(), name="残るコルーチン")
        results = [task1.result(), task2.result(), task3.result()]
        print(f"{results=}")
    except* ValueError as err:
        print(f"{err.exceptions=}")

    print(f"完了していないタスク {task1._state=}")
    print(f"完了していないタスク {task2._state=}")
    print(f"完了していないタスク {task3._state=}")

実行すると、キャンセル処理が実行され、またタスクも「CANCELLED」になっていることがわかります。

$ python3.11 asyncio_taskgroup_cancel.py
キャンセルされたタスクが出力しています
err.exceptions=(ValueError(),)
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='CANCELLED'

【コラム】 何のための「PEP 654 Exception Groups and except」

少し asyncio.TaskGroup から離れ、 except* について見てみましょう。 Python 3.11では新たな構文として except* がPEP 654にて追加されました。Python 3.11のWhat's Newでは次のように紹介されています。

PEP 654 introduces language features that enable a program to raise and handle multiple unrelated exceptions simultaneously.

What’s New In Python 3.11 — Python 3.11 documentation

「handle multiple unrelated exceptions simultaneously(無関係な複数の例外を同時に処理する)」とあります。 該当の「PEP 654 – Exception Groups and except*」ではMotivation(動機)に、4つのケースが挙げられています。 中でも今回注目するのは「Concurrent errors(同時に発生するエラー)」です。

Libraries for async concurrency provide APIs to invoke multiple tasks and return their results in aggregate. There isn’t currently a good way for such libraries to handle situations where multiple tasks raise exceptions. The Python standard library’s asyncio.gather() [1] function provides two options: raise the first exception, or return the exceptions in the results list.

PEP 654 – Exception Groups and except* | peps.python.org

「There isn’t currently a good way for such libraries to handle situations where multiple tasks raise exceptions.(複数のタスクが送出する例外を処理する良い方法は、今のところありません。)」とあり、また asyncio.gather() の課題が記載されています。 except* 構文は、非同期処理のために実装された、と言っても過言ではないようです。[5]

まとめ

asyncioはI/Oバウンドな処理を効率的に実行できるため、データベースや外部APIの実行によく使われます。ただ、外部リソースに依存するため、エラーも発生しやすいです。 今回の asyncio.TaskGroup を利用すると、例外処理や例外発生時のキャンセルを非常に読みやすく書けるようになっています。 asyncio.TaskGroup を利用して「予測可能で、より安全」に非同期処理を実装しましょう。

最後に私事ですが、 asyncio.TaskGroup とasyncioについてもう少し掘り下げた内容をPyCon JP 2022で紹介します(Python 3.11新機能asyncio.TaskGroup()と2022年asyncioの"Hello-ish world" というトークです)。気になる方はぜひ現地でお会いしましょう!