Is there a simple process-based parallel map for python?

I'm looking for a simple process-based parallel map for python, that is, a function

parmap(function,[data])

that would run function on each element of [data] on a different process (well, on a different core, but AFAIK, the only way to run stuff on different cores in python is to start multiple interpreters), and return a list of results.

Does something like this exist? I would like something simple, so a simple module would be nice. Of course, if no such thing exists, I will settle for a big library :-/

58391 次浏览

I seems like what you need is the map method in multiprocessing.Pool():

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.


This method chops the iterable into a number of chunks which it submits to the
process pool as separate tasks. The (approximate) size of these chunks can be
specified by setting chunksize to a positive integ

For example, if you wanted to map this function:

def f(x):
return x**2

to range(10), you could do it using the built-in map() function:

map(f, range(10))

or using a multiprocessing.Pool() object's method map():

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))

For those who looking for Python equivalent of R's mclapply(), here is my implementation. It is an improvement of the following two examples:

It can be apply to map functions with single or multiple arguments.

import numpy as np, pandas as pd
from scipy import sparse
import functools, multiprocessing
from multiprocessing import Pool


num_cores = multiprocessing.cpu_count()


def parallelize_dataframe(df, func, U=None, V=None):


#blockSize = 5000
num_partitions = 5 # int( np.ceil(df.shape[0]*(1.0/blockSize)) )
blocks = np.array_split(df, num_partitions)


pool = Pool(num_cores)
if V is not None and U is not None:
# apply func with multiple arguments to dataframe (i.e. involves multiple columns)
df = pd.concat(pool.map(functools.partial(func, U=U, V=V), blocks))
else:
# apply func with one argument to dataframe (i.e. involves single column)
df = pd.concat(pool.map(func, blocks))


pool.close()
pool.join()


return df


def square(x):
return x**2


def test_func(data):
print("Process working on: ", data.shape)
data["squareV"] = data["testV"].apply(square)
return data


def vecProd(row, U, V):
return np.sum( np.multiply(U[int(row["obsI"]),:], V[int(row["obsJ"]),:]) )


def mProd_func(data, U, V):
data["predV"] = data.apply( lambda row: vecProd(row, U, V), axis=1 )
return data


def generate_simulated_data():


N, D, nnz, K = [302, 184, 5000, 5]
I = np.random.choice(N, size=nnz, replace=True)
J = np.random.choice(D, size=nnz, replace=True)
vals = np.random.sample(nnz)


sparseY = sparse.csc_matrix((vals, (I, J)), shape=[N, D])


# Generate parameters U and V which could be used to reconstruct the matrix Y
U = np.random.sample(N*K).reshape([N,K])
V = np.random.sample(D*K).reshape([D,K])


return sparseY, U, V


def main():
Y, U, V = generate_simulated_data()


# find row, column indices and obvseved values for sparse matrix Y
(testI, testJ, testV) = sparse.find(Y)


colNames = ["obsI", "obsJ", "testV", "predV", "squareV"]
dtypes = {"obsI":int, "obsJ":int, "testV":float, "predV":float, "squareV": float}


obsValDF = pd.DataFrame(np.zeros((len(testV), len(colNames))), columns=colNames)
obsValDF["obsI"] = testI
obsValDF["obsJ"] = testJ
obsValDF["testV"] = testV
obsValDF = obsValDF.astype(dtype=dtypes)


print("Y.shape: {!s}, #obsVals: {}, obsValDF.shape: {!s}".format(Y.shape, len(testV), obsValDF.shape))


# calculate the square of testVals
obsValDF = parallelize_dataframe(obsValDF, test_func)


# reconstruct prediction of testVals using parameters U and V
obsValDF = parallelize_dataframe(obsValDF, mProd_func, U, V)


print("obsValDF.shape after reconstruction: {!s}".format(obsValDF.shape))
print("First 5 elements of obsValDF:\n", obsValDF.iloc[:5,:])


if __name__ == '__main__':
main()

This can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

To parallelize your example, you'd need to define your map function with the @ray.remote decorator, and then invoke it with .remote. This will ensure that every instance of the remote function will executed in a different process.

import time
import ray


ray.init()


# Define the function you want to apply map on, as remote function.
@ray.remote
def f(x):
# Do some work...
time.sleep(1)
return x*x


# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e.,
# an identifier of the result) rather than the result itself.
def parmap(f, list):
return [f.remote(x) for x in list]


# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))


# Get the results
results = ray.get(result_ids)
print(results)

This will print:

[1, 4, 9, 16, 25]

and it will finish in approximately len(list)/p (rounded up the nearest integer) where p is number of cores on your machine. Assuming a machine with 2 cores, our example will execute in 5/2 rounded up, i.e, in approximately 3 sec.

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.

I know this is an old post, but just in case, I wrote a tool to make this super, super easy called parmapper (I actually call it parmap in my use but the name was taken).

It handles a lot of the setup and deconstruction of processes and adds tons of features. In rough order of importance

  • Can take lambda and other unpickleable functions
  • Can apply starmap and other similar call methods to make it very easy to directly use.
  • Can split amongst both threads and/or processes
  • Includes features such as progress bars

It does incur a small cost but for most uses, that is negligible.

I hope you find it useful.

(Note: It, like map in Python 3+, returns an iterable so if you expect all results to pass through it immediately, use list())

Python3's Pool class has a map() method and that's all you need to parallelize map:

from multiprocessing import Pool


with Pool() as P:
xtransList = P.map(some_func, a_list)

Using with Pool() as P is similar to a process pool and will execute each item in the list in parallel. You can provide the number of cores:

with Pool(processes=4) as P: