固定大小的队列,自动将旧值排队到新的枚举上

我使用 ConcurrentQueue作为一个共享数据结构,其目的是保存传递给它的最后 N 个对象(类似于历史)。

假设我们有一个浏览器,并且希望拥有最后100个浏览的 Url。我需要一个队列,当容量满时(历史上有100个地址) ,它会在新的条目插入(enqueue)时自动删除(取消队列)最老的(第一个)条目。

如何使用 System.Collections实现这一点?

104351 次浏览

我将编写一个包装器类,在 Enqueue 上检查 Count,然后在 Count 超过限制时检查 Dequeue。

 public class FixedSizedQueue<T>
{
ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();


public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}

不管怎样,这里有一个轻量级的循环缓冲区,其中包含一些标记为安全和不安全使用的方法。

public class CircularBuffer<T> : IEnumerable<T>
{
readonly int size;
readonly object locker;


int count;
int head;
int rear;
T[] values;


public CircularBuffer(int max)
{
this.size = max;
locker = new object();
count = 0;
head = 0;
rear = 0;
values = new T[size];
}


static int Incr(int index, int size)
{
return (index + 1) % size;
}


private void UnsafeEnsureQueueNotEmpty()
{
if (count == 0)
throw new Exception("Empty queue");
}


public int Size { get { return size; } }
public object SyncRoot { get { return locker; } }


#region Count


public int Count { get { return UnsafeCount; } }
public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
public int UnsafeCount { get { return count; } }


#endregion


#region Enqueue


public void Enqueue(T obj)
{
UnsafeEnqueue(obj);
}


public void SafeEnqueue(T obj)
{
lock (locker) { UnsafeEnqueue(obj); }
}


public void UnsafeEnqueue(T obj)
{
values[rear] = obj;


if (Count == Size)
head = Incr(head, Size);
rear = Incr(rear, Size);
count = Math.Min(count + 1, Size);
}


#endregion


#region Dequeue


public T Dequeue()
{
return UnsafeDequeue();
}


public T SafeDequeue()
{
lock (locker) { return UnsafeDequeue(); }
}


public T UnsafeDequeue()
{
UnsafeEnsureQueueNotEmpty();


T res = values[head];
values[head] = default(T);
head = Incr(head, Size);
count--;


return res;
}


#endregion


#region Peek


public T Peek()
{
return UnsafePeek();
}


public T SafePeek()
{
lock (locker) { return UnsafePeek(); }
}


public T UnsafePeek()
{
UnsafeEnsureQueueNotEmpty();


return values[head];
}


#endregion




#region GetEnumerator


public IEnumerator<T> GetEnumerator()
{
return UnsafeGetEnumerator();
}


public IEnumerator<T> SafeGetEnumerator()
{
lock (locker)
{
List<T> res = new List<T>(count);
var enumerator = UnsafeGetEnumerator();
while (enumerator.MoveNext())
res.Add(enumerator.Current);
return res.GetEnumerator();
}
}


public IEnumerator<T> UnsafeGetEnumerator()
{
int index = head;
for (int i = 0; i < count; i++)
{
yield return values[index];
index = Incr(index, size);
}
}


System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}


#endregion
}

我喜欢使用 Foo()/SafeFoo()/UnsafeFoo()惯例:

  • Foo方法默认调用 UnsafeFoo
  • UnsafeFoo方法可以在没有锁的情况下自由修改状态,它们应该只调用其他不安全的方法。
  • SafeFoo方法调用锁中的 UnsafeFoo方法。

虽然有点冗长,但是它会产生明显的错误,比如在一个应该是线程安全的方法中调用锁以外的不安全方法。

对于那些觉得它有用的人,这里有一些基于 Richard Schneider 上述答案的工作代码:

public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();


public int Size { get; private set; }


public FixedSizedQueue(int size)
{
Size = size;
}


public void Enqueue(T obj)
{
queue.Enqueue(obj);


while (queue.Count > Size)
{
T outObj;
queue.TryDequeue(out outObj);
}
}
}

我倾向于稍微改变一下... ... 扩展 ConcurrentQueue,以便能够在 FixedSizeQueue 上使用 Linq 扩展

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
private readonly object syncObject = new object();


public int Size { get; private set; }


public FixedSizedQueue(int size)
{
Size = size;
}


public new void Enqueue(T obj)
{
base.Enqueue(obj);
lock (syncObject)
{
while (base.Count > Size)
{
T outObj;
base.TryDequeue(out outObj);
}
}
}
}

