如何从多个进程增加共享计数器?

我的 multiprocessing模块出了点问题。我使用工人的 Pool和它的 map方法来并发分析大量的文件。每次处理一个文件,我希望有一个计数器更新,以便我可以跟踪有多少文件仍然要处理。下面是示例代码:

import os
import multiprocessing


counter = 0




def analyze(file):
# Analyze the file.
global counter
counter += 1
print counter




if __name__ == '__main__':
files = os.listdir('/some/directory')
pool = multiprocessing.Pool(4)
pool.map(analyze, files)

我找不到解决办法。

79888 次浏览

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep


counter = None


def init(args):
''' store the counter for later use '''
global counter
counter = args


def analyze_data(args):
''' increment the global counter, do something with the input '''
global counter
# += operation is not atomic, so we need to get a lock:
with counter.get_lock():
counter.value += 1
print counter.value
return args * 10


if __name__ == '__main__':
#inputs = os.listdir(some_directory)


#
# initialize a cross-process counter and the input lists
#
counter = Value('i', 0)
inputs = [1, 2, 3, 4]


#
# create the pool of workers, ensuring each one receives the counter
# as it starts.
#
p = Pool(initializer = init, initargs = (counter, ))
i = p.map_async(analyze_data, inputs, chunksize = 1)
i.wait()
print i.get()

Counter class without the race-condition bug:

class Counter(object):
def __init__(self):
self.val = multiprocessing.Value('i', 0)


def increment(self, n=1):
with self.val.get_lock():
self.val.value += n


@property
def value(self):
return self.val.value

Faster Counter class without using the built-in lock of Value twice

class Counter(object):
def __init__(self, initval=0):
self.val = multiprocessing.RawValue('i', initval)
self.lock = multiprocessing.Lock()


def increment(self):
with self.lock:
self.val.value += 1


@property
def value(self):
return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

A extremly simple example, changed from jkp's answer:

from multiprocessing import Pool, Value
from time import sleep


counter = Value('i', 0)
def f(x):
global counter
with counter.get_lock():
counter.value += 1
print("counter.value:", counter.value)
sleep(1)
return x


with Pool(4) as p:
r = p.map(f, range(1000*1000))

I'm working on a process bar in PyQT5, so I use thread and pool together

import threading
import multiprocessing as mp
from queue import Queue


def multi(x):
return x*x


def pooler(q):
with mp.Pool() as pool:
count = 0
for i in pool.imap_unordered(ggg, range(100)):
print(count, i)
count += 1
q.put(count)


def main():
q = Queue()
t = threading.Thread(target=thr, args=(q,))
t.start()
print('start')
process = 0
while process < 100:
process = q.get()
print('p',process)
if __name__ == '__main__':
main()

this I put in Qthread worker and it works with acceptable latency

Here is a solution to your problem based on a different approach from that proposed in the other answers. It uses message passing with multiprocessing.Queue objects (instead of shared memory with multiprocessing.Value objects) and process-safe (atomic) built-in increment and decrement operators += and -= (instead of introducing custom increment and decrement methods) since you asked for it.

First, we define a class Subject for instantiating an object that will be local to the parent process and whose attributes are to be incremented or decremented:

import multiprocessing




class Subject:


def __init__(self):
self.x = 0
self.y = 0

Next, we define a class Proxy for instantiating an object that will be the remote proxy through which the child processes will request the parent process to retrieve or update the attributes of the Subject object. The interprocess communication will use two multiprocessing.Queue attributes, one for exchanging requests and one for exchanging responses. Requests are of the form (sender, action, *args) where sender is the sender name, action is the action name ('get', 'set', 'increment', or 'decrement' the value of an attribute), and Subject0 is the argument tuple. Responses are of the form Subject1 (to 'get' requests):

class Proxy(Subject):


def __init__(self, request_queue, response_queue):
self.__request_queue = request_queue
self.__response_queue = response_queue


def _getter(self, target):
sender = multiprocessing.current_process().name
self.__request_queue.put((sender, 'get', target))
return Decorator(self.__response_queue.get())


def _setter(self, target, value):
sender = multiprocessing.current_process().name
action = getattr(value, 'action', 'set')
self.__request_queue.put((sender, action, target, value))


@property
def x(self):
return self._getter('x')


@property
def y(self):
return self._getter('y')


@x.setter
def x(self, value):
self._setter('x', value)


@y.setter
def y(self, value):
self._setter('y', value)

