C + + 11中的线程池

相关问题 :

关于 C + + 11:

关于 Boost:


我如何让一个 一堆线把任务发给,而不是创建和删除他们一遍又一遍?这意味着持久线程不需要连接就可以重新同步。


我的代码是这样的:

namespace {
std::vector<std::thread> workers;


int total = 4;
int arr[4] = {0};


void each_thread_does(int i) {
arr[i] += 2;
}
}


int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}

与在每次迭代中创建和连接线程不同,我更愿意在每次迭代中将任务发送给工作线程,并且只创建一次。

297815 次浏览

线程池意味着所有的线程都在运行,所有的时间,换句话说,线程函数永远不会返回。为了给线程一些有意义的事情去做,您必须设计一个线程间通信的系统,这既是为了告诉线程有事情要做,也是为了通信实际的工作数据。

通常这将涉及某种并发数据结构,并且每个线程可能会睡眠在某种条件变量上,当有工作要做时会通知这些条件变量。在接收到通知后,一个或多个线程被唤醒,从并发数据结构中恢复任务,处理它,并以类似的方式存储结果。

然后线程会继续检查是否有更多的工作要做,如果没有回到睡眠状态。

结果是你必须自己设计所有这些,因为没有一个普遍适用的“工作”的自然概念。这是一个相当大的工作量,有一些微妙的问题,你必须得到正确的。(如果你喜欢一个在后台为你进行线程管理的系统,你可以在 Go 中进行编程。)

线程池的核心是一组全部绑定到作为事件循环的函数的线程。这些线程将无休止地等待任务被执行,或者它们自己的终止。

线程池作业是提供一个提交作业的接口,定义(或许还可以修改)运行这些作业的策略(调度规则、线程实例化、池的大小) ,并监视线程和相关资源的状态。

因此,对于一个多功能池来说,首先必须定义一个任务是什么,它是如何启动、中断的,结果是什么(请参阅这个问题的承诺和未来的概念) ,线程将要响应什么类型的事件,它们将如何处理这些事件,这些事件应该如何与任务处理的事件区分开来。如您所见,这可能会变得非常复杂,并且随着解决方案变得越来越复杂,会对线程的工作方式施加限制。

当前用于处理事件的工具相当简单(*) : 互斥锁、条件变量等原语,以及在此基础上的一些抽象(锁、障碍)。但是在某些情况下,这些抽象可能不适合(参见相关的 有个问题) ,必须恢复使用原语。

其它问题也必须得到解决:

  • 信号
  • 输入/输出
  • 硬件(处理器亲和性,异构设置)

这些东西在你的环境下会怎么样?

对于一个类似问题的回答 指向一个用于升级和 stl 的现有实现。

我为另一个问题提供了线程池的 非常粗糙的执行,它没有解决上面列出的许多问题。你可能需要在此基础上进一步发展。您可能还想看看其他语言中的现有框架,以便找到灵感。


(*)我不认为这是个问题,恰恰相反。我认为这正是 C + + 从 C 继承而来的精神。

编辑: 现在需要 C + + 17和概念(截至9/12/16,只有 g + + 6.0 + 就足够了)

由于这个原因,模板演绎要准确得多,所以值得花时间去获得一个更新的编译器。我还没有找到一个需要显式模板参数的函数。

它现在还接受任何适当的可调用对象(仍然是静态类型安全的! ! !)。

它现在还包含一个可选的绿色线程优先级线程池,使用相同的 API。但是,这个类只是 POSIX。它使用 ucontext_t API 进行用户空间任务切换。


我为此创建了一个简单的库。下面给出一个用法示例。(我之所以回答这个问题,是因为这是我在决定有必要自己写之前发现的东西之一。)

bool is_prime(int n){
// Determine if n is prime.
}


int main(){
thread_pool pool(8); // 8 threads


list<future<bool>> results;
for(int n = 2;n < 10000;n++){
// Submit a job to the pool.
results.emplace_back(pool.async(is_prime, n));
}


int n = 2;
for(auto i = results.begin();i != results.end();i++, n++){
// i is an iterator pointing to a future representing the result of is_prime(n)
cout << n << " ";
bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
if(prime)
cout << "is prime";
else
cout << "is not prime";
cout << endl;
}
}

您可以传递 async任何具有任何(或 void)返回值和任何(或无)参数的函数,它将返回相应的 std::future。为了得到结果(或者只是等待任务完成) ,在将来调用 get()

这是 Github: https://github.com/Tyler-Hardin/thread_pool

可以使用 C + + 线程池库 https://github.com/vit-vit/ctpl

然后,您编写的代码可以替换为以下代码

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library


int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}

您将获得所需数量的线程,并且不会在迭代中一遍又一遍地创建和删除它们。

这是从 我的回答改编到另一个非常类似的职位。

让我们构建一个 ThreadPool类:

class ThreadPool {
public:
void Start();
void QueueJob(const std::function<void()>& job);
void Stop();
void busy();


private:
void ThreadLoop();


bool should_terminate = false;           // Tells threads to stop looking for jobs
std::mutex queue_mutex;                  // Prevents data races to the job queue
std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination
std::vector<std::thread> threads;
std::queue<std::function<void()>> jobs;
};
  1. ThreadPool::Start

对于有效的线程池实现,一旦根据 num_threads创建了线程,最好不要这样做 创造新的或破坏旧的(通过加入)。将会有一个性能惩罚,甚至可能使你的 应用程序比串行版慢。因此,我们保留了一个可以在任何时候使用的线程池(如果它们 并不是已经在运行一个作业)。

每个线程都应该运行自己的无限循环,不断地等待新任务的获取和运行。

