如何在 Python 中使用多处理队列?

我很难理解多处理队列在 python 上是如何工作的,以及如何实现它。假设我有两个从共享文件访问数据的 python 模块,我们将这两个模块称为作者和读者。我的计划是让读取器和编写器将请求放入两个单独的多处理队列,然后让第三个进程将这些请求放入一个循环中并执行。

我的主要问题是,我真的不知道如何正确实现 multiprocessing.queue,你不能真正实例化每个进程的对象,因为它们将是独立的队列,你如何确保所有进程关联到一个共享队列(或在这种情况下,队列)

241060 次浏览

我的主要问题是,我真的不知道如何正确实现 multiprocessing.queue,你不能真正实例化每个进程的对象,因为它们将是独立的队列,你如何确保所有进程关联到一个共享队列(或在这种情况下,队列)

这是一个读写器共享单个队列的简单示例... ... 写入器向读取器发送一堆整数; 当写入器用完数字时,它发送“ DONE”,这让读取器知道要打破读取循环。

您可以生成任意数量的读取进程..。

from multiprocessing import Process, Queue
import time
import sys




def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get()  # Read from the queue and do nothing
if msg == "DONE":
break




def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue.  A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii)  # Put 'count' numbers into queue


### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")




def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
###    you can spawn as many reader_p() as you like
###    however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start()  # Launch reader_p() as another proc


all_reader_procs.append(reader_p)


return all_reader_procs




if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue()  # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)


writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")


