foreach(IEnumerable<User> batch in users.Batch(1000))
// use batch
If simple usage of library is not an option, you can reuse implementation:
public static IEnumerable<IEnumerable<T>> Batch<T>(
this IEnumerable<T> source, int size)
{
T[] bucket = null;
var count = 0;
foreach (var item in source)
{
if (bucket == null)
bucket = new T[size];
bucket[count++] = item;
if (count != size)
continue;
yield return bucket.Select(x => x);
bucket = null;
count = 0;
}
// Return the last bucket with all remaining elements
if (bucket != null && count > 0)
{
Array.Resize(ref bucket, count);
yield return bucket.Select(x => x);
}
}
BTW for performance you can simply return bucket without calling Select(x => x). Select is optimized for arrays, but selector delegate still would be invoked on each item. So, in your case it's better to use
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(
this IEnumerable<TSource> source,
int batchSize)
{
var batch = new List<TSource>();
foreach (var item in source)
{
batch.Add(item);
if (batch.Count == batchSize)
{
yield return batch;
batch = new List<TSource>();
}
}
if (batch.Any()) yield return batch;
}
List<MyClass> batch = new List<MyClass>();
foreach (MyClass item in items)
{
batch.Add(item);
if (batch.Count == 1000)
{
// Perform operation on batch
batch.Clear();
}
}
// Process last batch
if (batch.Any())
{
// Perform operation on batch
}
And you could generalize this into a generic method, like this:
static void PerformBatchedOperation<T>(IEnumerable<T> items,
Action<IEnumerable<T>> operation,
int batchSize)
{
List<T> batch = new List<T>();
foreach (T item in items)
{
batch.Add(item);
if (batch.Count == batchSize)
{
operation(batch);
batch.Clear();
}
}
// Process last batch
if (batch.Any())
{
operation(batch);
}
}
In a streaming context, where the enumerator might get blocked in the middle of the batch, simply because the value is not yet produced (yield) it is useful to have a timeout method so that the last batch is produced after a given time. I used this for example for tailing a cursor in MongoDB. It's a little bit complicated, because the enumeration has to be done in another thread.
public static IEnumerable<List<T>> TimedBatch<T>(this IEnumerable<T> collection, double timeoutMilliseconds, long maxItems)
{
object _lock = new object();
List<T> batch = new List<T>();
AutoResetEvent yieldEventTriggered = new AutoResetEvent(false);
AutoResetEvent yieldEventFinished = new AutoResetEvent(false);
bool yieldEventTriggering = false;
var task = Task.Run(delegate
{
foreach (T item in collection)
{
lock (_lock)
{
batch.Add(item);
if (batch.Count == maxItems)
{
yieldEventTriggering = true;
yieldEventTriggered.Set();
}
}
if (yieldEventTriggering)
{
yieldEventFinished.WaitOne(); //wait for the yield to finish, and batch to be cleaned
yieldEventTriggering = false;
}
}
});
while (!task.IsCompleted)
{
//Wait for the event to be triggered, or the timeout to finish
yieldEventTriggered.WaitOne(TimeSpan.FromMilliseconds(timeoutMilliseconds));
lock (_lock)
{
if (batch.Count > 0) //yield return only if the batch accumulated something
{
yield return batch;
batch.Clear();
yieldEventFinished.Set();
}
}
}
task.Wait();
}