滚动方差算法

我试图找到一个有效的,数值稳定的算法来计算滚动方差(例如,一个20周期滚动窗口的方差)。我知道 Welford 算法可以有效地计算数字流的运行方差(它只需要一次通过) ,但是不确定这是否可以适用于滚动窗口。我也希望解决方案,以避免准确性问题在 这篇文章的顶部讨论约翰 D 库克。任何语言的解决方案都可以。

44018 次浏览

我期待着在这一点上被证明是错误的,但我不认为这可以做到“迅速”也就是说,很大一部分的计算是保持跟踪电动汽车的窗口,可以很容易地做到这一点。

我留下一个问题: 你确定 need是一个窗口函数吗?除非您使用的是非常大的窗口,否则最好使用一个众所周知的预定义算法。

我猜是为了记录你的20个样本,求和(X ^ 2从1。. 20) ,和(X 从1。. 20)然后在每次迭代中连续重新计算两个和不够有效吗?重新计算新的方差是可能的,不需要每次对所有样本进行加和、平方等操作。

例如:

Sum(X^2 from 2..21) = Sum(X^2 from 1..20) - X_1^2 + X_21^2
Sum(X from 2..21) = Sum(X from 1..20) - X_1 + X_21

下面是一个分而治之的方法,它具有 O(log k)时间更新,其中 k是示例的数量。它应该是相对稳定的,这与成对求和和 FFT 是稳定的原因相同,但是它有点复杂,常数也不大。

假设我们有一个长度为 m且具有平均 E(A)和方差 V(A)的序列 A,以及一个长度为 n且具有平均 E(B)和方差 V(B)的序列 B。让 C成为 AB的串联。是的

p = m / (m + n)
q = n / (m + n)
E(C) = p * E(A) + q * E(B)
V(C) = p * (V(A) + (E(A) + E(C)) * (E(A) - E(C))) + q * (V(B) + (E(B) + E(C)) * (E(B) - E(C)))

现在,将元素填充到一个红黑树中,其中每个节点都用根植于该节点的子树的均值和方差进行装饰。在右边插入; 在左边删除。(因为我们只访问末端,所以分期树 也许吧是分期的,但我猜分期对您的应用程序来说是个问题。)如果在编译时知道 k,那么可以采用 FFTW 样式展开内部循环。

Here's another O(log k) solution: find squares the original sequence, then sum pairs, then quadruples, etc.. (You'll need a bit of a buffer to be able to find all of these efficiently.) Then add up those values that you need to to get your answer. For example:

|||||||||||||||||||||||||  // Squares
| | | | | | | | | | | | |  // Sum of squares for pairs
|   |   |   |   |   |   |  // Pairs of pairs
|       |       |       |  // (etc.)
|               |
^------------------^    // Want these 20, which you can get with
|       |          // one...
|   |       |   |      // two, three...
| |    // four...
||                      // five stored values.

现在使用标准的 E (x ^ 2)-E (x) ^ 2公式,就完成了。 (如果您需要小型数字集的良好稳定性,则不需要; 这是假设只有滚动错误的累积才会导致问题。)

也就是说,如今在大多数体系结构中,将20个平方数加起来就是 非常。如果你做的更多——比如说,几百个——一个更有效的方法显然会更好。但我不确定暴力不是解决问题的方法。

对于仅仅20个值,调整暴露 给你的方法是微不足道的(但我没有说快)。

您可以简单地选择20个这样的 RunningStat类的数组。

The first 20 elements of the stream are somewhat special, however once this is done, it's much more simple:

  • 当一个新元素到达时,清除当前的 RunningStat实例,将该元素添加到所有20个实例中,并增加“ counter”(模数20) ,它标识新的“ full”RunningStat实例
  • 在任何给定的时刻,您都可以查询当前的“完整”实例以获得运行变量。

You will obviously note that this approach isn't really scalable...

You can also note that there is some redudancy in the numbers we keep (if you go with the RunningStat full class). An obvious improvement would be to keep the 20 lasts Mk and Sk directly.

我想不出一个更好的公式来使用这个特殊的算法,恐怕它的递归公式有点束缚我们的手脚。

我一直在处理同样的问题。

Mean 迭代计算起来很简单,但是需要在循环缓冲区中保存值的完整历史记录。

next_index = (index + 1) % window_size;    // oldest x value is at next_index, wrapping if necessary.


