我应该如何单元测试多线程代码?

到目前为止,我避免了测试多线程代码的噩梦,因为它看起来太像雷区了。我想问一下人们是如何测试依赖线程成功执行的代码的,或者人们是如何测试那些只有当两个线程以给定方式交互时才会出现的问题的?

对于今天的程序员来说,这似乎是一个非常关键的问题,在这个问题上汇集我们的知识将是有用的。

200161 次浏览

我有过测试线程代码的不幸任务,它们绝对是我写过的最难的测试。

在编写测试时,我使用了委托和事件的组合。基本上,它是关于使用PropertyNotifyChanged事件和WaitCallback或某种ConditionalWaiter轮询。

我不确定这是否是最好的方法,但它对我来说已经奏效了。

确实很难!在我的(C++)单元测试中,我按照使用的并发模式将其分为几类:

  1. 在单个线程中操作且不具有线程感知的类的单元测试——简单,像往常一样测试。

  2. 监控对象的单元测试(那些在调用者的控制线程中执行同步方法的单元测试),它们公开了同步的公共API——实例化多个行使API的模拟线程。构造行使被动对象内部条件的场景。包括一个运行时间更长的测试,基本上可以在很长一段时间内从多个线程中击败它。我知道这不科学,但它确实建立了信心。

  3. 活动物体的单元测试(那些封装自己的线程或控制线程的单元测试)——类似于上面的#2,但根据类设计有所不同。公共API可能是阻塞的或非阻塞的,调用者可能获得期货,数据可能到达队列或需要出队。这里有许多可能的组合;白盒离开。仍然需要多个模拟线程来调用被测对象。

作为旁白:

在我进行的内部开发人员培训中,我教授并发支柱和这两种模式,作为思考和分解并发问题的主要框架。显然有更高级的概念,但我发现这组基础知识有助于让工程师远离困境。它还导致代码更具单元可测试性,如上所述。

听着,要做到这一点并不容易。我正在做一个本质上是多线程的项目。事件来自操作系统,我必须同时处理它们。

处理测试复杂的多线程应用程序代码的最简单方法是:如果它太复杂而无法测试,那么你就错了。如果你有一个具有多个线程作用的单个实例,并且你无法测试这些线程相互执行的情况,那么你的设计需要重做。这既简单又复杂。

有很多方法可以避免线程同时运行通过实例。最简单的是让你的所有对象都是不可变的。当然,这通常是不可能的。所以你必须在你的设计中确定线程与同一实例交互的那些地方,并减少这些地方的数量。通过这样做,你隔离了几个实际发生多线程的类,降低了测试系统的整体复杂性。

但你必须意识到,即使这样做,你仍然无法测试两个线程相互踩踏的每种情况。为此,你必须在同一测试中同时运行两个线程,然后准确控制它们在任何给定时刻执行的行。你能做的最好的就是模拟这种情况。但这可能需要你专门为测试编写代码,这充其量只是朝着真正的解决方案迈出的半步。

测试代码是否存在线程问题的最好方法可能是静态分析代码。如果你的线程代码不遵循一组有限的线程安全模式,那么你可能会遇到问题。我相信VS中的代码分析确实包含了一些线程知识,但可能不多。

看,就目前的情况来看(并且可能在未来的很长一段时间内都会如此),测试多线程应用的最佳方法是尽可能降低线程代码的复杂性。最小化线程交互的区域,尽可能测试,并使用代码分析来识别危险区域。

Pete Goodliffe螺纹单元试验代码上有一个序列。

这很难。我采取了更简单的方法,并尝试将线程代码从实际测试中抽象出来。Pete确实提到我这样做的方式是错误的,但我要么做对了分离,要么我只是很幸运。

我在测试多线程代码时也遇到了严重的问题。然后我在Gerard Meszaros的“xUnit Test Patterns”中找到了一个非常酷的解决方案。他描述的模式称为卑微的物体

基本上,它描述了如何将逻辑提取到一个独立的、易于测试的组件中,该组件与其环境解耦。测试此逻辑后,您可以测试复杂的行为(多线程、异步执行等…)

我喜欢编写两个或更多的测试方法来在并行线程上执行,它们中的每一个都调用被测对象。我一直在使用Slip()调用来协调来自不同线程的调用顺序,但这并不真正可靠。它也慢得多,因为你必须睡眠足够长的时间才能正常工作。

我从编写FindBugs的同一个组中找到了多线程TCJava库。它允许您指定事件的顺序而无需使用Slip(),并且它很可靠。我还没有尝试过。

