在Python中使用多处理时,我应该如何记录日志?

现在我在一个框架中有一个中心模块,它使用Python 2.6 multiprocessing模块生成多个进程。因为它使用multiprocessing,所以有一个模块级的多处理感知日志LOG = multiprocessing.get_logger()。根据的文档,这个记录器(编辑)的具有进程共享锁,这样你就不会因为有多个进程同时写入而混淆sys.stderr(或任何文件句柄)中的内容。

我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中心模块上的所有依赖都使用多处理感知日志。这是恼人的框架,更不用说对框架的所有客户端。还有我想不到的选择吗?

212188 次浏览

解决这个问题的唯一方法是非侵入性的:

  1. 生成每个工作进程,使其日志流向不同的文件描述符(磁盘或管道)。理想情况下,所有日志条目都应该有时间戳。
  2. 你的控制器进程可以执行下面的一个:
    • 如果使用磁盘文件:在运行结束时合并日志文件,按时间戳排序
    • 如果使用管道(推荐):将所有管道的日志条目即时合并到一个中央日志文件中。(例如,从管道的文件描述符中定期select,对可用的日志项执行归并排序,并刷新到集中日志。重复。)
    • 李< / ul > < / >

只需将日志记录器的实例发布到某个地方。这样,其他模块和客户端就可以使用你的API来获取记录器,而不必import multiprocessing

一种替代方法是将多处理日志写入一个已知文件,并注册一个atexit处理程序来加入这些进程,并从stderr上读取它;但是,您无法通过这种方式获得stderr上输出消息的实时流。

另一种选择可能是logging中各种非基于文件的日志处理程序:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(和其他人)

通过这种方式,您可以轻松地在某个地方创建一个日志守护进程,以便安全地对其进行写入并正确地处理结果。(例如,一个简单的套接字服务器,它只是解pickle消息并将其发送到自己的旋转文件处理程序。)

SyslogHandler也会为你处理这个问题。当然,你可以使用你自己的syslog实例,而不是系统实例。

我刚刚写了一个我自己的日志处理程序,它只是通过管道将所有内容提供给父进程。我只测试了十分钟,但它似乎工作得很好。

(注意:这是硬编码到RotatingFileHandler,这是我自己的用例。)


更新:@javier现在维护这个方法作为一个包在Pypi上可用-见multiprocessing-logging在Pypi, github在https://github.com/jruere/multiprocessing-logging


更新:实现!

现在它使用队列来正确地处理并发,并正确地从错误中恢复。现在,我已经在生产中使用了几个月了,下面的当前版本工作起来没有问题。

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback


class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)


self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)


t = threading.Thread(target=self.receive)
t.daemon = True
t.start()


def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)


def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)


def send(self, s):
self.queue.put_nowait(s)


def _format_record(self, record):
# ensure that exc_info and args
# have been stringified.  Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None


return record


def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)


def close(self):
self._handler.close()
logging.Handler.close(self)

我喜欢zzzeek的回答。我只会用管道代替队列,因为如果多个线程/进程使用相同的管道端来生成日志消息,它们将被混淆。

我也喜欢zzzeek的回答,但Andre是正确的,需要一个队列来防止乱码。我的运气还不错,但确实看到了乱码,这是意料之中的。实现它比我想象的要难,特别是由于在Windows上运行,其中有一些关于全局变量和东西的额外限制(参见:如何在Windows上实现Python多处理?)

但是,我终于让它工作了。这个例子可能并不完美,所以欢迎评论和建议。它也不支持设置格式化程序或根日志记录器以外的任何内容。基本上,您必须在每个池进程中用队列重新配置记录器,并在记录器上设置其他属性。

同样,欢迎提出任何关于如何使代码更好的建议。我当然还不知道所有的Python技巧:-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue


class MultiProcessingLogHandler(logging.Handler):
def __init__(self, handler, queue, child=False):
logging.Handler.__init__(self)


self._handler = handler
self.queue = queue


# we only want one of the loggers to be pulling from the queue.
# If there is a way to do this without needing to be passed this
# information, that would be great!
if child == False:
self.shutdown = False
self.polltime = 1
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()


def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)


def receive(self):
#print "receive on"
while (self.shutdown == False) or (self.queue.empty() == False):
# so we block for a short period of time so that we can
# check for the shutdown cases.
try:
record = self.queue.get(True, self.polltime)
self._handler.emit(record)
except Queue.Empty, e:
pass


