移位数组中的元素

这个问题在底部有自己的答案。使用预先分配的数组。

跟踪 这个问题年前,有没有一个规范的“移位”函数在麻木? 我没有看到任何东西从 文件

下面是我要找的东西的一个简单版本:

def shift(xs, n):
if n >= 0:
return np.r_[np.full(n, np.nan), xs[:-n]]
else:
return np.r_[xs[-n:], np.full(-n, np.nan)]

使用这个就像:

In [76]: xs
Out[76]: array([ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9.])


In [77]: shift(xs, 3)
Out[77]: array([ nan,  nan,  nan,   0.,   1.,   2.,   3.,   4.,   5.,   6.])


In [78]: shift(xs, -3)
Out[78]: array([  3.,   4.,   5.,   6.,   7.,   8.,   9.,  nan,  nan,  nan])

这个问题来自我昨天尝试 写一个快速滚动的产品。我需要一种“转移”累积产品的方法,我所能想到的就是在 np.roll()中复制逻辑。


因此,np.concatenate()np.r_[]快得多。这个版本的函数性能要好得多:

def shift(xs, n):
if n >= 0:
return np.concatenate((np.full(n, np.nan), xs[:-n]))
else:
return np.concatenate((xs[-n:], np.full(-n, np.nan)))

更快的版本只是简单地预分配数组:

def shift(xs, n):
e = np.empty_like(xs)
if n >= 0:
e[:n] = np.nan
e[n:] = xs[:-n]
else:
e[n:] = np.nan
e[:n] = xs[-n:]
return e

上面的建议就是答案。使用预分配的数组。

165217 次浏览

没有一个单独的函数可以完成您想要的任务。你对转变的定义与大多数人所做的稍有不同。移动数组的方法通常是循环的:

>>>xs=np.array([1,2,3,4,5])
>>>shift(xs,3)
array([3,4,5,1,2])

但是,您可以使用两个函数来完成任何操作

def shift2(arr,num):
arr=np.roll(arr,num)
if num<0:
np.put(arr,range(len(arr)+num,len(arr)),np.nan)
elif num > 0:
np.put(arr,range(num),np.nan)
return arr
>>>shift2(a,3)
[ nan  nan  nan   0.   1.   2.   3.   4.   5.   6.]
>>>shift2(a,-3)
[  3.   4.   5.   6.   7.   8.   9.  nan  nan  nan]

在给定函数和上面提供的代码上运行 cProfile 之后,我发现您提供的代码进行了42次函数调用,而 shift2在 arr 为正时进行了14次调用,在 arr 为负时进行了16次调用。我将试验计时,看看每个执行与真实数据。

不是麻木,而是 scipy 提供了你想要的移位功能,

import numpy as np
from scipy.ndimage.interpolation import shift


xs = np.array([ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9.])


shift(xs, 3, cval=np.NaN)

其中默认值是从数组外部引入一个值为 cval的常数值,在这里设置为 nan。这就给出了所需的输出,

array([ nan, nan, nan, 0., 1., 2., 3., 4., 5., 6.])

and the negative shift works similarly,

shift(xs, -3, cval=np.NaN)

提供输出

array([  3.,   4.,   5.,   6.,   7.,   8.,   9.,  nan,  nan,  nan])

对于那些只想复制和粘贴 shift 的最快实现的人,有一个基准和结论(参见结尾)。此外,我还引入了 fill _ value 参数并修复了一些 bug。

基准

import numpy as np
import timeit


# enhanced from IronManMark20 version
def shift1(arr, num, fill_value=np.nan):
arr = np.roll(arr,num)
if num < 0:
arr[num:] = fill_value
elif num > 0:
arr[:num] = fill_value
return arr


# use np.roll and np.put by IronManMark20
def shift2(arr,num):
arr=np.roll(arr,num)
if num<0:
np.put(arr,range(len(arr)+num,len(arr)),np.nan)
elif num > 0:
np.put(arr,range(num),np.nan)
return arr


