循环无锁缓冲区

我正在设计一个系统,它连接到一个或多个数据源流,并对数据做一些分析,而不是根据结果触发事件。在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,多个消费者线程读取数据,消费者只对最新的数据点和 n 个点感兴趣。如果缓慢的消费者无法跟上,生产者线程将必须阻塞,当然,如果没有未处理的更新,消费者线程将阻塞。使用带有读/写锁的典型并发队列将会很好地工作,但是数据传入的速率可能会很快,所以我想减少我的锁开销,特别是为生产者的写锁。我想我需要的是一个环形的无锁缓冲区。

现在有两个问题:

  1. 循环无锁缓冲区是答案吗?

  2. 如果是的话,在我开始我自己的工作之前,你知道有什么公开的实现可以满足我的需要吗?

实现循环无锁缓冲区的任何指针都是受欢迎的。

顺便说一句,在 Linux 上用 C + + 做这件事。

附加信息:

响应时间对我的系统至关重要。理想情况下,使用者线程希望尽快看到任何更新,因为额外的1毫秒延迟可能会使系统变得毫无价值,或者价值大大降低。

我倾向的设计理念是一个半无锁循环缓冲区,生产者线程尽可能快地将数据放入缓冲区,让我们调用缓冲区的头 A,除非缓冲区已满,当 A 满足缓冲区 Z 的末尾时,不会阻塞。消费者线程将每个持有两个指向循环缓冲区的指针,P 和 PN,其中 P 是线程的本地缓冲区头,PN是 P 之后的第 n 个项目。每个消费者线程将推进它的 P 和 PN,一旦它完成处理当前的 P 和缓冲区指针 Z 的末尾是推进最慢的 PN。当 P 赶上 A 时,这意味着不再有新的更新需要处理,使用者旋转并忙碌地等待 A 再次前进。如果使用者线程运行太长时间,它可以进入休眠状态,等待条件变量,但我不介意使用者占用 CPU 周期等待更新,因为这不会增加我的延迟(我将有更多的 CPU 核心比线程)。假设你有一个环形轨道,生产者在一群消费者面前运行,关键是调整系统,使生产者通常比消费者快几步,大多数操作可以使用无锁技术完成。我知道获得正确的实现细节并不容易... ... 好吧,非常困难,这就是为什么我想从别人的错误中学习,然后再犯一些我自己的错误。

67246 次浏览

关于这个 在 DDJ 上有一系列相当不错的文章。作为一个迹象,这个东西可以是多么困难,这是一个纠正 早期的一篇文章的错误。在你卷自己的卷之前,确保你了解错误) ;

缓冲区为空或满时,生产者或消费者阻塞的要求表明,您应该使用常规的锁定数据结构,使用信号量或条件变量使生产者和消费者阻塞,直到数据可用。无锁代码通常不会在这种情况下阻塞——它旋转或放弃无法执行的操作,而不是使用操作系统阻塞。(如果您可以等到另一个线程生成或使用数据,那么为什么等待另一个线程完成数据结构的更新会更糟糕呢?)

在(x86/x64) Linux 上,如果没有争用,使用互斥对象的线程内同步相当便宜。集中精力减少生产者和消费者持有锁的时间。考虑到您已经说过您只关心最后 N 个记录的数据点,我认为循环缓冲区可以很好地完成这项工作。然而,我真的不明白这如何符合阻塞需求和消费者实际使用(删除)他们读取的数据的想法。(您是否希望使用者只查看最后 N 个数据点,而不删除它们?你希望生产商不在乎消费者是否跟不上,而只是覆盖旧的数据吗?)

另外,正如 Zan Lynx 所说,当有大量数据进入时,您可以将数据聚合/缓冲到更大的块中。您可以缓冲固定数量的点,或在一定时间内收到的所有数据。这意味着将有更少的同步操作。不过,它确实引入了延迟,但是如果您不使用实时 Linux,那么在某种程度上您将不得不处理这个问题。

你想要的艺术术语是 无锁队列无锁队列。这是罗斯 · 本西纳的 出色的一套笔记与代码和文件的链接。我最信任的作品是 Maurice Herlihy(对美国人来说,他的名字读起来像“ Morris”)。

减少争用的一种有用技术是将项散列到多个队列中,并让每个使用者专门处理一个“主题”。

对于消费者感兴趣的大多数最近的项目——您不想锁定整个队列并在其上迭代以找到要覆盖的项目——只需以 N 个元组(即所有 N 个最近的项目)的形式发布项目即可。如果生产者在超时的情况下阻塞整个队列(当消费者无法跟上时) ,更新其本地元组缓存(这样就不会给数据源带来压力) ,那么实现的好处就是。

