C # 对象池模式实现

有没有人有一个很好的资源来实现一个共享对象池策略,对于一个有限的资源,就像 Sql 连接池一样?(即将完全实现,它是线程安全的)。

要对@Aaronort 请求进行后续澄清,池的使用将用于对外部服务的负载平衡请求。把它放在一个可能更容易立即理解的场景,而不是我的直接情况。我有一个会话对象,它的功能类似于来自 NHibernate 的 ISession对象。每个唯一的会话管理它到数据库的连接。目前,我有一个长时间运行的会话对象,并遇到问题,我的服务提供商是速率限制我的使用这个单独的会话。

由于他们没有期望一个会话会被当作一个长时间运行的服务帐户,所以他们显然把它当作一个正在打击他们的服务的客户端。这就引出了我的问题,我不会只有一个单独的会话,而是创建一个不同的会话池,并将请求分散到多个会话的服务中,而不是像以前那样创建一个单独的协调中心。

希望这些背景知识能够提供一些价值,但也能直接回答你的一些问题:

问: 创建这些对象是否很昂贵?
没有对象是一个有限资源池

问: 他们会经常被收购/释放吗?
A: 是的,它们可以再次被认为是 NHibernate ISessions,通常在每个页面请求期间获取并释放1。

问: 一个简单的先到先得的服务就足够了吗? 还是你需要一些更智能的东西,比如能够防止饥饿的东西?
A: 一个简单的循环类型分布就足够了,关于饥饿,我假设您的意思是如果没有可用的会话,那么调用者就会被阻塞,等待释放。这实际上并不适用,因为会话可以由不同的调用方共享。我的目标是将使用分布在多个会话之间,而不是单个会话。

我相信这可能与对象池的正常使用有所不同,这就是为什么我最初省略了这一部分,并计划仅仅调整模式以允许共享对象,而不是允许出现饥饿情况。

问: 优先级、懒惰与渴望加载等事情怎么样?
答: 没有涉及优先级,为了简单起见,假设我将在创建池本身时创建可用对象池。

83680 次浏览

本文面向 Java,公开了 ConnectionImpl 池模式和抽象的对象池模式,可能是一个很好的第一种方法: Http://www.developer.com/design/article.php/626171/pattern-summaries-object-pool.htm

对象池模式:

pattern

回到过去,微软通过 Microsoft Transaction Server (MTS)和后来的 COM + 提供了一个框架来为 COM 对象进行对象池。该功能被转移到 System。中的企业服务。NET 框架,现在在 Windows 通信基础中。

WCF 中的对象池

这篇文章来自。NET 1.1,但应该仍然适用于当前版本的框架(即使 WCF 是首选方法)。

对象池.NET

这样的东西也许能满足你的需要。

/// <summary>
/// Represents a pool of objects with a size limit.
/// </summary>
/// <typeparam name="T">The type of object in the pool.</typeparam>
public sealed class ObjectPool<T> : IDisposable
where T : new()
{
private readonly int size;
private readonly object locker;
private readonly Queue<T> queue;
private int count;




/// <summary>
/// Initializes a new instance of the ObjectPool class.
/// </summary>
/// <param name="size">The size of the object pool.</param>
public ObjectPool(int size)
{
if (size <= 0)
{
const string message = "The size of the pool must be greater than zero.";
throw new ArgumentOutOfRangeException("size", size, message);
}


this.size = size;
locker = new object();
queue = new Queue<T>();
}




/// <summary>
/// Retrieves an item from the pool.
/// </summary>
/// <returns>The item retrieved from the pool.</returns>
public T Get()
{
lock (locker)
{
if (queue.Count > 0)
{
return queue.Dequeue();
}


count++;
return new T();
}
}


/// <summary>
/// Places an item in the pool.
/// </summary>
/// <param name="item">The item to place to the pool.</param>
public void Put(T item)
{
lock (locker)
{
if (count < size)
{
queue.Enqueue(item);
}
else
{
using (item as IDisposable)
{
count--;
}
}
}
}


/// <summary>
/// Disposes of items in the pool that implement IDisposable.
/// </summary>
public void Dispose()
{
lock (locker)
{
count = 0;
while (queue.Count > 0)
{
using (queue.Dequeue() as IDisposable)
{


}
}
}
}
}

示例用法

public class ThisObject
{
private readonly ObjectPool<That> pool = new ObjectPool<That>(100);


public void ThisMethod()
{
var that = pool.Get();


try
{
// Use that ....
}
finally
{
pool.Put(that);
}
}
}

