LMAX's disruptor模式是如何工作的?

我正在努力理解粉碎机模式。我看了InfoQ的视频,并试着读了他们的论文。我知道有一个环缓冲区涉及,它被初始化为一个非常大的数组,以利用缓存局部性,消除分配的新内存。

听起来好像有一个或多个原子整数来记录位置。每个“事件”似乎都有一个唯一的id,它在环中的位置是通过计算它的模量相对于环的大小,等等。

不幸的是,我对它的工作原理没有直观的感觉。我做了很多交易应用程序,研究了角色模型,看了SEDA等。

在他们的演讲中,他们提到这个模式基本上就是路由器的工作方式;然而,我也没有找到任何关于路由器如何工作的很好的描述。

有没有更好的解释?

76917 次浏览

谷歌代码项目在环形缓冲区的实现上做参考一篇技术论文,然而对于想要学习它如何工作的人来说,它有点枯燥,学术和艰难。然而,有一些博客文章已经开始以一种更易读的方式解释内部结构。有一个环形缓冲器说明是中断器模式的核心,一个消费者障碍的描述(与从中断器读取相关的部分)和一些可用的处理多个生产者的信息

对中断器最简单的描述是:它是一种在线程之间以最有效的方式发送消息的方法。它可以用作队列的替代品,但它还与SEDA和actor共享许多特性。

与队列相比:

Disruptor提供了将消息传递到另一个线程的能力,并在需要时将其唤醒(类似于BlockingQueue)。然而,有3个明显的区别。

  1. 中断器的用户通过扩展Entry类并提供一个工厂来进行预分配来定义消息如何存储。这允许内存重用(复制),或者Entry可以包含对另一个对象的引用。
  2. 将消息放入Disruptor是一个两阶段的过程,首先在环形缓冲区中声明一个插槽,它为用户提供了可以用适当的数据填充的条目。然后必须提交条目,这种两阶段的方法是必要的,以允许上面提到的灵活使用内存。正是提交使消息对使用者线程可见。
  3. 使用者负责跟踪已从循环缓冲区中消费的消息。将此责任从循环缓冲区本身移开有助于减少写争用的数量,因为每个线程都维护自己的计数器。

与演员相比

Actor模型比大多数其他编程模型更接近于Disruptor,特别是如果您使用所提供的BatchConsumer/BatchHandler类时。这些类隐藏了维护所使用的序列号的所有复杂性,并在重要事件发生时提供了一组简单的回调。然而,有一些细微的区别。

  1. 破坏者使用1线程- 1消费者模型,而参与者使用N:M模型,即你可以有尽可能多的参与者,他们将分布在固定数量的线程上(通常每个核心1个)。
  2. BatchHandler接口提供了一个额外的(非常重要的)回调onEndOfBatch()。这允许缓慢的消费者,例如那些做I/O的人将事件批处理在一起以提高吞吐量。在其他Actor框架中也可以进行批处理,但是由于几乎所有其他框架都没有在批处理结束时提供回调,因此您需要使用超时来确定批处理的结束时间,这导致了较差的延迟。

与SEDA相比

LMAX构建了破坏者模式来取代基于SEDA的方法。

  1. 与SEDA相比,它提供的主要改进是并行工作的能力。为此,Disruptor支持将相同的消息(以相同的顺序)多播给多个消费者。这避免了管道中分叉阶段的需要。
  2. 我们还允许消费者等待其他消费者的结果,而不必在它们之间放置另一个排队阶段。使用者可以简单地观察它所依赖的使用者的序列号。这避免了管道中连接阶段的需要。

与记忆障碍相比

另一种思考方式是,它是一个结构化的,有序的记忆屏障。其中生产者屏障形成写屏障,消费者屏障是读屏障。

首先,我们想了解它提供的编程模型。

有一个或多个作者。有一个或多个阅读器。有一行条目,完全按照从旧到新的顺序排列(如图从左到右)。作者可以在右端添加新条目。每个阅读器从左到右依次读取条目。显然,读者无法读懂过去的作家。

没有删除条目的概念。我使用“读者”而不是“消费者”来避免条目被消费的形象。然而,我们知道,最后一个读卡器左边的条目将变得无用。

一般来说,读者可以同时阅读和独立阅读。但是,我们可以在阅读器之间声明依赖关系。读卡器的依赖关系可以是任意的无循环图。如果读者B依赖于读者A,读者B就无法读过读者A。

产生阅读器依赖是因为阅读器A可以注释条目,而阅读器B依赖于该注释。例如,A对一个条目进行了一些计算,并将结果存储在条目中的a字段中。A然后继续,现在B可以读取该条目,并存储a A的值。如果reader C不依赖于A, C不应该试图读取a

这确实是一个有趣的编程模型。撇开性能不谈,模型本身就可以使许多应用受益。

当然,LMAX的主要目标是性能。它使用预先分配的条目环。这个环足够大,但它是有界限的,这样系统的负载就不会超过设计容量。如果圈满了,作者会等到最慢的读者前进并腾出空间。

条目对象是预先分配的,并且永远存在,以减少垃圾收集成本。我们不插入新的条目对象或删除旧的条目对象,相反,写入器请求一个预先存在的条目,填充它的字段,并通知读取器。这种明显的两相作用实际上只是原子作用

setNewEntry(EntryPopulator);


interface EntryPopulator{ void populate(Entry existingEntry); }

