实时时间序列数据中的峰值信号检测


更新:性能最好的算法。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

Plot of data

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ...
3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

  1. 有一种具有一般平均值的基本噪声
  2. 有很大的'山峰'或'更高的数据点'明显偏离噪声。

让我们假设以下情况:

  • 峰的宽度不能事先确定
  • 峰的高度明显偏离其他值
  • 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


有针对这种情况的特定算法吗?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)

292740 次浏览

一种方法是根据以下观察来检测峰:

  • 时间t是峰值,如果(y(t) > y(t-1)) &&y(t) > y(t+1)

通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

  • 峰值为(y(t) - y(t-dt) > m) &&(y(t) - y(t+dt) > m)

其中dt是控制灵敏度和延时的参数

这是你用上述算法得到的结果: enter image description here

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()
通过设置m = 0.5,你可以得到一个只有一个假阳性的更清晰的信号: enter image description here

如果边界值或其他标准取决于未来值,那么唯一的解决方案(没有时间机器,或其他关于未来值的知识)是推迟任何决定,直到有足够的未来值。如果你想要一个高于均值的水平,例如,20点,那么你必须等到你至少有19点才能做出任何峰值决策,否则下一个新点可能会完全超过你19点之前的阈值。

补充:如果峰高的统计分布可能是重尾分布,而不是均匀分布或高斯分布,那么你可能需要等待,直到你看到几千个峰,然后隐藏的帕累托分布就不太可能产生比你目前看到的或当前图中的任何峰值大许多倍的峰值。除非你事先知道下一个点不可能是1e20,否则它可能会出现,在重新缩放你的图的Y维后,它会在那一点之前是平坦的。

在Palshikar(2009)中发现了另一个算法:

Palshikar, G.(2009)。时间序列中峰值检测的简单算法。在Proc. 1st Int。高级数据分析,商业分析和智能(卷122)。

论文可以下载从这里

算法是这样的:

algorithm peak1 // one peak detection algorithms that uses peak function S1


input T = x1, x2, …, xN, N // input time-series of N points
input k // window size around the peak
input h // typically 1 <= h <= 3
output O // set of peaks detected in T


begin
O = empty set // initially empty


for (i = 1; i < n; i++) do
// compute peak function value for each of the N points in T
a[i] = S1(k,i,xi,T);
end for


Compute the mean m' and standard deviation s' of all positive values in array a;


for (i = 1; i < n; i++) do // remove local peaks which are “small” in global context
if (a[i] > 0 && (a[i] – m') >( h * s')) then O = O + {xi};
end if
end for


Order peaks in O in terms of increasing index in T


// retain only one peak out of any set of peaks within distance k of each other


for every adjacent pair of peaks xi and xj in O do
if |j – i| <= k then remove the smaller value of {xi, xj} from O
end if
end for
end

优势

  • 本文给出了5种不同的峰值检测算法
  • 算法在原始时间序列数据上工作(不需要平滑)

缺点

  • 很难预先确定kh
  • 峰值不能是平的(像我的测试数据中的第三个峰值)

例子:

enter image description here

鲁棒峰值检测算法(使用z-scores)

我提出了一种算法,适用于这些类型的数据集。它基于分散的原理:如果一个新的数据点距离某个移动平均值有给定的x个标准偏差,则算法发出信号(也称为z分数)。该算法非常健壮,因为它构造了单独的移动平均值和偏差,这样信号就不会破坏阈值。因此,不管之前的信号有多少,未来的信号都能以大致相同的精度被识别出来。该算法有3个输入:lag = the lag of the moving windowthreshold = the z-score at which the algorithm signalsinfluence = the influence (between 0 and 1) of new signals on the mean and standard deviation。例如,lag为5将使用最后5个观测值来平滑数据。如果一个数据点距离移动平均值有3.5个标准偏差,则threshold为3.5将发出信号。而0.5的influence给出了正常数据点影响的信号threshold = the z-score at which the algorithm signals0。类似地,influence为0会完全忽略信号以重新计算新的阈值。因此,0的影响是最健壮的选项(但假设threshold = the z-score at which the algorithm signals1);将影响选项设为1是最不稳健的。因此,对于非平稳数据,影响选项应该放在0到1之间。

其工作原理如下:

< >强伪代码< / >强

# Let y be a vector of timeseries data of at least length lag+2
# Let mean() be a function that calculates the mean
# Let std() be a function that calculates the standard deviaton
# Let absolute() be the absolute value function


# Settings (these are examples: choose what is best for your data!)
set lag to 5;          # average and std. are based on past 5 observations
set threshold to 3.5;  # signal when data point is 3.5 std. away from average
set influence to 0.5;  # between 0 (no influence) and 1 (full influence)


# Initialize variables
set signals to vector 0,...,0 of length of y;   # Initialize signal results
set filteredY to y(1),...,y(lag)                # Initialize filtered series
set avgFilter to null;                          # Initialize average filter
set stdFilter to null;                          # Initialize std. filter
set avgFilter(lag) to mean(y(1),...,y(lag));    # Initialize first value average
set stdFilter(lag) to std(y(1),...,y(lag));     # Initialize first value std.


for i=lag+1,...,t do
if absolute(y(i) - avgFilter(i-1)) > threshold*stdFilter(i-1) then
if y(i) > avgFilter(i-1) then
set signals(i) to +1;                     # Positive signal
else
set signals(i) to -1;                     # Negative signal
end
set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);
else
set signals(i) to 0;                        # No signal
set filteredY(i) to y(i);
end
set avgFilter(i) to mean(filteredY(i-lag+1),...,filteredY(i));
set stdFilter(i) to std(filteredY(i-lag+1),...,filteredY(i));
end

下面是为数据选择良好参数的经验法则。


演示

鲁棒阈值算法演示

这个演示的Matlab代码可以找到here。要使用演示,只需运行它并单击上面的图表自己创建一个时间序列。算法在绘制lag个观测值后开始工作。


结果

对于最初的问题,当使用以下设置时,该算法将给出以下输出:lag = 30, threshold = 5, influence = 0:

阈值算法示例


在不同编程语言中的实现:


配置算法的经验法则

< >强lag < / >强:滞后参数决定了你的数据将被平滑的程度,以及算法对数据长期平均变化的自适应程度。你的数据静止的越多,你就应该包含越多的滞后(这应该提高算法的鲁棒性)。如果你的数据包含随时间变化的趋势,你应该考虑你希望算法多快适应这些趋势。例如,如果你把lag放在10,在算法的阈值调整到长期平均值的任何系统变化之前,它需要10个“周期”。因此,根据数据的趋势行为和您希望算法的自适应程度选择lag参数。

< >强influence < / >强:该参数确定信号对算法检测阈值的影响。如果设为0,则信号对阈值没有影响,因此根据不受过去信号影响的平均值和标准差计算的阈值检测未来信号。如果置为0.5,信号具有正常数据点的一半影响。另一种思考方法是,如果你把影响设为0,你就隐含地假设了平稳性(即无论有多少信号,你总是期望时间序列在长期内回到相同的平均值)。如果不是这样,则应将影响参数设置在0到1之间,这取决于信号能够系统地影响数据的时变趋势的程度。例如,如果信号导致时间序列的长期平均值结构破坏,则影响参数应该设置高(接近1),以便阈值可以快速对结构断裂做出反应。

< >强threshold < / >强:阈值参数是从移动平均值的标准偏差数,在此之上,算法将新数据点分类为信号。例如,如果一个新的数据点高于移动平均值4.0个标准偏差,并且阈值参数设置为3.5,算法将识别该数据点为信号。该参数应该根据您期望的信号数量来设置。例如,如果您的数据是正态分布的,3.5的阈值(或:z-score)对应的信号概率为0.00047(来自这个表),这意味着您期望每2128个数据点(1/0.00047)有一个信号。因此,阈值直接影响算法的灵敏度,从而也决定了算法信号的频率。检查您自己的数据,并选择一个合理的阈值,使算法在需要时发出信号(这里可能需要进行一些试错,以获得适合您的目的的良好阈值)。


警告:上面的代码每次运行时都会遍历所有的数据点。在实现这段代码时,请确保将信号的计算拆分为一个单独的函数(没有循环)。然后当一个新的数据点到达时,更新filteredYavgFilterstdFilter一次。不要每次有新的数据点时都重新计算所有数据的信号(就像上面的例子一样),这在实时应用程序中是非常低效和缓慢的。