由于一些未知因素,这个问题比人们想象的要复杂一些: 被池化的资源的行为、对象的预期/需要的生命周期、需要池的真正原因等等。通常池是特殊用途的-线程池、连接池等等-因为当您确切地知道资源做什么时,更容易优化一个,更重要的是在资源如何实现方面有 控制

因为它不是那么简单,所以我试图提供一个相当灵活的方法,你可以尝试,看看什么工作最好。先为这个长篇文章道歉,但是当涉及到实现一个像样的通用资源库时,还有很多地方需要讨论。我真的只是触及了皮毛。

一个通用池必须有几个主要的“设置”,包括:

  • 资源加载策略-渴望或懒惰;
  • 资源加载 机械装置-如何实际构建一个;
  • 访问策略——您提到的“循环”并不像听起来那么简单; 这个实现可以使用循环缓冲区 相似,但并不完美,因为池无法控制何时实际回收资源。其他的选择是 FIFO 和 LIFO; FIFO 将有更多的随机访问模式,但是 LIFO 使得实现最近使用最少的释放策略变得非常容易(你说过这超出了范围,但是仍然值得一提)。

对于资源加载机制,.NET 已经为我们提供了一个干净的抽象委托。

private Func<Pool<T>, T> factory;

通过池的构造函数传递这个,我们就完成了。使用带有 new()约束的泛型类型也可以工作,但是这样更灵活。


在其他两个参数中,访问策略更为复杂,因此我的方法是使用基于继承(接口)的方法:

public class Pool<T> : IDisposable
{
// Other code - we'll come back to this


interface IItemStore
{
T Fetch();
void Store(T item);
int Count { get; }
}
}

这里的概念很简单——我们将让公共 Pool类处理线程安全等常见问题,但对每种访问模式使用不同的“项存储”。LIFO 很容易用堆栈表示,FIFO 是一个队列,我使用了一个不太优化但可能足够的循环缓冲区实现,使用 List<T>和索引指针来近似一个循环访问模式。

下面的所有类都是 Pool<T>的内部类——这是一个风格选择,但是因为这些类并不意味着要在 Pool之外使用,所以它是最有意义的。

    class QueueStore : Queue<T>, IItemStore
{
public QueueStore(int capacity) : base(capacity)
{
}


public T Fetch()
{
return Dequeue();
}


public void Store(T item)
{
Enqueue(item);
}
}


class StackStore : Stack<T>, IItemStore
{
public StackStore(int capacity) : base(capacity)
{
}


public T Fetch()
{
return Pop();
}


public void Store(T item)
{
Push(item);
}
}

这些是显而易见的——堆栈和队列。我不认为他们真的需要更多的解释。循环缓冲区稍微复杂一些:

    class CircularStore : IItemStore
{
private List<Slot> slots;
private int freeSlotCount;
private int position = -1;


public CircularStore(int capacity)
{
slots = new List<Slot>(capacity);
}


public T Fetch()
{
if (Count == 0)
throw new InvalidOperationException("The buffer is empty.");


int startPosition = position;
do
{
Advance();
Slot slot = slots[position];
if (!slot.IsInUse)
{
slot.IsInUse = true;
--freeSlotCount;
return slot.Item;
}
} while (startPosition != position);
throw new InvalidOperationException("No free slots.");
}


public void Store(T item)
{
Slot slot = slots.Find(s => object.Equals(s.Item, item));
if (slot == null)
{
slot = new Slot(item);
slots.Add(slot);
}
slot.IsInUse = false;
++freeSlotCount;
}


public int Count
{
get { return freeSlotCount; }
}


private void Advance()
{
position = (position + 1) % slots.Count;
}


class Slot
{
public Slot(T item)
{
this.Item = item;
}


public T Item { get; private set; }
public bool IsInUse { get; set; }
}
}

我本来可以选择一些不同的方法,但是底线是资源应该按照它们被创建的顺序被访问,这意味着我们必须维护对它们的引用,但是将它们标记为“正在使用”(或者没有使用)。在绝境求生手册中,只有一个槽可用,并且每次提取都要对缓冲区进行完整的迭代。如果您有数百个资源池,并且每秒多次获取和释放这些资源,那么这种情况就很糟糕; 对于5-10个项目的池来说,这不是什么问题,而且在 典型的的情况下,资源使用很少,它只需要提前一到两个插槽。

