如何对线程池使用 MDC?

在我们的软件中,我们广泛使用 MDC 来跟踪诸如会话 ID 和 Web 请求的用户名之类的东西。这种方法在原始线程中运行时效果很好。

但是,有很多事情需要在后台处理。为此,我们使用 java.concurrent.ThreadPoolExecutorjava.util.Timer类以及一些自滚动的 异步执行服务。所有这些服务都管理自己的线程池。

这就是 日志回放手册对在这样的环境中使用 MDC 所要说的:

映射的诊断上下文的副本不能始终由工作线程从起始线程继承。这就是 java.util.while 的情况。执行器用于线程管理。例如,newCachedThreadPool 方法创建一个 ThreadPoolExecator,并且像其他线程池代码一样,它具有复杂的线程创建逻辑。

在这种情况下,建议在向执行者提交任务之前,在原始(主)线程上调用 MDC.getCopyOfContextMap ()。当任务运行时,作为第一个操作,它应该调用 MDC.setContextMapValue ()来将原始 MDC 值的存储副本与新的 Execator 托管线程关联。

这很好,但是很容易忘记添加这些调用,并且在为时已晚之前没有简单的方法来识别问题。使用 Log4j 的唯一标志是您在日志中丢失了 MDC 信息,而使用 Logback 您将获得过时的 MDC 信息(因为胎面池中的线程从在它上面运行的第一个任务继承了它的 MDC)。这两者都是生产系统中的严重问题。

我没有看到我们的情况在任何方面特殊,但我无法找到很多关于这个问题在网上。显然,这不是很多人都会遇到的问题,所以一定有办法避免。我们到底做错了什么?

93904 次浏览

我们也遇到过类似的问题。您可能希望在启动/停止新线程之前扩展 ThreadPoolExecator 并重写 before/after Execute 方法以执行所需的 MDC 调用。

是的,这也是我经常遇到的问题。有一些变通方法(如上所述,手动设置) ,但理想情况下,您需要的解决方案是

  • 设置一致的 MDC;
  • 避免 MDC 不正确但您不知道的隐式 bug; 以及
  • 最大限度地减少对线程池使用方式的更改(例如,到处使用 MyCallable子类化 Callable,或类似的丑陋)。

下面是我使用的一个满足这三个需求的解决方案。

(作为附带说明,这个执行者可以被创建,并提供给番石榴的 MoreExecutors.listeningDecorator(),如果 你可以使用番石榴的 ListanableFuture。)

import org.slf4j.MDC;


import java.util.Map;
import java.util.concurrent.*;


/**
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
* <p/>
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
* <p/>
* Created by jlevy.
* Date: 6/14/13
*/
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {


final private boolean useFixedContext;
final private Map<String, Object> fixedContext;


/**
* Pool where task threads take MDC from the submitting thread.
*/
public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}


/**
* Pool where task threads take fixed MDC from the thread that creates the pool.
*/
@SuppressWarnings("unchecked")
public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
}


/**
* Pool where task threads always have a specified, fixed MDC.
*/
public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}


private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.fixedContext = fixedContext;
useFixedContext = (fixedContext != null);
}


@SuppressWarnings("unchecked")
private Map<String, Object> getContextForTask() {
return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
}


/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(Runnable command) {
super.execute(wrap(command, getContextForTask()));
}


public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
return new Runnable() {
@Override
public void run() {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
}
};
}
}

与前面提到的解决方案类似,在创建 RunnableFuture时,可以覆盖 RunnableCallablenewTaskFor方法以包装参数(参见公认的解决方案)。

注意: 因此,必须调用 executorServicesubmit方法而不是 execute方法。

对于 ScheduledThreadPoolExecutor,将改写 decorateTask方法。

这就是我使用固定线程池和执行器的方法:

ExecutorService executor = Executors.newFixedThreadPool(4);
Map<String, String> mdcContextMap = MDC.getCopyOfContextMap();

在穿线部分:

executor.submit(() -> {
MDC.setContextMap(mdcContextMap);
// my stuff
});

恕我直言,最好的解决办法是:

  • 使用 ThreadPoolTaskExecutor
  • 实现自己的 TaskDecorator
  • 使用它: executor.setTaskDecorator(new LoggingTaskDecorator());

室内设计师可以是这样的:

private final class LoggingTaskDecorator implements TaskDecorator {


@Override
public Runnable decorate(Runnable task) {
// web thread
Map<String, String> webThreadContext = MDC.getCopyOfContextMap();
return () -> {
// work thread
try {
// TODO: is this thread safe?
MDC.setContextMap(webThreadContext);
task.run();
} finally {
MDC.clear();
}
};
}


}

另一个类似于现有答案的变体是实现 ExecutorService并允许向其传递一个委托。然后使用泛型,它仍然可以公开实际的委托,以防需要获得一些统计信息(只要不使用其他修改方法)。

参考编号:

public class MDCExecutorService<D extends ExecutorService> implements ExecutorService {


private final D delegate;


public MDCExecutorService(D delegate) {
this.delegate = delegate;
}


@Override
public void shutdown() {
delegate.shutdown();
}


@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}


@Override
public boolean isShutdown() {
return delegate.isShutdown();
}


@Override
public boolean isTerminated() {
return delegate.isTerminated();
}


@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}


@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrap(task));
}


@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrap(task), result);
}


@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrap(task));
}


@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks));
}


@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(wrapCollection(tasks), timeout, unit);
}


@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapCollection(tasks));
}


@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapCollection(tasks), timeout, unit);
}


@Override
public void execute(Runnable command) {
delegate.execute(wrap(command));
}


public D getDelegate() {
return delegate;
}


/* Copied from https://github.com/project-ncl/pnc/blob/master/common/src/main/java/org/jboss/pnc/common
/concurrent/MDCWrappers.java */


private static Runnable wrap(final Runnable runnable) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}


private static <T> Callable<T> wrap(final Callable<T> callable) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}


private static <T> Consumer<T> wrap(final Consumer<T> consumer) {
final Map<String, String> context = MDC.getCopyOfContextMap();
return (t) -> {
Map previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
consumer.accept(t);
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}


private static <T> Collection<Callable<T>> wrapCollection(Collection<? extends Callable<T>> tasks) {
Collection<Callable<T>> wrapped = new ArrayList<>();
for (Callable<T> task : tasks) {
wrapped.add(wrap(task));
}
return wrapped;
}
}

如果您在与 Spring 框架相关的环境中遇到这个问题,您可以使用 @Async注释来运行任务,您可以使用 TaskDecorator方法来装饰任务。

这里提供了一个示例:

我遇到了这个问题,上面的文章帮助我解决了这个问题,所以我在这里分享它。