void ThreadPool::Start() {
const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
threads.resize(num_threads);
for (uint32_t i = 0; i < num_threads; i++) {
threads.at(i) = std::thread(ThreadLoop);
}
}
  1. ThreadPool::ThreadLoop

无限循环函数。这是一个等待任务队列打开的 while (true)循环。

void ThreadPool::ThreadLoop() {
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(queue_mutex);
mutex_condition.wait(lock, [this] {
return !jobs.empty() || should_terminate;
});
if (should_terminate) {
return;
}
job = jobs.front();
jobs.pop();
}
job();
}
}
  1. ThreadPool::QueueJob

向池中添加一个新作业; 使用锁以避免数据竞争。

void ThreadPool::QueueJob(const std::function<void()>& job) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
jobs.push(job);
}
mutex_condition.notify_one();
}

使用方法:

thread_pool->QueueJob([] { /* ... */ });
  1. ThreadPool::busy
void ThreadPool::busy() {
bool poolbusy;
{
std::unique_lock<std::mutex> lock(queue_mutex);
poolbusy = jobs.empty();
}
return poolbusy;
}

忙()函数可以在 while 循环中使用,这样主线程可以等待线程池完成所有任务,然后再调用线程池析构函数。

  1. ThreadPool::Stop

停止游泳。

void ThreadPool::Stop() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
should_terminate = true;
}
mutex_condition.notify_all();
for (std::thread& active_thread : threads) {
active_thread.join();
}
threads.clear();
}

一旦你集成了这些元素,你就有了自己的动态线程池 工作。

如果有一些语法错误,我道歉,我输入了这个代码,我有一个糟糕的记忆。对不起,我不能提供 你完整的线程池代码; 这将违反我的工作完整性。

备注:

  • 使用匿名代码块是为了当它们退出时,在它们内部创建 std::unique_lock变量 超出范围,解锁互斥锁。
  • ThreadPool::Stop不会终止任何当前正在运行的作业,它只是等待它们通过 active_thread.join()完成。

这样的东西可能会有帮助(从一个工作的应用程序)。

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>


struct thread_pool {
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;


thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
auto worker = [this] { return service.run(); };
grp.add_thread(new boost::thread(worker));
}
}


template<class F>
void enqueue(F f) {
service.post(f);
}


~thread_pool() {
service_worker.reset();
grp.join_all();
service.stop();
}


private:
boost::asio::io_service service;
asio_worker service_worker;
boost::thread_group grp;
};

你可以这样使用它:

thread_pool pool(2);


pool.enqueue([] {
std::cout << "Hello from Task 1\n";
});


pool.enqueue([] {
std::cout << "Hello from Task 2\n";
});

请记住,重新发明 有效率异步排队机制并非易事。

Io _ service 是一个非常有效的实现,或者实际上是一个特定于平台的包装器集合(例如,它在 Windows 上包装 I/O 完成端口)。

Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

Function _ pool. h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>


class Function_pool
{


private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;


public:


Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};

Function _ pool. cpp 函数

#include "function_pool.h"


Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}


Function_pool::~Function_pool()
{
}


void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}


void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}


void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}

Main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>


Function_pool func_pool;


class quit_worker_exception : public std::exception {};


void example_function()
{
std::cout << "bla" << std::endl;
}


int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}


//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}

你可以使用来自升级库的 Thread _ pool:

void my_task(){...}


int main(){
int threadNumbers = thread::hardware_concurrency();
boost::asio::thread_pool pool(threadNumbers);


// Submit a function to the pool.
boost::asio::post(pool, my_task);


// Submit a lambda object to the pool.
boost::asio::post(pool, []() {
...
});
}

你也可以从开源社区使用 线程池:

void first_task() {...}
void second_task() {...}


int main(){
int threadNumbers = thread::hardware_concurrency();
pool tp(threadNumbers);


// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
}

看起来线程池是一个非常流行的问题/练习: -)

我最近用现代 C + + 编写了一个; 它属于我,在这里可以公开使用—— https://github.com/yurir-dev/threadpool

它支持模板化的返回值、核心固定和某些任务的排序。 所有实现在两个.h 文件中。

所以,最初的问题是这样的:

#include "tp/threadpool.h"


int arr[5] = { 0 };


concurency::threadPool<void> tp;
tp.start(std::thread::hardware_concurrency());


std::vector<std::future<void>> futures;
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
futures.push_back(tp.push([&arr, j]() {
arr[j] += 2;
}));
}
}


// wait until all pushed tasks are finished.
for (auto& f : futures)
f.get();
// or just tp.end(); // will kill all the threads


arr[4] = *std::min_element(arr, arr + 4);

我发现,如果线程池终止并在任务队列中留下一些任务,那么挂起的任务的 future. get ()调用挂起在调用方。如何在线程池中仅使用包装器 std: : 函数设置将来的异常?

template <class F, class... Args>
std::future<std::result_of_t<F(Args...)>> enqueue(F &&f, Args &&...args) {
auto task = std::make_shared<std::packaged_task<std::result_of_t<F(Args...)>()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(_mutex);
_tasks.push([task]() -> void { (*task)(); });
}
return res;
}


class StdThreadPool {
std::vector<std::thread> _workers;
std::priority_queue<TASK> _tasks;
...
}


struct TASK {
//int _func_return_value;
std::function<void()> _func;
int priority;
...
}

Stroika 图书馆有一个线程池实现。

Stroika ThreadPool.h

ThreadPool p;
p.AddTask ([] () {doIt ();});

Stroika 的线程库也支持取消(协作)——这样当上面的 ThreadPool 超出范围时,它就会取消任何正在运行的任务(类似于 c + + 20的 jthread)。