从整数流中找到运行中位数

< p > 可能的重复: < br > 滚动中值算法 < / p >

假设整数是从数据流中读取的。以有效的方式查找到目前为止读取的元素的中位数。

我读过的解决方案:我们可以在左边使用max堆来表示小于有效中位数的元素,在右边使用min堆来表示大于有效中位数的元素。

在处理一个传入元素后,堆中的元素数量最多相差1个元素。当两个堆包含相同数量的元素时,我们发现堆根数据的平均值为有效中位数。当堆不平衡时,我们从包含更多元素的堆根中选择有效中值。

但是我们如何构造最大堆和最小堆也就是说,我们如何知道这里的有效中值?我认为我们应该在max-heap中插入1个元素然后在min-heap中插入下一个元素,如此类推。如果我说错了请指正。

186111 次浏览

从流数据中找到运行中值有许多不同的解决方案,我将在答案的最后简要地讨论它们。

这个问题是关于特定解决方案(最大堆/最小堆解决方案)的细节,以及基于堆的解决方案如何工作的解释如下:

对于前两个元素,将较小的元素添加到左边的maxHeap中,将较大的元素添加到右边的minHeap中。然后逐个处理流数据,

Step 1: Add next item to one of the heaps


if next item is smaller than maxHeap root add it to maxHeap,
else add it to minHeap


Step 2: Balance the heaps (after this step heaps will be either balanced or
one of them will contain 1 more item)


if number of elements in one of the heaps is greater than the other by
more than 1, remove the root element from the one containing more elements and
add to the other one

然后在任何给定的时间,你都可以像这样计算中值:

   If the heaps contain equal amount of elements;
median = (root of maxHeap + root of minHeap)/2
Else
median = root of the heap with more elements

现在我将像在答案开始时承诺的那样,大致地谈谈这个问题。从数据流中找到运行中值是一个棘手的问题,并且有效地找到具有内存限制的精确解在一般情况下可能是不可能的。另一方面,如果数据具有一些我们可以利用的特征,我们就可以开发有效的专门解决方案。例如,如果我们知道数据是整型,那么我们可以使用计数排序,它可以给你一个常量内存常量时间算法。基于堆的解决方案是一种更通用的解决方案,因为它也可以用于其他数据类型(双精度)。最后,如果不需要精确的中位数,一个近似值就足够了,你可以试着估计数据的概率密度函数,然后用它来估计中位数。

高效这个词取决于上下文。这个问题的解决方案取决于执行的查询量与插入量的关系。假设你插入N个数字K次直到最后你对中位数感兴趣。基于堆的算法的复杂度是O(N log N + K)。

考虑下面的替代方案。将数字放入一个数组中,对于每个查询,运行线性选择算法(比如使用快速排序枢轴)。现在你有了一个运行时间为O(K N)的算法。

现在如果K足够小(不频繁查询),后一种算法实际上更有效,反之亦然。

你不能只用一个堆来做这个吗?更新:没有。请看评论。

不变性:读入2*n输入后,最小堆保存其中最大的n

循环:读取2个输入。将它们都添加到堆中,并删除堆的最小值。这将重新建立不变量。

因此,当2n输入被读取时,堆的最小值是第n大的。在中间位置附近取两个元素的平均值,以及在奇数个输入之后处理查询,需要稍微复杂一点。

如果您不能一次将所有项保存在内存中,这个问题就会变得更加困难。堆解决方案要求您一次将所有元素保存在内存中。这在这个问题的大多数实际应用中是不可能的。

相反,当你看到数字时,跟踪你看到每个整数的次数的。假设4个字节整数,即2^32个桶,或最多2^33个整数(每个int的key和count),即2^35字节或32GB。它可能会比这个小得多,因为您不需要存储键或为那些为0的条目计数(例如。就像python中的defaultdict)。插入每个新整数需要常数时间。

然后在任意点,要找到中位数,只需使用计数来确定哪个整数是中间元素。这需要常数时间(虽然是一个很大的常数,但仍然是常数)。

如果输入的方差是统计分布的(如正态分布、对数正态分布等),那么从任意长的数据流中估计百分位数/中位数是一种合理的方法。

int n = 0;  // Running count of elements observed so far
#define SIZE 10000
int reservoir[SIZE];


while(streamHasData())
{
int x = readNumberFromStream();


if (n < SIZE)
{
reservoir[n++] = x;
}
else
{
int p = random(++n); // Choose a random number 0 >= p < n
if (p < SIZE)
{
reservoir[p] = x;
}
}
}

“水库”则是一个运行的,均匀的(公平的),所有输入的样本-无论大小。然后找到中位数(或任何百分位数)是一个简单的问题,即对存储库进行排序并轮询感兴趣的点。

由于存储库的大小是固定的,因此排序可以被认为是有效的O(1) -并且该方法运行的时间和内存消耗都是常数。

我发现的计算流百分位数最有效的方法是P²算法:Raj Jain, Imrich Chlamtac:不存储观测值的动态计算分位数和直方图的P²算法。Commun。Acm 28(10): 1076-1085 (1985)

该算法易于实现,工作效果非常好。然而,这只是一个估计,所以要记住这一点。来自摘要:

提出了一种动态计算中位数和其他分位数的启发式算法。随着观测值的生成,估算值是动态生成的。观察结果不被存储;因此,无论观测数据的数量如何,该算法的存储需求都非常小且固定。这使得它非常适合用于工业控制器和记录仪的分位数芯片中实现。该算法进一步推广到直方图绘制。分析了算法的精度。

如果我们想要找到n最近出现的元素的中位数,这个问题有一个精确的解决方案,只需要将n最近出现的元素保存在内存中。它速度快,规模大。