# use np.pad and slice by me.
def shift3(arr, num, fill_value=np.nan):
l = len(arr)
if num < 0:
arr = np.pad(arr, (0, abs(num)), mode='constant', constant_values=(fill_value,))[:-num]
elif num > 0:
arr = np.pad(arr, (num, 0), mode='constant', constant_values=(fill_value,))[:-num]


return arr


# use np.concatenate and np.full by chrisaycock
def shift4(arr, num, fill_value=np.nan):
if num >= 0:
return np.concatenate((np.full(num, fill_value), arr[:-num]))
else:
return np.concatenate((arr[-num:], np.full(-num, fill_value)))


# preallocate empty array and assign slice by chrisaycock
def shift5(arr, num, fill_value=np.nan):
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result


arr = np.arange(2000).astype(float)


def benchmark_shift1():
shift1(arr, 3)


def benchmark_shift2():
shift2(arr, 3)


def benchmark_shift3():
shift3(arr, 3)


def benchmark_shift4():
shift4(arr, 3)


def benchmark_shift5():
shift5(arr, 3)


benchmark_set = ['benchmark_shift1', 'benchmark_shift2', 'benchmark_shift3', 'benchmark_shift4', 'benchmark_shift5']


for x in benchmark_set:
number = 10000
t = timeit.timeit('%s()' % x, 'from __main__ import %s' % x, number=number)
print '%s time: %f' % (x, t)

基准结果:

benchmark_shift1 time: 0.265238
benchmark_shift2 time: 0.285175
benchmark_shift3 time: 0.473890
benchmark_shift4 time: 0.099049
benchmark_shift5 time: 0.052836

结论

第五班是赢家! 这是观察所的第三个解决方案。

你可以先用 pandasndarray转换成 SeriesDataFrame,然后你可以随意使用 shift方法。

例如:

In [1]: from pandas import Series


In [2]: data = np.arange(10)


In [3]: data
Out[3]: array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])


In [4]: data = Series(data)


In [5]: data
Out[5]:
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
dtype: int64


In [6]: data = data.shift(3)


In [7]: data
Out[7]:
0    NaN
1    NaN
2    NaN
3    0.0
4    1.0
5    2.0
6    3.0
7    4.0
8    5.0
9    6.0
dtype: float64


In [8]: data = data.values


In [9]: data
Out[9]: array([ nan,  nan,  nan,   0.,   1.,   2.,   3.,   4.,   5.,   6.])

你也可以这样对待熊猫:

使用2356长的数组:

import numpy as np


xs = np.array([...])

使用 scypy:

from scipy.ndimage.interpolation import shift


%timeit shift(xs, 1, cval=np.nan)
# 956 µs ± 77.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

使用熊猫:

import pandas as pd


%timeit pd.Series(xs).shift(1).values
# 377 µs ± 9.42 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In this example, using Pandas was about ~8 times faster than Scipy

如果你想从 numpy 那里得到一句俏皮话,并且不太关心性能,那么试试:

np.sum(np.diag(the_array,1),0)[:-1]

说明: np.diag(the_array,1)创建一个矩阵,数组一次性沿对角线运行,np.sum(...,0)对矩阵列进行求和,...[:-1]获取与原始数组大小相对应的元素。把 1:-1当作参数来玩可以给你不同方向的转变。

一种不把代码分解成案件的方法

使用数组:

def shift(arr, dx, default_value):
result = np.empty_like(arr)
get_neg_or_none = lambda s: s if s < 0 else None
get_pos_or_none = lambda s: s if s > 0 else None
result[get_neg_or_none(dx): get_pos_or_none(dx)] = default_value
result[get_pos_or_none(dx): get_neg_or_none(dx)] = arr[get_pos_or_none(-dx): get_neg_or_none(-dx)]
return result

使用矩阵可以这样做:

def shift(image, dx, dy, default_value):
res = np.full_like(image, default_value)


get_neg_or_none = lambda s: s if s < 0 else None
get_pos_or_none = lambda s : s if s > 0 else None


res[get_pos_or_none(-dy): get_neg_or_none(-dy), get_pos_or_none(-dx): get_neg_or_none(-dx)] = \
image[get_pos_or_none(dy): get_neg_or_none(dy), get_pos_or_none(dx): get_neg_or_none(dx)]
return res