其他修改算法的方法(为了潜在的改进)有:

  1. 使用中位数而不是平均值
  2. 使用稳健尺度测量,例如中位数绝对偏差(MAD),而不是标准偏差
  3. 使用信号裕度,这样信号就不会频繁切换
  4. 更改影响参数的工作方式
  5. 区别对待向上下来信号(不对称处理)
  6. 为mean和std创建一个单独的influence参数(就像这个快速翻译)

(已知)学术引用此StackOverflow的答案:

其他工作使用的算法从这个答案

算法的其他应用从这个答案

链接到其他峰值检测算法


如何引用该算法:

Brakel, J.P.G. van(2014)。使用z分数的鲁棒峰值检测算法。堆栈溢出。可用于:https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/22640362#22640362(版本:2020-11-08)。

<子> < >强助理 @misc{brakel2014,作者= {Brakel, J.P.G van},标题={使用z-scores的鲁棒峰值检测算法},url = {https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/22640362#22640362},语言= {en},年份= {2014},urldate ={2022-04-12},期刊= {Stack Overflow}, howpublished = {https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/22640362#22640362}}


如果你在某个地方使用这个功能,请使用上面的参考资料来感谢我。如果你对算法有任何问题,请在下面的评论中发表,或者在LinkedIn上联系我。


这个问题看起来类似于我在混合/嵌入式系统课程中遇到的问题,但这与检测传感器输入有噪声时的故障有关。我们使用卡尔曼滤波器来估计/预测系统的隐藏状态,然后使用确定故障发生可能性的统计分析。我们研究的是线性系统,但是存在非线性变量。我记得这种方法具有令人惊讶的适应性,但它需要一个系统动态模型。

在信号处理中,峰值检测通常采用小波变换。基本上就是对时间序列数据进行离散小波变换。返回的细节系数中的过零将对应于时间序列信号中的峰值。你会在不同的细节系数水平上检测到不同的峰值振幅,这给了你多层次的分辨率。

下面是在Golang中实现的Smoothed z-score算法(上图)。它假设[]int16 (PCM 16位样本)的一个切片。你可以找到一个这里的要点

/*
Settings (the ones below are examples: choose what is best for your data)
set lag to 5;          # lag 5 for the smoothing functions
set threshold to 3.5;  # 3.5 standard deviations for signal
set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half
*/


// ZScore on 16bit WAV samples
func ZScore(samples []int16, lag int, threshold float64, influence float64) (signals []int16) {
//lag := 20
//threshold := 3.5
//influence := 0.5


signals = make([]int16, len(samples))
filteredY := make([]int16, len(samples))
for i, sample := range samples[0:lag] {
filteredY[i] = sample
}
avgFilter := make([]int16, len(samples))
stdFilter := make([]int16, len(samples))


avgFilter[lag] = Average(samples[0:lag])
stdFilter[lag] = Std(samples[0:lag])


for i := lag + 1; i < len(samples); i++ {


f := float64(samples[i])


if float64(Abs(samples[i]-avgFilter[i-1])) > threshold*float64(stdFilter[i-1]) {
if samples[i] > avgFilter[i-1] {
signals[i] = 1
} else {
signals[i] = -1
}
filteredY[i] = int16(influence*f + (1-influence)*float64(filteredY[i-1]))
avgFilter[i] = Average(filteredY[(i - lag):i])
stdFilter[i] = Std(filteredY[(i - lag):i])
} else {
signals[i] = 0
filteredY[i] = samples[i]
avgFilter[i] = Average(filteredY[(i - lag):i])
stdFilter[i] = Std(filteredY[(i - lag):i])
}
}


return
}


// Average a chunk of values
func Average(chunk []int16) (avg int16) {
var sum int64
for _, sample := range chunk {
if sample < 0 {
sample *= -1
}
sum += int64(sample)
}
return int16(sum / int64(len(chunk)))
}
与其将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag).

注意,最大值必须被视为信号之前的增加

下面是平滑z-score算法的Python / numpy实现(参见以上回答)。你可以找到这里的要点

#!/usr/bin/env python
# Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab


def thresholding_algo(y, lag, threshold, influence):
signals = np.zeros(len(y))
filteredY = np.array(y)
avgFilter = [0]*len(y)
stdFilter = [0]*len(y)
avgFilter[lag - 1] = np.mean(y[0:lag])
stdFilter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y)):
if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
if y[i] > avgFilter[i-1]:
signals[i] = 1
else:
signals[i] = -1


filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
else:
signals[i] = 0
filteredY[i] = y[i]
avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])


return dict(signals = np.asarray(signals),
avgFilter = np.asarray(avgFilter),
stdFilter = np.asarray(stdFilter))

下面是在相同数据集上的测试,它产生与R/Matlab的原始答案相同的图

# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])


# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0


# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)


# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)


pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"], color="cyan", lw=2)


pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)


pylab.plot(np.arange(1, len(y)+1),
result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)


pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

下面是平滑z-score算法(见上面的答案)的Groovy (Java)实现。

/**
* "Smoothed zero-score alogrithm" shamelessly copied from https://stackoverflow.com/a/22640362/6029703
*  Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/


public HashMap<String, List<Object>> thresholdingAlgo(List<Double> y, Long lag, Double threshold, Double influence) {
//init stats instance
SummaryStatistics stats = new SummaryStatistics()


//the results (peaks, 1 or -1) of our algorithm
List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(y.size(), 0))
//filter out the signals (peaks) from our original list (using influence arg)
List<Double> filteredY = new ArrayList<Double>(y)
//the current average of the rolling window
List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
//the current standard deviation of the rolling window
List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
//init avgFilter and stdFilter
(0..lag-1).each { stats.addValue(y[it as int]) }
avgFilter[lag - 1 as int] = stats.getMean()
stdFilter[lag - 1 as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
stats.clear()
//loop input starting at end of rolling window
(lag..y.size()-1).each { i ->
//if the distance between the current value and average is enough standard deviations (threshold) away
if (Math.abs((y[i as int] - avgFilter[i - 1 as int]) as Double) > threshold * stdFilter[i - 1 as int]) {
//this is a signal (i.e. peak), determine if it is a positive or negative signal
signals[i as int] = (y[i as int] > avgFilter[i - 1 as int]) ? 1 : -1
//filter this signal out using influence
filteredY[i as int] = (influence * y[i as int]) + ((1-influence) * filteredY[i - 1 as int])
} else {
//ensure this signal remains a zero
signals[i as int] = 0
//ensure this value is not filtered
filteredY[i as int] = y[i as int]
}
//update rolling average and deviation
(i - lag..i-1).each { stats.addValue(filteredY[it as int] as Double) }
avgFilter[i as int] = stats.getMean()
stdFilter[i as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
stats.clear()
}


return [
signals  : signals,
avgFilter: avgFilter,
stdFilter: stdFilter
]
}

下面是在同一数据集上的测试,产生与以上Python / numpy实现相同的结果。

    // Data
def y = [1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d]


// Settings
def lag = 30
def threshold = 5
def influence = 0




def thresholdingResults = thresholdingAlgo((List<Double>) y, (Long) lag, (Double) threshold, (Double) influence)


println y.size()
println thresholdingResults.signals.size()
println thresholdingResults.signals


thresholdingResults.signals.eachWithIndex { x, idx ->
if (x) {
println y[idx]
}
}
在计算拓扑学中,持久同调的思想导致一个有效的 -快如排序数字-解决方案。它不仅能检测峰,还能以自然的方式量化峰的“重要性”,让你选择对你有意义的峰

< >强算法总结。 在一维设置(时间序列,实值信号)中,算法可以简单地用下图描述:

最持久的峰值

把函数图(或它的子层次集)想象成一个景观,并考虑从无穷级(或图中的1.8级)开始递减的水位。当水位下降时,在局部极大值时,岛屿就会出现。在局部极小时,这些岛屿合并在一起。这个想法的一个细节是,后来出现的岛屿被合并到更古老的岛屿中。岛屿的“持久性”是它的诞生时间减去它的死亡时间。蓝色条的长度描述了持久性,这就是上面提到的峰值的“意义”。

< >强效率。 在对函数值进行排序之后,找到一个在线性时间内运行的实现并不难——实际上它是一个单一的、简单的循环。所以这个实现在实践中应该是快速的,也很容易实现

< >强引用。 一篇关于整个故事的文章和对持久同调(计算代数拓扑中的一个领域)动机的引用可以在这里找到: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html < / p >

下面是平滑z-score算法从这个答案的c++实现

std::vector<int> smoothedZScore(std::vector<float> input)
{
//lag 5 for the smoothing functions
int lag = 5;
//3.5 standard deviations for signal
float threshold = 3.5;
//between 0 and 1, where 1 is normal influence, 0.5 is half
float influence = .5;


if (input.size() <= lag + 2)
{
std::vector<int> emptyVec;
return emptyVec;
}


//Initialise variables
std::vector<int> signals(input.size(), 0.0);
std::vector<float> filteredY(input.size(), 0.0);
std::vector<float> avgFilter(input.size(), 0.0);
std::vector<float> stdFilter(input.size(), 0.0);
std::vector<float> subVecStart(input.begin(), input.begin() + lag);
avgFilter[lag] = mean(subVecStart);
stdFilter[lag] = stdDev(subVecStart);


for (size_t i = lag + 1; i < input.size(); i++)
{
if (std::abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
{
if (input[i] > avgFilter[i - 1])
{
signals[i] = 1; //# Positive signal
}
else
{
signals[i] = -1; //# Negative signal
}
//Make influence lower
filteredY[i] = influence* input[i] + (1 - influence) * filteredY[i - 1];
}
else
{
signals[i] = 0; //# No signal
filteredY[i] = input[i];
}
//Adjust the filters
std::vector<float> subVec(filteredY.begin() + i - lag, filteredY.begin() + i);
avgFilter[i] = mean(subVec);
stdFilter[i] = stdDev(subVec);
}
return signals;
}

c++实现

#include <iostream>
#include <vector>
#include <algorithm>
#include <unordered_map>
#include <cmath>
#include <iterator>
#include <numeric>


using namespace std;


typedef long double ld;
typedef unsigned int uint;
typedef std::vector<ld>::iterator vec_iter_ld;


/**
* Overriding the ostream operator for pretty printing vectors.
*/
template<typename T>
std::ostream &operator<<(std::ostream &os, std::vector<T> vec) {
os << "[";
if (vec.size() != 0) {
std::copy(vec.begin(), vec.end() - 1, std::ostream_iterator<T>(os, " "));
os << vec.back();
}
os << "]";
return os;
}