这种方法的最大限制是它只允许您测试您怀疑会造成麻烦的场景。正如其他人所说,您确实需要将多线程代码隔离到少数简单类中,才有希望彻底测试它们。

一旦你仔细测试了你预计会引起麻烦的场景,一个不科学的测试会在类中同时抛出一堆请求,这是寻找意外麻烦的好方法。

更新时间:我使用过Multithreaded TCJava库,它运行良好。我还将它的一些功能移植到我称之为测试信息的. NET版本。

有关Java,请查看JCIP的第12章。有一些编写确定性、多线程单元测试的具体示例,至少可以测试并发代码的正确性和不变性。

通过单元测试“证明”线程安全性要冒险得多。我的信念是,在各种平台/配置上进行自动化集成测试会更好。

我做了很多这样的事情,是的,它很糟糕。

一些小贴士:

  • 创建者用于运行多个测试线程
  • alphaWorks竞赛到仪器类以导致交错在迭代之间变化
  • 创建一个throwable字段并将其检查在tearDown中(参见清单1)。如果您在另一个线程中捕获了错误的异常,只需将其分配给throwable。
  • 我在清单2中创建了utils类,并发现它非常宝贵,尤其是waitForVerify和waitForConsole,它们将大大提高测试的性能。
  • 在测试中充分利用AtomicBoolean。它是线程安全的,您通常需要最终引用类型来存储来自回调类等的值。参见清单3中的示例。
  • 确保始终为您的测试提供超时(例如,@Test(timeout=60*1000)),因为并发测试有时会在中断时永远挂起。

清单1:

@Afterpublic void tearDown() {if ( throwable != null )throw throwable;}

清单2:

import static org.junit.Assert.fail;import java.io.File;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Proxy;import java.util.Random;import org.apache.commons.collections.Closure;import org.apache.commons.collections.Predicate;import org.apache.commons.lang.time.StopWatch;import org.easymock.EasyMock;import org.easymock.classextension.internal.ClassExtensionHelper;import static org.easymock.classextension.EasyMock.*;
import ca.digitalrapids.io.DRFileUtils;
/*** Various utilities for testing*/public abstract class DRTestUtils{static private Random random = new Random();
/** Calls {@link #waitForCondition(Integer, Integer, Predicate, String)} with* default max wait and check period values.*/static public void waitForCondition(Predicate predicate, String errorMessage)throws Throwable{waitForCondition(null, null, predicate, errorMessage);}
/** Blocks until a condition is true, throwing an {@link AssertionError} if* it does not become true during a given max time.* @param maxWait_ms max time to wait for true condition. Optional; defaults* to 30 * 1000 ms (30 seconds).* @param checkPeriod_ms period at which to try the condition. Optional; defaults* to 100 ms.* @param predicate the condition* @param errorMessage message use in the {@link AssertionError}* @throws Throwable on {@link AssertionError} or any other exception/error*/static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,Predicate predicate, String errorMessage) throws Throwable{waitForCondition(maxWait_ms, checkPeriod_ms, predicate, new Closure() {public void execute(Object errorMessage){fail((String)errorMessage);}}, errorMessage);}
/** Blocks until a condition is true, running a closure if* it does not become true during a given max time.* @param maxWait_ms max time to wait for true condition. Optional; defaults* to 30 * 1000 ms (30 seconds).* @param checkPeriod_ms period at which to try the condition. Optional; defaults* to 100 ms.* @param predicate the condition* @param closure closure to run* @param argument argument for closure* @throws Throwable on {@link AssertionError} or any other exception/error*/static public void waitForCondition(Integer maxWait_ms, Integer checkPeriod_ms,Predicate predicate, Closure closure, Object argument) throws Throwable{if ( maxWait_ms == null )maxWait_ms = 30 * 1000;if ( checkPeriod_ms == null )checkPeriod_ms = 100;StopWatch stopWatch = new StopWatch();stopWatch.start();while ( !predicate.evaluate(null) ) {Thread.sleep(checkPeriod_ms);if ( stopWatch.getTime() > maxWait_ms ) {closure.execute(argument);}}}
/** Calls {@link #waitForVerify(Integer, Object)} with <code>null</code>* for {@code maxWait_ms}*/static public void waitForVerify(Object easyMockProxy)throws Throwable{waitForVerify(null, easyMockProxy);}
/** Repeatedly calls {@link EasyMock#verify(Object[])} until it succeeds, or a* max wait time has elapsed.* @param maxWait_ms Max wait time. <code>null</code> defaults to 30s.* @param easyMockProxy Proxy to call verify on* @throws Throwable*/static public void waitForVerify(Integer maxWait_ms, Object easyMockProxy)throws Throwable{if ( maxWait_ms == null )maxWait_ms = 30 * 1000;StopWatch stopWatch = new StopWatch();stopWatch.start();for(;;) {try{verify(easyMockProxy);break;}catch (AssertionError e){if ( stopWatch.getTime() > maxWait_ms )throw e;Thread.sleep(100);}}}
/** Returns a path to a directory in the temp dir with the name of the given* class. This is useful for temporary test files.* @param aClass test class for which to create dir* @return the path*/static public String getTestDirPathForTestClass(Object object){
String filename = object instanceof Class ?((Class)object).getName() :object.getClass().getName();return DRFileUtils.getTempDir() + File.separator +filename;}
static public byte[] createRandomByteArray(int bytesLength){byte[] sourceBytes = new byte[bytesLength];random.nextBytes(sourceBytes);return sourceBytes;}
/** Returns <code>true</code> if the given object is an EasyMock mock object*/static public boolean isEasyMockMock(Object object) {try {InvocationHandler invocationHandler = Proxy.getInvocationHandler(object);return invocationHandler.getClass().getName().contains("easymock");} catch (IllegalArgumentException e) {return false;}}}