为了您的编码乐趣,我提交给你的’ConcurrentDeck

public class ConcurrentDeck<T>
{
private readonly int _size;
private readonly T[] _buffer;
private int _position = 0;


public ConcurrentDeck(int size)
{
_size = size;
_buffer = new T[size];
}


public void Push(T item)
{
lock (this)
{
_buffer[_position] = item;
_position++;
if (_position == _size) _position = 0;
}
}


public T[] ReadDeck()
{
lock (this)
{
return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray();
}
}
}

示例用法:

void Main()
{
var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25);
var handle = new ManualResetEventSlim();
var task1 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task2 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
var task3 = Task.Factory.StartNew(()=>{
var timer = new System.Timers.Timer();
timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));};
timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds;
timer.Enabled = true;
handle.Wait();
});
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10));
handle.Set();
var outputtime = DateTime.Now;
deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true);
}

只是为了好玩,这里有另一个实现,我相信它解决了大多数评论者的担忧。特别是,线程安全性是在没有锁定的情况下实现的,并且实现是由包装类隐藏的。

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private int _count;


public int Limit { get; private set; }


public FixedSizeQueue(int limit)
{
this.Limit = limit;
}


public void Enqueue(T obj)
{
_queue.Enqueue(obj);
Interlocked.Increment(ref _count);


// Calculate the number of items to be removed by this thread in a thread safe manner
int currentCount;
int finalCount;
do
{
currentCount = _count;
finalCount = Math.Min(currentCount, this.Limit);
} while (currentCount !=
Interlocked.CompareExchange(ref _count, finalCount, currentCount));


T overflow;
while (currentCount > finalCount && _queue.TryDequeue(out overflow))
currentCount--;
}


public int Count
{
get { return _count; }
}


public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}


System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return _queue.GetEnumerator();
}
}

这是我的排队方式:

public class FixedSizedQueue<T> {
private object LOCK = new object();
ConcurrentQueue<T> queue;


public int MaxSize { get; set; }


public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) {
this.MaxSize = maxSize;
if (items == null) {
queue = new ConcurrentQueue<T>();
}
else {
queue = new ConcurrentQueue<T>(items);
EnsureLimitConstraint();
}
}


public void Enqueue(T obj) {
queue.Enqueue(obj);
EnsureLimitConstraint();
}


private void EnsureLimitConstraint() {
if (queue.Count > MaxSize) {
lock (LOCK) {
T overflow;
while (queue.Count > MaxSize) {
queue.TryDequeue(out overflow);
}
}
}
}




/// <summary>
/// returns the current snapshot of the queue
/// </summary>
/// <returns></returns>
public T[] GetSnapshot() {
return queue.ToArray();
}
}

我发现有一个构造函数构建在 IEnumable 之上是很有用的,而且我发现有一个 GetSnapshot 在调用时有一个多线程安全列表(这里是数组) ,在底层集合发生变化时不会产生错误。

双重计数检查是为了在某些情况下防止锁定。

我们再补充一个问题,为什么这个问题比其他问题更重要?

1)简单。尝试保证大小是好的,但是会导致不必要的复杂性,这会显示出它自己的问题。

2)实现 IReadOnlyCollection,这意味着您可以在它上面使用 Linq,并将其传递到各种需要 IEnumable 的事物中。

3)无锁。上面的许多解决方案都使用锁,这在无锁集合中是不正确的。

4)实现与 ConcurrentQueue 相同的一组方法、属性和接口,包括 IProducerConsumerCollection,如果你想使用 BlockingCollection 集合,这一点很重要。

如果 TryDequeue 失败,这个实现最终可能会带来比预期更多的条目,但是这种情况发生的频率似乎不值得编写专门的代码,这些代码将不可避免地阻碍性能并导致其自身意想不到的问题。

如果您绝对想要保证一个大小,那么实现 Prune ()或类似的方法似乎是最好的主意。您可以在其他方法(包括 TryDequeue)中使用 ReaderWriterLockSlim 读锁,并且只在修剪时使用写锁。

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
readonly ConcurrentQueue<T> m_concurrentQueue;
readonly int m_maxSize;


public int Count => m_concurrentQueue.Count;
public bool IsEmpty => m_concurrentQueue.IsEmpty;


public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }


public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
if (initialCollection == null) {
throw new ArgumentNullException(nameof(initialCollection));
}