我同意 这篇文章,建议不要使用无锁数据结构。最近一篇关于无锁 FIFO 队列的论文是 这个,搜索同一作者的更多论文; 还有一篇关于 Chalmers 的博士论文是关于无锁数据结构的(我丢失了链接)。但是,您没有说明您的元素有多大——无锁数据结构仅对字大小的项目有效,因此如果元素大于机器单词(32或64位) ,则必须动态分配元素。如果你动态地分配元素,你就把瓶颈转移到了内存分配器上(假设,因为你还没有分析你的程序,你基本上是在做过早的优化) ,所以你需要一个无锁的内存分配器,比如 溪流,然后把它和你的应用程序集成起来。

萨特的队列是次优的,他知道这一点。多核编程的艺术是一个很好的参考,但是在内存模型上不要相信 Java。Ross 的链接将得不到明确的答案,因为它们的库存在诸如此类的问题中。

进行无锁编程是自找麻烦,除非你想在解决问题之前花大量时间在一些明显没有过度设计的事情上(根据对它的描述判断,这是一种在缓存一致性中“寻求完美”的常见疯狂行为)。这需要数年时间,并导致不首先解决问题,然后再优化,这是一种常见的疾病。

在过去的几年里,我对无锁数据结构做了一个特别的研究。我已经阅读了该领域的大部分论文(只有大约四十篇左右——尽管只有大约十篇或十五篇是真正有用的: -)

AFAIK,一种无锁循环缓冲区尚未被发明。问题在于如何处理读者超过作者或者作者超过读者的复杂情况。

如果您还没有花费至少六个月的时间来研究无锁数据结构,那么不要尝试自己编写一个。您可能会出现错误,并且在新平台上部署后代码失败之前,您可能看不出错误的存在。

不过我相信你的要求是有解决办法的。

您应该将无锁队列与无锁自由列表配对。

Free-list 将提供预分配,从而避免了对无锁分配器的要求(在财政上是昂贵的) ; 当 free-list 为空时,通过立即从队列中删除一个元素并使用该元素,可以复制循环缓冲区的行为。

(当然,在一个基于锁的循环缓冲区中,一旦获得了锁,获得一个元素是非常快的——基本上只是一个取消引用的指针——但是你不会在任何无锁算法中获得这一点; 他们经常不得不走出他们的方式来做事情; 失败的自由列表弹出后接着一个队列的开销与任何无锁算法将需要做的工作量相当)。

Michael 和 Scott 在1996年开发了一个非常好的无锁队列。下面的链接将给你足够的细节,以跟踪他们的论文的 PDF; 迈克尔和斯科特,先进先出

无锁免费列表(lock-free-list)是最简单的无锁算法,事实上,我还没有看过关于它的论文。

我会这么做:

  • 将队列映射到数组中
  • 使用下一个读索引和下一个写索引保持状态
  • 保持一个空的全位向量

插入包括在下一次写操作中使用一个 CAS 并进行增量和滚动。一旦您有了一个槽,添加您的值,然后设置与之匹配的空/满位。

移除操作需要在测试底流之前检查位,但除此之外,与写操作相同,但使用读索引并清除空/满位。

我警告你,

  1. 我不是这方面的专家
  2. 原子 ASM 操作在我使用它们的时候似乎非常慢,所以如果您最终使用了多个原子 ASM 操作,那么使用嵌入在 insert/delete 函数中的锁可能会更快。该理论认为,一个单一的原子操作获取锁,然后(非常)几个非原子 ASM 操作可能比同样的事情做了几个原子操作。但是要使这个工作将需要手动或自动内联,所以它是所有 ASM 的一个短块。

仅仅为了完整性: 在 OtlContainer中有经过良好测试的无锁循环缓冲区,但它是用 Delphi 编写的(TOmniBaseBoundedQueue 是循环缓冲区,TOmniBaseBoundedStack 是有界堆栈)。在同一个单元中还有一个无界队列(TOmniBaseQueue)。无界队列在 动态无锁队列-正确执行中描述。有界队列(循环缓冲区)的初始实现在 终于有一个无锁队列了!中描述,但是此后代码被更新。

看看 干扰器(怎么用) ,它是一个多线程可以订阅的环形缓冲区:

我不是硬件内存模型和自由锁定数据结构的专家,我倾向于避免在我的项目中使用这些,我选择传统的锁定数据结构。

然而,我最近注意到一个视频: 基于环形缓冲区的无锁 SPSC 队列

这是基于交易系统 LMAX 干扰器使用的名为 LMAX 批发商的开源高性能 Java 库

基于上面的演示,您将头部和尾部指针设置为原子的,并自动检查头部从后面捕捉尾部的情况,反之亦然。

下面你可以看到一个非常基本的 C + + 11实现:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;


#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
public :


lockless_ring_buffer_spsc()
{
write.store(0);
read.store(0);
}


bool try_push(int64_t val)
{
const auto current_tail = write.load();
const auto next_tail = increment(current_tail);
if (next_tail != read.load())
{
buffer[current_tail] = val;
write.store(next_tail);
return true;
}


return false;
}


void push(int64_t val)
{
while( ! try_push(val) );
// TODO: exponential backoff / sleep
}


