Multiprocessing Woes
It is a truth universally acknowledged that parallel computing on Python is an experience that ranks somewhere alongside “facial exfoliation with a rusted cheese grater” in its pleasantness.
In this post I’m going to briefly review our particular (common!) problem, the state of multiprocessing in Python, and why it unfortunately appears necessary to continue using my own parallel processing layer in LensKit. LensKit supports parallelism in two different places: model training and batch evaluation. Model training uses the threading support of the underlying compute engine (BLAS and Numba, although I’m looking to get away from Numba). Batch evaluation uses the Python standard library’s In batch evaluation, we have a problem that is common to many machine learning batch “inference”1 problems: we have a large model object that is read-only (except for ephemeral caches), and multiple threads or processes need to compute results using this model object (in our case, computing recommendations for the thousands or millions of test users). Since this model is large, it presents two constraints: Typical Python parallel processing APIs are designed around submitting tasks with arguments to the executor pool. At a surface level, our use case maps reasonably well to the typical Most Python parallel APIs provide a function like this, and differ primarily in their implementation details and optimizations. However, there is one element of this API that does not match our use case: the function needs access to the large shared model object in addition to the per-task arguments it gets from The In Python 3.8, the I created LensKit’s Invoker framework to abstract all of this this. Its It works pretty well, all things considered. There are a few outstanding problems, though: So I spent some time this week trying to see if someone else has built a good solution I can use instead of continuing to maintain the invoker library. So far as I can tell, there really hasn’t been a lot of movement towards a solution for this specific problem shape. I also may not be looking in the right places, but I see surprisingly little uptake of Pickle 5’s ability to facilitate shared memory. The things I’ve looked at in my current exploration include: That’s what I’ve been able to learn so far. Multiple operations with single large shared model object just does not seem to be a well-supported pattern outside of PyTorch and heavyweight cloud- and cluster-oriented frameworks. With LensKit’s goals of good single-node performance and minimal constraints on users’ environments, there doesn’t seem to be a better solution than what I’m doing now. Python recently added “subinterpreters”, which are multiple isolated Python interpreters in a single process, and Python 3.12 allows each subinterpreter to have its own global interpreter lock. This seems to open the door for a very good solution: parallelize with threads, giving each thread its own subinterpreter; share the buffer memory between subinterpreters and keep using pickle for the object structures and other inter-thread communication. However, there is significant work to upgrade extension modules to support subinterpreters, and it doesn’t look like a priority for NumPy yet. It looks like I need to keep maintaining a parallel compute API for large shared model objects in order to enable LensKit’s parallel evaluation. I’m looking to make a few changes to make it more usable and ergonomic, and encourage adoption (and co-maintenance?) by others: I’ll update this post with links once the new library is up and available. Hopefully people will find it useful! Inference is a very questionable name for applying a model at runtime, because it makes it much more difficult to talk about the difference between prediction and inference tasks in a statistical sense. It seems to be the term we’re stuck with, but I hate it, and wish that machine learning would be less gratuituous with redefining already-established terms. I could also go on a rant about calling interpolation weights “attention” when “interpolation weights” is right there, especially when you’re trying to do information retrieval or recommendation research that involves user attention.↩︎Shape of the Problem
ProcessPoolExecutor
through a custom wrapper layer (more on this later).Usual API design
map
function provided by e.g. Executor
:def map(func, *iterables):
# apply func to items in zip(iterables), in parallel
iterables
.Current Solution
ProcessPoolExecutor
works on top of multiprocessing.Pool
, which allows us to provide a worker setup function that will be called to instantiate each worker thread, as well as provide arguments to the setup function. This conceptually maps quite well: if we have a way to put a model in shared memory, we can use worker setup functions to store it in a global variable, and then supply a task function to map
that fetches the shared model and passes it, along with the per-task arguments, to the batch evaluation function.pickle
module gained support for the new Protocol 5 pickling format, which allows application code to provide a custom callback for serializing anything that implements the Python buffer
API, which includes the NumPy arrays that make up the bulk of our model’s saved data. If we “serialize” those by copying them to shared memory, then the remaining object structure is a relatively small pickle. The binpickle library leverages this to provide an on-disk serialization format that supports memory-mapping buffer contents.ModelOpInvoker
implementations take a model object, and do the following:SharedMemory
and binpickle, which was created for the purpose and can be used to share through memory-mapped disk files if /dev/shm
has insufficient space for the shared memory.binpickle
(basically, serializing to a memory-mappable binpickle doesn’t require the output to be in memory, so you serialize to disk, drop the original model object, and then have the invoker use the pre-serialized model, which its’ capable of doing).Current Status
ProcessPoolExecutor
, didn’t expect anything new here, and it’s what we’re already using.map
, and admits that this comes at the expense of other functionality; there is no way to provide per-worker model data (at least not without configuring your own executor). It uses loky
and cloudpickle
to work around old bugs in Python’s concurrency library, improve performance, and increase the number of things that can be pickled; the relevant Python concurrency bug has been fixed in the versions of Python I support, and I haven’t found pickle
’s limitations to be a problem in our code paths. Joblib also provides dump
/load
routines that allow memory-mapped Numpy arrays, but it’s a hacky solution that predates Pickle 5. binpickle
was born from “what if we did Joblib’s dump/load but on top of Pickle 5?”. For my purposes in LensKit, joblib doesn’t provide much value on top of Python’s standard library, so it isn’t worth the dependency.fork
, so that feature doesn’t work on Windows (or with any other multiprocessing spawn model). Good Windows support is a priority for LensKit.map
or push
function, not when it is an implementation data structure of an object you pass (with a few exceptions). The serialization layer is extensible, so I could use Pickle5 to serialize anything wtih zero-copy buffers, but it gets to the next problem: the zero copies are in the process of preparing the data to send, but as near as I can tell, it is still copied over the zeromq pipe. Therefore it isn’t zero-copy in the sense that I need, but rather 1 (instead of 2) copies in the driver process and one copy in each of the worker processes. Supporting true shared model objects still requires the same serialization logic I use to work with the standard library.dask.distributed
and ray
are both interesting, but very much seem to want you to work within their framework. LensKit is designed to fit with whatever tools the user wants to use, particularly fro model implementation, so forcing evaluation into a heavier framework like them would be a paradigm change. It also appears like I would still need the custom shared memory serialization logic, although that may not be entirely correct. I also just haven’t had good success getting actual speedups from Dask when I have tried it in the past.torch.multiprocessing
does go there, for PyTorch tensors — when you send an object to a worker, it moves the tensor into shared memory and allows the workers to access it. Only works for PyTorch, not numpy or other buffer-based APIs.Sidebar: Subinterpreters
Moving Forward
binpickle
mapping, depending on the use case. I may also consider adding support for plasma
, but I’m not sure that would gain us much on top of Python’s shared memory API.binpickle
files stored on NFS, this will allow multi-node parallelism with one model copy per node.