def send(self, s):
# send just puts it in the queue for the server to retrieve
self.queue.put(s)


def _format_record(self, record):
ei = record.exc_info
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None  # to avoid Unpickleable error


return record


def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)


def close(self):
time.sleep(self.polltime+1) # give some time for messages to enter the queue.
self.shutdown = True
time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown


def __del__(self):
self.close() # hopefully this aids in orderly shutdown when things are going poorly.


def f(x):
# just a logging command...
logging.critical('function number: ' + str(x))
# to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
time.sleep(x % 3)


def initPool(queue, level):
"""
This causes the logging module to be initialized with the necessary info
in pool threads to work correctly.
"""
logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
logging.getLogger('').setLevel(level)


if __name__ == '__main__':
stream = StringIO.StringIO()
logQueue = multiprocessing.Queue(100)
handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
logging.getLogger('').addHandler(handler)
logging.getLogger('').setLevel(logging.DEBUG)


logging.debug('starting main')


# when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
pool.map(f, range(0,50))
pool.close()


logging.debug('done')
logging.shutdown()
print "stream output is:"
print stream.getvalue()

其他线程的变体,它将日志记录和队列线程分开。

"""sample code for logging in subprocesses using multiprocessing


* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.


tested with Python 2.5 and 2.6 on Linux and Windows


"""


import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys


DEFAULT_LEVEL = logging.DEBUG


formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")


class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses


It simply puts items on a Queue for the main process to log.


"""


def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue


def emit(self, record):
self.queue.put(record)


class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log


This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.


"""


def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True