_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print("    Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join()  # Wait for a_reader_proc() to finish


print("        reader_p() idx:%s is done" % idx)


print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")

在“ from queue import Queue”中没有称为 queue的模块,而是应该使用 multiprocessing。因此,它应该看起来像“ from multiprocessing import Queue

下面是 multiprocessing.Queuemultiprocessing.Process的一个非常简单的用法,它允许调用方向一个单独的进程发送“ event”和参数,该进程将事件发送给进程上的“ do _”方法。(Python 3.4 +)

import multiprocessing as mp
import collections


Msg = collections.namedtuple('Msg', ['event', 'args'])


class BaseProcess(mp.Process):
"""A process backed by an internal queue for simple one-way message passing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = mp.Queue()


def send(self, event, *args):
"""Puts the event and args as a `Msg` on the queue
"""
msg = Msg(event, args)
self.queue.put(msg)


def dispatch(self, msg):
event, args = msg


handler = getattr(self, "do_%s" % event, None)
if not handler:
raise NotImplementedError("Process has no handler for [%s]" % event)


handler(*args)


def run(self):
while True:
msg = self.queue.get()
self.dispatch(msg)

用法:

class MyProcess(BaseProcess):
def do_helloworld(self, arg1, arg2):
print(arg1, arg2)


if __name__ == "__main__":
process = MyProcess()
process.start()
process.send('helloworld', 'hello', 'world')

send发生在父进程中,do_*发生在子进程中。

我省略了任何明显会中断运行循环并退出子进程的异常处理。您还可以通过重写 run来控制阻塞或其他任何方式来自定义它。

这只有在只有一个工作进程的情况下才有用,但是我认为这个问题的相关答案是演示一个具有更多面向对象的常见场景。

我们实现了两个版本,一个是简单的多 线池,可以执行多种类型的调用,使我们的生活更容易,第二个版本使用 程序,这是不太灵活的调用和需求和额外的调用。

将 zen _ pool 设置为 true 将冻结执行,直到在任一类中调用 Finish _ pool _ queue。

线程版本:

'''
Created on Nov 4, 2019


@author: Kevin
'''
from threading import Lock, Thread
from Queue import Queue
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os


class ThreadPool(object):
def __init__(self, queue_threads, *args, **kwargs):
self.frozen_pool = kwargs.get('frozen_pool', False)
self.print_queue = kwargs.get('print_queue', True)
self.pool_results = []
self.lock = Lock()
self.queue_threads = queue_threads
self.queue = Queue()
self.threads = []


for i in range(self.queue_threads):
t = Thread(target=self.make_pool_call)
t.daemon = True
t.start()
self.threads.append(t)


def make_pool_call(self):
while True:
if self.frozen_pool:
#print '--> Queue is frozen'
sleep(1)
continue


item = self.queue.get()
if item is None:
break


call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)


try:
result = call(*args, **kwargs)


if keep_results:
self.lock.acquire()
self.pool_results.append((item, result))
self.lock.release()


except Exception as e:
self.lock.acquire()
print e
traceback.print_exc()
self.lock.release()
os.kill(os.getpid(), signal.SIGUSR1)


self.queue.task_done()


def finish_pool_queue(self):
self.frozen_pool = False


while self.queue.unfinished_tasks > 0:
if self.print_queue:
print_info('--> Thread pool... %s' % self.queue.unfinished_tasks)
sleep(5)


self.queue.join()


for i in range(self.queue_threads):
self.queue.put(None)


for t in self.threads:
t.join()


del self.threads[:]


def get_pool_results(self):
return self.pool_results


def clear_pool_results(self):
del self.pool_results[:]

过程版本:

  '''
Created on Nov 4, 2019


@author: Kevin
'''
import traceback
from helium.loaders.loader_retailers import print_info
from time import sleep
import signal
import os
from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\
RawArray, Manager
from dill import dill
import ctypes
from helium.misc.utils import ignore_exception
from mem_top import mem_top
import gc


class ProcessPool(object):
def __init__(self, queue_processes, *args, **kwargs):
self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False))
self.print_queue = kwargs.get('print_queue', True)
self.manager = Manager()
self.pool_results = self.manager.list()
self.queue_processes = queue_processes
self.queue = JoinableQueue()
self.processes = []


for i in range(self.queue_processes):
p = Process(target=self.make_pool_call)
p.start()
self.processes.append(p)


print 'Processes', self.queue_processes


def make_pool_call(self):
while True:
if self.frozen_pool.value:
sleep(1)
continue


item_pickled = self.queue.get()


if item_pickled is None:
#print '--> Ending'
self.queue.task_done()
break


item = dill.loads(item_pickled)


call = item.get('call', None)
args = item.get('args', [])
kwargs = item.get('kwargs', {})
keep_results = item.get('keep_results', False)


try:
result = call(*args, **kwargs)


if keep_results:
self.pool_results.append(dill.dumps((item, result)))
else:
del call, args, kwargs, keep_results, item, result


except Exception as e:
print e
traceback.print_exc()
os.kill(os.getpid(), signal.SIGUSR1)


self.queue.task_done()


def finish_pool_queue(self, callable=None):
self.frozen_pool.value = False


while self.queue._unfinished_tasks.get_value() > 0:
if self.print_queue:
print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value()))


if callable:
callable()


sleep(5)


for i in range(self.queue_processes):
self.queue.put(None)


self.queue.join()
self.queue.close()


for p in self.processes:
with ignore_exception: p.join(10)
with ignore_exception: p.terminate()


with ignore_exception: del self.processes[:]


def get_pool_results(self):
return self.pool_results


def clear_pool_results(self):
del self.pool_results[:]
def test(eg):
print 'EG', eg

打电话给任何一个:

tp = ThreadPool(queue_threads=2)
tp.queue.put({'call': test, 'args': [random.randint(0, 100)]})
tp.finish_pool_queue()

或者

pp = ProcessPool(queue_processes=2)
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]}))
pp.finish_pool_queue()

刚刚做了一个简单而通用的示例,演示了如何在两个独立程序之间通过 Queue 传递消息。它没有直接回答 OP 的问题,但是应该足够清楚地表明这个概念。

服务器:

multiprocessing-queue-manager-server.py

import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.managers
import queue
import sys
import threading
from typing import Any, AnyStr, Dict, Union




class QueueManager(multiprocessing.managers.BaseManager):


def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass




def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
global q


if not ident in q:
q[ident] = multiprocessing.Queue()


return q[ident]




q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict()
delattr(QueueManager, 'get_queue')




def init_queue_manager_server():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue', get_queue)




def serve(no: int, term_ev: threading.Event):
manager: QueueManager
with QueueManager(authkey=QueueManager.__name__.encode()) as manager:
print(f"Server address {no}: {manager.address}")


while not term_ev.is_set():
try:
item: Any = manager.get_queue().get(timeout=0.1)
print(f"Client {no}: {item} from {manager.address}")
except queue.Empty:
continue




async def main(n: int):
init_queue_manager_server()
term_ev: threading.Event = threading.Event()
executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor()


i: int
for i in range(n):
asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev))


# Gracefully shut down
try:
await asyncio.get_running_loop().create_future()
except asyncio.CancelledError:
term_ev.set()
executor.shutdown()
raise




if __name__ == '__main__':
asyncio.run(main(int(sys.argv[1])))

客户:

multiprocessing-queue-manager-client.py

import multiprocessing
import multiprocessing.managers
import os
import sys
from typing import AnyStr, Union




class QueueManager(multiprocessing.managers.BaseManager):


def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue:
pass




delattr(QueueManager, 'get_queue')




def init_queue_manager_client():
if not hasattr(QueueManager, 'get_queue'):
QueueManager.register('get_queue')




def main():
init_queue_manager_client()


manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode())
manager.connect()


message = f"A message from {os.getpid()}"
print(f"Message to send: {message}")
manager.get_queue().put(message)




if __name__ == '__main__':
main()

用法

服务器:

$ python3 multiprocessing-queue-manager-server.py N

N是一个整数,指示应该创建多少个服务器。通过服务器复制一个 <server-address-N>输出,并将其作为每个 multiprocessing-queue-manager-client.py的第一个参数。

客户:

python3 multiprocessing-queue-manager-client.py <server-address-1>

结果

服务器:

Client 1: <item> from <server-address-1>

要点: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5


UPD : 创建一个包 给你

服务器:

import ipcq




with ipcq.QueueManagerServer(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO) as server:
server.get_queue().get()

客户:

import ipcq




client = ipcq.QueueManagerClient(address=ipcq.Address.AUTO, authkey=ipcq.AuthKey.AUTO)
client.get_queue().put('a message')

enter image description here

在尝试建立一种使用队列传递大熊猫数据框架的多处理方法时,我研究了堆栈溢出和网络中的多个答案。在我看来,每一个答案都在重复相同的解决方案,而没有考虑到在设置这样的计算时必然会遇到的大量边界情况。问题是同时有很多事情在起作用。任务的数量、工作人员的数量、每个任务的持续时间以及任务执行过程中可能出现的异常。所有这些都使得同步变得棘手,而且大多数答案都没有说明如何进行同步。因此,这是我在摆弄了几个小时之后得出的结论,希望它对大多数人来说足够通用,能够发现它的有用之处。

在编码示例之前的一些想法。由于 queue.Emptyqueue.qsize()或任何其他类似的方法对于流控制来说都是不可靠的,因此任何类似的代码都是不可靠的

while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break

是假的。即使几毫秒之后队列中出现了另一个任务,这也会杀死工作器。工人将无法恢复,过了一段时间,所有的工人将消失,因为他们随机发现队列暂时空。最终的结果是,主要的多处理函数(在进程中带有 join ()的函数)将在所有任务都未完成的情况下返回。不错嘛。如果您有成千上万个任务并且有一些任务丢失,那么祝您在调试过程中好运。

另一个问题是哨兵值的使用。许多人建议在队列中添加一个哨兵值来标记队列的末尾。但到底是给谁标记呢?如果有 N 个工作者,假设 N 是可用的核心数量,那么一个哨兵值将只标记队列的末尾给一个工作者。当没有剩下的工作时,所有其他的工人都会坐着等待更多的工作。我看到的典型例子是

while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break

一个工人将得到哨点值,而其他人将无限期地等待。我碰到的文章中没有提到,您需要将哨兵值提交到队列中,至少需要提交工作人员的次数,以便所有工作人员都能获得哨兵值。

另一个问题是在任务执行期间处理异常。同样,这些问题应该得到解决和管理。此外,如果您有一个 completed_tasks队列,您应该以确定性的方式独立地计算队列中有多少项,然后再决定完成任务。同样,依赖队列大小注定会失败并返回意外的结果。

在下面的示例中,par_proc()函数将接收一个任务列表,其中包括应该与任何命名参数和值一起执行这些任务的函数。

import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil


SENTINEL = None




def do_work(tasks_pending, tasks_completed):
# Get the current worker's name
worker_name = mp.current_process().name


while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break


print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})




def par_proc(job_list, num_cpus=None):


# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)


print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))


# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()


# Gather processes and results here
processes = []
results = []


# Count tasks
num_tasks = 0


# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)


# Use as many workers as there are cores (usually chokes the system so better use less)
num_workers = num_cpus


# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)


print('* Number of tasks: {}'.format(num_tasks))


# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()


# Gather the results
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1


for p in processes:
p.join()


return results

这里有一个测试,可以对其运行上述代码

def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3


def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3


task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]


results = par_proc(task_list)


job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])


assert job1 == 15
assert job2 == 21

除了一些例外,还有一个

def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3


def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3


task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]


results = par_proc(task_list)


job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])


assert not job1
assert job2 == 21

希望这对你有帮助。

验证了一个多生产者和多消费者的示例。它应该很容易修改,以涵盖其他情况,单/多生产者,单/多消费者。

from multiprocessing import Process, JoinableQueue
import time
import os


q = JoinableQueue()


def producer():
for item in range(30):
time.sleep(2)
q.put(item)
pid = os.getpid()
print(f'producer {pid} done')




def worker():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
q.task_done()


for i in range(5):
p = Process(target=worker, daemon=True).start()


# send thirty task requests to the worker
producers = []
for i in range(2):
p = Process(target=producer)
producers.append(p)
p.start()


# make sure producers done
for p in producers:
p.join()


# block until all workers are done
q.join()
print('All work completed')

说明:

  1. 本例中有两个生产者和五个消费者。
  2. JoinableQueue 用于确保处理队列中存储的所有元素。“ task _ done”用于工作者通知元素已完成。‘ q.join ()’将等待标记为 done 的所有元素。
  3. 有了 # 2,就没有必要加入等待每个工人。
  4. 但是重要的是要等待每个生产者将元素存储到队列中。否则,程序立即退出。