Then, we define the class Decorator to decorate the int objects returned by the getters of a Proxy object in order to inform its setters whether the increment or decrement operators += and -= have been used by adding an action attribute, in which case the setters request an 'increment' or 'decrement' operation instead of a 'set' operation. The increment and decrement operators += and -= call the corresponding augmented assignment special methods int1 and int2 if they are defined, and fall back on the assignment special methods int3 and int4 which are always defined for int objects (e.g. int6 is equivalent to int7 which is equivalent to int8 which is equivalent to int9):

class Decorator(int):


def __iadd__(self, other):
value = Decorator(other)
value.action = 'increment'
return value


def __isub__(self, other):
value = Decorator(other)
value.action = 'decrement'
return value

Then, we define the function worker that will be run in the child processes and request the increment and decrement operations:

def worker(proxy):
proxy.x += 1
proxy.y -= 1

Finally, we define a single request queue to send requests to the parent process, and multiple response queues to send responses to the child processes:

if __name__ == '__main__':
subject = Subject()
request_queue = multiprocessing.Queue()
response_queues = {}
processes = []
for index in range(4):
sender = 'child {}'.format(index)
response_queues[sender] = multiprocessing.Queue()
proxy = Proxy(request_queue, response_queues[sender])
process = multiprocessing.Process(
target=worker, args=(proxy,), name=sender)
processes.append(process)
running = len(processes)
for process in processes:
process.start()
while subject.x != 4 or subject.y != -4:
sender, action, *args = request_queue.get()
print(sender, 'requested', action, *args)
if action == 'get':
response_queues[sender].put(getattr(subject, args[0]))
elif action == 'set':
setattr(subject, args[0], args[1])
elif action == 'increment':
setattr(subject, args[0], getattr(subject, args[0]) + args[1])
elif action == 'decrement':
setattr(subject, args[0], getattr(subject, args[0]) - args[1])
for process in processes:
process.join()

The program is guaranteed to terminate when += and -= are process-safe. If you remove process-safety by commenting the corresponding __iadd__ or __isub__ of Decorator then the program will only terminate by chance (e.g. proxy.x += value is equivalent to proxy.x = proxy.x.__iadd__(value) but falls back to proxy.x = proxy.x.__add__(value) if __iadd__ is not defined, which is equivalent to proxy.x = proxy.x + value which is equivalent to -=0 which is equivalent to -=1, so the -=2 attribute is not added and the setter requests a -=3 operation instead of an -=4 operation).

Example process-safe session (atomic += and -=):

child 0 requested get x
child 0 requested increment x 1
child 0 requested get y
child 0 requested decrement y 1
child 3 requested get x
child 3 requested increment x 1
child 3 requested get y
child 2 requested get x
child 3 requested decrement y 1
child 1 requested get x
child 2 requested increment x 1
child 2 requested get y
child 2 requested decrement y 1
child 1 requested increment x 1
child 1 requested get y
child 1 requested decrement y 1

Example process-unsafe session (non-atomic += and -=):

child 2 requested get x
child 1 requested get x
child 0 requested get x
child 2 requested set x 1
child 2 requested get y
child 1 requested set x 1
child 1 requested get y
child 2 requested set y -1
child 1 requested set y -1
child 0 requested set x 1
child 0 requested get y
child 0 requested set y -2
child 3 requested get x
child 3 requested set x 2
child 3 requested get y
child 3 requested set y -3  # the program stalls here

A more sophisticated solution based on the lock-free atomic operations, as given by example on atomics library README:

from multiprocessing import Process, shared_memory


import atomics




def fn(shmem_name: str, width: int, n: int) -> None:
shmem = shared_memory.SharedMemory(name=shmem_name)
buf = shmem.buf[:width]
with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
for _ in range(n):
a.inc()
del buf
shmem.close()




if __name__ == "__main__":
# setup
width = 4
shmem = shared_memory.SharedMemory(create=True, size=width)
buf = shmem.buf[:width]
total = 10_000
# run processes to completion
p1 = Process(target=fn, args=(shmem.name, width, total // 2))
p2 = Process(target=fn, args=(shmem.name, width, total // 2))
p1.start(), p2.start()
p1.join(), p2.join()
# print results and cleanup
with atomics.atomicview(buffer=buf, atype=atomics.INT) as a:
print(f"a[{a.load()}] == total[{total}]")
del buf
shmem.close()
shmem.unlink()

(atomics could be installed via pip install atomics on most of the major platforms)