安全地将 Python 多处理写入文件

我试图解决一个涉及许多子问题的大型数值问题,我使用 Python 的多处理模块(特别是 Pool.map)将不同的独立子问题分解到不同的核上。每个子问题都涉及到计算大量的子问题,我试图通过将这些结果存储到一个文件(如果它们还没有被任何进程计算的话)来有效地记录这些结果,否则就跳过计算,直接从文件中读取结果。

我在处理这些文件时遇到了并发性问题: 不同的进程有时会检查子问题是否已经被计算出来(通过查找存储结果的文件) ,查看它是否已经被计算出来,然后运行计算,然后尝试在同一时间将结果写入同一个文件。如何避免编写这样的冲突?

102821 次浏览

@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:

import multiprocessing as mp
import time


fn = 'c:/temp/temp.txt'


def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res


def listener(q):
'''listens for messages on the q, writes to file. '''


with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()


def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)


#put listener to work first
watcher = pool.apply_async(listener, (q,))


#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)


# collect results from the workers through the pool result queue
for job in jobs:
job.get()


#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()


if __name__ == "__main__":
main()

It looks to me that you need to use Manager to temporarily save your results to a list and then write the results from the list to a file. Also, use starmap to pass the object you want to process and the managed list. The first step is to build the parameter to be passed to starmap, which includes the managed list.

from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd


def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)


if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])


# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])


with Pool() as p:
p.starmap(worker, params)

From this point you need to decide how you are going to handle the list. If you have tons of RAM and a huge data set feel free to concatenate using pandas. Then you can save of the file very easily as a csv or a pickle.

        df = pd.concat(row, ignore_index=True)


df.to_pickle('data.pickle')
df.to_csv('data.csv')