从多个进程处理单个文件

我有一个单独的大文本文件,我想在其中处理每一行(做一些操作) ,并将它们存储在数据库中。由于一个简单的程序花费的时间太长,我希望它通过多个进程或线程完成。 每个线程/进程都应该从单个文件中读取 DIFFERENT 数据(不同的行) ,并对它们的数据片段(行)进行一些操作,然后将它们放入数据库中,这样最终,我处理了所有的数据,我的数据库中就有了我需要的数据。

但我不知道如何处理这个问题。

63882 次浏览

well break the single big file into multiple smaller files and have each of them processed in separate threads.

What you are looking for is a Producer/Consumer pattern

Basic threading example

Here is a basic example using the threading module (instead of multiprocessing)

import threading
import Queue
import sys


def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()


if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20


# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()


# produce data
for i in xrange(total):
work.put(i)


work.join()


# get the results
for i in xrange(total):
print results.get()


sys.exit()

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

Basic multiprocessing.Pool example

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool


def process_line(line):
return "FOO: %s" % line


if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)


print results

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

Manual "pool" with limit and line re-sorting

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager
import time
import itertools


def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item


# exit signal
if line == None:
return


# fake work
time.sleep(.5)
result = (line_no, line)


out_list.append(result)




if __name__ == "__main__":
num_workers = 4


manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)


# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)


# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)


for p in pool:
p.join()


# get the results
# example:  [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)

Here's a really stupid example that I cooked up:

import os.path
import multiprocessing


def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!='\n' and n > 0:
n-=1
f.seek(n)
c=f.read(1)


f.seek(n)
return n


filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)


#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)


#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.




with open(filename,'r') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))


end_byte=[i-1 for i in start_byte] [1:] + [None]


def process_piece(filename,start,end):
with open(filename,'r') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)


# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.


returnobj=text
return returnobj


def wrapper(args):
return process_piece(*args)


filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)


pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)


#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

The tricky part here is to make sure that we split the file on newline characters so that you don't miss any lines (or only read partial lines). Then, each process reads it's part of the file and returns an object which can be put into the database by the main thread. Of course, you may even need to do this part in chunks so that you don't have to keep all of the information in memory at once. (this is quite easily accomplished -- just split the "args" list into X chunks and call pool.map(wrapper,chunk) -- See here)