new_mean = mean + (x_new - xs[next_index])/window_size;

I have adapted Welford's algorithm and it works for all the values that I have tested with.

varSum = var_sum + (x_new - mean) * (x_new - new_mean) - (xs[next_index] - mean) * (xs[next_index] - new_mean);


xs[next_index] = x_new;
index = next_index;

要获得当前方差,只需要将 varSum 除以窗口大小: variance = varSum / window_size;

我也碰到过这个问题。有一些伟大的职位,在计算运行累积方差,如约翰库克的 准确计算运行方差职位和职位从数字探索,计算样本和总体方差、协方差和相关系数的 Python 代码。只是找不到任何适合滚动窗口。

Subluminal Messages 的 运行标准偏差文章对于滚动窗口公式的实现至关重要。Jim 采用的是数值平方差的幂和,而 Welford 的方法是使用平均数的平方差的和。配方如下:

PSA today = PSA(yesterday) + (((x today * x today) - x yesterday)) / n

  • X = 时间序列中的值
  • N = 到目前为止分析的值的数目。

But, to convert the Power Sum Average formula to a windowed variety you need tweak the formula to the following:

今天 PSA = 昨天 PSA + (((x 今天 * x 今天)-(x 昨天 * x 昨天)/n

  • X = 时间序列中的值
  • n = number of values you've analyzed so far.

You'll also need the Rolling Simple Moving Average formula:

今天的 SMA = 昨天的 SMA + ((x today-x today-n)/n

  • X = 时间序列中的值
  • 用于滚动窗口的句点。

从这里你可以计算出滚动人口方差:

人口 Var today = (PSA today * n-n * SMA today * SMA today)/n

或滚动样本方差:

样本 Var today = (PSA today * n-n * SMA today * SMA today)/(n-1)

几年前,我曾在一篇名为 跑步方差的博客文章中讨论过这个话题以及 Python 示例代码。

希望这个能帮上忙。

请注意: 我提供了所有博客文章和数学公式的链接 in Latex (images) for this answer. But, due to my low reputation (< 10) ; 我只有2个超链接和绝对没有图片。对不起 关于这个。希望这不会影响到内容。

实际上,Welfords 算法可以很容易地适用于 AFAICT 计算 加重了方差。 通过将权重设置为 -1,您应该能够有效地抵消元素。虽然我还没有检查数学是否允许负权重,但第一眼看上去它应该!

我用 ELKI做了一个小实验:

void testSlidingWindowVariance() {
MeanVariance mv = new MeanVariance(); // ELKI implementation of weighted Welford!
MeanVariance mc = new MeanVariance(); // Control.


Random r = new Random();
double[] data = new double[1000];
for (int i = 0; i < data.length; i++) {
data[i] = r.nextDouble();
}


// Pre-roll:
for (int i = 0; i < 10; i++) {
mv.put(data[i]);
}
// Compare to window approach
for (int i = 10; i < data.length; i++) {
mv.put(data[i-10], -1.); // Remove
mv.put(data[i]);
mc.reset(); // Reset statistics
for (int j = i - 9; j <= i; j++) {
mc.put(data[j]);
}
assertEquals("Variance does not agree.", mv.getSampleVariance(),
mc.getSampleVariance(), 1e-14);
}
}

与精确的双程算法相比,我得到了大约14位的精度; 这大约是双精度算法所能达到的精度。请注意,由于额外的除法,Welford 是的需要一些计算成本-它大约需要两倍于精确的两步算法的时间。如果您的窗口大小很小,那么实际上重新计算平均值,然后在第二次传递方差 每个时间,可能会更加合理。

我把这个实验作为单元测试添加到 ELKI 中,你可以在这里看到完整的源代码: < a href = “ http://ELKI.dbs.ifi.lmu.de/web/ELKI/trunk/test/de/lmu/ifi/dbs/ELKI/data/TestSlidingVariance.java”rel = “ nofollow noReferrer”> http://ELKI.dbs.ifi.lmu.de/browser/ELKI/trunk/test/de/lmu/ifi/dbs/ELKI/math/testslidingvariance.java 它也比较了精确的两次方差。

然而,在倾斜的数据集上,行为可能会有所不同。这个数据集显然是均匀分布的; 但是我也尝试了一个排序的数组,它起作用了。

更新: 我们发表了一篇论文,详细介绍了(协变量)方差的不同加权方案:

舒伯特,埃里克和迈克尔 · 格茨。第30届国际科学与统计数据库管理会议论文集。ACM 2018年。(荣获 SSDBM 最佳论文奖。)

本文还讨论了如何使用加权来并行化计算,例如,使用 AVX、 GPU 或集群。

如果你更喜欢代码而不是文字(主要基于 DanS 的文章) : Http://calcandstuff.blogspot.se/2014/02/rolling-variance-calculation.html

public IEnumerable RollingSampleVariance(IEnumerable data, int sampleSize)
{
double mean = 0;
double accVar = 0;


int n = 0;
var queue = new Queue(sampleSize);


foreach(var observation in data)
{
queue.Enqueue(observation);
if (n < sampleSize)
{
// Calculating first variance
n++;
double delta = observation - mean;
mean += delta / n;
accVar += delta * (observation - mean);
}
else
{
// Adjusting variance
double then = queue.Dequeue();
double prevMean = mean;
mean += (observation - then) / sampleSize;
accVar += (observation - prevMean) * (observation - mean) - (then - prevMean) * (then - mean);
}


if (n == sampleSize)
yield return accVar / (sampleSize - 1);
}
}

我知道这个问题已经很老了,但是如果有人感兴趣的话,可以按照 python 代码进行操作。它的灵感来自于 JohndCook的博客文章,@Joachim’s,@DanS’s code 和@Jaime comments。下面的代码仍然为小数据窗口大小提供了小的不精确性。好好享受吧。

from __future__ import division
import collections
import math




class RunningStats:
def __init__(self, WIN_SIZE=20):
self.n = 0
self.mean = 0
self.run_var = 0
self.WIN_SIZE = WIN_SIZE


self.windows = collections.deque(maxlen=WIN_SIZE)


def clear(self):
self.n = 0
self.windows.clear()


def push(self, x):


self.windows.append(x)


if self.n <= self.WIN_SIZE:
# Calculating first variance
self.n += 1
delta = x - self.mean
self.mean += delta / self.n
self.run_var += delta * (x - self.mean)
else:
# Adjusting variance
x_removed = self.windows.popleft()
old_m = self.mean
self.mean += (x - x_removed) / self.WIN_SIZE
self.run_var += (x + x_removed - old_m - self.mean) * (x - x_removed)


def get_mean(self):
return self.mean if self.n else 0.0


def get_var(self):
return self.run_var / (self.WIN_SIZE - 1) if self.n > 1 else 0.0


def get_std(self):
return math.sqrt(self.get_var())


def get_all(self):
return list(self.windows)


def __str__(self):
return "Current window values: {}".format(list(self.windows))

这只是对 DanS 提供的出色答案的一个小小的补充。下面的公式用于从窗口中移除最古老的样本并更新均值和方差。例如,如果您想在输入数据流的右边缘附近采用较小的窗口(即只删除最老的窗口样本而不添加新的样本) ,那么这很有用。

window_size -= 1; % decrease window size by 1 sample
new_mean = prev_mean + (prev_mean - x_old) / window_size
varSum = varSum - (prev_mean - x_old) * (new_mean - x_old)

在这里,x _ old 是您希望删除的窗口中最古老的示例。

For those coming here now, 给你's a reference containing the full derivation, with proofs, of DanS's answer and Jaime's related comment.

DanS 和 Jaime 在简明 C 中的回答。

typedef struct {
size_t n, i;
float *samples, mean, var;
} rolling_var_t;


void rolling_var_init(rolling_var_t *c, size_t window_size) {
size_t ss;


memset(c, 0, sizeof(*c));
c->n = window_size;
c->samples = (float *) malloc(ss = sizeof(float)*window_size);
memset(c->samples, 0, ss);
}


void rolling_var_add(rolling_var_t *c, float x) {
float nmean;    // new mean
float xold;     // oldest x
float dx;


c->i = (c->i + 1) % c->n;
xold = c->samples[c->i];
dx = x - xold;
nmean = c->mean + dx / (float) c->n; // walk mean
//c->var += ((x - c->mean)*(x - nmean) - (xold - c->mean) * (xold - nmean)) / (float) c->n;
c->var += ((x + xold - c->mean - nmean) * dx) / (float) c->n;


c->mean = nmean;
c->samples[c->i] = x;
}