def run(self):
"""read from the queue and write to the log handlers


The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.


Note that we're using the name of the original logger.


"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)


class LoggingProcess(multiprocessing.Process):


def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue


def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler.  If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.


for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)


# On Windows, the level will not be inherited.  Also, we could just
# set the level to log everything here and filter it in the main
# process handlers.  For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger


def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))




if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)

我有一个解决方案,类似于ironhacker的,除了我使用日志。在我的一些代码中,我发现我需要在将异常传递回队列之前格式化它,因为回溯是不能pickle的:

class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "\n":
s = s[:-1]
return s

通过使用处理程序,当前所有解决方案都与日志记录配置过于耦合。我的解决方案具有以下架构和功能:

  • 你可以使用你想要的任何日志配置
  • 日志记录在守护进程线程中完成
  • 使用上下文管理器安全关闭守护进程
  • 与日志线程的通信由multiprocessing.Queue完成
  • 在子进程中,logging.Logger(和已经定义的实例)被打补丁以将所有记录发送到队列
  • :在发送到队列之前格式化回溯和消息,以防止pickle错误

带有使用示例和输出的代码可以在以下Gist中找到:https://gist.github.com/schlamar/7003737

如何将所有日志记录委托给另一个进程,从队列中读取所有日志条目?

LOG_QUEUE = multiprocessing.JoinableQueue()


class CentralLogger(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.log = logger.getLogger('some_config')
self.log.info("Started Central Logging process")


def run(self):
while True:
log_level, message = self.queue.get()
if log_level is None:
self.log.info("Shutting down Central Logging process")
break
else:
self.log.log(log_level, message)


central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

只需通过任何多进程机制甚至继承共享LOG_QUEUE,就可以很好地工作!

如果死锁发生在logging模块中的锁、线程和fork的组合中,则会在错误报告6721中报告(也请参阅相关SO问题)。

有一个小的修复解决方案张贴在这里

然而,这只会修复logging中任何潜在的死锁。这并不能解决问题,事情可能会变得混乱。请参阅此处提供的其他答案。

QueueHandler在Python 3.2+中是原生的,并且正是这样做的。它很容易在以前的版本中复制。

Python文档有两个完整的例子:从多个进程记录到单个文件

对于那些使用Python <3.2,只需将QueueHandlerhttps://gist.github.com/vsajip/591589复制到您自己的代码中,或者导入logutils

每个进程(包括父进程)将其日志记录放在Queue上,然后一个listener线程或进程(为每个进程提供了一个示例)将其拾取并将它们全部写入一个文件-没有损坏或乱码的风险。

下面是另一个简单的解决方案,适用于从谷歌到这里的其他人(比如我)。日志记录应该很简单!仅适用于3.2或更高版本。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random




def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i




def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)




def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))


# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()


logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)


return ql, q




def main():
q_listener, q = logger_init()


logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()


if __name__ == '__main__':
main()
下面是一个可以在Windows环境中使用的类,需要ActivePython。 你也可以继承其他日志处理程序(StreamHandler等)

class SyncronizedFileHandler(logging.FileHandler):
MUTEX_NAME = 'logging_mutex'


def __init__(self , *args , **kwargs):


self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)


def emit(self, *args , **kwargs):
try:
win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
finally:
win32event.ReleaseMutex(self.mutex)
return ret

下面是一个演示用法的例子:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool


def f(i):
time.sleep(random.randint(0,10) * 0.1)
ch = random.choice(letters)
logging.info( ch * 30)




def init_logging():
'''
initilize the loggers
'''
formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)


file_handler = SyncronizedFileHandler(sys.argv[1])
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)


#must be called in the parent and in every worker process
init_logging()


if __name__ == '__main__':
#multiprocessing stuff
pool = Pool(processes=10)
imap_result = pool.imap(f , range(30))
for i , _ in enumerate(imap_result):
pass

下面是我简单的破解/变通方法…不是最全面的,但很容易修改,比我在写这篇文章之前找到的任何其他答案都更容易阅读和理解:

import logging
import multiprocessing


class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))


def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2


def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)


if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join()  # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)

由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(侦听器),因此使用ZeroMQ来实现PUB-SUB消息传递确实是一种选择。

此外,PyZMQ模块(ZMQ的Python绑定)实现了PUBHandler,这是通过ZMQ发布日志消息的对象。酒吧的套接字。

有一个网上解决方案,用于从使用PyZMQ和PUBHandler的分布式应用程序集中记录日志,它可以很容易地用于多个发布进程的本地工作。

formatters = {
logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}


# This one will be used by publishing processes
class PUBLogger:
def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
self._handler = PUBHandler(self.pub)
self._handler.formatters = formatters
self._logger.addHandler(self._handler)


@property
def logger(self):
return self._logger


# This one will be used by listener process
class SUBLogger:
def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
self.output_dir = output_dir
self._logger = logging.getLogger()
self._logger.setLevel(logging.DEBUG)


self.ctx = zmq.Context()
self._sub = self.ctx.socket(zmq.SUB)
self._sub.bind('tcp://*:{1}'.format(ip, port))
self._sub.setsockopt(zmq.SUBSCRIBE, "")


handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
handler.setFormatter(formatter)
self._logger.addHandler(handler)


@property
def sub(self):
return self._sub


@property
def logger(self):
return self._logger


#  And that's the way we actually run things:


# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
sub_logger = SUBLogger(ip)
while not event.is_set():
try:
topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
log_msg = getattr(logging, topic.lower())
log_msg(message)
except zmq.ZMQError as zmq_error:
if zmq_error.errno == zmq.EAGAIN:
pass




# Publisher processes loggers should be initialized as follows:


class Publisher:
def __init__(self, stop_event, proc_id):
self.stop_event = stop_event
self.proc_id = proc_id
self._logger = pub_logger.PUBLogger('127.0.0.1').logger


def run(self):
self._logger.info("{0} - Sending message".format(proc_id))


def run_worker(event, proc_id):
worker = Publisher(event, proc_id)
worker.run()


# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
args=('127.0.0.1'), stop_event,))
sub_logger_process.start()


#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
processes.append(Process(target=run_worker,
args=(stop_event, i,)))
for p in processes:
p.start()

有一个很棒的套餐

< p >包: https://pypi.python.org/pypi/multiprocessing-logging/ < / p > < p >代码: https://github.com/jruere/multiprocessing-logging < / p >

安装:

pip install multiprocessing-logging

然后添加:

import multiprocessing_logging


# This enables logs inside process
multiprocessing_logging.install_mp_handler()

最简单的想法是:

  • 获取当前进程的文件名和进程id。
  • 设置一个[WatchedFileHandler][1]。此处理程序的原因将在在这里中详细讨论,但简而言之,其他日志处理程序存在某些更糟糕的竞态条件。这个有最短的竞态条件窗口。
    • 选择日志保存路径,例如“/var/log/…”
    • 李< / ul > < / >

对于可能需要这个的人,我为multiprocessing_logging包写了一个装饰器,它将当前进程名添加到日志中,这样就可以清楚地看到谁记录了什么。

它还运行install_mp_handler(),因此在创建池之前运行它是没有用的。

这让我可以看到哪个工作人员创建了哪些日志消息。

下面是蓝图和示例:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging


# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)




# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
class MultiProcessLogFilter(logging.Filter):
def filter(self, record):
try:
process_name = multiprocessing.current_process().name
except BaseException:
process_name = __name__
record.msg = f'{process_name} :: {record.msg}'
return True


multiprocessing_logging.install_mp_handler()
f = MultiProcessLogFilter()


# Wraps is needed here so apply / apply_async know the function name
@wraps(fn)
def wrapper(*args, **kwargs):
logger.removeFilter(f)
logger.addFilter(f)
return fn(*args, **kwargs)


return wrapper




# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
logger.info(f'test function called via: {argument}')




# You can also redefine undecored functions
def undecorated_function():
logger.info('I am not decorated')




@logs_mp_process_names
def redecorated(*args, **kwargs):
return undecorated_function(*args, **kwargs)




# Enjoy
if __name__ == '__main__':
with multiprocessing.Pool() as mp_pool:
# Also works with apply_async
mp_pool.apply(test, ('mp pool',))
mp_pool.apply(redecorated)
logger.info('some main logs')
test('main program')

到2020年,似乎有一种更简单的多处理日志记录方式。

这个函数将创建记录器。你可以在这里设置格式和你想要输出的位置(文件,stdout):

def create_logger():
import multiprocessing, logging
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(\
'[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
handler = logging.FileHandler('logs/your_file_name.log')
handler.setFormatter(formatter)


# this bit will make sure you won't have
# duplicated messages in the output
if not len(logger.handlers):
logger.addHandler(handler)
return logger


在init中实例化记录器:

if __name__ == '__main__':
from multiprocessing import Pool
logger = create_logger()
logger.info('Starting pooling')
p = Pool()
# rest of the code

现在,你只需要在每个需要记录日志的函数中添加这个引用:

logger = create_logger()

并输出消息:

logger.info(f'My message from {something}')

希望这能有所帮助。

我想建议使用logger_tt库:https://github.com/Dragon2fly/logger_tt

multiporcessing_logging库不能在我的macOSX上工作,而logger_tt可以。

concurrent-log-handler似乎完美地完成了这项工作。在Windows上测试。还支持POSIX系统。

主要思想

  • 使用返回记录器的函数创建一个单独的文件。记录器必须为每个进程拥有ConcurrentRotatingFileHandler的新实例。下面给出的例子函数get_logger()
  • 创建记录器是在流程初始化时完成的。对于multiprocessing.Process子类,它意味着run()方法的开始。

详细说明

在这个例子中,我将使用下面的文件结构

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

代码

子进程

# child.py


import multiprocessing as mp
import time
from somemodule import do_something




class ChildProcess(mp.Process):
def __init__(self):
self.logger = None
super().__init__()


def run(self):
from logs import get_logger
self.logger = get_logger()




while True:
time.sleep(1)
self.logger.info("Child process")
do_something()


  • 继承multiprocessing.Process的简单子进程,并简单地记录到文件文本“子进程”;
  • 重要的: get_logger()run()内部被调用,或者在子进程内部的其他地方被调用(模块级别或__init__()中)。这是必需的,因为get_logger()创建了ConcurrentRotatingFileHandler实例,并且每个进程都需要新的实例。
  • do_something的使用只是为了证明它适用于第三方库代码,而第三方库代码并不知道你正在使用concurrent-log-handler。

主要过程

# main.py


import logging
import multiprocessing as mp
import time


from child import ChildProcess
from somemodule import do_something




class MainProcess(mp.Process):
def __init__(self):
self.logger = logging.getLogger()
super().__init__()


def run(self):
from logs import get_logger


self.logger = get_logger()
self.child = ChildProcess()
self.child.daemon = True
self.child.start()


while True:
time.sleep(0.5)
self.logger.critical("Main process")
do_something()




  • 主进程,每秒两次登录到文件中。同样继承自multiprocessing.Process
  • get_logger()do_something()的注释也适用于子进程。

日志设置

# logs.py


import logging
import os


from concurrent_log_handler import ConcurrentRotatingFileHandler


LOGLEVEL = logging.DEBUG




def get_logger():
logger = logging.getLogger()


if logger.handlers:
return logger


# Use an absolute path to prevent file rotation trouble.
logfile = os.path.abspath("mylog.log")


logger.setLevel(LOGLEVEL)


# Rotate log after reaching 512K, keep 5 old copies.
filehandler = ConcurrentRotatingFileHandler(
logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
)
filehandler.setLevel(LOGLEVEL)


# create also handler for displaying output in the stdout
ch = logging.StreamHandler()
ch.setLevel(LOGLEVEL)


formatter = logging.Formatter(
"%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
)


# add formatter to ch
ch.setFormatter(formatter)
filehandler.setFormatter(formatter)


logger.addHandler(ch)
logger.addHandler(filehandler)


return logger
  • 这使用了concurrent-log-handler包中的ConcurrentRotatingFileHandler。每个进程都需要一个新的ConcurrentRotatingFileHandler实例。
  • 注意,ConcurrentRotatingFileHandler的所有参数在每个进程中都应该是相同的。

示例应用程序

# myapp.py


if __name__ == "__main__":
from main import MainProcess


p = MainProcess()
p.start()
  • 这只是一个关于如何启动多进程应用程序的简单示例

第三方模块使用标准logging的例子

# somemodule.py


import logging


logger = logging.getLogger("somemodule")


def do_something():
logging.info("doing something")


  • 只是一个简单的例子来测试来自第三方代码的记录器是否正常工作。

示例输出

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]


这是我的变通办法。线程的登录是正常的,进程的登录是通过队列完成的。这不是最优雅的解决方案,但即使在多个文件/类场景下,它在Windows上也很好。

import logging
import multiprocessing as mp
import threading as th
from typing import Union




class MPLogger:
def __init__(self, name: str, level: str, mp_queue: mp.Queue, file_name: Union[str, None] = None):
self.queue = mp_queue
self.logger = logging.getLogger(name)
self.thread = None


if level in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
self.logger.setLevel(level)
else:
self.logger.setLevel("INFO")
if file_name:
handler = logging.FileHandler(file_name)
else:
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s/%(levelname)s] %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.debug(f"Logger: {name} with level: {level} initialized")


def debug(self, msg) -> None:
self.logger.debug(msg)
def info(self, msg) -> None:
self.logger.info(msg)
def warning(self, msg) -> None:
self.logger.warning(msg)
def error(self, msg) -> None:
self.logger.error(msg)
def critical(self, msg) -> None:
self.logger.critical(msg)


def _mp_logger(self) -> None:
while True:
record = self.queue.get()
if record is None:
break


if record[0] == "DEBUG":
self.logger.debug(record[1])
elif record[0] == "INFO":
self.logger.info(record[1])
elif record[0] == "WARNING":
self.logger.warning(record[1])
elif record[0] == "ERROR":
self.logger.error(record[1])
elif record[0] == "CRITICAL":
self.logger.critical(record[1])
else:
self.logger.debug(f"Invalid log level: {record[1]}")


def start_logging(self) -> None:
self.stop_logging()
self.thread = th.Thread(target=self._mp_logger)
self.thread.start()
self.logger.debug("MP Logging started.")


def stop_logging(self) -> None:
if self.thread is None:
return
if self.thread.is_alive():
self.queue.put(None)
self.thread.join()
self.thread = None
self.logger.debug("MP Logging stopped.")


def is_running(self) -> bool:
if self.thread is None:
return False
if self.thread.is_alive():
return True
else:
return False


def get_queue(self) -> mp.Queue:
return self.queue

用法:

class ExampleClass:
def __init__(self, mp_logger: MPLogger):
mp_logger.error("ExampleClass Hi")
proc2 = mp.Process(target=self.process2, args=(mp_logger.get_queue(),))
proc2.start()
proc2.join()


@staticmethod
def process2(log_q: mp.Queue):
log_q.put(("CRITICAL", "process2 Hi"))




def process1(log_q: mp.Queue):
log_q.put(("WARNING", "process1 Hi"))




if __name__ == '__main__':
mp_logger = MPLogger("mp_logger", "DEBUG", mp.Queue())
mp_logger.start_logging()


mp_logger.info("main Hi")


proc1 = mp.Process(target=process1, args=(mp_logger.get_queue(),))
proc1.start()
proc1.join()


example_class = ExampleClass(mp_logger)


mp_logger.stop_logging()

输出:

[2022-09-11 12:50:14,354/DEBUG] Logger: mp_logger with level: DEBUG initialized
[2022-09-11 12:50:14,354/DEBUG] MP Logging started.
[2022-09-11 12:50:14,354/INFO] main Hi
[2022-09-11 12:50:14,465/WARNING] process1 Hi
[2022-09-11 12:50:14,480/ERROR] ExampleClass Hi
[2022-09-11 12:50:14,574/CRITICAL] process2 Hi
[2022-09-11 12:50:14,590/DEBUG] MP Logging stopped.