NumPy: function for simultaneous max() and min()

numpy.amax() will find the max value in an array, and numpy.amin() does the same for the min value. If I want to find both max and min, I have to call both functions, which requires passing over the (very big) array twice, which seems slow.

Is there a function in the numpy API that finds both max and min with only a single pass through the data?

103039 次浏览

我不认为传递两次数组是个问题。 考虑下面的伪代码:

minval = array[0]
maxval = array[0]
for i in array:
if i < minval:
minval = i
if i > maxval:
maxval = i

虽然这里只有1个循环,但仍然有2个检查。(不要使用2个循环,每个循环检查一次)。实际上,您唯一节省的是1个循环的开销。如果数组真的如您所说的那么大,那么与实际循环的工作负载相比,开销就很小了。(注意,所有这些都是在 C 中实现的,所以这些循环或多或少都是免费的)。


很抱歉你们四个对我投了赞成票并且对我有信心,你们一定可以优化这个。

下面是一些可以通过 f2py编译成 python 模块的 fortran 代码(也许 Cython专家可以将其与优化的 C 版本进行比较... ...) :

subroutine minmax1(a,n,amin,amax)
implicit none
!f2py intent(hidden) :: n
!f2py intent(out) :: amin,amax
!f2py intent(in) :: a
integer n
real a(n),amin,amax
integer i


amin = a(1)
amax = a(1)
do i=2, n
if(a(i) > amax)then
amax = a(i)
elseif(a(i) < amin) then
amin = a(i)
endif
enddo
end subroutine minmax1


subroutine minmax2(a,n,amin,amax)
implicit none
!f2py intent(hidden) :: n
!f2py intent(out) :: amin,amax
!f2py intent(in) :: a
integer n
real a(n),amin,amax
amin = minval(a)
amax = maxval(a)
end subroutine minmax2

通过以下途径编译:

f2py -m untitled -c fortran_code.f90

现在我们到了一个可以测试它的地方:

import timeit


size = 100000
repeat = 10000


print timeit.timeit(
'np.min(a); np.max(a)',
setup='import numpy as np; a = np.arange(%d, dtype=np.float32)' % size,
number=repeat), " # numpy min/max"


print timeit.timeit(
'untitled.minmax1(a)',
setup='import numpy as np; import untitled; a = np.arange(%d, dtype=np.float32)' % size,
number=repeat), '# minmax1'


print timeit.timeit(
'untitled.minmax2(a)',
setup='import numpy as np; import untitled; a = np.arange(%d, dtype=np.float32)' % size,
number=repeat), '# minmax2'

结果让我有点吃惊:

8.61869883537 # numpy min/max
1.60417699814 # minmax1
2.30169081688 # minmax2

我不得不说,我不完全理解。仅仅比较 np.minminmax1minmax2仍然是一场失败的战斗,所以这不仅仅是一个记忆问题..。

Note ——增加一个 10**a因子的大小和减少一个 10**a因子的重复(保持问题大小不变)确实会改变性能,但不是以一种看似一致的方式,这表明在 python 中内存性能和函数调用开销之间存在一些相互作用。甚至比较一个简单的 min实现在 fortran 比 numpy 的一个因素大约2..。

有一个查找(max-min)函数叫做 Numpy.ptp,如果它对你有用的话:

>>> import numpy
>>> x = numpy.array([1,2,3,4,5,6])
>>> x.ptp()
5

但我不认为有办法通过一次遍历同时找到 min 和 max。

编辑: < a href = “ https://github.com/numpy/numpy/blob/cf9f1907b99d06291ab16ad4d2105a871f56f7d9/numpy/ma/core.py # L5515-L5546”rel = “ norefrer”> ptp 只是在引擎盖下调用 min 和 max

这是一个老线索,但无论如何,如果有人再次看到这个..。

当同时查找最小值和最大值时,可以减少比较的次数。如果您比较的是浮点数(我猜是浮点数) ,这可能会节省您一些时间,尽管计算复杂性不会降低。

而不是(Python 代码) :

_max = ar[0]
_min=  ar[0]
for ii in xrange(len(ar)):
if _max > ar[ii]: _max = ar[ii]
if _min < ar[ii]: _min = ar[ii]

您可以首先比较数组中的两个相邻值,然后只比较较小的值与当前最小值,以及较大的值与当前最大值:

## for an even-sized array
_max = ar[0]
_min = ar[0]
for ii in xrange(0, len(ar), 2)):  ## iterate over every other value in the array
f1 = ar[ii]
f2 = ar[ii+1]
if (f1 < f2):
if f1 < _min: _min = f1
if f2 > _max: _max = f2
else:
if f2 < _min: _min = f2
if f1 > _max: _max = f1

这里的代码是用 Python 编写的,显然为了提高速度,您可以使用 C、 Fortran 或 Cython,但是这样做的话,每次迭代要进行3次比较,使用 len (ar)/2次迭代,给出3/2 * len (ar)比较。与此相反,进行比较的“显而易见的方式”是每次迭代进行两次比较,导致2 * len (ar)比较。节省你25% 的对比时间。

也许有一天会有人发现这个有用。

第一眼看上去,numpy.histogram 出现了就能做到这一点:

count, (amin, amax) = numpy.histogram(a, bins=1)

... 但是如果你看看 来源的那个函数,它只是单独调用 a.min()a.max(),因此不能避免在这个问题中提到的性能问题。:-(

类似地,scipy.ndimage.measurements.extrema看起来像是一种可能性,但它也只是单独调用 a.min()a.max()

Numpy API 中是否有一个函数只需要一次传递数据就可以同时找到 max 和 min?

没有。在写这篇文章的时候,还没有这样的功能。(是的,如果有 曾经是这样一个函数,那么它的性能将优于在大型数组上连续调用 numpy.amin()numpy.amax()。)

您可以使用 Numba,它是一个使用 LLVM 的支持 NumPy 的动态 Python 编译器。由此产生的实现非常简单明了:

import numpy
import numba




@numba.jit
def minmax(x):
maximum = x[0]
minimum = x[0]
for i in x[1:]:
if i > maximum:
maximum = i
elif i < minimum:
minimum = i
return (minimum, maximum)




numpy.random.seed(1)
x = numpy.random.rand(1000000)
print(minmax(x) == (x.min(), x.max()))

它也应该比 Numpy 的 min() & max()实现更快。而且不需要编写一行 C/Fortran 代码。

进行您自己的性能测试,因为它总是依赖于您的体系结构、您的数据、您的软件包版本..。

通常,可以通过一次处理两个元素来减少 minmax 算法的比较次数,并且只将较小的元素与临时最小值进行比较,将较大的元素与临时最大值进行比较。平均而言,人们只需要3/4的比较,而不是一个幼稚的方法。

这可以用 c 或 fortran (或任何其他低级语言)实现,并且在性能方面几乎是无与伦比的。我使用 来演示这个原理,并得到一个非常快速的、与 dtype 无关的实现:

import numba as nb
import numpy as np


@nb.njit
def minmax(array):
# Ravel the array and return early if it's empty
array = array.ravel()
length = array.size
if not length:
return


# We want to process two elements at once so we need
# an even sized array, but we preprocess the first and
# start with the second element, so we want it "odd"
odd = length % 2
if not odd:
length -= 1


# Initialize min and max with the first item
minimum = maximum = array[0]


i = 1
while i < length:
# Get the next two items and swap them if necessary
x = array[i]
y = array[i+1]
if x > y:
x, y = y, x
# Compare the min with the smaller one and the max
# with the bigger one
minimum = min(x, minimum)
maximum = max(y, maximum)
i += 2


# If we had an even sized array we need to compare the
# one remaining item too.
if not odd:
x = array[length]
minimum = min(x, minimum)
maximum = max(x, maximum)


return minimum, maximum

这肯定比 佩克提出的幼稚方法要快:

arr = np.random.random(3000000)
assert minmax(arr) == minmax_peque(arr)  # warmup and making sure they are identical
%timeit minmax(arr)            # 100 loops, best of 3: 2.1 ms per loop
%timeit minmax_peque(arr)      # 100 loops, best of 3: 2.75 ms per loop

正如预期的那样,新的 minmax 实现只需要大约原始实现时间的3/4(2.1 / 2.75 = 0.7636363636363637)

没有人提到 笨蛋,百分比,所以我想我会。如果您要求 [0, 100]百分位数,它将给出一个由两个元素组成的数组,min (第0个百分位数)和 max (第100个百分位数)。

但是,它不能满足 OP 的目的: 它不比分别使用 min 和 max 快。这可能是由于一些机制,将允许非极端百分位数(一个更难的问题,其中 应该需要更长的时间)。

In [1]: import numpy


In [2]: a = numpy.random.normal(0, 1, 1000000)


In [3]: %%timeit
...: lo, hi = numpy.amin(a), numpy.amax(a)
...:
100 loops, best of 3: 4.08 ms per loop


In [4]: %%timeit
...: lo, hi = numpy.percentile(a, [0, 100])
...:
100 loops, best of 3: 17.2 ms per loop


In [5]: numpy.__version__
Out[5]: '1.14.4'

如果只请求 [0, 100],Numpy 的未来版本可以在特殊情况下跳过正常的百分比计算。在不向接口添加任何内容的情况下,有一种方法可以在一次调用中要求 Numpy 提供 min 和 max (与公认的答案相反) ,但是库的标准实现并没有利用这种情况来使其值得。

无论如何,这对我来说是值得的,所以我将为感兴趣的人提出最困难和最不优雅的解决方案。我的解决方案是在 C + + 中一次性实现一个多线程 min-max 算法,并使用它创建一个 Python 扩展模块。这项工作需要一些开销来学习如何使用 Python 和 NumPy C/C + + API,在这里我将展示代码,并为希望沿着这条道路前进的人提供一些小的解释和参考。

多线程最小/最大

这里没什么有趣的东西。数组被分成大小为 length / workers的块。计算 future中每个块的 min/max,然后扫描全局 min/max。

    // mt_np.cc
//
// multi-threaded min/max algorithm


#include <algorithm>
#include <future>
#include <vector>


namespace mt_np {


/*
* Get {min,max} in interval [begin,end)
*/
template <typename T> std::pair<T, T> min_max(T *begin, T *end) {
T min{*begin};
T max{*begin};
while (++begin < end) {
if (*begin < min) {
min = *begin;
continue;
} else if (*begin > max) {
max = *begin;
}
}
return {min, max};
}


/*
* get {min,max} in interval [begin,end) using #workers for concurrency
*/
template <typename T>
std::pair<T, T> min_max_mt(T *begin, T *end, int workers) {
const long int chunk_size = std::max((end - begin) / workers, 1l);
std::vector<std::future<std::pair<T, T>>> min_maxes;
// fire up the workers
while (begin < end) {
T *next = std::min(end, begin + chunk_size);
min_maxes.push_back(std::async(min_max<T>, begin, next));
begin = next;
}
// retrieve the results
auto min_max_it = min_maxes.begin();
auto v{min_max_it->get()};
T min{v.first};
T max{v.second};
while (++min_max_it != min_maxes.end()) {
v = min_max_it->get();
min = std::min(min, v.first);
max = std::max(max, v.second);
}
return {min, max};
}
}; // namespace mt_np

Python 扩展模块

这就是事情开始变得丑陋的地方... 在 Python 中使用 C + + 代码的一种方法是实现一个扩展模块。可以使用 distutils.core标准模块构建和安装此模块。Python 文档 https://docs.python.org/3/extending/extending.html对此有完整的描述。引用 https://docs.python.org/3/extending/index.html#extending-index的话,当然还有其他方法可以得到类似的结果:

本指南仅介绍创建作为此版本 CPython 的一部分提供的扩展的基本工具。Cython、 cffi、 SWIG 和 Numba 等第三方工具提供了更简单、更复杂的方法来为 Python 创建 C 和 C + + 扩展。

从本质上讲,这条路线可能更多的是学术性的,而不是实用性的。既然如此,我接下来要做的就是,紧跟教程,创建一个模块文件。这实际上是 distutils 的样板,它可以让 distutils 知道如何处理代码,并用代码创建一个 Python 模块。在执行这些操作之前,创建一个 Python虚拟环境可能是明智的,这样您就不会污染您的系统包(参见 https://docs.python.org/3/library/venv.html#module-venv)。

下面是模块文件:

// mt_np_forpy.cc
//
// C++ module implementation for multi-threaded min/max for np


#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION


#include <python3.6/numpy/arrayobject.h>


#include "mt_np.h"


#include <cstdint>
#include <iostream>


using namespace std;


/*
* check:
*  shape
*  stride
*  data_type
*  byteorder
*  alignment
*/
static bool check_array(PyArrayObject *arr) {
if (PyArray_NDIM(arr) != 1) {
PyErr_SetString(PyExc_RuntimeError, "Wrong shape, require (1,n)");
return false;
}
if (PyArray_STRIDES(arr)[0] != 8) {
PyErr_SetString(PyExc_RuntimeError, "Expected stride of 8");
return false;
}
PyArray_Descr *descr = PyArray_DESCR(arr);
if (descr->type != NPY_LONGLTR && descr->type != NPY_DOUBLELTR) {
PyErr_SetString(PyExc_RuntimeError, "Wrong type, require l or d");
return false;
}
if (descr->byteorder != '=') {
PyErr_SetString(PyExc_RuntimeError, "Expected native byteorder");
return false;
}
if (descr->alignment != 8) {
cerr << "alignment: " << descr->alignment << endl;
PyErr_SetString(PyExc_RuntimeError, "Require proper alignement");
return false;
}
return true;
}


template <typename T>
static PyObject *mt_np_minmax_dispatch(PyArrayObject *arr) {
npy_intp size = PyArray_SHAPE(arr)[0];
T *begin = (T *)PyArray_DATA(arr);
auto minmax =
mt_np::min_max_mt(begin, begin + size, thread::hardware_concurrency());
return Py_BuildValue("(L,L)", minmax.first, minmax.second);
}


static PyObject *mt_np_minmax(PyObject *self, PyObject *args) {
PyArrayObject *arr;
if (!PyArg_ParseTuple(args, "O", &arr))
return NULL;
if (!check_array(arr))
return NULL;
switch (PyArray_DESCR(arr)->type) {
case NPY_LONGLTR: {
return mt_np_minmax_dispatch<int64_t>(arr);
} break;
case NPY_DOUBLELTR: {
return mt_np_minmax_dispatch<double>(arr);
} break;
default: {
PyErr_SetString(PyExc_RuntimeError, "Unknown error");
return NULL;
}
}
}


static PyObject *get_concurrency(PyObject *self, PyObject *args) {
return Py_BuildValue("I", thread::hardware_concurrency());
}


static PyMethodDef mt_np_Methods[] = {
{"mt_np_minmax", mt_np_minmax, METH_VARARGS, "multi-threaded np min/max"},
{"get_concurrency", get_concurrency, METH_VARARGS,
"retrieve thread::hardware_concurrency()"},
{NULL, NULL, 0, NULL} /* sentinel */
};


static struct PyModuleDef mt_np_module = {PyModuleDef_HEAD_INIT, "mt_np", NULL,
-1, mt_np_Methods};


PyMODINIT_FUNC PyInit_mt_np() { return PyModule_Create(&mt_np_module); }

在这个文件中,Python 和 NumPy API 都得到了大量使用,更多信息请参考: https://docs.python.org/3/c-api/arg.html#c.PyArg_ParseTuple和 NumPy: https://docs.scipy.org/doc/numpy/reference/c-api.array.html

安装模组

接下来要做的事情是利用 distutils 来安装模块:

# setup.py


from distutils.core import setup,Extension


module = Extension('mt_np', sources = ['mt_np_module.cc'])


setup (name = 'mt_np',
version = '1.0',
description = 'multi-threaded min/max for np arrays',
ext_modules = [module])

要最终安装模块,请从您的虚拟环境中执行 python3 setup.py install

测试模组

最后,我们可以测试 C + + 实现是否实际上优于对 NumPy 的简单使用。为此,下面是一个简单的测试脚本:

# timing.py
# compare numpy min/max vs multi-threaded min/max


import numpy as np
import mt_np
import timeit


def normal_min_max(X):
return (np.min(X),np.max(X))


print(mt_np.get_concurrency())


for ssize in np.logspace(3,8,6):
size = int(ssize)
print('********************')
print('sample size:', size)
print('********************')
samples = np.random.normal(0,50,(2,size))
for sample in samples:
print('np:', timeit.timeit('normal_min_max(sample)',
globals=globals(),number=10))
print('mt:', timeit.timeit('mt_np.mt_np_minmax(sample)',
globals=globals(),number=10))

以下是我做这一切得到的结果:

8
********************
sample size: 1000
********************
np: 0.00012079699808964506
mt: 0.002468645994667895
np: 0.00011947099847020581
mt: 0.0020772050047526136
********************
sample size: 10000
********************
np: 0.00024697799381101504
mt: 0.002037393998762127
np: 0.0002713389985729009
mt: 0.0020942929986631498
********************
sample size: 100000
********************
np: 0.0007130410012905486
mt: 0.0019842900001094677
np: 0.0007540129954577424
mt: 0.0029724110063398257
********************
sample size: 1000000
********************
np: 0.0094779249993735
mt: 0.007134920000680722
np: 0.009129883001151029
mt: 0.012836456997320056
********************
sample size: 10000000
********************
np: 0.09471094200125663
mt: 0.0453535050037317
np: 0.09436299200024223
mt: 0.04188535599678289
********************
sample size: 100000000
********************
np: 0.9537652180006262
mt: 0.3957935369980987
np: 0.9624398809974082
mt: 0.4019058070043684

这些结果远不如前面在线程中显示的那样令人鼓舞,后者显示的速度提高了大约3.5倍,而且没有包含多线程。我得到的结果在一定程度上是合理的,我希望线程的开销和主导时间,直到数组变得非常大,这时性能的提高将开始接近 std::thread::hardware_concurrency x 的提高。

结论

对于某些 NumPy 代码,特定于应用程序的优化当然是有空间的,尤其是在多线程方面。我不清楚它是否值得这样做,但它确实看起来像是一个很好的练习(或什么东西)。我认为学习一些像 Cython 这样的“第三方工具”可能会更好地利用时间,但谁知道呢。

我想到的最简单的方法是:

mn, mx = np.sort(ar)[[0, -1]]

但是因为它对数组进行了排序,所以它不是最有效的。

另一条捷径是:

mn, mx = np.percentile(ar, [0, 100])

这样应该更有效率,但是计算结果并返回一个 float。

只是为了得到一些数字的想法,人们可以期待,给出以下方法:

import numpy as np




def extrema_np(arr):
return np.max(arr), np.min(arr)
import numba as nb




@nb.jit(nopython=True)
def extrema_loop_nb(arr):
n = arr.size
max_val = min_val = arr[0]
for i in range(1, n):
item = arr[i]
if item > max_val:
max_val = item
elif item < min_val:
min_val = item
return max_val, min_val
import numba as nb




@nb.jit(nopython=True)
def extrema_while_nb(arr):
n = arr.size
odd = n % 2
if not odd:
n -= 1
max_val = min_val = arr[0]
i = 1
while i < n:
x = arr[i]
y = arr[i + 1]
if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i += 2
if not odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)
return max_val, min_val
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True




import numpy as np




cdef void _extrema_loop_cy(
long[:] arr,
size_t n,
long[:] result):
cdef size_t i
cdef long item, max_val, min_val
max_val = arr[0]
min_val = arr[0]
for i in range(1, n):
item = arr[i]
if item > max_val:
max_val = item
elif item < min_val:
min_val = item
result[0] = max_val
result[1] = min_val




def extrema_loop_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_loop_cy(arr, arr.size, result)
return result[0], result[1]
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True




import numpy as np




cdef void _extrema_while_cy(
long[:] arr,
size_t n,
long[:] result):
cdef size_t i, odd
cdef long x, y, max_val, min_val
max_val = arr[0]
min_val = arr[0]
odd = n % 2
if not odd:
n -= 1
max_val = min_val = arr[0]
i = 1
while i < n:
x = arr[i]
y = arr[i + 1]
if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i += 2
if not odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)
result[0] = max_val
result[1] = min_val




