减少 Haskell 程序中的垃圾收集暂停时间

我们正在开发一个接收和转发“消息”的程序,同时保留这些消息的临时历史记录,以便它可以告诉您的消息历史,如果需要的话。消息是通过数字识别的,通常大小在1 KB 左右,我们需要保存成千上万的这样的消息。

我们希望优化这个程序的延迟: 发送和接收消息之间的时间必须小于10毫秒。

该程序是在哈斯克尔编写的,并与 GHc 一起编译。然而,我们发现垃圾收集暂停对于我们的延迟需求来说太长了: 在我们的实际程序中超过100毫秒。

下面的程序是我们的应用程序的简化版本。它使用 Data.Map.Strict来存储消息。消息是由 Int标识的 ByteString。以递增的数字顺序插入1,000,000条消息,并且不断删除最老的消息,以将历史记录保持在最多200,000条消息。

module Main (main) where


import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map


data Msg = Msg !Int !ByteString.ByteString


type Chan = Map.Map Int ByteString.ByteString


message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if 200000 < Map.size inserted
then Map.deleteMin inserted
else inserted


main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

我们编译并运行这个程序,使用:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
3,116,460,096 bytes allocated in the heap
385,101,600 bytes copied during GC
235,234,800 bytes maximum residency (14 sample(s))
124,137,808 bytes maximum slop
600 MB total memory in use (0 MB lost due to fragmentation)


Tot time (elapsed)  Avg pause  Max pause
Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s


INIT    time    0.000s  (  0.000s elapsed)
MUT     time    0.652s  (  0.745s elapsed)
GC      time    0.417s  (  0.530s elapsed)
EXIT    time    0.010s  (  0.052s elapsed)
Total   time    1.079s  (  1.326s elapsed)


%GC     time      38.6%  (40.0% elapsed)


Alloc rate    4,780,213,353 bytes per MUT second


Productivity  61.4% of total user, 49.9% of total elapsed

这里的重要指标是0.0515秒或51毫秒的“最大停顿”。我们希望至少减少一个数量级。

实验表明,GC 暂停的长度由历史记录中的消息数量决定。这种关系大致是线性的,或者可能是超线性的。下表显示了此关系。(您可以在这里看到我们的基准测试这里有些图表)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

我们已经对其他几个变量进行了实验,以确定它们是否能够减少这种延迟,这些变量都不会产生很大的影响。这些不重要的变量包括: 优化(-O-O2) ; RTS GC 选项(-G-H-A-c) ,核心数量(-N) ,不同的数据结构(Data.Sequence) ,消息的大小,以及生成的短命垃圾的数量。最重要的决定因素是历史记录中的消息数量。

我们的工作原理是,停顿在消息数量上是线性的,因为每个 GC 周期必须遍历所有可访问的工作内存并复制它,这显然是线性操作。

问题:

  • 这个线性时间理论正确吗?GC 暂停的长度可以用这种简单的方式来表示吗,还是实际情况更复杂?
  • 如果 GC 暂停在工作内存中是线性的,有没有办法减少所涉及的常数因素?
  • 有没有增量 GC 的选项,或者类似的选项?我们只能看到研究论文。我们非常愿意以吞吐量换取更低的延迟。
  • 除了拆分成多个进程之外,还有什么方法可以为更小的 GC 周期“分区”内存吗?
8518 次浏览

你实际上做得很好,有一个51毫秒的暂停时间超过200Mb 的实时数据。我工作的系统有一个较大的最大暂停时间与一半的实时数据量。

您的假设是正确的,主要的 GC 暂停时间与实时数据的数量成正比,不幸的是,对于现有的 GHC 来说,没有办法绕过这个问题。我们过去曾试验过增量 GC,但那只是一个研究项目,还没有达到将其折叠成已发布的 GHC 所需的成熟度水平。

我们希望在未来能够帮助解决这个问题的一件事情是紧凑的区域: https://phabricator.haskell.org/D1264。这是一种手动内存管理,您在堆中压缩一个结构,GC 不必遍历它。它最适合长期存在的数据,但也许它足以用于您的设置中的单个消息。我们的目标是在 GHC 8.2.0中使用它。

如果你在一个分布式设置中,并且有一个负载平衡器,你可以使用一些技巧来避免暂停,你基本上可以确保负载平衡器不会发送请求给即将执行一个主要 GC 的机器,当然还要确保机器仍然完成 GC,即使它没有收到请求。

您发现了 GC 语言的局限性: 它们不适合硬核实时系统。

你有两个选择:

增加堆大小并使用2级缓存系统,将最旧的消息发送到磁盘,并将最新的消息保存在内存中,可以通过使用操作系统分页来实现。但这种解决方案的问题在于,根据所使用的辅助内存单元的读取能力,分页可能会非常昂贵。

用“ C”编写该解决方案并与 FFI 接口到 haskell。这样你就可以自己管理内存了。这将是最好的选择,因为你可以控制你自己需要的记忆。

我不得不同意其他人的观点——如果您有严格的实时约束,那么使用 GC 语言是不理想的。

但是,您可以考虑尝试使用其他可用的数据结构,而不仅仅是 Data.Map。

我用 Data. 序列重写了一下,得到了一些有希望的改进:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

尽管您正在优化延迟,但我注意到其他指标也有所改进。在200000的情况下,执行时间从1.5秒下降到0.2秒,总内存使用量从600 MB 下降到27 MB。

我应该注意到,我通过调整设计作弊了:

  • 我从 Msg上取下了 Int所以不是在两个地方。
  • 我没有使用从 IntByteString的 Map,而是使用了 ByteStringSequence,我认为对于整个 Sequence,可以使用一个 Int来代替每条消息一个 Int。假设消息不能被重新排序,您可以使用一个偏移量来将您想要的消息转换到它在队列中的位置。

(为了证明这一点,我附加了一个函数 getMsg。)

{-# LANGUAGE BangPatterns #-}


import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S


newtype Msg = Msg ByteString.ByteString


data Chan = Chan Int (Seq ByteString.ByteString)


message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))


maxSize :: Int
maxSize = 200000


pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
Exception.evaluate $
let newSize = 1 + S.length sq
newSq = sq |> msgContent
in
if newSize <= maxSize
then Chan offset newSq
else
case S.viewl newSq of
(_ :< newSq') -> Chan (offset+1) newSq'
S.EmptyL -> error "Can't happen"


getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
where
getMsg' i
| i < 0            = Nothing
| i >= S.length sq = Nothing
| otherwise        = Just (Msg (S.index sq i))


main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])

我已经尝试过使用 ringbuffer 方法使用 IOVector作为底层数据结构的代码片段。在我的系统上(GHC 7.10.3,相同的编译选项) ,这导致最大时间(OP 中提到的度量)减少了约22% 。

注意: 我在这里做了两个假设:

  1. 可变的数据结构很适合这个问题(我猜消息传递意味着 IO)
  2. 你的留言是连续的

使用一些额外的 Int参数和算术(比如将 messageId 重置回0或 minBound) ,就可以直接确定某条消息是否仍然在历史记录中,然后从环形缓冲区中的相应索引中检索该消息。

为了您的测试乐趣:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map


import qualified Data.Vector.Mutable as Vector


data Msg = Msg !Int !ByteString.ByteString


type Chan = Map.Map Int ByteString.ByteString


data Chan2 = Chan2
{ next          :: !Int
, maxId         :: !Int
, ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
}


chanSize :: Int
chanSize = 200000


message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))




newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize


pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
let ix' = if ix == chanSize then 0 else ix + 1
in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)


pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
Exception.evaluate $
let
inserted = Map.insert msgId msgContent chan
in
if chanSize < Map.size inserted
then Map.deleteMin inserted
else inserted


main, main1, main2 :: IO ()


main = main2


main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])


main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])

正如在其他答案中提到的,GHC 中的垃圾收集器遍历活动数据,这意味着存储在内存中的活动数据越长,GC 暂停时间就越长。

GHC 8.2

为了部分地克服这个问题,在 GHC-8.2中引入了一个称为 紧凑区域的特性。它既是 GHC 运行时系统的一个特性,也是一个公开 很方便接口的库。紧凑区域特性允许将数据放入内存中的单独位置,而 GC 在垃圾收集阶段不会遍历它。因此,如果希望在内存中保留较大的结构,可以考虑使用紧凑区域。然而,紧凑的区域本身 没有 迷你垃圾收集器内,它的工作原理更好的 仅附加数据结构,而不是像 HashMap的东西,你也想删除的东西。虽然你可以克服这个问题。详情请参阅以下博客文章:

GHC 8.10

此外,自 GHC-8.10以来,实现了一种新的 低延迟增量垃圾收集器算法。它是另一种 GC 算法,默认情况下不启用,但是如果需要,您可以选择使用它。因此,您可以将默认 GC 切换到较新的 GC,以自动获取 紧凑区域提供的特性,而不需要手动包装和展开。然而,新的 GC 并不是万灵药,也不能自动解决所有问题,而且它有自己的权衡。关于新 GC 的基准,请参考以下 GitHub 存储库: