是否限制了活动线程的数量?

根据这个代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
DoSomething(someString);
});

所有1000个线程会几乎同时产生吗?

61578 次浏览

It works out an optimal number of threads based on the number of processors/cores. They will not all spawn at once.

不,它不会启动1000个线程-是的,它会限制使用多少个线程。并行扩展使用适当数量的内核,这取决于实际有多少个 还有,有多少个已经忙碌。它为每个核心分配工作,然后使用一种称为 工作偷窃的技术,让每个线程有效地处理自己的队列,只需要在真正需要时进行任何代价高昂的跨线程访问。

看一下 PFX 团队博客 for 很多关于如何分配工作和其他各种主题的信息。

请注意,在某些情况下,您也可以指定所需的并行度。

请参阅 对于每个迭代使用一个任务,是否执行并行操作?了解可以使用的“心智模型”的概念。不过,作者确实指出: “在一天结束时,重要的是要记住,实现细节可能随时发生变化。”

在一台单核心机器上... 平行的。对于它在多个线程之间工作的集合的每个分区(块) ,但是这个数字是基于一个算法计算出来的,该算法考虑到并且似乎持续监视它分配给 ForEach 的线程所完成的工作。所以 如果 ForEach 的主体部分调用长时间运行的 IO 绑定/阻塞函数,这将使线程等待,算法将产生更多的线程,并在它们之间重新分区集合。如果线程完成得很快,并且不阻塞 IO 线程,例如简单地计算一些数字,算法将逐渐增加(或确实减少)线程的数量,直到算法认为吞吐量最优(每次迭代的平均完成时间)

基本上,所有不同的并行库函数背后的线程池,将计算出可使用的最佳线程数量。物理处理器核心的数量只是等式的一部分。内核数量和产生的线程数量之间没有简单的一对一关系。

关于同步线程的取消和处理的文档没有什么帮助。希望 MS 可以在 MSDN 中提供更好的示例。

Don't forget, the body code must be written to run on multiple threads, along with all the usual thread safety considerations, the framework does not abstract that factor... yet.

问得好。在您的示例中,即使在四核处理器上,并行级别也相当低,但是如果有一些处理器等待,并行级别可能会相当高。

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
monitor.TryTake(out int result);
monitorOut.Add(result);
});


Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

现在看看添加等待操作来模拟 HTTP 请求时会发生什么。

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);


System.Threading.Thread.Sleep(1000);


monitor.TryTake(out int result);
monitorOut.Add(result);
});


Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

我还没有做任何更改,并发/并行化的级别已经大大提高了。ParallelOptions.MaxDegreeOfParallelism可以增加并发的限制。

// Max concurrency: 43
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);


System.Threading.Thread.Sleep(1000);


monitor.TryTake(out int result);
monitorOut.Add(result);
});


Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}


// Max concurrency: 391
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);


System.Threading.Thread.Sleep(100000);


monitor.TryTake(out int result);
monitorOut.Add(result);
});


Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

I reccommend setting ParallelOptions.MaxDegreeOfParallelism. It will not necessarily increase the number of threads in use, but it will ensure you only start a sane number of threads, which seems to be your concern.

最后回答你的问题,不,你不会得到所有线程一次启动。使用并行。如果您希望完美地并行调用,则调用,例如测试竞态条件。

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
ConcurrentBag<string> monitor = new ConcurrentBag<string>();
ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(DateTime.UtcNow.Ticks.ToString());
monitor.TryTake(out string result);
monitorOut.Add(result);
});


var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}