Home ¦ Posts ¦ RSS

ThreadPoolExecutor and ProcessPoolExecutor: Modern Python idioms

I've recently put online a simple HTTP/2 validation service based on asyncio and aiohttp.web

It uses ThreadPoolExecutor and ProcessPoolExecutor in a couple of places to mix synchronous and asynchronous code. These are two powerful and high-level concurrency constructs which are useful in many projects based on asyncio.

ThreadPoolExecutor and ProcessPoolExecutor (and any other Executor subclass, for that matter) will do two main things for you:

  • They offer a very high-level interface to some common concurrency-related problems, where you have an input data set and want to execute a task against each item belonging to it.

  • They allow mixing asynchronous code along with synchronous, blocking code. They will be fundamental building blocks (at least, for the time being) for many projects based on asyncio.

Example usage from synchronous code

As said before, Executors are useful to execute a function against each item of a given data set, parallelizing the overall execution and thus lowering the time taken to complete an operation.

Let's imagine the following scenario: we have a function that performs a countdown starting from a given number. We want to execute many of these functions in parallel.

Using an executor, we can do it like this:

import time
import concurrent.futures

count_from = [40000000, 30000000, 20000000, 10000000]

def countdown(n):
    start = time.time()
    while n > 0:
        n -= 1
    return time.time() - start

def main():
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, time_taken in zip(count_from, executor.map(countdown, count_from)):
            print('Start: {} Time taken: {}'.format(number, time_taken))
    print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
    main()

Which, on my computer, yields these results:

$ python3.5 proc.py
Start: 40000000 Time taken: 3.3081891536712646
Start: 30000000 Time taken: 2.957615852355957
Start: 20000000 Time taken: 2.144028902053833
Start: 10000000 Time taken: 1.3123805522918701
Total time taken: 3.3637709617614746

What happened here? The ProcessPoolExecutor has spun off a bunch of processes, distributing the workload and returning results as soon as all of them have completed their execution.

There is no explicit mention to threading nor multiprocess in the code. The ProcessPoolExecutor does pretty much everything on its own. It is a very high level construct.

When to use ProcessPoolExecutor and ThreadPoolExecutor

Our countdown function is CPU-bound. It's a known fact that the cPython Global Interpreter Lock wasn't designed with the multiple CPU-bound threads use-case in mind. This gives us a simple rule to choose between ProcessPoolExecutor and ThreadPoolExecutor:

  • For CPU-bound workloads: use ProcessPoolExecutor
  • For I/O-bound workloads: use ThreadPoolExecutor

If we swap the ProcessPoolExecutor for the ThreadPoolExecutor in the example above, the overall time taken by the script will be much higher (almost twice, in my case).

$ python3.5 proc.py
Start: 40000000 Time taken: 6.244499921798706
Start: 30000000 Time taken: 5.247483015060425
Start: 20000000 Time taken: 4.267640829086304
Start: 10000000 Time taken: 3.5105090141296387
Total time taken: 6.245465517044067

Asyncio with ProcessPoolExecutor and ThreadPoolExecutor

Asyncio is getting traction, but many libraries haven't been adapted to work with it yet. For the time being, many asyncio-based Python projects will have to interface with blocking libraries.

An Executor will help you do that.

Example ProcessPoolExecutor and ThreadPoolExecutor

Let's say that we're writing a web application that processes image uploads and resizes them. We will use aiohttp.web, a web framework based on asyncio, and Pillow for the image processing.

A first version, which has some issues that will be outlined below, would look like this:

import io

from PIL import Image
from aiohttp import web


def resize(image_file):
    """Perform the actual resizing using Pillow blocking functions"""
    img = Image.open(image_file)
    img.thumbnail((100, 100), Image.ANTIALIAS)
    buf = io.BytesIO()
    img.save(buf, 'PNG')
    return buf.getvalue()


class Handler(web.View):

    async def get(self):
        return web.Response(body=b'''
         <!DOCTYPE html>
         <html>
         <h1>Resize an image</h1>
         <form action="/" method="post" enctype="multipart/form-data">
           <label for="image">Image</label>
           <input id="image" name="image" type="file" />
           <input type="submit" />
         </form>
         </html>''', content_type='text/html')

    async def post(self):
        data = await self.request.post()
        thumbnail = resize(data['image'].file)
        return web.Response(body=thumbnail, content_type='image/png')


app = web.Application()
app.router.add_route('*', '/', Handler)
web.run_app(app)

The problem here is that the resize function will lock the entire web application process until it has finished resizing the image. No concurrent connections will be allowed during the resize operation. This is expected, since it's a blocking operation being executed in the context of a single-threaded, asynchronous application.

To overcome the issue, we can offload the image processing function to an executor, which can be created with the following code:

from concurrent.futures import ProcessPoolExecutor

class Handler(web.View):
    def __init__(self, request):
        super().__init__(request)
        self.executor = ProcessPoolExecutor()

We will then submit our task to the executor:

    async def post(self):
        data = await self.request.post()
        thumbnail = await self.request.app.loop.run_in_executor(self.executor, resize, data['image'].file.read())
        return web.Response(body=thumbnail, content_type='image/png')

(Full source code)

The BaseEventLoop.run_in_executor method will cause our resize function to run in a background thread or process (depending on the executor being used). run_in_executor returns a Future which can be awaited on to regain control once the resize operation has finished.

Please note that we can no longer pass a file-like reference to the resize function. Our argument has to be something that can be easily serializable.

This happens because the ProcessExecutor will have to serialize the resize function parameters in order to distribute the work among a pool of worker processes. By contrast, when using a ThreadPoolExecutor, the serializability requirement does not exist.

In this specific case, we are using a ProcessPoolExecutor because we assume that the image resizing is a cpu-bound operation. Under some circumstances, for instance low-resolution images being fetched from an object storage or a NFS mount, one might actually prefer a ThreadPoolExecutor, because the I/O bound traits might prevail over the CPU usage.

© Giuseppe Ciotta. Built using Pelican. Theme on github.