/**
* This class calculates mean and standard deviation of a subvector.
* This is basically stats computation of a subvector of a window size qual to "lag".
*/
class VectorStats {
public:
/**
* Constructor for VectorStats class.
*
* @param start - This is the iterator position of the start of the window,
* @param end   - This is the iterator position of the end of the window,
*/
VectorStats(vec_iter_ld start, vec_iter_ld end) {
this->start = start;
this->end = end;
this->compute();
}


/**
* This method calculates the mean and standard deviation using STL function.
* This is the Two-Pass implementation of the Mean & Variance calculation.
*/
void compute() {
ld sum = std::accumulate(start, end, 0.0);
uint slice_size = std::distance(start, end);
ld mean = sum / slice_size;
std::vector<ld> diff(slice_size);
std::transform(start, end, diff.begin(), [mean](ld x) { return x - mean; });
ld sq_sum = std::inner_product(diff.begin(), diff.end(), diff.begin(), 0.0);
ld std_dev = std::sqrt(sq_sum / slice_size);


this->m1 = mean;
this->m2 = std_dev;
}


ld mean() {
return m1;
}


ld standard_deviation() {
return m2;
}


private:
vec_iter_ld start;
vec_iter_ld end;
ld m1;
ld m2;
};


/**
* This is the implementation of the Smoothed Z-Score Algorithm.
* This is direction translation of https://stackoverflow.com/a/22640362/1461896.
*
* @param input - input signal
* @param lag - the lag of the moving window
* @param threshold - the z-score at which the algorithm signals
* @param influence - the influence (between 0 and 1) of new signals on the mean and standard deviation
* @return a hashmap containing the filtered signal and corresponding mean and standard deviation.
*/
unordered_map<string, vector<ld>> z_score_thresholding(vector<ld> input, int lag, ld threshold, ld influence) {
unordered_map<string, vector<ld>> output;


uint n = (uint) input.size();
vector<ld> signals(input.size());
vector<ld> filtered_input(input.begin(), input.end());
vector<ld> filtered_mean(input.size());
vector<ld> filtered_stddev(input.size());


VectorStats lag_subvector_stats(input.begin(), input.begin() + lag);
filtered_mean[lag - 1] = lag_subvector_stats.mean();
filtered_stddev[lag - 1] = lag_subvector_stats.standard_deviation();


for (int i = lag; i < n; i++) {
if (abs(input[i] - filtered_mean[i - 1]) > threshold * filtered_stddev[i - 1]) {
signals[i] = (input[i] > filtered_mean[i - 1]) ? 1.0 : -1.0;
filtered_input[i] = influence * input[i] + (1 - influence) * filtered_input[i - 1];
} else {
signals[i] = 0.0;
filtered_input[i] = input[i];
}
VectorStats lag_subvector_stats(filtered_input.begin() + (i - lag), filtered_input.begin() + i);
filtered_mean[i] = lag_subvector_stats.mean();
filtered_stddev[i] = lag_subvector_stats.standard_deviation();
}


output["signals"] = signals;
output["filtered_mean"] = filtered_mean;
output["filtered_stddev"] = filtered_stddev;


return output;
};


int main() {
vector<ld> input = {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0,
1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0,
1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0, 3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0,
1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0, 1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};


int lag = 30;
ld threshold = 5.0;
ld influence = 0.0;
unordered_map<string, vector<ld>> output = z_score_thresholding(input, lag, threshold, influence);
cout << output["signals"] << endl;
}

下面是平滑z-score算法的(非惯用的)Scala版本:

/**
* Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/
private def smoothedZScore(y: Seq[Double], lag: Int, threshold: Double, influence: Double): Seq[Int] = {
val stats = new SummaryStatistics()


// the results (peaks, 1 or -1) of our algorithm
val signals = mutable.ArrayBuffer.fill(y.length)(0)


// filter out the signals (peaks) from our original list (using influence arg)
val filteredY = y.to[mutable.ArrayBuffer]


// the current average of the rolling window
val avgFilter = mutable.ArrayBuffer.fill(y.length)(0d)


// the current standard deviation of the rolling window
val stdFilter = mutable.ArrayBuffer.fill(y.length)(0d)


// init avgFilter and stdFilter
y.take(lag).foreach(s => stats.addValue(s))


avgFilter(lag - 1) = stats.getMean
stdFilter(lag - 1) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)


// loop input starting at end of rolling window
y.zipWithIndex.slice(lag, y.length - 1).foreach {
case (s: Double, i: Int) =>
// if the distance between the current value and average is enough standard deviations (threshold) away
if (Math.abs(s - avgFilter(i - 1)) > threshold * stdFilter(i - 1)) {
// this is a signal (i.e. peak), determine if it is a positive or negative signal
signals(i) = if (s > avgFilter(i - 1)) 1 else -1
// filter this signal out using influence
filteredY(i) = (influence * s) + ((1 - influence) * filteredY(i - 1))
} else {
// ensure this signal remains a zero
signals(i) = 0
// ensure this value is not filtered
filteredY(i) = s
}


// update rolling average and deviation
stats.clear()
filteredY.slice(i - lag, i).foreach(s => stats.addValue(s))
avgFilter(i) = stats.getMean
stdFilter(i) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)
}


println(y.length)
println(signals.length)
println(signals)


signals.zipWithIndex.foreach {
case(x: Int, idx: Int) =>
if (x == 1) {
println(idx + " " + y(idx))
}
}


val data =
y.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "y", "row" -> "data") } ++
avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "avgFilter", "row" -> "data") } ++
avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s - threshold * stdFilter(i)), "name" -> "lower", "row" -> "data") } ++
avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s + threshold * stdFilter(i)), "name" -> "upper", "row" -> "data") } ++
signals.zipWithIndex.map { case (s: Int, i: Int) => Map("x" -> i, "y" -> s, "name" -> "signal", "row" -> "signal") }


Vegas("Smoothed Z")
.withData(data)
.mark(Line)
.encodeX("x", Quant)
.encodeY("y", Quant)
.encodeColor(
field="name",
dataType=Nominal
)
.encodeRow("row", Ordinal)
.show


return signals
}

下面是一个测试,返回与Python和Groovy版本相同的结果:

val y = List(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d)


val lag = 30
val threshold = 5d
val influence = 0d


smoothedZScore(y, lag, threshold, influence)

vegas chart of result

这里的要点

我在我的机器人项目中需要这样的东西。我想我可能会返回芬兰湾的科特林实现。

/**
* Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/
fun smoothedZScore(y: List<Double>, lag: Int, threshold: Double, influence: Double): Triple<List<Int>, List<Double>, List<Double>> {
val stats = SummaryStatistics()
// the results (peaks, 1 or -1) of our algorithm
val signals = MutableList<Int>(y.size, { 0 })
// filter out the signals (peaks) from our original list (using influence arg)
val filteredY = ArrayList<Double>(y)
// the current average of the rolling window
val avgFilter = MutableList<Double>(y.size, { 0.0 })
// the current standard deviation of the rolling window
val stdFilter = MutableList<Double>(y.size, { 0.0 })
// init avgFilter and stdFilter
y.take(lag).forEach { s -> stats.addValue(s) }
avgFilter[lag - 1] = stats.mean
stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() uses sample variance (not what we want)
stats.clear()
//loop input starting at end of rolling window
(lag..y.size - 1).forEach { i ->
//if the distance between the current value and average is enough standard deviations (threshold) away
if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
//this is a signal (i.e. peak), determine if it is a positive or negative signal
signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
//filter this signal out using influence
filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
} else {
//ensure this signal remains a zero
signals[i] = 0
//ensure this value is not filtered
filteredY[i] = y[i]
}
//update rolling average and deviation
(i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
avgFilter[i] = stats.getMean()
stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
stats.clear()
}
return Triple(signals, avgFilter, stdFilter)
}

带有验证图的示例项目可以在github找到。

enter image description here

下面是我尝试为“Smoothed z-score算法”创建一个Ruby解决方案:

module ThresholdingAlgoMixin
def mean(array)
array.reduce(&:+) / array.size.to_f
end


def stddev(array)
array_mean = mean(array)
Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)
end


def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
return nil if size < lag * 2
Array.new(size, 0).tap do |signals|
filtered = Array.new(self)


initial_slice = take(lag)
avg_filter = Array.new(lag - 1, 0.0) + [mean(initial_slice)]
std_filter = Array.new(lag - 1, 0.0) + [stddev(initial_slice)]
(lag..size-1).each do |idx|
prev = idx - 1
if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])
end


filtered_slice = filtered[idx-lag..prev]
avg_filter[idx] = mean(filtered_slice)
std_filter[idx] = stddev(filtered_slice)
end
end
end
end

以及示例用法:

test_data = [
1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1
].extend(ThresholdingAlgoMixin)


puts test_data.thresholding_algo.inspect


# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]
这是一个修改的Fortran版本z-score算法。 它是专门针对频率空间中传递函数的峰值(共振)检测进行修改的(每个更改在代码中都有一个小注释)。< / p >

如果在输入向量的下界附近存在共振,则第一个修改会向用户发出警告,该共振由高于某个阈值的标准偏差表示(在本例中为10%)。这仅仅意味着信号不够平坦,不足以使检测正确地初始化滤波器。

第二种修改是只将峰值的最大值添加到已找到的峰值中。这是通过将每个发现的峰值与其(滞后)前辈及其(滞后)后继者的大小进行比较来达到的。

第三个变化是尊重共振峰通常在共振频率周围表现出某种形式的对称性。因此,围绕当前数据点对称地计算平均值和std是很自然的(而不仅仅是针对之前的数据点)。这将导致更好的峰值检测行为。

这些修改的效果是,整个信号必须事先被函数知道,这是共振检测的通常情况(类似于Jean-Paul的Matlab示例,其中数据点是动态生成的,这是行不通的)。

function PeakDetect(y,lag,threshold, influence)
implicit none
! Declaring part
real, dimension(:), intent(in) :: y
integer, dimension(size(y)) :: PeakDetect
real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
integer :: lag, ii
real :: threshold, influence


! Executing part
PeakDetect = 0
filteredY = 0.0
filteredY(1:lag+1) = y(1:lag+1)
avgFilter = 0.0
avgFilter(lag+1) = mean(y(1:2*lag+1))
stdFilter = 0.0
stdFilter(lag+1) = std(y(1:2*lag+1))


if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
write(unit=*,fmt=1001)
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
end if
do ii = lag+2, size(y)
if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
PeakDetect(ii) = 1
end if
filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
else
filteredY(ii) = y(ii)
end if
! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
end do
end function PeakDetect


real function mean(y)
!> @brief Calculates the mean of vector y
implicit none
! Declaring part
real, dimension(:), intent(in) :: y
integer :: N
! Executing part
N = max(1,size(y))
mean = sum(y)/N
end function mean


real function std(y)
!> @brief Calculates the standard deviation of vector y
implicit none
! Declaring part
real, dimension(:), intent(in) :: y
integer :: N
! Executing part
N = max(1,size(y))
std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std
对于我的应用程序,算法的工作就像一个魅力! enter image description here < / p >

这里是python/numpy中https://stackoverflow.com/a/22640362/6029703答案的迭代版本。对于大数据(100000+),此代码比计算平均和标准偏差的速度更快。

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
'''
iterative smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''
import numpy as np
labels = np.zeros(len(x))
filtered_y = np.array(x)
avg_filter = np.zeros(len(x))
std_filter = np.zeros(len(x))
var_filter = np.zeros(len(x))


avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])
for i in range(lag, len(x)):
if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
if x[i] > avg_filter[i - 1]:
labels[i] = 1
else:
labels[i] = -1
filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
else:
labels[i] = 0
filtered_y[i] = x[i]
# update avg, var, std
avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
std_filter[i] = np.sqrt(var_filter[i])


return dict(signals=labels,
avgFilter=avg_filter,
stdFilter=std_filter)

我想把我的Julia算法实现提供给其他人。要点可以找到在这里

using Statistics
using Plots
function SmoothedZscoreAlgo(y, lag, threshold, influence)
# Julia implimentation of http://stackoverflow.com/a/22640362/6029703
n = length(y)
signals = zeros(n) # init signal results
filteredY = copy(y) # init filtered series
avgFilter = zeros(n) # init average filter
stdFilter = zeros(n) # init std filter
avgFilter[lag - 1] = mean(y[1:lag]) # init first value
stdFilter[lag - 1] = std(y[1:lag]) # init first value


for i in range(lag, stop=n-1)
if abs(y[i] - avgFilter[i-1]) > threshold*stdFilter[i-1]
if y[i] > avgFilter[i-1]
signals[i] += 1 # postive signal
else
signals[i] += -1 # negative signal
end
# Make influence lower
filteredY[i] = influence*y[i] + (1-influence)*filteredY[i-1]
else
signals[i] = 0
filteredY[i] = y[i]
end
avgFilter[i] = mean(filteredY[i-lag+1:i])
stdFilter[i] = std(filteredY[i-lag+1:i])
end
return (signals = signals, avgFilter = avgFilter, stdFilter = stdFilter)
end




# Data
y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]


# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0


results = SmoothedZscoreAlgo(y, lag, threshold, influence)
upper_bound = results[:avgFilter] + threshold * results[:stdFilter]
lower_bound = results[:avgFilter] - threshold * results[:stdFilter]
x = 1:length(y)


yplot = plot(x,y,color="blue", label="Y",legend=:topleft)
yplot = plot!(x,upper_bound, color="green", label="Upper Bound",legend=:topleft)
yplot = plot!(x,results[:avgFilter], color="cyan", label="Average Filter",legend=:topleft)
yplot = plot!(x,lower_bound, color="green", label="Lower Bound",legend=:topleft)
signalplot = plot(x,results[:signals],color="red",label="Signals",legend=:topleft)
plot(yplot,signalplot,layout=(2,1),legend=:topleft)

Results

如果你的数据在一个数据库表中,这里是一个简单的z-score算法的SQL版本:

with data_with_zscore as (
select
date_time,
value,
value / (avg(value) over ()) as pct_of_mean,
(value - avg(value) over ()) / (stdev(value) over ()) as z_score
from \{\{tablename}}  where datetime > '2018-11-26' and datetime < '2018-12-03'
)




-- select all
select * from data_with_zscore


-- select only points greater than a certain threshold
select * from data_with_zscore where z_score > abs(2)

根据@Jean-Paul提出的解决方案,我用c#实现了他的算法

public class ZScoreOutput
{
public List<double> input;
public List<int> signals;
public List<double> avgFilter;
public List<double> filtered_stddev;
}


public static class ZScore
{
public static ZScoreOutput StartAlgo(List<double> input, int lag, double threshold, double influence)
{
// init variables!
int[] signals = new int[input.Count];
double[] filteredY = new List<double>(input).ToArray();
double[] avgFilter = new double[input.Count];
double[] stdFilter = new double[input.Count];


var initialWindow = new List<double>(filteredY).Skip(0).Take(lag).ToList();


avgFilter[lag - 1] = Mean(initialWindow);
stdFilter[lag - 1] = StdDev(initialWindow);


for (int i = lag; i < input.Count; i++)
{
if (Math.Abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
{
signals[i] = (input[i] > avgFilter[i - 1]) ? 1 : -1;
filteredY[i] = influence * input[i] + (1 - influence) * filteredY[i - 1];
}
else
{
signals[i] = 0;
filteredY[i] = input[i];
}


// Update rolling average and deviation
var slidingWindow = new List<double>(filteredY).Skip(i - lag).Take(lag+1).ToList();


var tmpMean = Mean(slidingWindow);
var tmpStdDev = StdDev(slidingWindow);


avgFilter[i] = Mean(slidingWindow);
stdFilter[i] = StdDev(slidingWindow);
}


// Copy to convenience class
var result = new ZScoreOutput();
result.input = input;
result.avgFilter       = new List<double>(avgFilter);
result.signals         = new List<int>(signals);
result.filtered_stddev = new List<double>(stdFilter);


return result;
}


private static double Mean(List<double> list)
{
// Simple helper function!
return list.Average();
}


private static double StdDev(List<double> values)
{
double ret = 0;
if (values.Count() > 0)
{
double avg = values.Average();
double sum = values.Sum(d => Math.Pow(d - avg, 2));
ret = Math.Sqrt((sum) / (values.Count() - 1));
}
return ret;
}
}

使用示例:

var input = new List<double> {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0,
1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9,
1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0, 1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0,
3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0, 1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0,
1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};


int lag = 30;
double threshold = 5.0;
double influence = 0.0;


var output = ZScore.StartAlgo(input, lag, threshold, influence);

我们尝试在我们的数据集上使用平滑的z-score算法,这导致了过度敏感或不敏感(取决于参数如何调整),几乎没有中间地带。在我们站点的交通信号中,我们观察到一个低频基线,它代表了每天的周期,即使有最好的可能参数(如下所示),它仍然在第4天下降,特别是因为大多数数据点被认为是异常的。

在原始z-score算法的基础上,我们提出了一种通过反向滤波来解决这个问题的方法。改进算法的具体内容及其在电视商业流量归因中的应用发表在我们的团队博客上。

enter image description here

函数scipy.signal.find_peaks,顾名思义,在这方面很有用。但重要的是要很好地理解其参数widththresholddistance 最重要的是prominence,以获得良好的峰值提取。

根据我的测试和文档,突出的概念是“有用的概念”,以保持良好的峰值,并丢弃噪声峰值。

(地形)突出是什么?它是从山顶到更高地形所需的最低下降高度。,可以在这里看到:

这个想法是:

突出位置越高,山峰就越“重要”。

下面是Arduino微控制器的@Jean-Paul的 Smoothed Z-score的C实现,用于获取加速度计读数,并确定冲击的方向是来自左边还是右边。这表现得非常好,因为这个设备返回一个反弹信号。这是设备对峰值检测算法的输入-显示了来自右边的冲击,然后是来自左边的冲击。你可以看到最初的峰值然后传感器的振荡。

enter image description here

#include <stdio.h>
#include <math.h>
#include <string.h>




#define SAMPLE_LENGTH 1000


float stddev(float data[], int len);
float mean(float data[], int len);
void thresholding(float y[], int signals[], int lag, float threshold, float influence);




void thresholding(float y[], int signals[], int lag, float threshold, float influence) {
memset(signals, 0, sizeof(int) * SAMPLE_LENGTH);
float filteredY[SAMPLE_LENGTH];
memcpy(filteredY, y, sizeof(float) * SAMPLE_LENGTH);
float avgFilter[SAMPLE_LENGTH];
float stdFilter[SAMPLE_LENGTH];


avgFilter[lag - 1] = mean(y, lag);
stdFilter[lag - 1] = stddev(y, lag);


for (int i = lag; i < SAMPLE_LENGTH; i++) {
if (fabsf(y[i] - avgFilter[i-1]) > threshold * stdFilter[i-1]) {
if (y[i] > avgFilter[i-1]) {
signals[i] = 1;
} else {
signals[i] = -1;
}
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1];
} else {
signals[i] = 0;
}
avgFilter[i] = mean(filteredY + i-lag, lag);
stdFilter[i] = stddev(filteredY + i-lag, lag);
}
}


float mean(float data[], int len) {
float sum = 0.0, mean = 0.0;


int i;
for(i=0; i<len; ++i) {
sum += data[i];
}


mean = sum/len;
return mean;




}


float stddev(float data[], int len) {
float the_mean = mean(data, len);
float standardDeviation = 0.0;


int i;
for(i=0; i<len; ++i) {
standardDeviation += pow(data[i] - the_mean, 2);
}


return sqrt(standardDeviation/len);
}


int main() {
printf("Hello, World!\n");
int lag = 100;
float threshold = 5;
float influence = 0;
float y[]=  {1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
....
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1}


int signal[SAMPLE_LENGTH];


thresholding(y, signal,  lag, threshold, influence);


return 0;
}

她的结果是影响= 0

enter image description here

不是很好,但这里的影响力= 1

enter image description here

这很好。

原答案的附录1:MatlabR翻译

< >强Matlab代码< / >强

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
% Initialise signal results
signals = zeros(length(y),1);
% Initialise filtered series
filteredY = y(1:lag+1);
% Initialise filters
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
% Loop over all datapoints y(lag+2),...,y(t)
for i=lag+2:length(y)
% If new value is a specified number of deviations away
if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
if y(i) > avgFilter(i-1)
% Positive signal
signals(i) = 1;
else
% Negative signal
signals(i) = -1;
end
% Make influence lower
filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
else
% No signal
signals(i) = 0;
filteredY(i) = y(i);
end
% Adjust the filters
avgFilter(i) = mean(filteredY(i-lag:i));
stdFilter(i) = std(filteredY(i-lag:i));
end
% Done, now return results
end

例子:

% Data
y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];


% Settings
lag = 30;
threshold = 5;
influence = 0;


% Get results
[signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);


figure; subplot(2,1,1); hold on;
x = 1:length(y); ix = lag+1:length(y);
area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
plot(x(ix),avg(ix),'LineWidth',1,'Color','cyan','LineWidth',1.5);
plot(x(ix),avg(ix)+threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(x(ix),avg(ix)-threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(1:length(y),y,'b');
subplot(2,1,2);
stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);

<强> R代码< / >强

ThresholdingAlgo <- function(y,lag,threshold,influence) {
signals <- rep(0,length(y))
filteredY <- y[0:lag]
avgFilter <- NULL
stdFilter <- NULL
avgFilter[lag] <- mean(y[0:lag], na.rm=TRUE)
stdFilter[lag] <- sd(y[0:lag], na.rm=TRUE)
for (i in (lag+1):length(y)){
if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
if (y[i] > avgFilter[i-1]) {
signals[i] <- 1;
} else {
signals[i] <- -1;
}
filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
} else {
signals[i] <- 0
filteredY[i] <- y[i]
}
avgFilter[i] <- mean(filteredY[(i-lag):i], na.rm=TRUE)
stdFilter[i] <- sd(filteredY[(i-lag):i], na.rm=TRUE)
}
return(list("signals"=signals,"avgFilter"=avgFilter,"stdFilter"=stdFilter))
}

例子:

# Data
y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1)


lag       <- 30
threshold <- 5
influence <- 0


# Run algo with lag = 30, threshold = 5, influence = 0
result <- ThresholdingAlgo(y,lag,threshold,influence)


# Plot result
par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)
plot(1:length(y),y,type="l",ylab="",xlab="")
lines(1:length(y),result$avgFilter,type="l",col="cyan",lwd=2)
lines(1:length(y),result$avgFilter+threshold*result$stdFilter,type="l",col="green",lwd=2)
lines(1:length(y),result$avgFilter-threshold*result$stdFilter,type="l",col="green",lwd=2)
plot(result$signals,type="S",col="red",ylab="",xlab="",ylim=c(-1.5,1.5),lwd=2)

这段代码(两种语言)将为原始问题的数据产生以下结果:

 threshold - olding example from Matlab code


附录2原始答案:Matlab演示代码

(点击创建数据)

Matlab demo

function [] = RobustThresholdingDemo()


%% SPECIFICATIONS
lag         = 5;       % lag for the smoothing
threshold   = 3.5;     % number of st.dev. away from the mean to signal
influence   = 0.3;     % when signal: how much influence for new data? (between 0 and 1)
% 1 is normal influence, 0.5 is half
%% START DEMO
DemoScreen(30,lag,threshold,influence);


end


function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
signals = zeros(length(y),1);
filteredY = y(1:lag+1);
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
for i=lag+2:length(y)
if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
if y(i) > avgFilter(i-1)
signals(i) = 1;
else
signals(i) = -1;
end
filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
else
signals(i) = 0;
filteredY(i) = y(i);
end
avgFilter(i) = mean(filteredY(i-lag:i));
stdFilter(i) = std(filteredY(i-lag:i));
end
end


% Demo screen function
function [] = DemoScreen(n,lag,threshold,influence)
figure('Position',[200 100,1000,500]);
subplot(2,1,1);
title(sprintf(['Draw data points (%.0f max)      [settings: lag = %.0f, '...
'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
ylim([0 5]); xlim([0 50]);
H = gca; subplot(2,1,1);
set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
xg = []; yg = [];
for i=1:n
try
[xi,yi] = ginput(1);
catch
return;
end
xg = [xg xi]; yg = [yg yi];
if i == 1
subplot(2,1,1); hold on;
plot(H, xg(i),yg(i),'r.');
text(xg(i),yg(i),num2str(i),'FontSize',7);
end
if length(xg) > lag
[signals,avg,dev] = ...
ThresholdingAlgo(yg,lag,threshold,influence);
area(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
'FaceColor',[1 1 1],'EdgeColor','none');
plot(xg(lag+1:end),avg(lag+1:end),'LineWidth',1,'Color','cyan');
plot(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
'LineWidth',1,'Color','green');
plot(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
'LineWidth',1,'Color','green');
subplot(2,1,2); hold on; title('Signal output');
stairs(xg(lag+1:end),signals(lag+1:end),'LineWidth',2,'Color','blue');
ylim([-2 2]); xlim([0 50]); hold off;
end
subplot(2,1,1); hold on;
for j=2:i
plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');
text(xg(j),yg(j),num2str(j),'FontSize',7);
end
end
end

下面是一个基于之前发布的Groovy的答案的实际Java实现。(我知道已经发布了Groovy和Kotlin实现,但对于像我这样只做Java的人来说,弄清楚如何在其他语言和Java之间转换真的很麻烦)。

(结果与他人图表相匹配)

算法实现

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;


import org.apache.commons.math3.stat.descriptive.SummaryStatistics;


public class SignalDetector {


public HashMap<String, List> analyzeDataForSignals(List<Double> data, int lag, Double threshold, Double influence) {


// init stats instance
SummaryStatistics stats = new SummaryStatistics();


// the results (peaks, 1 or -1) of our algorithm
List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(data.size(), 0));


// filter out the signals (peaks) from our original list (using influence arg)
List<Double> filteredData = new ArrayList<Double>(data);


// the current average of the rolling window
List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));


// the current standard deviation of the rolling window
List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));


// init avgFilter and stdFilter
for (int i = 0; i < lag; i++) {
stats.addValue(data.get(i));
}
avgFilter.set(lag - 1, stats.getMean());
stdFilter.set(lag - 1, Math.sqrt(stats.getPopulationVariance())); // getStandardDeviation() uses sample variance
stats.clear();


// loop input starting at end of rolling window
for (int i = lag; i < data.size(); i++) {


// if the distance between the current value and average is enough standard deviations (threshold) away
if (Math.abs((data.get(i) - avgFilter.get(i - 1))) > threshold * stdFilter.get(i - 1)) {


// this is a signal (i.e. peak), determine if it is a positive or negative signal
if (data.get(i) > avgFilter.get(i - 1)) {
signals.set(i, 1);
} else {
signals.set(i, -1);
}


// filter this signal out using influence
filteredData.set(i, (influence * data.get(i)) + ((1 - influence) * filteredData.get(i - 1)));
} else {
// ensure this signal remains a zero
signals.set(i, 0);
// ensure this value is not filtered
filteredData.set(i, data.get(i));
}


// update rolling average and deviation
for (int j = i - lag; j < i; j++) {
stats.addValue(filteredData.get(j));
}
avgFilter.set(i, stats.getMean());
stdFilter.set(i, Math.sqrt(stats.getPopulationVariance()));
stats.clear();
}


HashMap<String, List> returnMap = new HashMap<String, List>();
returnMap.put("signals", signals);
returnMap.put("filteredData", filteredData);
returnMap.put("avgFilter", avgFilter);
returnMap.put("stdFilter", stdFilter);


return returnMap;


} // end
}

主要方法

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;


public class Main {


public static void main(String[] args) throws Exception {
DecimalFormat df = new DecimalFormat("#0.000");


ArrayList<Double> data = new ArrayList<Double>(Arrays.asList(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d,
1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d, 1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d,
1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d, 1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d,
0.9d, 1d, 1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d));


SignalDetector signalDetector = new SignalDetector();
int lag = 30;
double threshold = 5;
double influence = 0;


HashMap<String, List> resultsMap = signalDetector.analyzeDataForSignals(data, lag, threshold, influence);
// print algorithm params
System.out.println("lag: " + lag + "\t\tthreshold: " + threshold + "\t\tinfluence: " + influence);


System.out.println("Data size: " + data.size());
System.out.println("Signals size: " + resultsMap.get("signals").size());


// print data
System.out.print("Data:\t\t");
for (double d : data) {
System.out.print(df.format(d) + "\t");
}
System.out.println();


// print signals
System.out.print("Signals:\t");
List<Integer> signalsList = resultsMap.get("signals");
for (int i : signalsList) {
System.out.print(df.format(i) + "\t");
}
System.out.println();


// print filtered data
System.out.print("Filtered Data:\t");
List<Double> filteredDataList = resultsMap.get("filteredData");
for (double d : filteredDataList) {
System.out.print(df.format(d) + "\t");
}
System.out.println();


// print running average
System.out.print("Avg Filter:\t");
List<Double> avgFilterList = resultsMap.get("avgFilter");
for (double d : avgFilterList) {
System.out.print(df.format(d) + "\t");
}
System.out.println();


// print running std
System.out.print("Std filter:\t");
List<Double> stdFilterList = resultsMap.get("stdFilter");
for (double d : stdFilterList) {
System.out.print(df.format(d) + "\t");
}
System.out.println();


System.out.println();
for (int i = 0; i < signalsList.size(); i++) {
if (signalsList.get(i) != 0) {
System.out.println("Point " + i + " gave signal " + signalsList.get(i));
}
}
}
}

结果

lag: 30     threshold: 5.0      influence: 0.0
Data size: 74
Signals size: 74
Data:           1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.500   1.000   3.000   2.000   5.000   3.000   2.000   1.000   1.000   1.000   0.900   1.000   1.000   3.000   2.600   4.000   3.000   3.200   2.000   1.000   1.000   0.800   4.000   4.000   2.000   2.500   1.000   1.000   1.000
Signals:        0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   0.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000
Filtered Data:  1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.900   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.800   0.800   0.800   0.800   0.800   1.000   1.000   1.000
Avg Filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.003   1.003   1.007   1.007   1.003   1.007   1.010   1.003   1.000   0.997   1.003   1.003   1.003   1.000   1.003   1.010   1.013   1.013   1.013   1.010   1.010   1.010   1.010   1.010   1.007   1.010   1.010   1.003   1.003   1.003   1.007   1.007   1.003   1.003   1.003   1.000   1.000   1.007   1.003   0.997   0.983   0.980   0.973   0.973   0.970
Std filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.060   0.060   0.063   0.063   0.060   0.063   0.060   0.071   0.073   0.071   0.080   0.080   0.080   0.077   0.080   0.087   0.085   0.085   0.085   0.083   0.083   0.083   0.083   0.083   0.081   0.079   0.079   0.080   0.080   0.080   0.077   0.077   0.075   0.075   0.075   0.073   0.073   0.063   0.071   0.080   0.078   0.083   0.089   0.089   0.086


Point 45 gave signal 1
Point 47 gave signal 1
Point 48 gave signal 1
Point 49 gave signal 1
Point 50 gave signal 1
Point 51 gave signal 1
Point 58 gave signal 1
Point 59 gave signal 1
Point 60 gave signal 1
Point 61 gave signal 1
Point 62 gave signal 1
Point 63 gave signal 1
Point 67 gave signal 1
Point 68 gave signal 1
Point 69 gave signal 1
Point 70 gave signal 1

 java执行的数据和结果的图表

使用实时流的Python版本(不会在每个新数据点到达时重新计算所有数据点)。您可能想要调整类函数返回的内容—对于我的目的,我只需要信号。

import numpy as np




class real_time_peak_detection():
def __init__(self, array, lag, threshold, influence):
self.y = list(array)
self.length = len(self.y)
self.lag = lag
self.threshold = threshold
self.influence = influence
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()


def thresholding_algo(self, new_value):
self.y.append(new_value)
i = len(self.y) - 1
self.length = len(self.y)
if i < self.lag:
return 0
elif i == self.lag:
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
return 0


self.signals += [0]
self.filteredY += [0]
self.avgFilter += [0]
self.stdFilter += [0]


if abs(self.y[i] - self.avgFilter[i - 1]) > (self.threshold * self.stdFilter[i - 1]):


if self.y[i] > self.avgFilter[i - 1]:
self.signals[i] = 1
else:
self.signals[i] = -1


self.filteredY[i] = self.influence * self.y[i] + \
(1 - self.influence) * self.filteredY[i - 1]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
else:
self.signals[i] = 0
self.filteredY[i] = self.y[i]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])


return self.signals[i]

用现代c++实现的面向对象版z-score算法

template<typename T>
class FindPeaks{
private:
std::vector<T> m_input_signal;                      // stores input vector
std::vector<T> m_array_peak_positive;
std::vector<T> m_array_peak_negative;


public:
FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }


void estimate(){
int lag{5};
T threshold{ 5 };                                                                                       // set a threshold
T influence{ 0.5 };                                                                                    // value between 0 to 1, 1 is normal influence and 0.5 is half the influence


std::vector<T> filtered_signal(m_input_signal.size(), 0.0);                                             // placeholdered for smooth signal, initialie with all zeros
std::vector<int> signal(m_input_signal.size(), 0);                                                          // vector that stores where the negative and positive located
std::vector<T> avg_filtered(m_input_signal.size(), 0.0);                                                // moving averages
std::vector<T> std_filtered(m_input_signal.size(), 0.0);                                                // moving standard deviation


avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // pass the iteartor to vector
std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);


for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // start index frm
if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // check if value is above threhold
if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
signal[iLag] = 1;                                                                               // assign positive signal
}
else {
signal[iLag] = -1;                                                                                  // assign negative signal
}
filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponential smoothing
}
else {
signal[iLag] = 0;                                                                                         // no signal
filtered_signal[iLag] = m_input_signal[iLag];
}


avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);


}


for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
if (signal[iSignal] == 1) {
m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // store the positive peaks
}
else if (signal[iSignal] == -1) {
m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // store the negative peaks
}
}
printVoltagePeaks(signal, m_input_signal);


}


std::pair< std::vector<T>, std::vector<T> > get_peaks()
{
return std::make_pair(m_array_peak_negative, m_array_peak_negative);
}


};




template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
std::ofstream output_file("./voltage_peak.csv");
std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
output_file << "\n";
std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}


template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
{
/* function that receives iterator to*/
typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
int counter = 0;
while (it != end) {
sum += *(it++);
counter++;
}
return sum / counter;
}