清单3:

@Testpublic void testSomething() {final AtomicBoolean called = new AtomicBoolean(false);subject.setCallback(new SomeCallback() {public void callback(Object arg) {// check arg herecalled.set(true);}});subject.run();assertTrue(called.get());}

测试线程代码和非常复杂的系统的另一种方法是通过模糊测试。它不是很好,它不会找到所有的东西,但它可能是有用的,而且很容易做到。

引用:

模糊测试是一种软件测试技术,它向程序的输入提供随机数据(“模糊”)。如果程序失败(例如,崩溃或内置代码断言失败),可以注意到缺陷。模糊测试的最大优点是测试设计极其简单,并且没有对系统行为的先入为主。

模糊测试通常用于使用黑盒测试的大型软件开发项目。这些项目通常有开发测试工具的预算,模糊测试是提供高效益成本比的技术之一。

然而,模糊测试不能代替穷举测试或形式化方法:它只能提供系统行为的随机样本,并且在许多情况下,通过模糊测试可能只表明软件处理了异常而没有崩溃,而不是行为正确。因此,模糊测试只能被视为一种bug工具,而不是质量的保证。

这个问题已经发布了一段时间,但它仍然没有得到回答……

kleolb02的答案很好,我会解释得更详细一些。#2

有一种方法,我是为C#代码练习的。对于单元测试,你应该能够编写再现测试,这是多线程代码中最大的挑战。所以我的回答旨在强制异步代码进入测试工具,这适用于同步

这是Gerard Meszaros的书“xUnit测试模式”中的一个想法,被称为“Humble Object”(第695页):你必须将核心逻辑代码和任何闻起来像异步代码的东西分开。这将导致一个核心逻辑的类,它的工作原理同步

这使你能够以同步的方式测试核心逻辑代码。你对核心逻辑上的调用时间有绝对的控制权,因此可以进行再现的测试。这是你从分离核心逻辑和异步逻辑中获得的好处。

这个核心逻辑需要由另一个类封装,该类负责异步接收对核心逻辑的调用,并代表这些对核心逻辑的调用。生产代码只会通过该类访问核心逻辑。因为这个类应该只委托调用,所以它是一个没有太多逻辑的非常“哑”的类。因此你可以将这个异步工作类的单元测试保持在最低限度。

除此之外的任何东西(测试类之间的交互)都是组件测试。同样在这种情况下,如果你坚持“Humble Object”模式,你应该能够绝对控制时间。

上周的大部分时间我都在大学图书馆研究并发代码的调试。核心问题是并发代码是不确定的。通常,学术调试在这里分为三个阵营之一:

  1. 事件跟踪/重播。这需要一个事件监视器,然后审查发送的事件。在UT框架中,这将涉及手动发送事件作为测试的一部分,然后进行事后审查。
  2. 可编写脚本。这是您使用一组触发器与运行代码交互的地方。"On x>foo, baz()"。这可以解释为UT框架,其中您有一个运行时系统在特定条件下触发给定测试。
  3. 交互式。这显然不会在自动测试的情况下工作。;)

现在,正如上面的评论员所注意到的,你可以将并发系统设计成更确定的状态。然而,如果你做得不好,你就又回到了设计顺序系统的过程中。

我的建议是专注于有一个非常严格的设计协议,关于什么被线程化,什么没有被线程化。如果你限制你的接口,使元素之间的依赖最小化,这就容易得多。

祝你好运,继续解决这个问题。

我处理线程组件的单元测试的方式与处理任何单元测试的方式相同,即使用控制和隔离框架的反转。我在. Net领域开发,并且开箱即用,线程(除其他外)非常难以(我想说几乎不可能)完全隔离。

因此,我编写了类似这样的包装器(简化):

public interface IThread{void Start();...}
public class ThreadWrapper : IThread{private readonly Thread _thread;     
public ThreadWrapper(ThreadStart threadStart){_thread = new Thread(threadStart);}
public Start(){_thread.Start();}}    
public interface IThreadingManager{IThread CreateThread(ThreadStart threadStart);}
public class ThreadingManager : IThreadingManager{public IThread CreateThread(ThreadStart threadStart){return new ThreadWrapper(threadStart)}}

从那里,我可以轻松地将IThreadingManager注入到我的组件中,并使用我选择的隔离框架使线程在测试期间按我期望的方式运行。

到目前为止,这对我来说效果很好,我对线程池、System. Environment中的东西、睡眠等使用相同的方法。

身边有几个工具相当不错,下面总结几个Java的。

一些好的静态分析工具包括搜索结果(给出了一些有用的提示)、JLintJava探路者(JPF和JPF2)和茂物

多线程事务处理是一个非常好的动态分析工具(集成到JUnit中),您必须在其中设置自己的测试用例。

来自IBMResearch的竞赛很有趣。它通过插入各种线程修改行为(例如睡眠和产量)来检测您的代码,试图随机发现错误。

SPIN是一个非常酷的工具,用于建模您的Java(和其他)组件,但您需要有一些有用的框架。它很难按原样使用,但如果您知道如何使用它,它会非常强大。相当多的工具在引擎盖下使用SPIN。

MultithreadedTC可能是最主流的,但上面列出的一些静态分析工具绝对值得一看。

放弃也可以帮助您编写确定性单元测试。它允许您等待系统中某处的某些状态更新。例如:

await().untilCall( to(myService).myMethod(), greaterThan(3) );

await().atMost(5,SECONDS).until(fieldIn(myObject).ofType(int.class), equalTo(1));

它还支持Scala和Groovy。

await until { something() > 4 } // Scala example

看看我的相关回答在

为自定义Barrier设计Test类

它偏向于Java,但对选项有一个合理的总结。

总之,(IMO)不是使用一些花哨的框架来确保正确性,而是如何设计多线程代码。拆分关注点(并发和功能)对提高信心有很大的帮助。基于测试的面向对象软件开发比我更好地解释了一些选项。

静态分析和形式化方法(参见并发:状态模型和Java程序)是一种选择,但我发现它们在商业开发中的用途有限。

不要忘记,任何负载/浸泡样式测试都很少能保证突出问题。

祝你好运!

(如果可能)不要使用线程,使用参与者/活动对象。易于测试。

下面的文章提出了两种解决方案。包装一个信号量(CountDownLatch)并添加诸如从内部线程外部化数据之类的功能。实现此目的的另一种方法是使用线程池(请参阅兴趣点)。

Sprinkler-高级同步对象

您可以使用EasyMock.makeThreadSafe使测试实例线程安全

我最近刚刚发现(Java)一个名为ThreadSafety的工具。它是一个静态分析工具,很像findbugs,但专门用于发现多线程问题。它不是测试的替代品,但我可以推荐它作为编写可靠的多线程Java的一部分。

它甚至捕捉到一些非常微妙的潜在问题,例如类包含、通过并发类访问不安全对象以及在使用双重检查锁定范式时发现丢失的易失性修饰符。

如果您编写多线程Java给它一个镜头。

如前所述,测试MT代码的正确性是一个相当困难的问题。最终,它归结为确保你的代码中没有不正确的同步数据竞争。这样做的问题是,有无限多的线程执行(交错)的可能性,你对它们没有太多的控制(不过一定要阅读0文章)。在简单的场景中,可能有可能通过推理来证明正确性,但通常不是这样。特别是如果你想避免/最小化同步,而不是选择最明显/最简单的同步选项。

我遵循的一种方法是编写高并发测试代码,以使潜在未检测到的数据竞争有可能发生。然后我运行这些测试一段时间:)我曾经偶然发现一个演讲,一些计算机科学家在展示一种可以做到这一点的工具(从规范随机设计测试,然后疯狂地、并发地运行它们,检查定义的不变量是否被破坏)。

顺便说一句,我觉得测试MT代码这方面在这里没有提到:识别代码中可以随机检查的不变量。不幸的是,找到这些不变量也是一个相当困难的问题。而且它们可能在执行过程中并不总是成立,所以你必须找到/强制执行期望它们为真的执行点。将代码执行到这样的状态也是一个难题(它本身可能会导致并发问题。哇,太难了!

一些有趣的链接阅读:

近年来,当我为多个项目编写线程处理代码时,我多次遇到这个问题。我提供一个较晚的答案,因为大多数其他答案,虽然提供了替代方案,但实际上并没有回答关于测试的问题。我的答案是针对除了多线程代码之外没有替代方案的情况;我确实涵盖了完整性的代码设计问题,但也讨论了单元测试。

编写可测试的多线程代码

首先要做的是将生产线程处理代码与所有进行实际数据处理的代码分开。这样,数据处理可以作为单线程代码进行测试,而多线程代码唯一做的就是协调线程。

第二件要记住的事情是多线程代码中的bug是概率性的;最不频繁出现的bug是那些会潜入生产环境的bug,即使在生产环境中也很难重现,从而导致最大的问题。出于这个原因,快速编写代码然后调试直到它正常工作的标准编码方法对多线程代码来说是个坏主意;它将导致代码中容易的bug被修复,危险的bug仍然存在。

相反,在编写多线程代码时,你必须以从一开始就避免编写错误的态度编写代码。如果你已经正确地删除了数据处理代码,线程处理代码应该足够小——最好是几行,最糟糕的是几十行——这样你就有机会在不编写bug的情况下编写它,当然也不会编写很多错误,如果你了解线程,慢慢来,小心。

为多线程代码编写单元测试

一旦尽可能仔细地编写了多线程代码,仍然值得为该代码编写测试。测试的主要目的与其说是测试高度依赖于时间的竞争条件错误——不可能重复测试这种竞争条件——不如说是测试你防止此类错误的锁定策略是否允许多个线程按预期交互。

为了正确测试正确的锁定行为,测试必须启动多个线程。为了使测试可重复,我们希望线程之间的交互以可预测的顺序发生。我们不希望外部同步测试中的线程,因为这将掩盖生产中线程不外部同步可能发生的错误。这就剩下了使用时间延迟进行线程同步,这是我每次编写多线程代码测试时都成功使用的技术。

如果延迟太短,那么测试就会变得脆弱,因为微小的时序差异——比如可能运行测试的不同机器之间的时序差异——可能会导致时序关闭,测试失败。我通常做的是从导致测试失败的延迟开始,增加延迟,以便测试在我的开发机器上可靠地通过,然后将延迟增加一倍,以便测试有很大的机会在其他机器上通过。这确实意味着测试将花费宏观的时间,尽管根据我的经验,仔细的测试设计可以将时间限制在十几秒。由于您的应用程序中不应该有很多需要线程协调代码的地方,因此您的测试套件应该可以接受。

最后,跟踪测试发现的bug数量。如果你的测试有80%的代码覆盖率,那么它可以捕获大约80%的bug。如果你的测试设计良好但没有发现bug,那么你很有可能没有其他只会出现在生产中的bug。如果测试捕获了一两个bug,你可能仍然很幸运。除此之外,你可能需要考虑仔细审查甚至完全重写你的线程处理代码,因为代码很可能仍然包含隐藏的bug,这些bug在代码投入生产之前很难找到,并且很难修复。

对于J2E代码,我使用了SilkPerron、LoadRunner和JMeter来进行线程的并发测试。它们都做同样的事情。基本上,它们为你提供了一个相对简单的界面,用于管理所需的代理服务器版本,以分析TCP/IP数据流,并模拟多个用户同时向你的应用服务器发出请求。代理服务器可以为你提供诸如分析所发出的请求的能力,方法是在处理请求后呈现发送到服务器的整个页面和URL,以及服务器的响应。

你可以在不安全的超文本传输协议模式下找到一些错误,在这种模式下,你至少可以分析正在发送的表单数据,并系统地为每个用户更改它。但真正的测试是在https(安全套接字层)中运行时。然后,你还必须应对系统地更改会话和cookie数据,这可能会更复杂一些。

在测试并发时,我发现最好的bug是当我发现开发人员依赖Java垃圾回收机制来关闭登录时建立的与LDAP服务器的连接请求时。这导致用户暴露在其他用户的会话中,当试图分析服务器瘫痪时发生了什么时,结果非常混乱,几乎每隔几秒钟就完成一个事务。

最后,你或者其他人可能还得认真分析代码,找出我刚才提到的那种错误。跨部门的公开讨论,比如我们发现上面描述的问题时发生的那次,是最有用的。但这些工具是测试多线程代码的最佳解决方案。JMeter是开源的。SilkPerron和LoadRunner是专有的。如果你真的想知道你的应用程序是否线程安全,大公司就是这么做的。我专业地为非常大的公司做过这件事,所以我不是猜测。我是从个人经验来说的。

提醒一句:理解这些工具确实需要一些时间。除非你已经接触过多线程编程,否则这不是简单地安装软件和启动GUI的问题。我试图确定需要理解的3个关键类别(表单、会话和cookie数据),希望至少从理解这些主题开始,将帮助你专注于快速结果,而不是通读整个留档。

如果您正在测试简单的new Thread(runnable). run()你可以模拟线程顺序运行可运行的

例如,如果被测试对象的代码调用了这样的新线程

Class TestedClass {public void doAsychOp() {new Thread(new myRunnable()).start();}}

然后模拟新线程并按顺序运行可运行参数可以帮助

@Mockprivate Thread threadMock;
@Testpublic void myTest() throws Exception {PowerMockito.mockStatic(Thread.class);//when new thread is created execute runnable immediatelyPowerMockito.whenNew(Thread.class).withAnyArguments().then(new Answer<Thread>() {@Overridepublic Thread answer(InvocationOnMock invocation) throws Throwable {// immediately run the runnableRunnable runnable = invocation.getArgumentAt(0, Runnable.class);if(runnable != null) {runnable.run();}return threadMock;//return a mock so Thread.start() will do nothing}});TestedClass testcls = new TestedClass()testcls.doAsychOp(); //will invoke myRunnable.run in current thread//.... check expected}

并发是内存模型、硬件、缓存和我们的代码之间复杂的相互作用。就Java而言,至少这些测试已经部分由jc应力解决。众所周知,该库的创建者是许多JVM、GC和Java并发特性的作者。

但即使是这个库也需要很好地了解Java内存模型规范,以便我们确切地知道我们正在测试什么。但我认为这项工作的重点是mirco基准测试。不是巨大的业务应用程序。

假设在“多线程”代码下意味着

  • 有状态和可变
  • AND被多个线程访问/修改并发

换句话说,我们正在谈论测试自定义有状态线程安全类/方法/单元-这应该是当今非常罕见的野兽。

因为这种野兽是罕见的,首先我们需要确保有所有有效的借口来写它。

步骤1。考虑在相同的同步上下文中修改状态。

今天很容易编写可组合的并发和异步代码,其中IO或其他慢操作卸载到后台,但共享状态在一个同步上下文中更新和查询。例如async/wait任务和Rx。NET等-它们都是可测试的设计,“真实”任务和调度程序可以被替换以使测试具有确定性(然而这超出了问题的范围)。

这听起来可能非常受限制,但这种方法效果出奇地好。可以用这种风格编写整个应用程序,而无需使任何状态成为线程安全的(我做的)。

步骤2。如果在单个同步上下文上操作共享状态是绝对不可能的。

确保轮子没有被重新发明/绝对没有可以适应这项工作的标准替代方案。代码应该非常有凝聚力并包含在一个单元中,例如,它很有可能是一些标准线程安全数据结构的特例,如哈希map或集合或其他什么。

注意:如果代码很大/跨越多个类并且需要多线程状态操作,那么很有可能设计不好,请重新考虑步骤1

步骤3。如果达到这一步,那么我们需要测试我们自己的自定义有状态线程安全类/方法/单元

老实说:我从来都不需要为这样的代码编写合适的测试。大多数时候我都在步骤1,有时在步骤2。上一次我不得不编写自定义线程安全代码是在很多年前,那是在我采用单元测试之前/可能无论如何我都不必用当前的知识编写它。

如果我真的必须测试这样的代码(最后是实际答案),那么我会尝试下面的几件事

  1. 非确定性压力测试.例如同时运行100个线程并检查最终结果是否一致。这对于多用户场景的更高级别/集成测试更典型,但也可以在单元级别使用。

  2. 暴露一些测试“挂钩”,其中测试可以注入一些代码来帮助创建一个线程必须在另一个线程之前执行操作的确定性场景。尽管它很丑陋,我想不出更好的东西。

  3. 延迟驱动的测试,使线程以特定的顺序运行和执行操作。严格来说,这样的测试也是不确定的(有可能系统冻结/停止世界GC收集,这可能会扭曲编排的延迟),它也很丑陋,但允许避免钩子。

有一篇关于这个主题的文章,在示例代码中使用Rust作为语言:

它并不完美,但我为我的C#测试编写了这个助手:

using System;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;
namespace Proto.Promises.Tests.Threading{public class ThreadHelper{public static readonly int multiThreadCount = Environment.ProcessorCount * 100;private static readonly int[] offsets = new int[] { 0, 10, 100, 1000 };
private readonly Stack<Task> _executingTasks = new Stack<Task>(multiThreadCount);private readonly Barrier _barrier = new Barrier(1);private int _currentParticipants = 0;private readonly TimeSpan _timeout;
public ThreadHelper() : this(TimeSpan.FromSeconds(10)) { } // 10 second timeout should be enough for most cases.
public ThreadHelper(TimeSpan timeout){_timeout = timeout;}
/// <summary>/// Execute the action multiple times in parallel threads./// </summary>public void ExecuteMultiActionParallel(Action action){for (int i = 0; i < multiThreadCount; ++i){AddParallelAction(action);}ExecutePendingParallelActions();}
/// <summary>/// Execute the action once in a separate thread./// </summary>public void ExecuteSingleAction(Action action){AddParallelAction(action);ExecutePendingParallelActions();}
/// <summary>/// Add an action to be run in parallel./// </summary>public void AddParallelAction(Action action){var taskSource = new TaskCompletionSource<bool>();lock (_executingTasks){++_currentParticipants;_barrier.AddParticipant();_executingTasks.Push(taskSource.Task);}new Thread(() =>{try{_barrier.SignalAndWait(); // Try to make actions run in lock-step to increase likelihood of breaking race conditions.action.Invoke();taskSource.SetResult(true);}catch (Exception e){taskSource.SetException(e);}}).Start();}
/// <summary>/// Runs the pending actions in parallel, attempting to run them in lock-step./// </summary>public void ExecutePendingParallelActions(){Task[] tasks;lock (_executingTasks){_barrier.SignalAndWait();_barrier.RemoveParticipants(_currentParticipants);_currentParticipants = 0;tasks = _executingTasks.ToArray();_executingTasks.Clear();}try{if (!Task.WaitAll(tasks, _timeout)){throw new TimeoutException($"Action(s) timed out after {_timeout}, there may be a deadlock.");}}catch (AggregateException e){// Only throw one exception instead of aggregate to try to avoid overloading the test error output.throw e.Flatten().InnerException;}}
/// <summary>/// Run each action in parallel multiple times with differing offsets for each run./// <para/>The number of runs is 4^actions.Length, so be careful if you don't want the test to run too long./// </summary>/// <param name="expandToProcessorCount">If true, copies each action on additional threads up to the processor count. This can help test more without increasing the time it takes to complete./// <para/>Example: 2 actions with 6 processors, runs each action 3 times in parallel.</param>/// <param name="setup">The action to run before each parallel run.</param>/// <param name="teardown">The action to run after each parallel run.</param>/// <param name="actions">The actions to run in parallel.</param>public void ExecuteParallelActionsWithOffsets(bool expandToProcessorCount, Action setup, Action teardown, params Action[] actions){setup += () => { };teardown += () => { };int actionCount = actions.Length;int expandCount = expandToProcessorCount ? Math.Max(Environment.ProcessorCount / actionCount, 1) : 1;foreach (var combo in GenerateCombinations(offsets, actionCount)){setup.Invoke();for (int k = 0; k < expandCount; ++k){for (int i = 0; i < actionCount; ++i){int offset = combo[i];Action action = actions[i];AddParallelAction(() =>{for (int j = offset; j > 0; --j) { } // Just spin in a loop for the offset.action.Invoke();});}}ExecutePendingParallelActions();teardown.Invoke();}}
// Input: [1, 2, 3], 3// Ouput: [//          [1, 1, 1],//          [2, 1, 1],//          [3, 1, 1],//          [1, 2, 1],//          [2, 2, 1],//          [3, 2, 1],//          [1, 3, 1],//          [2, 3, 1],//          [3, 3, 1],//          [1, 1, 2],//          [2, 1, 2],//          [3, 1, 2],//          [1, 2, 2],//          [2, 2, 2],//          [3, 2, 2],//          [1, 3, 2],//          [2, 3, 2],//          [3, 3, 2],//          [1, 1, 3],//          [2, 1, 3],//          [3, 1, 3],//          [1, 2, 3],//          [2, 2, 3],//          [3, 2, 3],//          [1, 3, 3],//          [2, 3, 3],//          [3, 3, 3]//        ]private static IEnumerable<int[]> GenerateCombinations(int[] options, int count){int[] indexTracker = new int[count];int[] combo = new int[count];for (int i = 0; i < count; ++i){combo[i] = options[0];}// Same algorithm as picking a combination lock.int rollovers = 0;while (rollovers < count){yield return combo; // No need to duplicate the array since we're just reading it.for (int i = 0; i < count; ++i){int index = ++indexTracker[i];if (index == options.Length){indexTracker[i] = 0;combo[i] = options[0];if (i == rollovers){++rollovers;}}else{combo[i] = options[index];break;}}}}}}

示例用法:

[Test]public void DeferredMayBeBeResolvedAndPromiseAwaitedConcurrently_void0(){Promise.Deferred deferred = default(Promise.Deferred);Promise promise = default(Promise);
int invokedCount = 0;
var threadHelper = new ThreadHelper();threadHelper.ExecuteParallelActionsWithOffsets(false,// Setup() =>{invokedCount = 0;deferred = Promise.NewDeferred();promise = deferred.Promise;},// Teardown() => Assert.AreEqual(1, invokedCount),// Parallel Actions() => deferred.Resolve(),() => promise.Then(() => { Interlocked.Increment(ref invokedCount); }).Forget());}

一种适用于一些(不是全部!)情况的简单测试模式是多次重复相同的测试。例如,假设您有一个方法:

def process(input):# Spawns several threads to do the job# ...return output

创建一组测试:

process(input1) -> expect to return output1process(input2) -> expect to return output2...

现在多次运行这些测试中的每一个。

如果process的实现包含一个微妙的bug(例如死锁、竞争条件等),出现的概率为0.1%,那么运行测试1000次,bug至少出现一次的概率为64%。运行测试10000次,概率>99%。

运行多个线程并不困难;这是小菜一碟。不幸的是,线程通常需要相互通信;这才是困难的。

最初发明的允许模块之间通信的机制是函数调用;当模块A想要与模块B通信时,它只是调用模块B中的函数。不幸的是,这对线程不起作用,因为当你调用一个函数时,该函数仍然在当前线程中运行。

为了克服这个问题,人们决定退回到一种更原始的通信机制:只需声明一个特定的变量,并让两个线程都可以访问该变量。换句话说,允许线程共享数据。共享数据是我们自然而然想到的第一件事,这似乎是一个不错的选择,因为它看起来非常简单。我的意思是,这有多难,对吧?可能会出什么问题?

种族条件。这就是可能,也将会出错的地方。

当人们意识到他们的软件由于竞争条件而遭受随机、不可重现的灾难性故障时,他们开始发明精细的机制,如锁、比较和交换,旨在防止此类事情发生。这些机制属于广泛的“同步”类别。不幸的是,同步有两个问题:

  1. 这是很难得到它的权利,所以它很容易出现错误。
  2. 它是完全不可测试的,因为您无法测试竞争条件。

精明的读者可能会注意到“非常容易出现错误”和“完全不可测试”是<强>致命组合

现在,在自动化软件测试的概念流行之前,我上面提到的机制已经被行业的很大一部分发明和采用;所以,没有人能看到这个问题有多致命;他们只是认为这是一个需要大师程序员的困难话题,每个人都可以接受。

如今,无论我们做什么,我们都把测试放在第一位。所以,如果某种机制是不可测试的,那么使用该机制就是不可能的,句号。因此,同步已经失宠了;很少有人还在实践它,而且每天都在变得越来越少。

没有同步的线程不能共享数据;然而,最初的需求不是共享数据;而是允许线程相互通信。除了共享数据之外,还有其他更优雅的线程间通信机制。

一种这样的机制是消息传递,也称为事件。

对于消息传递,整个软件系统中只有一个地方使用同步,那就是我们用于存储消息的并发阻塞队列集合类。(这个想法是我们应该能够至少正确处理那一小部分。)

消息传递的伟大之处在于它不受竞争条件的影响并且完全可测试。