Question

Python asyncio: handling exceptions in gather() - documentation unclear?

The documentation for asyncio.gather says that

If return_exceptions is False (default), the first raised exception is immediately propagated to the task that awaits on gather(). Other awaitables in the aws sequence won’t be cancelled and will continue to run.

However, from a simple test it seems that if one of the tasks raises an exception when return_exceptions is False, all other awaitable are cancelled (or to be more precise, in case the terminology is not clear to me, the other awaitables do not finish their job):

import asyncio

async def factorial(name, number, raise_exception=False):
    # If raise_exception is True, will raise an exception when
    # the loop counter > 3
    f = 1
    for i in range(2, number + 1):
        print(f'  Task {name}: Compute factorial({i})...')

        if raise_exception and i > 3:
            print(f'  Task {name}: raising Exception')
            raise Exception(f'Bad Task {name}')

        await asyncio.sleep(1)
        f *= i
    print(f'==>> Task {name} DONE: factorial({number}) = {f}')
    return f

async def main():
    tasks = [factorial('A', 5),  # this will not be finished
             factorial('B', 10, raise_exception=True),
             factorial('C', 2)]

    try:
        results = await asyncio.gather(*tasks)
        print('Results:', results)
    except Exception as e:
        print('Got an exception:', e)

asyncio.run(main())

What this piece of code is doing, just to make it simpler, it defines 3 tasks and call asyncio.gather() on them. One of the tasks raises an exception before one of the others is done, and this other task is not finished.

Actually, I cannot even make sense with what the documentations says - if an exception is raised and caught by the task awaiting on gather, I would not even be able to get the returned results (even if the other task would, somehow, get done).

Am I missing anything, or is there a problem with the documentation?

This was tested with Python 3.7.2.

 46  38842  46
1 Jan 1970

Solution

 25

Scatter-Gather pattern in parallelism

The gather function is primarily designed for the Scatter-Gather pattern. In this pattern, when you need to calculate something from all task-results (e.g. in an aggregation function), the gather function is handy. The switch return_exceptions controls whether a single failure should invalidate the aggregated result. By default, any failure will be immediately propagated to the gather task. This behavior is desired in aggregation tasks like sum(await gather(*tasks)) because the sum becomes invalid due to a single failure. However, no task will be cancelled or altered in any way by gather. It is merely one observable, waiting and gathering information quietly.

Your question 1:

I cannot even make sense with what the documentations says - if an exception is raised ..., I would not even be able to get the returned results

Usually you can and need to define multiple observables in a Scatter-Gather job. After one gather fails, the main routine should continue with another gather or wait or as_completed or asyncio.sleep. Remember that tasks are still running and the Future objects are still there. By design, Scatter-Gather tasks should be independent, and the gather operation should not have any side effects, so that you can continue to handle those awaitables independently, e.g., by querying their tasks[i].result() or explicitly killing them.

Question 2:

Am I missing anything, or is there a problem with the documentation?

In your test program, no other tasks (observables) are defined after catching the exception, so the main program simply exits, which gives you an illusion that all tasks are cancelled. They are indeed cancelled when the event loop are closed by asyncio.run(main()), some milliseconds after catching the exception. By adding another waiting task, either await asyncio.wait(tasks) or simply await asyncio.sleep(20) at the end of main(), those worker tasks will have their chance to complete. Thus the documentation is correct.

Your test program can be considered as a DAG of calculation tasks. The gather task is your root target passed to asyncio.run. Therefore, when the only mission fails, all sub-tasks are aborted.

-- updated in 2023


original answer

I've run your code and got the following output, as expected from documentation.

  Task C: Compute factorial(2)...
  Task A: Compute factorial(2)...
  Task B: Compute factorial(2)...
==>> Task C DONE: factorial(2) = 2
  Task A: Compute factorial(3)...
  Task B: Compute factorial(3)...
  Task A: Compute factorial(4)...
  Task B: Compute factorial(4)...
  Task B: raising Exception
Got an exception: Bad Task B
  Task A: Compute factorial(5)...
==>> Task A DONE: factorial(5) = 120

What's going on

  1. Tasks A,B and C are submitted to the queue;
  2. All tasks are running while C finishes earliest.
  3. Task B raises and exception.
  4. The await asyncio.gater() returns immediately and print('Got an exception:', e) to the screen.
  5. Task A continues to run and print "==>> Task A DONE ..."

What's wrong with your test

As @deceze commented, your program exited immediately after the exception was caught and main() returns. Thus, the tasks A and C are terminated because the entire process dies, not because of cancellation.

To fix it, add await asyncio.sleep(20) to the end of the main() function.

2019-03-04

Solution

 6

The answer to the main question here is to use asyncio.as_complete. Change your main() function code to:

async def main():
    tasks = [factorial('A', 5),  # this will not be finished
             factorial('B', 10, raise_exception=True),
             factorial('C', 2)]


    # Handle results in the order the task are completed
    # if exeption you can handle that as well. 
    for coroutine in asyncio.as_completed(tasks):
        try:
            results = await coroutine
        except Exception as e:
            print('Got an exception:', e)
        else:
            print('Results:', results)
2023-01-18