请记住,这些类是私有的内部类——这就是为什么它们不需要大量的错误检查,因为池本身限制了对它们的访问。

引入一个枚举和一个工厂方法,我们就完成了这一部分:

// Outside the pool
public enum AccessMode { FIFO, LIFO, Circular };


private IItemStore itemStore;


// Inside the Pool
private IItemStore CreateItemStore(AccessMode mode, int capacity)
{
switch (mode)
{
case AccessMode.FIFO:
return new QueueStore(capacity);
case AccessMode.LIFO:
return new StackStore(capacity);
default:
Debug.Assert(mode == AccessMode.Circular,
"Invalid AccessMode in CreateItemStore");
return new CircularStore(capacity);
}
}

下一个需要解决的问题是加载策略,我已经定义了三种类型:

public enum LoadingMode { Eager, Lazy, LazyExpanding };

前两个应该是不言自明的; 第三个是混合型的,它延迟加载资源,但是在池满之前不会重用任何资源。如果您希望池是满的(听起来确实如此) ,但又想将实际创建它们的费用推迟到第一次访问(即提高启动时间) ,那么这将是一个很好的权衡。

加载方法实际上并不太复杂,现在我们已经有了商品存储抽象:

    private int size;
private int count;


private T AcquireEager()
{
lock (itemStore)
{
return itemStore.Fetch();
}
}


private T AcquireLazy()
{
lock (itemStore)
{
if (itemStore.Count > 0)
{
return itemStore.Fetch();
}
}
Interlocked.Increment(ref count);
return factory(this);
}


private T AcquireLazyExpanding()
{
bool shouldExpand = false;
if (count < size)
{
int newCount = Interlocked.Increment(ref count);
if (newCount <= size)
{
shouldExpand = true;
}
else
{
// Another thread took the last spot - use the store instead
Interlocked.Decrement(ref count);
}
}
if (shouldExpand)
{
return factory(this);
}
else
{
lock (itemStore)
{
return itemStore.Fetch();
}
}
}


private void PreloadItems()
{
for (int i = 0; i < size; i++)
{
T item = factory(this);
itemStore.Store(item);
}
count = size;
}

上面的 sizecount字段分别表示池的最大大小和池所拥有的资源总数(但不一定是 有空)。AcquireEager是最简单的,它假设一个项目已经在存储中-这些项目将在构造时预加载,即在最后显示的 PreloadItems方法中。

AcquireLazy检查池中是否有空闲项,如果没有,则创建一个新项。只要池还没有达到目标大小,AcquireLazyExpanding就会创建一个新资源。我已经尝试优化它以最小化锁定,并且我希望我没有犯任何错误(我 在多线程条件下测试了它,但显然没有详尽地测试)。

您可能想知道为什么这些方法都不检查存储是否已经达到最大大小。我一会儿再说这个。


现在轮到游泳池了。下面是全套私人数据,其中一些已经显示出来:

    private bool isDisposed;
private Func<Pool<T>, T> factory;
private LoadingMode loadingMode;
private IItemStore itemStore;
private int size;
private int count;
private Semaphore sync;

回答我在上一段中忽略的问题——如何确保我们限制创建的资源总数——结果是。NET 已经有了一个非常好的工具来实现这一点,它被称为 信号灯,它被专门设计来允许固定数量的线程访问一个资源(在这种情况下,“资源”是内部项存储)。因为我们没有实现一个完整的生产者/消费者队列,所以这完全足以满足我们的需求。

构造函数如下:

    public Pool(int size, Func<Pool<T>, T> factory,
LoadingMode loadingMode, AccessMode accessMode)
{
if (size <= 0)
throw new ArgumentOutOfRangeException("size", size,
"Argument 'size' must be greater than zero.");
if (factory == null)
throw new ArgumentNullException("factory");


this.size = size;
this.factory = factory;
sync = new Semaphore(size, size);
this.loadingMode = loadingMode;
this.itemStore = CreateItemStore(accessMode, size);
if (loadingMode == LoadingMode.Eager)
{
PreloadItems();
}
}

应该不会有什么意外。唯一需要注意的是,使用前面已经显示的 PreloadItems方法进行即时加载的特殊大小写。

因为到目前为止,几乎所有的东西都已经被清楚地抽象出来了,所以实际的 AcquireRelease方法真的非常简单:

    public T Acquire()
{
sync.WaitOne();
switch (loadingMode)
{
case LoadingMode.Eager:
return AcquireEager();
case LoadingMode.Lazy:
return AcquireLazy();
default:
Debug.Assert(loadingMode == LoadingMode.LazyExpanding,
"Unknown LoadingMode encountered in Acquire method.");
return AcquireLazyExpanding();
}
}