预分配条目还意味着相邻条目(很可能)位于相邻的内存单元中,并且由于读取器顺序读取条目,因此这对于利用CPU缓存非常重要。

并努力避免锁,CAS,甚至内存障碍(例如,如果只有一个写入器,则使用非易失性序列变量)

对于读取器的开发人员:不同注释的读取器应该写入不同的字段,以避免写入争用。(实际上它们应该写入不同的缓存线。)注释读者不应该接触其他非依赖读者可能阅读的任何内容。这就是为什么我说这些读取器是注释条目,而不是修改条目。

Martin Fowler写了一篇关于LMAX和颠覆者模式LMAX架构的文章,可能会进一步阐明它。

事实上,出于好奇心,我花时间研究了实际的来源,它背后的想法很简单。在写这篇文章时,最新的版本是3.2.1。

有一个缓冲区存储预先分配的事件,该事件将保存供使用者读取的数据。

缓冲区由其长度的标志数组(整数数组)支持,该数组描述了缓冲区插槽的可用性(详细信息请参阅进一步)。访问数组的方式类似于java的#AtomicIntegerArray,因此为了便于解释,您不妨假设它就是一个数组。

可以有任意数量的生产者。当生产者想要写入缓冲区时,会生成一个长数字(就像调用AtomicLong#getAndIncrement一样,破坏者实际上使用自己的实现,但它以相同的方式工作)。让我们称这个生成的长为producerCallId。以类似的方式,当消费者结束从缓冲区读取插槽时生成consumerCallId。访问最近的consumerCallId。

(如果有很多消费者,则选择id最低的呼叫。)

然后比较这些id,如果两者之间的差异小于缓冲区端,则允许生产者写入。

(如果producerCallId大于最近的consumerCallId + bufferSize,这意味着缓冲区已满,生产者被迫进行总线等待,直到有可用的位置。)

然后生产者根据他的callId在缓冲区中分配插槽(这是producerCallId模bufferSize,但由于bufferSize总是2的幂(在缓冲区创建时强制限制),实际使用的操作是producerCallId和amp;(bufferSize - 1))。然后,它可以自由地修改该插槽中的事件。

(实际的算法稍微复杂一些,为了优化,需要将最近的consumerId缓存在一个单独的原子引用中。)

当事件被修改时,更改将被“发布”。发布时,标志数组中的相应槽位将被更新的标志填充。标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,所以实际操作是右移)。

以类似的方式,可以有任意数量的消费者。每次消费者想要访问缓冲区时,都会生成一个consumerCallId(根据消费者是如何添加到中断器的,id生成中使用的原子可能是共享的,也可能是单独的)。然后将这个consumerCallId与最近的producentCallId进行比较,如果它在两者中较低,则允许读取器继续前进。

(类似地,如果producerCallId是even to consumerCallId,这意味着缓冲区是空的,消费者被迫等待。等待的方式由创建中断器期间的WaitStrategy定义。)

对于单个消费者(具有自己的id生成器的消费者),检查的下一件事是批量消费的能力。缓冲区中的槽按照从对应于consumerCallId的槽(索引的确定方式与生产者相同)到对应于最近的producerCallId的槽的顺序进行检查。

通过将写入标记数组中的标记值与为consumerCallId生成的标记值进行比较,在循环中检查它们。如果标志匹配,则意味着填充插槽的生产者已经提交了他们的更改。如果不是,则循环被打破,并返回提交的最高的changeId。从ConsumerCallId到changeId中接收的槽可以批量使用。

如果一组消费者一起读取(使用共享id生成器的消费者),每个消费者只接受一个callId,并且只检查和返回该callId的插槽。

这篇文章:

中断模式是一个由循环备份的批处理队列 数组(即环形缓冲区)填充预分配的传输 对象,该对象使用内存屏障同步生产者和

记忆障碍很难解释,在我看来,Trisha的博客已经做了最好的尝试:http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

但是如果你不想深入研究底层的细节,你可以只知道Java中的内存壁垒是通过volatile关键字或java.util.concurrent.AtomicLong实现的。干扰器模式序列是__abc2,并通过内存屏障而不是锁在生产者和消费者之间来回通信。

我发现通过代码更容易理解一个概念,所以下面的代码是来自CoralQueue的一个简单的helloworld,这是由我所属的CoralBlocks完成的一个干扰模式实现。在下面的代码中,你可以看到干扰器模式如何实现批处理,以及环形缓冲区(即循环数组)如何允许两个线程之间无垃圾通信:

package com.coralblocks.coralqueue.sample.queue;


import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;


public class Sample {


public static void main(String[] args) throws InterruptedException {


final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);


Thread consumer = new Thread() {


@Override
public void run() {


boolean running = true;


while(running) {
long avail;
while((avail = queue.availableToPoll()) == 0); // busy spin
for(int i = 0; i < avail; i++) {
MutableLong ml = queue.poll();
if (ml.get() == -1) {
running = false;
} else {
System.out.println(ml.get());
}
}
queue.donePolling();
}
}


};


consumer.start();


MutableLong ml;


for(int i = 0; i < 10; i++) {
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(System.nanoTime());
queue.flush();
}


// send a message to stop consumer...
while((ml = queue.nextToDispatch()) == null); // busy spin
ml.set(-1);
queue.flush();


consumer.join(); // wait for the consumer thread to die...
}
}