基准及介绍 Numba

1. 摘要

  • The accepted answer (scipy.ndimage.interpolation.shift) is the slowest solution listed in this page.
  • 当数组大小小于25.000时,Numba (@Numba. njit)可以提高性能
  • 当数组大小较大(> 250.000)时,“ Any method”同样有效。
  • 最快的选择实际上取决于
        (1)  Length of your arrays
    (2)你需要轮班的次数。
  • 下面是这个页面(2020-07-11)中列出的所有不同方法的计时图片,使用常数 shift = 10。正如我们所看到的,对于较小的数组大小,有些方法比最佳方法使用的时间多出 + 2000% 。

Relative timings, constant shift (10), all methods

2. Detailed benchmarks with the best options

  • 选择 shift4_numba(定义如下) ,如果你想要好的全方位

Relative timings, best methods (Benchmarks)

3. 密码

3.1 shift4_numba

  • 良好的多面性; 最大20% 的书面最好的方法与任何数组的大小
  • 中等阵列大小的最佳方法: ~ 500 < N < 20.000。
  • 注意: Numba jit (即时编译器)只有在多次调用修饰函数时才能提高性能。第一次调用的时间通常是后续调用的3-4倍。使用 ahead of time compiled numba可以获得更大的性能提升。
import numba


@numba.njit
def shift4_numba(arr, num, fill_value=np.nan):
if num >= 0:
return np.concatenate((np.full(num, fill_value), arr[:-num]))
else:
return np.concatenate((arr[-num:], np.full(-num, fill_value)))

3.2. shift5_numba

  • Best option with small (N <= 300.. 1500) array sizes. Treshold depends on needed amount of shift.
  • 在任何数组大小上都有良好的性能; 与最快的解决方案相比,最大值增加50% 。
  • 注意: Numba jit (即时编译器)只有在多次调用修饰函数时才能提高性能。第一次调用的时间通常是后续调用的3-4倍。使用 提前编译了 numba可以获得更大的性能提升。
import numba


@numba.njit
def shift5_numba(arr, num, fill_value=np.nan):
result = np.empty_like(arr)
if num > 0:
result[:num] = fill_value
result[num:] = arr[:-num]
elif num < 0:
result[num:] = fill_value
result[:num] = arr[-num:]
else:
result[:] = arr
return result

3.3 shift5

  • 阵列大小 ~ 20.000 < N < 250.000的最佳方法
  • shift5_numba相同,只需删除@numba. njit 装饰器。

4附录

4.1 Details about used methods

  • shift_scipy: scipy.ndimage.interpolation.shift (scipy 1.4.1) - The option from accepted answer, which is 显然是最慢的选择.
  • shift1: np.rollout[:num] xnp.nan by 铁人三项20 & gzc
  • shift2: np.rollnp.put by 铁人三项20
  • shift3: np.padslice by GZC
  • shift4: np.concatenatenp.full by Chrisaycock
  • shift5: 由 Chrisaycock使用两次 result[slice] = x
  • shift#_numba:@笨蛋.njit 修饰版本的以前。

The shift2 and shift3 contained functions that were not supported by the current numba (0.50.1).

4.2 Other test results

4.2.1所有方法的相对计时

4.2.2所有方法的原始计时

4.2.3原始时间,几乎没有最好的方法

Maybe np.roll is what you need

arr = np.arange(10)
shift = 2  # shift length
arr_1 = np.roll(arr, shift=shift)
arr_1[:shift] = np.nan

My solution involves np.roll and masked arrays:

import numpy as np
import numpy.ma as ma # this is for masked array


def shift(arr, shift):
r_arr = np.roll(arr, shift=shift)
m_arr = ma.masked_array(r_arr,dtype=float)
if shift > 0: m_arr[:shift] = ma.masked
else: m_arr[shift:] = ma.masked
return m_arr.filled(np.nan)

Basically, I just use np.roll to shift the array, then use ma.masked_array to mark the unwanted elements as invalid, and fill those invalid positions with np.nan. I set the dtype to float so that filling with np.nan wouldn't cause any problems.