public void Release(T item)
{
lock (itemStore)
{
itemStore.Store(item);
}
sync.Release();
}

如前所述,我们使用 Semaphore来控制并发性,而不是严格地检查项目存储的状态。只要获得的物品被正确释放,就没有什么可担心的。

最后但并非最不重要的是,还有清理工作:

    public void Dispose()
{
if (isDisposed)
{
return;
}
isDisposed = true;
if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
{
lock (itemStore)
{
while (itemStore.Count > 0)
{
IDisposable disposable = (IDisposable)itemStore.Fetch();
disposable.Dispose();
}
}
}
sync.Close();
}


public bool IsDisposed
{
get { return isDisposed; }
}

IsDisposed属性的用途很快就会变得清晰起来。如果实现了 IDisposable,那么主 Dispose方法实际上所做的就是处置实际的池项。


现在基本上可以使用 try-finally块的原样使用它,但是我不喜欢这种语法,因为如果你开始在类和方法之间传递池资源,那么它会变得非常混乱。有可能使用资源的主类甚至没有 作为对池的引用。它实际上变得非常混乱,因此更好的方法是创建一个“智能”池对象。

假设我们从以下简单的接口/类开始:

public interface IFoo : IDisposable
{
void Test();
}


public class Foo : IFoo
{
private static int count = 0;


private int num;


public Foo()
{
num = Interlocked.Increment(ref count);
}


public void Dispose()
{
Console.WriteLine("Goodbye from Foo #{0}", num);
}


public void Test()
{
Console.WriteLine("Hello from Foo #{0}", num);
}
}

下面是我们假想的一次性 Foo资源,它实现了 IFoo,并且有一些用于生成唯一标识的样板代码。我们要做的是创建另一个特殊的池对象:

public class PooledFoo : IFoo
{
private Foo internalFoo;
private Pool<IFoo> pool;


public PooledFoo(Pool<IFoo> pool)
{
if (pool == null)
throw new ArgumentNullException("pool");


this.pool = pool;
this.internalFoo = new Foo();
}


public void Dispose()
{
if (pool.IsDisposed)
{
internalFoo.Dispose();
}
else
{
pool.Release(this);
}
}


public void Test()
{
internalFoo.Test();
}
}

这只是将所有“真正的”方法代理到它的内部 IFoo中(我们可以使用像 Castle 这样的动态代理库,但是我不想讨论这个问题)。它还维护对创建它的 Pool的引用,这样当我们使用 Dispose这个对象时,它会自动将自己释放回池中。当池已经被释放时,就是 除了-这意味着我们处于“清理”模式,在这种情况下实际上是 清理内部资源


使用上面的方法,我们可以这样编写代码:

// Create the pool early
Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p),
LoadingMode.Lazy, AccessMode.Circular);


// Sometime later on...
using (IFoo foo = pool.Acquire())
{
foo.Test();
}

这是一个 非常能做的好事。这意味着 用途IFoo的代码(相对于创建它的代码)实际上并不需要知道池。您甚至可以使用您最喜欢的 DI 库和 Pool<T>作为提供者/工厂来创建 注射 IFoo对象。


我已经把 在 PasteBin 上完成代码的复制和粘贴享受。还有一个简短的 测试程序,您可以使用它来处理不同的加载/访问模式和多线程条件,以确保它是线程安全的,不存在 bug。

如果你对此有任何问题或担忧,请告诉我。

我非常喜欢 Aronaught 的实现——特别是因为他处理通过使用信号量变得可用的资源等待。我想补充几点:

  1. sync.WaitOne()更改为 sync.WaitOne(timeout),并在 Acquire(int timeout)方法上将超时作为参数公开。这还需要在线程超时等待对象变为可用时处理该条件。
  2. 添加 Recycle(T item)方法以处理在发生故障时需要回收对象的情况,例如。

这是另一个实现,池中的对象数量有限。

