移动平均或移动平均

Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?

565379 次浏览
有关现成的解决方案,请参见https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html。 它使用flat窗口类型提供运行平均值。注意,这比简单的do-it-yourself卷积方法要复杂一些,因为它试图通过反射数据来处理数据开头和结尾的问题(在您的情况下可能有效,也可能无效…)

首先,你可以试着:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

你可以用以下方法计算运行平均值:

import numpy as np


def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N

但是速度很慢。

幸运的是,numpy包含一个卷积函数,我们可以使用它来加快速度。运行平均值相当于将x与一个长度为N的向量进行卷积,其中所有成员都等于1/N。卷积的numpy实现包括起始瞬态,所以你必须删除前N-1点:

def runningMeanFast(x, N):
return np.convolve(x, np.ones((N,))/N)[(N-1):]

在我的机器上,快速版本要快20-30倍,这取决于输入向量的长度和平均窗口的大小。

请注意,卷积确实包括'same'模式,它似乎应该解决起始瞬态问题,但它在开始和结束之间分割。

更新:已经提出了更有效的解决方案,__ABC0从scipy可能是“标准”中最好的。第三方库和一些更新的或专门的库也可用。


你可以使用np.convolve:

np.convolve(x, np.ones(N)/N, mode='valid')

解释

运行平均值是卷积的数学运算的一种情况。对于运行平均值,您沿着输入滑动一个窗口并计算窗口内容的平均值。对于离散的1D信号,卷积是同样的事情,除了计算一个任意的线性组合而不是平均值,即将每个元素乘以相应的系数并将结果相加。这些系数,一个代表窗口中的每个位置,有时被称为卷积内核。N个值的算术平均值是(x_1 + x_2 + ... + x_N) / N,所以对应的核是(1/N, 1/N, ..., 1/N),这正是我们使用np.ones(N)/N得到的结果。

边缘

np.convolvemode参数指定如何处理边。我在这里选择了valid模式,因为我认为这是大多数人期望的运行方式,但你可能有其他优先级。下面是一个图表,说明了模式之间的差异:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

Running mean conve modes

如果你选择自己生成,而不是使用现有的库,请注意浮点错误并尽量减少其影响:

class SumAccumulator:
def __init__(self):
self.values = [0]
self.count = 0


def add( self, val ):
self.values.append( val )
self.count = self.count + 1
i = self.count
while i & 0x01:
i = i >> 1
v0 = self.values.pop()
v1 = self.values.pop()
self.values.append( v0 + v1 )


def get_total(self):
return sum( reversed(self.values) )


def get_size( self ):
return self.count

如果所有的值都是大致相同的数量级,那么这将通过始终添加大致相似的数量级值来帮助保持精度。

高效的解决方案

卷积比直接的方法好得多,但(我猜)它使用FFT,因此相当慢。但是,下面的方法特别适用于计算运行平均值

def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)

要检查的代码

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
注意numpy.allclose(result1, result2)True,两个方法是等价的。 N越大,时间差异越大。

警告:虽然cumsum更快,但会增加浮点错误,这可能导致您的结果无效/不正确/不可接受

这里的评论指出了这个浮点错误问题,但我在回答中把它写得更明显。

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
  • 你累积的点数越多,浮点误差就越大(所以1e5点很明显,1e6点更重要,超过1e6点,你可能想重置累加器)
  • 你可以使用np.longdouble作弊,但你的浮点错误仍然会变得相对较大的点(大约>1e5,但取决于你的数据)
  • 你可以把误差画出来,看到它增长得比较快
  • 卷积解较慢,但没有这种浮点精度损失
  • uniform_filter1d解决方案比这个累加解快,并且没有这种浮点精度损失

我还没有检查这有多快,但你可以试试:

from collections import deque


cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum


for t, val in enumerate(A, 1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)
else:                           # if window is saturated,
cum_sum -= cache.popleft()  # subtract oldest value
avg = cum_sum / float(n)

下面的例子显示了旧的pandas.rolling_mean函数,该函数在最近的pandas版本中已被删除。该函数调用的现代等价物将使用pandas.Series.rolling:

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]:
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
0.49456679,  0.49427121])

熊猫比NumPy或SciPy更适合于此。它的函数rolling_mean可以方便地完成这项工作。当输入是一个数组时,它还返回一个NumPy数组。

任何自定义纯Python实现都很难在性能上击败rolling_mean。下面是针对两个提议的解决方案的性能示例:

In [1]: import numpy as np


In [2]: import pandas as pd


In [3]: def running_mean(x, N):
...:     cumsum = np.cumsum(np.insert(x, 0, 0))
...:     return (cumsum[N:] - cumsum[:-N]) / N
...:


In [4]: x = np.random.random(100000)


In [5]: N = 1000


In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop


In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop


In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop


In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

关于如何处理边缘值,也有很好的选项。

或用于python计算的模块

在我在Tradewave.net的测试中,TA-lib总是赢:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t


PAIR = info.primary_pair
PERIOD = 30


def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])


def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:] = ret[period:] - ret[:-period]
return ret[period - 1:] / period


def pandas_sma(array, period):
return pd.rolling_mean(array, period)


def api_sma(array, period):
# this method is native to Tradewave and does NOT return an array
return (data[PAIR].ma(PERIOD))


def talib_sma(array, period):
return ta.MA(array, period)


def convolve_sma(array, period):
return np.convolve(array, np.ones((period,))/period, mode='valid')


def fftconvolve_sma(array, period):
return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')


def tick():


close = data[PAIR].warmup_period('close')


t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()


storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
storage.elapsed[-5] = storage.elapsed[-5] + t6-t5
storage.elapsed[-6] = storage.elapsed[-6] + t7-t6


plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])


def stop():


log('ticks....: %s' % info.max_ticks)


log('api......: %.5f' % storage.elapsed[-1])
log('cumsum...: %.5f' % storage.elapsed[-2])
log('pandas...: %.5f' % storage.elapsed[-3])
log('talib....: %.5f' % storage.elapsed[-4])
log('convolve.: %.5f' % storage.elapsed[-5])
log('fft......: %.5f' % storage.elapsed[-6])

结果:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

enter image description here

使用numpypandas找到移动平均线没有

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
lambda a,b: b/a,
enumerate(itertools.accumulate(sample), 1))
)

将打印[2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

  • 2.0 = (2)/1
  • 4.0 = (2 + 6) / 2
  • 6.0 = (2 + 6 + 10) / 3
  • ...

有点晚了,但我已经做了我自己的小函数,它不环绕端点或垫与零,然后用于查找平均值。进一步的处理是,它还在线性间隔点上对信号进行重新采样。随意定制代码以获得其他特性。

该方法是一个简单的矩阵乘法与规范化高斯核。

def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
    

Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
    

Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
import numpy as np
N_in = len(y_in)


# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in


return y_out, x_out
对于添加了正态分布噪声的正弦信号的一个简单用法: enter image description here < / p >

我知道这是一个老问题,但这里有一个解决方案,它不使用任何额外的数据结构或库。它在输入列表的元素数量上是线性的,我想不出任何其他方法来使它更有效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。

注意:使用numpy数组而不是列表会快得多,但我想消除所有依赖项。通过多线程执行也可以提高性能

该函数假设输入列表是一维的,所以要小心。

### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)


for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)


for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N


return result

例子

假设我们有一个列表data = [ 1, 2, 3, 4, 5, 6 ],我们想在上面计算周期为3的滚动平均值,并且你还想要一个与输入列表相同大小的输出列表(这是最常见的情况)。

第一个元素的索引为0,因此滚动平均值应该在索引为-2、-1和0的元素上计算。显然,我们没有data[-2]和data[-1](除非您想使用特殊的边界条件),因此我们假设这些元素为0。这相当于对列表进行零填充,除了我们实际上不填充它,只是跟踪需要填充的索引(从0到N-1)。

所以,对于前N个元素,我们只是在累加器中不断地把元素加起来。

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

从元素N+1开始,简单的累加是行不通的。我们期望result[3] = (2 + 3 + 4)/3 = 3,但这与(sum + 4)/3 = 3.333不同。

计算正确值的方法是用sum+4减去data[0] = 1,从而得到sum + 4 - 1 = 9

这是因为目前是sum = data[0] + data[1] + data[2],但对于每个i >= N也是如此,因为在减法之前,sumdata[i-N] + ... + data[i-2] + data[i-1]

这个问题现在是更老了比NeXuS上个月写的时候,但我喜欢他的代码如何处理边缘情况。然而,因为它是一个“简单移动平均”,它的结果滞后于它们应用的数据。我认为处理边缘情况比NumPy的模式validsamefull更令人满意,可以通过应用类似的方法来实现基于convolution()的方法。

我的贡献使用了一个中央运行平均值,以使其结果与他们的数据相一致。当可供使用的全尺寸窗口的点太少时,将从数组边缘的连续较小窗口计算运行平均值。[实际上,从连续较大的窗口,但这是一个实现细节。]

import numpy as np


def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)


for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

它相对较慢,因为它使用convolve(),并且可能会被真正的Pythonista修饰很多,然而,我相信这个想法是成立的。

你可以使用scipy.ndimage.uniform_filter1d:

import numpy as np
from scipy.ndimage import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

  • 给出具有相同numpy形状的输出(即点数)
  • 允许多种方式处理边界,其中'reflect'是默认的,但在我的情况下,我更想要'nearest'