m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
m_maxSize = maxSize;
}


public void Enqueue (T item) {
m_concurrentQueue.Enqueue(item);


if (m_concurrentQueue.Count > m_maxSize) {
T result;
m_concurrentQueue.TryDequeue(out result);
}
}


public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);


public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
public T[] ToArray () => m_concurrentQueue.ToArray();


public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();


// Explicit ICollection implementations.
void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;


// Explicit IProducerConsumerCollection<T> implementations.
bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);


public override int GetHashCode () => m_concurrentQueue.GetHashCode();
public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
public override string ToString () => m_concurrentQueue.ToString();
}

我的版本只是一个正常 Queue的子类。.没什么特别的,只是看到每个人都参与了,而且它仍然与主题标题一致,我不妨把它放在这里。为了以防万一,它还返回出队列的数据。

public sealed class SizedQueue<T> : Queue<T>
{
public int FixedCapacity { get; }
public SizedQueue(int fixedCapacity)
{
this.FixedCapacity = fixedCapacity;
}


/// <summary>
/// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
/// </summary>
/// <returns>The dequeued value, if any.</returns>
public new T Enqueue(T item)
{
base.Enqueue(item);
if (base.Count > FixedCapacity)
{
return base.Dequeue();
}
return default;
}
}

嗯,这取决于使用,我已经注意到,上面的一些解决方案可能会超过大小时,在多线程环境中使用。无论如何,我的用例是显示最后5个事件,有多个线程将事件写入到队列中,另一个线程从中读取并显示在 WInformingControl 中。所以这就是我的解决办法。

编辑: 因为我们已经在实现中使用了锁,所以我们并不真正需要 ConcurrentQueue,它可以提高性能。

class FixedSizedConcurrentQueue<T>
{
readonly Queue<T> queue = new Queue<T>();
readonly object syncObject = new object();


public int MaxSize { get; private set; }


public FixedSizedConcurrentQueue(int maxSize)
{
MaxSize = maxSize;
}


public void Enqueue(T obj)
{
lock (syncObject)
{
queue.Enqueue(obj);
while (queue.Count > MaxSize)
{
queue.Dequeue();
}
}
}


public T[] ToArray()
{
T[] result = null;
lock (syncObject)
{
result = queue.ToArray();
}


return result;
}


public void Clear()
{
lock (syncObject)
{
queue.Clear();
}
}
}

编辑: 在上面的例子中,我们实际上并不需要 syncObject,我们可以使用 queue对象,因为我们没有在任何函数中重新初始化 queue,并且它被标记为 readonly

以下是我对固定大小队列的看法

它使用常规 Queue,以避免在 ConcurrentQueue上使用 Count属性时的同步开销。它还实现了 IReadOnlyCollection,因此可以使用 LINQ 方法。其余的答案与这里的其他答案非常相似。

[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();


public int Count { get { lock (_lock) { return _queue.Count; } } }
public int Limit { get; }


public FixedSizedQueue(int limit)
{
if (limit < 1)
throw new ArgumentOutOfRangeException(nameof(limit));


Limit = limit;
}


public FixedSizedQueue(IEnumerable<T> collection)
{
if (collection is null || !collection.Any())
throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));


_queue = new Queue<T>(collection);
Limit = _queue.Count;
}


public void Enqueue(T obj)
{
lock (_lock)
{
_queue.Enqueue(obj);


while (_queue.Count > Limit)
_queue.Dequeue();
}
}


public void Clear()
{
lock (_lock)
_queue.Clear();
}


public IEnumerator<T> GetEnumerator()
{
lock (_lock)
return new List<T>(_queue).GetEnumerator();
}


IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}

这个被接受的答案将会产生可以避免的副作用。

Fine-Grained Locking and Lock-Free Mechanisms

下面的链接是我在写下面的示例时使用的参考资料。

虽然来自微软的文档有点误导,因为他们确实使用了锁,但是他们锁定了段类。段类本身使用 Interlock。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;