public class ObjectPool<T>
where T : class
{
private readonly int maxSize;
private Func<T> constructor;
private int currentSize;
private Queue<T> pool;
private AutoResetEvent poolReleasedEvent;


public ObjectPool(int maxSize, Func<T> constructor)
{
this.maxSize = maxSize;
this.constructor = constructor;
this.currentSize = 0;
this.pool = new Queue<T>();
this.poolReleasedEvent = new AutoResetEvent(false);
}


public T GetFromPool()
{
T item = null;
do
{
lock (this)
{
if (this.pool.Count == 0)
{
if (this.currentSize < this.maxSize)
{
item = this.constructor();
this.currentSize++;
}
}
else
{
item = this.pool.Dequeue();
}
}


if (null == item)
{
this.poolReleasedEvent.WaitOne();
}
}
while (null == item);
return item;
}


public void ReturnToPool(T item)
{
lock (this)
{
this.pool.Enqueue(item);
this.poolReleasedEvent.Set();
}
}
}

.NET 内核中的对象池

Dotnet 核心具有添加到基类库(BCL)中的对象池的实现。您可以阅读原始的 GitHub 问题 给你并查看 系统,缓冲器的代码。目前,ArrayPool是唯一可用的类型,用于池数组。有一个不错的博客文章 给你

namespace System.Buffers
{
public abstract class ArrayPool<T>
{
public static ArrayPool<T> Shared { get; internal set; }


public static ArrayPool<T> Create(int maxBufferSize = <number>, int numberOfBuffers = <number>);


public T[] Rent(int size);


public T[] Enlarge(T[] buffer, int newSize, bool clearBuffer = false);


public void Return(T[] buffer, bool clearBuffer = false);
}
}

在 ASP.NET Core 中可以看到它的使用示例。因为 ASP.NET Core 位于 dotnet 核心 BCL 中,所以它可以与其他对象(如 Newtonsoft)共享它的对象池。JSON 的 JSON 序列化程序。你可以阅读 这个的博客文章了解更多关于牛顿软件的信息。是杰森干的。

Microsoft Roslyn C # 编译器中的对象池

新的 Microsoft Roslyn C # 编译器包含 目标池类型,它用于汇集经常使用的对象,这些对象通常会被更新,并且经常被垃圾收集。这减少了必须执行的垃圾收集操作的数量和大小。有几个不同的子实现都使用 ObjectPool (参见: 为什么罗斯林有这么多对象池的实现?)。

1-共享池-存储一个包含20个对象的池,如果使用 BigDefault,则存储100个对象。

// Example 1 - In a using statement, so the object gets freed at the end.
using (PooledObject<Foo> pooledObject = SharedPools.Default<List<Foo>>().GetPooledObject())
{
// Do something with pooledObject.Object
}


// Example 2 - No using statement so you need to be sure no exceptions are not thrown.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
// Do something with list
SharedPools.Default<List<Foo>>().Free(list);


// Example 3 - I have also seen this variation of the above pattern, which ends up the same as Example 1, except Example 1 seems to create a new instance of the IDisposable [PooledObject<T>][4] object. This is probably the preferred option if you want fewer GC's.
List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear();
try
{
// Do something with list
}
finally
{
SharedPools.Default<List<Foo>>().Free(list);
}

2-列表池StringBuilderPool-不是严格分离的实现,而是上面所示的针对 List 和 StringBuilder 的 SharedPools 实现的包装器。因此,这将重用存储在 SharedPools 中的对象池。

// Example 1 - No using statement so you need to be sure no exceptions are thrown.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
// Do something with stringBuilder
StringBuilderPool.Free(stringBuilder);


// Example 2 - Safer version of Example 1.
StringBuilder stringBuilder= StringBuilderPool.Allocate();
try
{
// Do something with stringBuilder
}
finally
{
StringBuilderPool.Free(stringBuilder);
}

3-PooledDictionaryPooledHashSet-它们直接使用 ObjectPool,并有一个完全独立的对象池。存储128个对象的池。

// Example 1
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
// Do something with hashSet.
hashSet.Free();


// Example 2 - Safer version of Example 1.
PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance()
try
{
// Do something with hashSet.
}
finally
{
hashSet.Free();
}

微软 IO. 可回收记忆流

这个库为 MemoryStream对象提供池。这是 System.IO.MemoryStream的临时替代品。它有完全相同的语义。它是由必应的工程师设计的。阅读博客文章 给你或者查看 GitHub上的代码。

var sourceBuffer = new byte[]{0,1,2,3,4,5,6,7};
var manager = new RecyclableMemoryStreamManager();
using (var stream = manager.GetStream())
{
stream.Write(sourceBuffer, 0, sourceBuffer.Length);
}

请注意,RecyclableMemoryStreamManager应该声明一次,并且它将在整个流程中存在——这就是池。如果您愿意,使用多个池是完全可以的。