Multiprocessing Woes

A peloton in a bicycle race, moving quickly in parallel.
Photo by Markus Spiske on Unsplash

It is a truth universally acknowledged that parallel computing on Python is an experience that ranks somewhere alongside “facial exfoliation with a rusted cheese grater” in its pleasantness.

In this post I’m going to briefly review our particular (common!) problem, the state of multiprocessing in Python, and why it unfortunately appears necessary to continue using my own parallel processing layer in LensKit.

Shape of the Problem

LensKit supports parallelism in two different places: model training and batch evaluation. Model training uses the threading support of the underlying compute engine (BLAS and Numba, although I’m looking to get away from Numba). Batch evaluation uses the Python standard library’s ProcessPoolExecutor through a custom wrapper layer (more on this later).

In batch evaluation, we have a problem that is common to many machine learning batch “inference”1 problems: we have a large model object that is read-only (except for ephemeral caches), and multiple threads or processes need to compute results using this model object (in our case, computing recommendations for the thousands or millions of test users). Since this model is large, it presents two constraints:

  • We only want one copy in memory, no matter how many workers we use (if we eventually support multi-node evaluation, we would want one copy per node).
  • We don’t want to have to re-serialize it for each task (user).

Usual API design

Typical Python parallel processing APIs are designed around submitting tasks with arguments to the executor pool. At a surface level, our use case maps reasonably well to the typical map function provided by e.g. Executor:

def map(func, *iterables):
    # apply func to items in zip(iterables), in parallel

Most Python parallel APIs provide a function like this, and differ primarily in their implementation details and optimizations. However, there is one element of this API that does not match our use case: the function needs access to the large shared model object in addition to the per-task arguments it gets from iterables.

Current Solution

The ProcessPoolExecutor works on top of multiprocessing.Pool, which allows us to provide a worker setup function that will be called to instantiate each worker thread, as well as provide arguments to the setup function. This conceptually maps quite well: if we have a way to put a model in shared memory, we can use worker setup functions to store it in a global variable, and then supply a task function to map that fetches the shared model and passes it, along with the per-task arguments, to the batch evaluation function.

In Python 3.8, the pickle module gained support for the new Protocol 5 pickling format, which allows application code to provide a custom callback for serializing anything that implements the Python buffer API, which includes the NumPy arrays that make up the bulk of our model’s saved data. If we “serialize” those by copying them to shared memory, then the remaining object structure is a relatively small pickle. The binpickle library leverages this to provide an on-disk serialization format that supports memory-mapping buffer contents.

I created LensKit’s Invoker framework to abstract all of this this. Its ModelOpInvoker implementations take a model object, and do the following:

  • Serialize the model to shared memory (if it isn’t already serialized). It supports both Python’s SharedMemory and binpickle, which was created for the purpose and can be used to share through memory-mapped disk files if /dev/shm has insufficient space for the shared memory.
  • Deserialize the model in the worker processes, using shared memory for the buffers.
  • Wire up logging so that log messages are properly passed back to the lead process.
  • Provide the model object to a “model op”, a function that takes the model and the task (e.g. user ID) and returns the results.

It works pretty well, all things considered. There are a few outstanding problems, though:

  • I’m maintaining a parallel processing library.
  • Naïve use still has 2 copies of the model in memory, because we cannot move the model’s buffers, we can only copy them. I have workarounds, but they’re not pretty, and require binpickle (basically, serializing to a memory-mappable binpickle doesn’t require the output to be in memory, so you serialize to disk, drop the original model object, and then have the invoker use the pre-serialized model, which its’ capable of doing).
  • Excessive worker process logging can overwhelm the logging pipe.

So I spent some time this week trying to see if someone else has built a good solution I can use instead of continuing to maintain the invoker library.

Current Status

So far as I can tell, there really hasn’t been a lot of movement towards a solution for this specific problem shape. I also may not be looking in the right places, but I see surprisingly little uptake of Pickle 5’s ability to facilitate shared memory.