namespace Lib.Core
{
// Sources:
// https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/
// https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked?view=netcore-3.1
// https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
// https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs


/// <summary>
/// Concurrent safe circular buffer that will used a fixed capacity specified and resuse slots as it goes.
/// </summary>
/// <typeparam name="TObject">The object that you want to go into the slots.</typeparam>
public class ConcurrentCircularBuffer<TObject>
{
private readonly ConcurrentQueue<TObject> _queue;


public int Capacity { get; private set; }


public ConcurrentCircularBuffer(int capacity)
{
if(capacity <= 0)
{
throw new ArgumentException($"The capacity specified '{capacity}' is not valid.", nameof(capacity));
}


// Setup the queue to the initial capacity using List's underlying implementation.
_queue = new ConcurrentQueue<TObject>(new List<TObject>(capacity));


Capacity = capacity;
}


public void Enqueue(TObject @object)
{
// Enforce the capacity first so the head can be used instead of the entire segment (slow).
while (_queue.Count + 1 > Capacity)
{
if (!_queue.TryDequeue(out _))
{
// Handle error condition however you want to ie throw, return validation object, etc.
var ex = new Exception("Concurrent Dequeue operation failed.");
ex.Data.Add("EnqueueObject", @object);
throw ex;
}
}


// Place the item into the queue
_queue.Enqueue(@object);
}


public TObject Dequeue()
{
if(_queue.TryDequeue(out var result))
{
return result;
}


return default;
}
}
}

下面是另一个实现,它尽可能使用底层 ConcurrentQueue,同时提供通过 ConcurrentQueue 提供的相同接口。

/// <summary>
/// This is a FIFO concurrent queue that will remove the oldest added items when a given limit is reached.
/// </summary>
/// <typeparam name="TValue"></typeparam>
public class FixedSizedConcurrentQueue<TValue> : IProducerConsumerCollection<TValue>, IReadOnlyCollection<TValue>
{
private readonly ConcurrentQueue<TValue> _queue;


private readonly object _syncObject = new object();


public int LimitSize { get; }


public FixedSizedConcurrentQueue(int limit)
{
_queue = new ConcurrentQueue<TValue>();
LimitSize = limit;
}


public FixedSizedConcurrentQueue(int limit, System.Collections.Generic.IEnumerable<TValue> collection)
{
_queue = new ConcurrentQueue<TValue>(collection);
LimitSize = limit;


}


public int Count => _queue.Count;


bool ICollection.IsSynchronized => ((ICollection) _queue).IsSynchronized;


object ICollection.SyncRoot => ((ICollection)_queue).SyncRoot;


public bool IsEmpty => _queue.IsEmpty;


// Not supported until .NET Standard 2.1
//public void Clear() => _queue.Clear();


public void CopyTo(TValue[] array, int index) => _queue.CopyTo(array, index);


void ICollection.CopyTo(Array array, int index) => ((ICollection)_queue).CopyTo(array, index);


public void Enqueue(TValue obj)
{
_queue.Enqueue(obj);
lock( _syncObject )
{
while( _queue.Count > LimitSize ) {
_queue.TryDequeue(out _);
}
}
}


public IEnumerator<TValue> GetEnumerator() => _queue.GetEnumerator();


IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<TValue>)this).GetEnumerator();


public TValue[] ToArray() => _queue.ToArray();


public bool TryAdd(TValue item)
{
Enqueue(item);
return true;
}


bool IProducerConsumerCollection<TValue>.TryTake(out TValue item) => TryDequeue(out item);


public bool TryDequeue(out TValue result) => _queue.TryDequeue(out result);


public bool TryPeek(out TValue result) => _queue.TryPeek(out result);


}

只是因为还没有人说过. . 你可以使用 LinkedList<T>和添加线程安全:

public class Buffer<T> : LinkedList<T>
{
private int capacity;


public Buffer(int capacity)
{
this.capacity = capacity;
}


public void Enqueue(T item)
{
// todo: add synchronization mechanism
if (Count == capacity) RemoveLast();
AddFirst(item);
}


public T Dequeue()
{
// todo: add synchronization mechanism
var last = Last.Value;
RemoveLast();
return last;
}
}

需要注意的一点是,本例中的默认枚举顺序为 LIFO。但如果有必要,可以重写。

using System.Collections.Concurrent;


public class FixedSizeQueue<T>
{
ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();


private void Enque(T obj)
{
T temp;


if (_queue.Count > 99)
{
// Remove one of the oldest added items.
_queue.TryDequeue(out temp);
}


_queue.Enqueue(obj);
}


private bool Dequeue(out T obj)
{
return _queue.TryDequeue(out obj);
}


private void Clear()
{
T obj;


// It does not fall into an infinite loop, and clears the contents of the present time.
int cnt = _queue.Count;
for (; cnt > 0; cnt--)
{
_queue.TryDequeue(out obj);
}
}
}