它也相当快(比np.convolve快近50倍,比比上面给出的cumsum方法快快2-5倍):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop


%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

这里有3个函数可以让你比较不同实现的错误/速度:

from __future__ import division
import numpy as np
import scipy.ndimage as ndi
def running_mean_convolve(x, N):
return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
cumsum = np.cumsum(np.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
return ndi.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]

对于一个简短、快速的解决方案,在一个循环中完成所有事情,没有依赖关系,下面的代码工作得很好。

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []


for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)

虽然这里有这个问题的解决方案,但请看看我的解决方案。这是非常简单和工作良好。

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
if t+window <= len(dataset):
indices = range(t, t+window)
ma.append(np.average(np.take(dataset, indices)))
else:
ma = np.asarray(ma)

仅使用Python标准库(内存高效)

只需要给出标准库deque的另一个版本。令我惊讶的是,大多数答案都使用pandasnumpy

def moving_average(iterable, n=3):
d = deque(maxlen=n)
for i in iterable:
d.append(i)
if len(d) == n:
yield sum(d)/n


r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上我找到了另一个python文档中的实现

def moving_average(iterable, n=3):
# moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
d = deque(itertools.islice(it, n-1))
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n

然而,在我看来,实现似乎比它应该的要复杂一些。但它肯定在标准python文档中是有原因的,有人能评论一下我的实现和标准文档吗?

从其他答案来看,我不认为这是问题所要求的,但我需要保持一个不断增长的值列表的运行平均值。

因此,如果你想保持从某个地方(站点,测量设备等)获取的值的列表和最近更新的n值的平均值,你可以使用下面的代码,这将最大限度地减少添加新元素的工作:

class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.


This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.


Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1)  # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2)  # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get()  # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size)  # make sure it's an int
self.reset()


def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new)  # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size:  # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average:  # ~ if isNaN().
self._average = new  # no previous numbers, so it's new.
else:
self._average *= n  # so it's only the sum of numbers.
self._average += new  # add new number.
self._average /= (n+1)  # divide by new number of numbers.
else:  # buffer full, replace oldest value.
old = self._buffer[self._index]  # the previous oldest number.
self._buffer[self._index] = new  # replace with new one.
self._index += 1  # update the index and make sure it's...
self._index %= self.buffer_size  # ... smaller than buffer_size.
self._average -= old/self.buffer_size  # remove old one...
self._average += new/self.buffer_size  # ...and add new one...
# ... weighted by the number of elements.


def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average


def get(self):
"""
Return the moving average value.
"""
return self()


def reset(self):
"""
Reset the moving average.


If for some reason you don't want to just create a new one.
"""
self._buffer = []  # could use np.empty(self.buffer_size)...
self._index = 0  # and use this to keep track of how many numbers.
self._average = float('nan')  # could use np.NaN .


def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size


def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
...     a.add(i)
...
>>> a()
9.5
>>> a._buffer  # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]


Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer  # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]


Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]


Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
...     a.add(i)
...
>>> a()
9.5
>>> a._buffer  # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10  # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!')  # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer


buffer_size = property(get_buffer_size, set_buffer_size)

你可以测试它,例如:

def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()

这使:

Values and their average as a function of Values #

比起numpy或scipy,我建议熊猫们更快地做到这一点:

df['data'].rolling(3).mean()

这取列“数据”的3个周期的移动平均值(MA)。你也可以计算移位的版本,例如排除当前单元格的版本(向后移位一个)可以很容易地计算为:

df['data'].shift(periods=1).rolling(3).mean()

移动平均滤波器呢?它也是一个单行程序,它的优点是,如果你需要矩形以外的东西,你可以很容易地操作窗口类型。一个n长的简单移动平均数组a:

lfilter(np.ones(N)/N, [1], a)[N:]

应用三角形窗口后:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

注意:我通常丢弃前N个样本作为伪造的,因此[N:]在结束,但这是没有必要的,只是个人选择的问题。

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的特征:

  1. 忽略nan值
  2. 计算N个相邻值的平均值,不包括兴趣值本身

这第二个特征对于确定哪些值与总体趋势有一定的差异特别有用。