template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
auto mean = findMean(it, end);
typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
int counter{ 0 };
while (it != end) {
sum_squared_error += std::pow((*(it++) - mean), 2);
counter++;
}
auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
return standard_deviation;
}

我允许自己创建一个javascript版本。也许会有帮助。javascript应该是上面给出的伪代码的直接转录。可用的npm包和github repo:

Javascript的翻译:

// javascript port of: https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/48895639#48895639


function sum(a) {
return a.reduce((acc, val) => acc + val)
}


function mean(a) {
return sum(a) / a.length
}


function stddev(arr) {
const arr_mean = mean(arr)
const r = function(acc, val) {
return acc + ((val - arr_mean) * (val - arr_mean))
}
return Math.sqrt(arr.reduce(r, 0.0) / arr.length)
}


function smoothed_z_score(y, params) {
var p = params || {}
// init cooefficients
const lag = p.lag || 5
const threshold = p.threshold || 3.5
const influence = p.influece || 0.5


if (y === undefined || y.length < lag + 2) {
throw ` ## y data array to short(${y.length}) for given lag of ${lag}`
}
//console.log(`lag, threshold, influence: ${lag}, ${threshold}, ${influence}`)


// init variables
var signals = Array(y.length).fill(0)
var filteredY = y.slice(0)
const lead_in = y.slice(0, lag)
//console.log("1: " + lead_in.toString())


var avgFilter = []
avgFilter[lag - 1] = mean(lead_in)
var stdFilter = []
stdFilter[lag - 1] = stddev(lead_in)
//console.log("2: " + stdFilter.toString())


for (var i = lag; i < y.length; i++) {
//console.log(`${y[i]}, ${avgFilter[i-1]}, ${threshold}, ${stdFilter[i-1]}`)
if (Math.abs(y[i] - avgFilter[i - 1]) > (threshold * stdFilter[i - 1])) {
if (y[i] > avgFilter[i - 1]) {
signals[i] = +1 // positive signal
} else {
signals[i] = -1 // negative signal
}
// make influence lower
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1]
} else {
signals[i] = 0 // no signal
filteredY[i] = y[i]
}


// adjust the filters
const y_lag = filteredY.slice(i - lag, i)
avgFilter[i] = mean(y_lag)
stdFilter[i] = stddev(y_lag)
}