def extrema_while_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_while_cy(arr, arr.size, result)
return result[0], result[1]

(extrema_loop_*()方法类似于提出的 给你方法,而 extrema_while_*()方法是基于来自 给你的代码)

下列时间:

bm

表明 extrema_while_*()是最快的,extrema_while_nb()是最快的。在任何情况下,extrema_loop_nb()extrema_loop_cy()解决方案也确实优于只使用 NumPy 的方法(分别使用 np.max()np.min())。

最后,请注意,它们都不如 np.min()/np.max()灵活(在 n-dim 支持、 axis参数等方面)。

(完整代码可用 给你)

受到 上一个答案的启发,我编写了 numba 实现,从2-D 数组返回 minmax for ax = 0。它比调用 numpy min/max 快5倍。 也许有人会觉得有用。

from numba import jit


@jit
def minmax(x):
"""Return minimum and maximum from 2D array for axis=0."""
m, n = len(x), len(x[0])
mi, ma = np.empty(n), np.empty(n)
mi[:] = ma[:] = x[0]
for i in range(1, m):
for j in range(n):
if x[i, j]>ma[j]: ma[j] = x[i, j]
elif x[i, j]<mi[j]: mi[j] = x[i, j]
return mi, ma


x = np.random.normal(size=(256, 11))
mi, ma = minmax(x)


np.all(mi == x.min(axis=0)), np.all(ma == x.max(axis=0))
# (True, True)




%timeit x.min(axis=0), x.max(axis=0)
# 15.9 µs ± 9.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit minmax(x)
# 2.62 µs ± 31.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

也许使用 numpy.unique? 像这样:

min_, max_ = numpy.unique(arr)[[0, -1]]

只是在这里添加了多样性:)它就像排序一样慢。