bool try_pop(int64_t* pval)
{
auto currentHead = read.load();


if (currentHead == write.load())
{
return false;
}


*pval = buffer[currentHead];
read.store(increment(currentHead));


return true;
}


int64_t pop()
{
int64_t ret;
while( ! try_pop(&ret) );
// TODO: exponential backoff / sleep
return ret;
}


private :
std::atomic<int64_t> write;
std::atomic<int64_t> read;
static const int64_t size = RING_BUFFER_SIZE;
int64_t buffer[RING_BUFFER_SIZE];


int64_t increment(int n)
{
return (n + 1) % size;
}
};


int main (int argc, char** argv)
{
lockless_ring_buffer_spsc queue;


std::thread write_thread( [&] () {
for(int i = 0; i<1000000; i++)
{
queue.push(i);
}
}  // End of lambda expression
);
std::thread read_thread( [&] () {
for(int i = 0; i<1000000; i++)
{
queue.pop();
}
}  // End of lambda expression
);
write_thread.join();
read_thread.join();


return 0;
}

在升级库中的实现是值得考虑的。它易于使用,性能相当高。我编写了一个测试,并在一台四核 i7笔记本电脑(8个线程)上运行它,每秒钟可以获得 ~ 4M 的队列/取队列操作。到目前为止还没有提到的另一个实现是 http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue处的 MPMC 队列。我在同一台笔记本电脑上用这个实现对32个生产商和32个消费者进行了一些简单的测试。正如所宣传的那样,它比升压无锁队列更快。

正如大多数其他答案所说,无锁编程很难。大多数实现将很难检测到需要大量测试和调试才能修复的角落情况。这些问题通常是通过在代码中仔细放置内存屏障来解决的。你还可以在许多学术文章中找到正确性的证据。我更喜欢用蛮力工具测试这些实现。您计划在生产中使用的任何无锁算法都应该使用像 http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html这样的工具进行正确性检查。

这是一个老线程,但是因为它还没有被提及,所以——在 JUCE C + + 框架中有一个无锁的、循环的、1生产者-> 1消费者、 FIFO 可用。

Https://www.juce.com/doc/classabstractfifo#details

虽然这是一个老问题,没有人提到 DPDK的无锁环缓冲区。它是支持多个生产者和多个消费者的高吞吐量环缓冲区。它还提供了单消费者和单生产者模式,环缓冲区是无等待 SPSC 模式。它是用 C 编写的,支持多种体系结构。

此外,它还支持批量和突发模式,在这种模式下,项目可以批量进入/离开队列。该设计允许多个使用者或多个生产者通过移动一个原子指针来保留空间,从而同时写入队列。

你可以试试 排队

它使用简单,它是圆形设计锁免费

int *ret;


lfqueue_t results;


lfqueue_init(&results);


/** Wrap This scope in multithread testing **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&results, int_data) != 1) ;


/*Dequeue*/
while ( (ret = lfqueue_deq(&results)) == NULL);


// printf("%d\n", *(int*) ret );
free(ret);
/** End **/


lfqueue_clear(&results);

有些情况下,您不需要锁定来防止竞态条件,特别是当您只有一个生产者和消费者时。

考虑第三个最不发达国家中的这一段:

在仔细实现时,循环缓冲区不需要在没有多个生产者或使用者的情况下进行锁定。生成器是唯一允许修改写入索引及其指向的数组位置的线程。只要写入器在更新写入索引之前将新值存储到缓冲区中,读取器就会始终看到一致的视图。而读取器则是唯一可以访问读取索引及其指向的值的线程。为了确保两个指针不会相互溢出,生产者和使用者可以在没有竞态条件的情况下并发访问缓冲区。

不久前,我发现 一个不错的解决方案对这个问题。我相信它是迄今为止发现的最小的。

该存储库有一个示例,说明如何使用它创建 N 个线程(读取器和写入器) ,然后使其共享单个席位。

我在测试示例上做了一些基准测试,得到了以下结果(以百万操作/秒为单位) :

按缓冲区大小

throughput

根据线程的数量

enter image description here

注意线程的数量不会改变吞吐量。

我认为这是这个问题的最终解决方案。它的工作,是令人难以置信的快速和简单。即使有数百个线程和一个单一位置的队列。它可以用作线程之间的管道,在队列内部分配空间。

你能打破它吗?

如果将缓冲区永远不会满作为一个先决条件,那么可以考虑使用这种无锁算法:

capacity must be a power of 2
buffer = new T[capacity] ~ on different cache line
mask = capacity - 1
write_index ~ on different cache line
read_index ~ on different cache line


enqueue:
write_i = write_index.fetch_add(1) & mask
buffer[write_i] = element ~ release store


dequeue:
read_i = read_index.fetch_add(1) & mask
element
while ((element = buffer[read_i] ~ acquire load) == NULL) {
spin loop
}
buffer[read_i] = NULL ~ relaxed store
return element