return signals
}


module.exports = smoothed_z_score

@Jean-Paul算法的Perl实现。

#!/usr/bin/perl


use strict;
use Data::Dumper;


sub mean {
my $data = shift;
my $sum = 0;
my $mean_val = 0;
for my $item (@$data) {
$sum += $item;
}
$mean_val = $sum / (scalar @$data) if @$data;
return $mean_val;
}


sub variance {
my $data = shift;
my $variance_val = 0;
my $mean_val = mean($data);
my $sum = 0;
for my $item (@$data) {
$sum += ($item - $mean_val)**2;
}
$variance_val = $sum / (scalar @$data) if @$data;
return $variance_val;
}


sub std {
my $data = shift;
my $variance_val = variance($data);
return sqrt($variance_val);
}


# @param y - The input vector to analyze
# @parameter lag - The lag of the moving window
# @parameter threshold - The z-score at which the algorithm signals
# @parameter influence - The influence (between 0 and 1) of new signals on the mean and standard deviation
sub thresholding_algo {
my ($y, $lag, $threshold, $influence) = @_;


my @signals = (0) x @$y;
my @filteredY = @$y;
my @avgFilter = (0) x @$y;
my @stdFilter = (0) x @$y;


$avgFilter[$lag - 1] = mean([@$y[0..$lag-1]]);
$stdFilter[$lag - 1] = std([@$y[0..$lag-1]]);


for (my $i=$lag; $i <= @$y - 1; $i++) {
if (abs($y->[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$i-1]) {
if ($y->[$i] > $avgFilter[$i-1]) {
$signals[$i] = 1;
} else {
$signals[$i] = -1;
}


$filteredY[$i] = $influence * $y->[$i] + (1 - $influence) * $filteredY[$i-1];
$avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
$stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
}
else {
$signals[$i] = 0;
$filteredY[$i] = $y->[$i];
$avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
$stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
}
}


return {
signals => \@signals,
avgFilter => \@avgFilter,
stdFilter => \@stdFilter
};
}


my $y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1];