In [11]: shift(arr, 3)
Out[11]: array([nan, nan, nan,  0.,  1.,  2.,  3.,  4.,  5.,  6.])


In [12]: shift(arr, -3)
Out[12]: array([ 3.,  4.,  5.,  6.,  7.,  8.,  9., nan, nan, nan])

下面是支持任意多维数组的快速答案(shift5)的一个推广:

def shift(array, offset, constant_values=0):
"""Returns copy of array shifted by offset, with fill using constant."""
array = np.asarray(array)
offset = np.atleast_1d(offset)
assert len(offset) == array.ndim
new_array = np.empty_like(array)


def slice1(o):
return slice(o, None) if o >= 0 else slice(0, o)


new_array[tuple(slice1(o) for o in offset)] = (
array[tuple(slice1(-o) for o in offset)])


for axis, o in enumerate(offset):
new_array[(slice(None),) * axis +
(slice(0, o) if o >= 0 else slice(o, None),)] = constant_values


return new_array

我想我有一个更快的解决方案: 为什么不直接使用 deque? 我从@gzc 向基准解决方案添加了2个基准:

def shift6(arr, num, fill_value=np.nan):
for _ in range(num):
darr.appendleft(fill_value)


def shift7(arr, num, fill_value=np.nan):
darr = deque(arr)
for _ in range(num):
darr.appendleft(fill_value)


darr = deque(arr)


def benchmark_shift6():
shift6(arr, 3)
    

def benchmark_shift7():
shift6(arr, 3)


benchmark_set = ['benchmark_shift1', 'benchmark_shift2', 'benchmark_shift3', 'benchmark_shift4', 'benchmark_shift5', 'benchmark_shift6', 'benchmark_shift7']

在我的笔记本电脑上,输出结果比其他任何提议的解决方案都要好得多:

%s time: ('benchmark_shift1', 0.08232757700170623)
%s time: ('benchmark_shift2', 0.0934765400015749)
%s time: ('benchmark_shift3', 0.14349375600431813)
%s time: ('benchmark_shift4', 0.03575193700089585)
%s time: ('benchmark_shift5', 0.01389261399890529)
%s time: ('benchmark_shift6', 0.0025887360025080852)
%s time: ('benchmark_shift7', 0.0024806019937386736)

一种简单有效的支持 笨蛋和负移值的方法,如 熊猫库。它使用参数中的原始数组 防止 堕落,也使用 整数数组:

import numpy as np
from numba import njit


@njit
def numba_shift(arr_: np.ndarray, shift: np.int32 = 1) -> np.ndarray:
arr = arr_.copy().astype(np.float64)
if shift > 0:
arr[shift:] = arr[:-shift]
arr[:shift] = np.nan
else:
arr[:shift] = arr[-shift:]
arr[shift:] = np.nan
return arr

例子 :

ar = np.array([1,2,3,4,5,6])
numba_shift(ar,-1)


array([ 2.,  3.,  4.,  5.,  6., nan])

时间 :

%timeit numba_shift(ar,-1)


1.02 µs ± 9.42 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

注意 : 如果不需要 numba,只需要 numpy,那么删除行 @njit和 numba 导入。

这里有一个解决方案,它不使用 numpy 的特殊内置函数,因此与 numba 兼容。

def shift(array, dy, dx):
n, m = array.shape[:2]
e = np.zeros((n, m))
if dy > 0 and dx > 0:
e[dy:, dx:] = array[:-dy, :-dx]
return e
elif dy > 0 and dx < 0:
e[dy:, :dx] = array[:-dy, -dx:]
return e
elif dy < 0 and dx > 0:
e[:dy, dx:] = array[-dy:, :-dx]
return e
elif dy < 0 and dx < 0:
e[:dy, :dx] = array[-dy:, -dx:]
return e
elif dy < 0 and dx == 0:
e[:dy, :] = array[-dy:, :]
return e
elif dy > 0 and dx == 0:
e[dy:, :] = array[:-dy, :]
return e
elif dy == 0 and dx < 0:
e[:, :dx] = array[:, -dx:]
return e
elif dy == 0 and dx > 0:
e[:, dx:] = array[:, :-dx]
return e