我使用numpy。cumsum因为它是最省时的方法(请看上面Alleo的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

这段代码只适用于偶数n。它可以通过改变np来调整奇数。插入padded_x和n_nan。

输出示例(黑色为raw,蓝色为movavg): 原始数据(黑色)和每个值周围10点的移动平均值(蓝色),不包括该值。Nan值被忽略。 < / p >

这段代码可以很容易地修改,以删除从小于cutoff = 3的非nan值计算的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

原始数据(黑色)和移动平均(蓝色)而忽略任何窗口少于3个非nan值

上面的答案中有一个马伯的注释,它有这个方法。bottleneckmove_mean,它是一个简单的移动平均线:

import numpy as np
import bottleneck as bn


a = np.arange(10) + np.random.random(10)


mva = bn.move_mean(a, window=2, min_count=1)

min_count是一个方便的参数,它基本上会取数组中该点的移动平均值。如果你不设置min_count,它将等于window,并且直到window点的所有点都将是nan

另一个解决方案是使用标准库和deque:

from collections import deque
import itertools


def moving_average(iterable, n=3):
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
# create an iterable object from input argument
d = deque(itertools.islice(it, n-1))
# create deque object by slicing iterable
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n


# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
print(i)


# 40.0
# 42.0
# 45.0
# 43.0

出于教学目的,让我再添加两个Numpy解决方案(比cumsum解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided


def ra_strides(arr, window):
''' Running average using as_strided'''
n = arr.shape[0] - window + 1
arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
return arr_strided.mean(axis=1)


def ra_add(arr, window):
''' Running average using add.reduceat'''
n = arr.shape[0] - window + 1
indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
arr = np.append(arr, 0)
return np.add.reduceat(arr, indices )[::2]/window

使用的函数:as_stridedadd.reduceat

Python标准库解决方案

此生成器函数接受一个可迭代对象和窗口大小N,并生成窗口内当前值的平均值。它使用deque,这是一种类似于列表的数据结构,但针对快速修改进行了优化(popappend) 在两端

from collections import deque
from itertools import islice


def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)


if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))


N = float(N) # force floating point division if using Python 2
s = sum(window)
    

while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
        

下面是函数的运行情况:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>>
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

我觉得这可以用瓶颈优雅地解决

参见下面的基本示例:

import numpy as np
import bottleneck as bn


a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
  • “mm”是“a”的移动平均值。

  • “窗口”是考虑移动均值的最大条目数。

  • "min_count"是考虑移动平均值的最小条目数(例如,对于前几个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。

使用@Aikude的变量,我编写了一行程序。

import numpy as np


mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3


mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)


>>> [2.0, 3.0, 4.0, 5.0, 6.0]

上述所有的解决方案都很差,因为它们缺乏

  • 由于本机python而不是numpy向量化实现,
  • 由于numpy.cumsum使用不当导致的数值稳定性,或
  • 由于O(len(x) * w)实现作为卷积的速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意,x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(...)添加x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()

通过cumsum,你可以通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()

直到到达x[-w:].mean()为止。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

这个解是向量化的,O(m),可读且数值稳定。

一个新的convolve配方是合并后的进入Python 3.10。

鉴于


import collections, operator


from itertools import chain, repeat




size = 3 + 1
kernel = [1/size] * size

代码

def convolve(signal, kernel):
# See:  https://betterexplained.com/articles/intuitive-convolution/
# convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
# convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
# convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
kernel = list(reversed(kernel))
n = len(kernel)
window = collections.deque([0] * n, maxlen=n)
for x in chain(signal, repeat(0, n-1)):
window.append(x)
yield sum(map(operator.mul, kernel, window))

演示

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一个通用的数学运算,可以应用于移动平均线。这个想法是,给定一些数据,你滑动一个数据子集(一个窗口)作为一个“掩码”。或“;kernel"跨数据,对每个窗口执行特定的数学操作。在移动平均的情况下,核是平均值:

enter image description here

你现在可以通过more_itertools.convolve来使用这个实现。 more_itertools是一个流行的第三方包;

. install via > pip install more_itertools.

.
如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但在那之后可能会更快
import numpy as np
import random


N = 100      # window size
size =200     # array length


x = np.random.random(size)
y = np.eye(size, dtype=float)


# prepare matrix
for i in range(size):
y[i,i:i+N] = 1./N
  

# calculate running mean
z = np.inner(x,y.T)[N-1:]


我的解决方案是基于“简单移动平均”。从维基百科。

from numba import jit
@jit
def sma(x, N):
s = np.zeros_like(x)
k = 1 / N
s[0] = x[0] * k
for i in range(1, N + 1):
s[i] = s[i - 1] + x[i] * k
for i in range(N, x.shape[0]):
s[i] = s[i - 1] + (x[i] - x[i - N]) * k
s = s[N - 1:]
return s

与之前建议的解决方案相比,它比scipy最快的解决方案“uniform_filter1d”快一倍,并且具有相同的错误顺序。 速度测试:< / p >

import numpy as np
x = np.random.random(10000000)
N = 1000


from scipy.ndimage.filters import uniform_filter1d
%timeit uniform_filter1d(x, size=N)
95.7 ms ± 9.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit sma(x, N)
47.3 ms ± 3.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

错误的比较:

np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - uniform_filter1d(x, size=N, mode='constant', origin=-(N//2))[:-(N-1)]))
8.604228440844963e-14
np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - sma(x, N)))
1.41886502547095e-13