my $lag = 30;
my $threshold = 5;
my $influence = 0;


my $result = thresholding_algo($y, $lag, $threshold, $influence);


print Dumper $result;

下面是ZSCORE算法的PHP实现:

<?php
$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,10,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1);


function mean($data, $start, $len) {
$avg = 0;
for ($i = $start; $i < $start+ $len; $i ++)
$avg += $data[$i];
return $avg / $len;
}
    

function stddev($data, $start,$len) {
$mean = mean($data,$start,$len);
$dev = 0;
for ($i = $start; $i < $start+$len; $i++)
$dev += (($data[$i] - $mean) * ($data[$i] - $mean));
return sqrt($dev / $len);
}


function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {


$signals = array();
$avgFilter = array();
$stdFilter = array();
$filteredY = array();
$avgFilter[$lag - 1] = mean($data, 0, $lag);
$stdFilter[$lag - 1] = stddev($data, 0, $lag);
    

for ($i = 0; $i < $len; $i++) {
$filteredY[$i] = $data[$i];
$signals[$i] = 0;
}




for ($i=$lag; $i < $len; $i++) {
if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
if ($data[$i] > $avgFilter[$i-1]) {
$signals[$i] = 1;
}
else {
$signals[$i] = -1;
}
$filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
}
else {
$signals[$i] = 0;
$filteredY[$i] = $data[$i];
}
        

$avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
$stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
}
return $signals;
}