The things I’ve looked at in my current exploration include:

  • ProcessPoolExecutor, didn’t expect anything new here, and it’s what we’re already using.
  • joblib, which I used in an earlier version of LensKit. Joblib focuses on providing a very clean and robust parallel implementation of map, and admits that this comes at the expense of other functionality; there is no way to provide per-worker model data (at least not without configuring your own executor). It uses loky and cloudpickle to work around old bugs in Python’s concurrency library, improve performance, and increase the number of things that can be pickled; the relevant Python concurrency bug has been fixed in the versions of Python I support, and I haven’t found pickle’s limitations to be a problem in our code paths. Joblib also provides dump/load routines that allow memory-mapped Numpy arrays, but it’s a hacky solution that predates Pickle 5. binpickle was born from “what if we did Joblib’s dump/load but on top of Pickle 5?”. For my purposes in LensKit, joblib doesn’t provide much value on top of Python’s standard library, so it isn’t worth the dependency.
  • mpire supports large shared models, but it does so through fork, so that feature doesn’t work on Windows (or with any other multiprocessing spawn model). Good Windows support is a priority for LensKit.
  • ipyparallel claims support for “zero-copy” buffers, but the devil is in the details. This zero-copy support only works when you actually pass the NumPy array to the map or push function, not when it is an implementation data structure of an object you pass (with a few exceptions). The serialization layer is extensible, so I could use Pickle5 to serialize anything wtih zero-copy buffers, but it gets to the next problem: the zero copies are in the process of preparing the data to send, but as near as I can tell, it is still copied over the zeromq pipe. Therefore it isn’t zero-copy in the sense that I need, but rather 1 (instead of 2) copies in the driver process and one copy in each of the worker processes. Supporting true shared model objects still requires the same serialization logic I use to work with the standard library.
  • dask.distributed and ray are both interesting, but very much seem to want you to work within their framework. LensKit is designed to fit with whatever tools the user wants to use, particularly fro model implementation, so forcing evaluation into a heavier framework like them would be a paradigm change. It also appears like I would still need the custom shared memory serialization logic, although that may not be entirely correct. I also just haven’t had good success getting actual speedups from Dask when I have tried it in the past.
  • torch.multiprocessing does go there, for PyTorch tensors — when you send an object to a worker, it moves the tensor into shared memory and allows the workers to access it. Only works for PyTorch, not numpy or other buffer-based APIs.

That’s what I’ve been able to learn so far. Multiple operations with single large shared model object just does not seem to be a well-supported pattern outside of PyTorch and heavyweight cloud- and cluster-oriented frameworks. With LensKit’s goals of good single-node performance and minimal constraints on users’ environments, there doesn’t seem to be a better solution than what I’m doing now.

Moving Forward

It looks like I need to keep maintaining a parallel compute API for large shared model objects in order to enable LensKit’s parallel evaluation. I’m looking to make a few changes to make it more usable and ergonomic, and encourage adoption (and co-maintenance?) by others:

  • Create a new library for the invoker code so it can be maintained separately and used by non-LensKit projects. It will continue to support both Python shared memory and binpickle mapping, depending on the use case. I may also consider adding support for plasma, but I’m not sure that would gain us much on top of Python’s shared memory API.
  • Add support to Binpickle to deserialize directly to shared memory, so users don’t need to have separate compressed BinPickle files for model storage and mappable ones for parallelism, unless they specifically need the increased memory capacity afforded by BinPickle mapping.
  • Add progress bar support.
  • Add optional “destructive pickling” that tries to dismantle NumPy arrays (by resizing them to 0) once their bytes have been transferred, to enable “move to shared memory” semantics. This is possible because the buffer callback API in Pickle 5 makes the buffer’s owner available. Destructive pickling will leave objects in unusable states and must be used with care, but it seems worth trying for the common cases of models that will not be used after they have been serialized.
  • Eventually: consider adding support for ipyparallel, with the same custom deserialization logic currently used for process pool executors. When used with memory-mappable binpickle files stored on NFS, this will allow multi-node parallelism with one model copy per node.

I’ll update this post with links once the new library is up and available. Hopefully people will find it useful!

  1. Inference is a very questionable name for applying a model at runtime, because it makes it much more difficult to talk about the difference between prediction and inference tasks in a statistical sense. It seems to be the term we’re stuck with, but I hate it, and wish that machine learning would be less gratuituous with redefining already-established terms. I could also go on a rant about calling interpolation weights “attention” when “interpolation weights” is right there, especially when you’re trying to do information retrieval or recommendation research that involves user attention.↩︎