可转位skiplist支持对任意元素进行O(ln n)次插入、删除和索引搜索,同时保持排序顺序。当与跟踪第n个最古老条目的先进先出队列相结合时,解决方案很简单:

class RunningMedian:
'Fast running median with O(lg n) updates where n is the window size'


def __init__(self, n, iterable):
self.it = iter(iterable)
self.queue = deque(islice(self.it, n))
self.skiplist = IndexableSkiplist(n)
for elem in self.queue:
self.skiplist.insert(elem)


def __iter__(self):
queue = self.queue
skiplist = self.skiplist
midpoint = len(queue) // 2
yield skiplist[midpoint]
for newelem in self.it:
oldelem = queue.popleft()
skiplist.remove(oldelem)
queue.append(newelem)
skiplist.insert(newelem)
yield skiplist[midpoint]

以下是完整工作代码的链接(一个易于理解的类版本和一个内联可索引的skiplist代码的优化生成器版本):

一种直观的思考方法是,如果你有一个完全平衡的二叉搜索树,那么根就是中位数元素,因为这里有相同数量的较大和较小的元素。 现在,如果树没有满,情况就不一样了,因为上一层中会有元素缺失。< / p >

所以我们可以用中值和两棵平衡二叉树,一棵表示小于中值的元素,另一棵表示大于中值的元素。这两棵树必须保持相同的大小。

当我们从数据流中获得一个新整数时,我们将其与中位数进行比较。如果它大于中值,我们就把它加到右边的树上。如果两个树的大小相差超过1,我们删除右边树的最小元素,使其成为新的中值,并将旧的中值放在左边树中。更小的也一样。

下面是我简单但有效的算法(c++),用于从整数流中计算运行中值:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>


using namespace std;


void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
if (bufSize < 1)
throw exception("Wrong buffer size.");
bool evenSize = bufSize % 2 == 0 ? true : false;
list<int> q;
vector<int> nums;
int n;
unsigned count = 0;
while (ifs.good()) {
ifs >> n;
q.push_back(n);
auto ub = std::upper_bound(nums.begin(), nums.end(), n);
nums.insert(ub, n);
count++;
if (nums.size() >= bufSize) {
auto it = std::find(nums.begin(), nums.end(), q.front());
nums.erase(it);
q.pop_front();
if (evenSize)
ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
else
ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
}
}
}
bufferSize指定数字序列的大小,必须在此基础上计算运行中位数。当从输入流ifs中读取数字时,大小为bufferSize的向量保持有序。中位数的计算方法是,如果bufferSize是奇数,则取已排序向量的中间值;如果bufferSize是偶数,则取中间两个元素的和除以2。另外,我维护了从输入中读取的最后bufferSize元素的列表。当添加一个新元素时,我将它放在排序向量中的正确位置,并从向量中删除前几步添加bufferSize的元素(保留在列表前面的元素的值)。与此同时,我从列表中删除旧元素:每个新元素都放在列表的后面,每个旧元素都从前面删除。在到达bufferSize之后,列表和向量都停止增长,并且每插入一个新元素都会被删除一个旧元素,该旧元素位于之前的bufferSize步骤列表中。注意,我并不关心从向量中移除的元素是在前面放置bufferSize的元素,还是只是一个具有相同值的元素。对于中值,这无关紧要。 所有计算的中值都输出到输出流中

我可以确认@schmil-the-cat的回答是正确的

下面是一个JS的实现。我不是算法专家,但我认为它可能对其他人有用。


class Heap {
constructor(isMin) {
this.heap = [];
this.isMin = isMin;
}


heapify() {
if (this.heap.length === 1) {
return;
}


let currentIndex = this.heap.length - 1;


while (true) {
if (currentIndex === 0) {
break;
}


const parentIndex = Math.floor((currentIndex - 1) / 2);
const parentValue = this.heap[parentIndex];
const currentValue = this.heap[currentIndex];


if (
(this.isMin && parentValue < currentValue) ||
(!this.isMin && parentValue > currentValue)
) {
break;
}


this.heap[parentIndex] = currentValue;
this.heap[currentIndex] = parentValue;


currentIndex = parentIndex;
}
}


insert(val) {
this.heap.push(val);


this.heapify();
}


pop() {
const val = this.heap.shift();
this.heapify();
return val;
}


top() {
return this.heap[0];
}


length() {
return this.heap.length;
}
}


function findMedian(arr) {
const topHeap = new Heap(true);
const bottomHeap = new Heap(false);


const output = [];


if (arr.length === 1) {
return arr[0];
}


topHeap.insert(Math.max(arr[0], arr[1]));
bottomHeap.insert(Math.min(arr[0], arr[1]));


for (let i = 0; i < arr.length; i++) {
const currentVal = arr[i];


if (i === 0) {
output.push(currentVal);
continue;
}


if (i > 1) {
if (currentVal < bottomHeap.top()) {
bottomHeap.insert(currentVal);
} else {
topHeap.insert(currentVal);
}
}


if (bottomHeap.length() - topHeap.length() > 1) {
const bottomVal = bottomHeap.pop();
topHeap.insert(bottomVal);
}


if (topHeap.length() - bottomHeap.length() > 1) {
const topVal = topHeap.pop();
bottomHeap.insert(topVal);
}


if (bottomHeap.length() === topHeap.length()) {
output.push(Math.floor((bottomHeap.top() + topHeap.top()) / 2));
continue;
}


if (bottomHeap.length() > topHeap.length()) {
output.push(bottomHeap.top());
} else {
output.push(topHeap.top());
}
}


return output;
}