$sig = zscore($y, count($y));


print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";


for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

这是鲁棒峰值检测算法算法的Python实现。

初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于lag,所以内存没有增加。(结果与上述答案相同)。 为了绘制图形,还保留labels数组

我做了一个github要点

import numpy as np
import pylab


def init(x, lag, threshold, influence):
'''
Smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''


labels = np.zeros(lag)
filtered_y = np.array(x[0:lag])
avg_filter = np.zeros(lag)
std_filter = np.zeros(lag)
var_filter = np.zeros(lag)


avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])


return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
std=std_filter[lag - 1], filtered_y=filtered_y,
labels=labels)




def add(result, single_value, lag, threshold, influence):
previous_avg = result['avg']
previous_var = result['var']
previous_std = result['std']
filtered_y = result['filtered_y']
labels = result['labels']


if abs(single_value - previous_avg) > threshold * previous_std:
if single_value > previous_avg:
labels = np.append(labels, 1)
else:
labels = np.append(labels, -1)


# calculate the new filtered element using the influence factor
filtered_y = np.append(filtered_y, influence * single_value
+ (1 - influence) * filtered_y[-1])
else:
labels = np.append(labels, 0)
filtered_y = np.append(filtered_y, single_value)


# update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
- filtered_y[len(filtered_y) - lag - 1])


# update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
- previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
- lag] - previous_avg) ** 2 - (filtered_y[-1]
- filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....


# calculate standard deviation for current element as sqrt (current variance)
current_std_filter = np.sqrt(current_var_filter)


return dict(avg=current_avg_filter, var=current_var_filter,
std=current_std_filter, filtered_y=filtered_y[1:],
labels=labels)


lag = 30
threshold = 5
influence = 0


y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])


# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)


i = open('quartz2', 'r')
for i in y[lag:]:
result = add(result, i, lag, threshold, influence)


# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

这种z-scores方法在峰值检测方面非常有效,也有助于异常值的去除。异常值对话经常讨论每个点的统计价值和变化数据的伦理。

但是,在来自易出错的串行通信或易出错的传感器的重复错误传感器值的情况下,错误或虚假读数中没有统计值。它们需要被识别并移除。

从视觉上看,错误是显而易见的。下图中的直线显示了需要删除的内容。但是用算法识别和消除错误是相当具有挑战性的。z分数效果很好。

下图是通过串行通信从传感器获得的值。偶尔的串行通信错误,传感器错误或两者都导致重复的,明显错误的数据点。

z-score峰值检测器能够在虚假数据点上发出信号,并生成一个干净的结果数据集,同时保留正确数据的特征:

enter image description here

@Jean-Paul Smoothed Z Score算法的Dart版本:

class SmoothedZScore {
int lag = 5;
num threshold = 10;
num influence = 0.5;


num sum(List<num> a) {
num s = 0;
for (int i = 0; i < a.length; i++) s += a[i];
return s;
}


num mean(List<num> a) {
return sum(a) / a.length;
}


num stddev(List<num> arr) {
num arrMean = mean(arr);
num dev = 0;
for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
return sqrt(dev / arr.length);
}


List<int> smoothedZScore(List<num> y) {
if (y.length < lag + 2) {
throw 'y data array too short($y.length) for given lag of $lag';
}


// init variables
List<int> signals = List.filled(y.length, 0);
List<num> filteredY = List<num>.from(y);
List<num> leadIn = y.sublist(0, lag);


var avgFilter = List<num>.filled(y.length, 0);
var stdFilter = List<num>.filled(y.length, 0);
avgFilter[lag - 1] = mean(leadIn);
stdFilter[lag - 1] = stddev(leadIn);


for (var i = lag; i < y.length; i++) {
if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
// make influence lower
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
} else {
signals[i] = 0; // no signal
filteredY[i] = y[i];
}


// adjust the filters
List<num> yLag = filteredY.sublist(i - lag, i);
avgFilter[i] = mean(yLag);
stdFilter[i] = stddev(yLag);
}


return signals;
}
}

我认为delica的Python回答器有一个bug。我不能评论他的帖子,因为我没有代表来做这件事,编辑队列已经满了,所以我可能不是第一个注意到它的人。

avgFilter[lag - 1]和stdFilter[lag - 1]在init中设置,然后在lag == i时再次设置,而不是改变[lag]值。这个结果使得第一个信号总是1。

以下是带有轻微修正的代码:

import numpy as np


class real_time_peak_detection():
def __init__(self, array, lag, threshold, influence):
self.y = list(array)
self.length = len(self.y)
self.lag = lag
self.threshold = threshold
self.influence = influence
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()


def thresholding_algo(self, new_value):
self.y.append(new_value)
i = len(self.y) - 1
self.length = len(self.y)
if i < self.lag:
return 0
elif i == self.lag:
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
return 0


self.signals += [0]
self.filteredY += [0]
self.avgFilter += [0]
self.stdFilter += [0]


if abs(self.y[i] - self.avgFilter[i - 1]) > self.threshold * self.stdFilter[i - 1]:
if self.y[i] > self.avgFilter[i - 1]:
self.signals[i] = 1
else:
self.signals[i] = -1


self.filteredY[i] = self.influence * self.y[i] + (1 - self.influence) * self.filteredY[i - 1]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
else:
self.signals[i] = 0
self.filteredY[i] = self.y[i]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])


return self.signals[i]

我从Jean-Paul那里写了一个最流行的答案的Go包。它假设y值的类型为float64

github.com/MicahParks/peakdetect

下面的示例使用了这个包,并基于上面提到的流行答案中的R示例。它在编译时没有任何依赖关系,试图保持较低的内存占用,并且在有新数据点进入时不重新处理过去的点。该项目有100%的测试覆盖率,主要来自上述R示例的输入和输出。但是,如果有人发现任何错误,请打开一个GitHub问题。

编辑:我已经使用v0.0.5进行了性能改进,似乎快了10倍!它使用Welford的方法进行初始化,并使用类似的方法计算滞后期(滑动窗口)的平均值和总体标准偏差。特别感谢另一个帖子的回答:https://stackoverflow.com/a/14638138/14797322

下面是基于R例子的Golang例子:

package main


import (
"fmt"
"log"


"github.com/MicahParks/peakdetect"
)


// This example is the equivalent of the R example from the algorithm's author.
// https://stackoverflow.com/a/54507329/14797322
func main() {
data := []float64{1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1}


// Algorithm configuration from example.
const (
lag       = 30
threshold = 5
influence = 0
)


// Create then initialize the peak detector.
detector := peakdetect.NewPeakDetector()
err := detector.Initialize(influence, threshold, data[:lag]) // The length of the initial values is the lag.
if err != nil {
log.Fatalf("Failed to initialize peak detector.\nError: %s", err)
}


// Start processing new data points and determine what signal, if any they produce.
//
// This method, .Next(), is best for when data is being processed in a stream, but this simply iterates over a slice.
nextDataPoints := data[lag:]
for i, newPoint := range nextDataPoints {
signal := detector.Next(newPoint)
var signalType string
switch signal {
case peakdetect.SignalNegative:
signalType = "negative"
case peakdetect.SignalNeutral:
signalType = "neutral"
case peakdetect.SignalPositive:
signalType = "positive"
}


println(fmt.Sprintf("Data point at index %d has the signal: %s", i+lag, signalType))
}


// This method, .NextBatch(), is a helper function for processing many data points at once. It's returned slice
// should produce the same signal outputs as the loop above.
signals := detector.NextBatch(nextDataPoints)
println(fmt.Sprintf("1:1 ratio of batch inputs to signal outputs: %t", len(signals) == len(nextDataPoints)))
}

c++ (Qt)演示端口,交互式参数

我已经将这个算法的演示应用程序移植到c++ (Qt)。

代码可以在GitHub上找到这里,最终我将添加一些文档和发布版本。

您不能绘制点,但可以从文本文件中导入它们(用空格分隔点——换行也算作空格)。您还可以调整算法参数,实时查看效果。这对于针对特定数据集调整算法以及探索参数如何影响结果非常有用。

enter image description here