C + + 0x 没有信号量? 如何同步线程?

C + + 0x 真的没有信号量吗?关于 Stack Overflow,已经有一些关于信号量使用的问题。我一直使用它们(posx 信号量)让一个线程等待另一个线程中的某个事件:

void thread0(...)
{
doSomething0();


event1.wait();


...
}


void thread1(...)
{
doSomething1();


event1.post();


...
}

如果我使用互斥对象:

void thread0(...)
{
doSomething0();


event1.lock(); event1.unlock();


...
}


void thread1(...)
{
event1.lock();


doSomethingth1();


event1.unlock();


...
}

问题: 它很丑陋,而且不能保证 thread1会首先锁定互斥锁(假设同一个线程应该锁定和解锁互斥锁,那么在 thread0和 thread1启动之前也不能锁定 event1)。

因此,既然 Boost 也没有信号量,那么实现上述目标的最简单方法是什么呢?

100937 次浏览

您可以使用互斥量和条件变量。您可以通过互斥对象获得独占访问权限,检查是否要继续或需要等待另一端。如果你需要等待,你等待的条件。当另一个线程确定您可以继续时,它会发出条件信号。

在升级: : 线程库中有一个简短的 例子,您很可能只需要复制它(C + + 0x 和升级线程库非常相似)。

您可以很容易地从一个互斥量和一个条件变量构建一个:

#include <mutex>
#include <condition_variable>


class semaphore {
std::mutex mutex_;
std::condition_variable condition_;
unsigned long count_ = 0; // Initialized as locked.


public:
void release() {
std::lock_guard<decltype(mutex_)> lock(mutex_);
++count_;
condition_.notify_one();
}


void acquire() {
std::unique_lock<decltype(mutex_)> lock(mutex_);
while(!count_) // Handle spurious wake-ups.
condition_.wait(lock);
--count_;
}


bool try_acquire() {
std::lock_guard<decltype(mutex_)> lock(mutex_);
if(count_) {
--count_;
return true;
}
return false;
}
};

根据 posx 信号量,我要补充

class semaphore
{
...
bool trywait()
{
boost::mutex::scoped_lock lock(mutex_);
if(count_)
{
--count_;
return true;
}
else
{
return false;
}
}
};

而且我更喜欢在一个方便的抽象级别上使用同步机制,而不是总是使用更基本的操作符复制粘贴拼接在一起的版本。

基于 Maxim Yegorushkin 的回答,我尝试用 C + + 11风格编写这个示例。

#include <mutex>
#include <condition_variable>


class Semaphore {
public:
Semaphore (int count_ = 0)
: count(count_) {}


inline void notify()
{
std::unique_lock<std::mutex> lock(mtx);
count++;
cv.notify_one();
}


inline void wait()
{
std::unique_lock<std::mutex> lock(mtx);


while(count == 0){
cv.wait(lock);
}
count--;
}


private:
std::mutex mtx;
std::condition_variable cv;
int count;
};

也可以是线程中有用的 RAII 信号量包装器:

class ScopedSemaphore
{
public:
explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
ScopedSemaphore(const ScopedSemaphore&) = delete;
~ScopedSemaphore() { m_Semaphore.Notify(); }


ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;


private:
Semaphore& m_Semaphore;
};

多线程应用程序中的使用示例:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;


for (...)
{
...
auto t = new std::thread([..., &semaphore]
{
ScopedSemaphore scopedSemaphore(semaphore);
...
}
);
threads.push_back(t);
}


for (auto& t : threads)
t.join();

我决定尽我所能以标准的风格写出最健壮/通用的 C + + 11信号量(注意,通常你只会使用名字 semaphore,就像通常使用 string而不是 basic_string一样) :

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
using native_handle_type = typename CondVar::native_handle_type;


explicit basic_semaphore(size_t count = 0);
basic_semaphore(const basic_semaphore&) = delete;
basic_semaphore(basic_semaphore&&) = delete;
basic_semaphore& operator=(const basic_semaphore&) = delete;
basic_semaphore& operator=(basic_semaphore&&) = delete;


void notify();
void wait();
bool try_wait();
template<class Rep, class Period>
bool wait_for(const std::chrono::duration<Rep, Period>& d);
template<class Clock, class Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& t);


native_handle_type native_handle();


private:
Mutex   mMutex;
CondVar mCv;
size_t  mCount;
};


using semaphore = basic_semaphore<std::mutex, std::condition_variable>;


template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
: mCount{count}
{}


template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
std::lock_guard<Mutex> lock{mMutex};
++mCount;
mCv.notify_one();
}


template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
std::unique_lock<Mutex> lock{mMutex};
mCv.wait(lock, [&]{ return mCount > 0; });
--mCount;
}


template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
std::lock_guard<Mutex> lock{mMutex};


if (mCount > 0) {
--mCount;
return true;
}


return false;
}


template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
std::unique_lock<Mutex> lock{mMutex};
auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });


if (finished)
--mCount;


return finished;
}


template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
std::unique_lock<Mutex> lock{mMutex};
auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });


if (finished)
--mCount;


return finished;
}


template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
return mCv.native_handle();
}

你也可以查看 Cpp11-on-multicore-它有一个可移植的和最佳的信号量实现。

该存储库还包含补充 c + + 11线程的其他线程优点。

如果有人对原子版本感兴趣,下面是实现。预计性能比互斥和条件变量版本要好。

class semaphore_atomic
{
public:
void notify() {
count_.fetch_add(1, std::memory_order_release);
}


void wait() {
while (true) {
int count = count_.load(std::memory_order_relaxed);
if (count > 0) {
if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
break;
}
}
}
}


bool try_wait() {
int count = count_.load(std::memory_order_relaxed);
if (count > 0) {
if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
return true;
}
}
return false;
}
private:
std::atomic_int count_{0};
};

我发现 share _ ptr 和 soft _ ptr 完成了我需要的工作,它们有一个很长的列表。我的问题是,我有几个客户希望与主机的内部数据交互。通常,主机自己更新数据,但是,如果客户机请求更新,主机需要停止更新,直到没有客户机访问主机数据为止。同时,客户端可以请求独占访问,这样其他客户端和主机都不能修改该主机数据。

我是这样做的,我创建了一个 struct:

struct UpdateLock
{
typedef std::shared_ptr< UpdateLock > ptr;
};

每个客户都会有一个这样的成员:

UpdateLock::ptr m_myLock;

然后,主机将拥有一个弱 _ ptr 成员用于排他性,以及一个弱 _ ptrs 列表用于非排他性锁:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

有一个函数用于启用锁定,另一个函数用于检查主机是否被锁定:

UpdateLock::ptr LockUpdate( bool exclusive );
bool IsUpdateLocked( bool exclusive ) const;

我在 LockUpdate、 IsUpdateLocked 中测试锁,并定期在主机的 Update 例程中测试锁。对锁进行测试很简单,只需检查孱弱 _ ptr 是否过期,然后从 m _ lock 列表中删除任何过期的内容(我只在主机更新期间进行此操作) ,就可以检查列表是否为空; 同时,当客户端重置挂起的 share _ ptr 时,我会自动解锁,当客户端被自动销毁时也会出现这种情况。

总体效果是,由于客户机很少需要独占性(通常只为添加和删除保留) ,大多数情况下,对 LockUpdate 的请求(false) ,也就是说非独占性,只要(!锁)。一个 LockUpdate (true) ,一个独占请求,只有当两个(!和(m _ lock)。空()。

可以添加一个队列来减轻排他锁和非排他锁之间的冲突,但是,到目前为止我还没有遇到冲突,所以我打算等到那个时候再添加解决方案(主要是因为我有一个实际的测试条件)。

到目前为止,这个工作很好地满足了我的需要; 我可以想象扩展它的需要,以及扩展使用可能出现的一些问题,但是,这个工作很快就实现了,并且只需要很少的自定义代码。

C + + 20终于有了信号量 -std::counting_semaphore<max_count>

这些方法(至少)有以下几种:

  • acquire()(阻塞)
  • try_acquire()(非阻塞,立即返回)
  • try_acquire_for()(非阻塞,持续时间)
  • try_acquire_until()(非阻塞,需要一段时间来停止尝试)
  • release()

你可以阅读 这些 CppCon 2019演示幻灯片,或者观看 视频。还有官方的提案 P0514R4,但它可能不是实际的 C + + 20的最新版本。

与其他答案不同的是,我提出了一个新的版本:

  1. 在删除之前解除所有等待线程的块。在这种情况下,删除信号量将唤醒所有等待的线程,只有在所有人都醒来之后,信号量析构函数才会退出。
  2. wait()调用有一个参数,用于在超时(以毫秒为单位)过后自动解锁调用线程。
  3. 在构造函数上有一个选项,可以将可用资源计数限制到信号量初始化时的计数。这样,多次调用 notify()将不会增加信号量所拥有的资源数量。
#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>


std::recursive_mutex g_sync_mutex;
#define sync(x) do { \
std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
x; \
} while (false);


class Semaphore {
int _count;
bool _limit;
int _all_resources;
int _wakedup;
std::mutex _mutex;
std::condition_variable_any _condition_variable;


public:
/**
* count - how many resources this semaphore holds
* limit - limit notify() calls only up to the count value (available resources)
*/
Semaphore (int count, bool limit)
: _count(count),
_limit(limit),
_all_resources(count),
_wakedup(count)
{
}


/**
* Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later)
*/
virtual ~Semaphore () {
std::unique_lock<std::mutex> lock(_mutex);
_wakeup(lock);
}


void _wakeup(std::unique_lock<std::mutex>& lock) {
int lastwakeup = 0;


while( _wakedup < _all_resources ) {
lock.unlock();
notify();
lock.lock();
// avoids 100% CPU usage if someone is not waking up properly
if (lastwakeup == _wakedup) {
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
}
lastwakeup = _wakedup;
}
}


// Mutex and condition variables are not movable and there is no need for smart pointers yet
Semaphore(const Semaphore&) = delete;
Semaphore& operator =(const Semaphore&) = delete;
Semaphore(const Semaphore&&) = delete;
Semaphore& operator =(const Semaphore&&) = delete;


/**
* Release one acquired resource.
*/
void notify()
{
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
_count++;
if (_limit && _count > _all_resources) {
_count = _all_resources;
}
_condition_variable.notify_one();
}


/**
* This function never blocks!
* Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true.
*/
bool try_acquire() {
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
if(_count <= 0) {
return false;
}
_count--;
return true;
}


/**
* Return true if the timeout expired, otherwise return false.
* timeout - how many milliseconds to wait before automatically unlocking the wait() call.
*/
bool wait(int timeout = 0) {
std::unique_lock<std::mutex> lock(_mutex);
// sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl);
_count--;
_wakedup--;
try {
std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();


while(_count < 0) {
if (timeout < 1) {
_condition_variable.wait(lock);
}
else {
std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));


if ( std::cv_status::timeout == status) {
_count++;
_wakedup++;
return true;
}
}
}
}
catch (...) {
_count++;
_wakedup++;
throw;
}
_wakedup++;
return false;
}


/**
* Return true if calling wait() will block the calling thread
*/
bool locked() {
std::unique_lock<std::mutex> lock(_mutex);
return _count <= 0;
}


/**
* Return true the semaphore has at least all resources available (since when it was created)
*/
bool freed() {
std::unique_lock<std::mutex> lock(_mutex);
return _count >= _all_resources;
}


/**
* Return how many resources are available:
* - 0 means not free resources and calling wait() will block te calling thread
* - a negative value means there are several threads being blocked
* - a positive value means there are no threads waiting
*/
int count() {
std::unique_lock<std::mutex> lock(_mutex);
return _count;
}


/**
* Wake everybody who is waiting and reset the semaphore to its initial value.
*/
void reset() {
std::unique_lock<std::mutex> lock(_mutex);
if(_count < 0) {
_wakeup(lock);
}
_count = _all_resources;
}
};

打印当前时间戳的实用工具:

std::string getTime() {
char buffer[20];
#if defined( WIN32 )
SYSTEMTIME wlocaltime;
GetLocalTime(&wlocaltime);
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
duration -= hours;
auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
duration -= minutes;
auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
duration -= seconds;
auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
duration -= milliseconds;
time_t theTime = time( NULL );
struct tm* aTime = localtime( &theTime );
::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
return buffer;
}

使用此信号量的示例程序:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
std::cerr << getTime() << "Creating Semaphore" << std::endl;
Semaphore* semaphore = new Semaphore(1, false);
semaphore->wait(1000);
semaphore->wait(1000);
std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;


std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
delete semaphore;


std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
return 0;
}

输出示例:

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

使用 EventLoop在一段时间后解锁信号量的额外函数:

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);


if (semaphore->try_acquire()) {
eventloop.enqueue( timeout, [waiting, source, semaphore]{
if ( (*waiting).load() ) {
sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
semaphore->notify();
}
} );
}
else {
semaphore->wait(timeout);
}
return waiting;
}


Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");

这是个老问题,但我想提供另一个解决方案。 似乎您需要的不是一个信号量,而是一个类似 Windows 事件的事件。 非常有效的活动可以这样做:

#ifdef _MSC_VER
#include <concrt.h>
#else
// pthread implementation
#include <cstddef>
#include <cstdint>
#include <shared_mutex>


namespace Concurrency
{
const unsigned int COOPERATIVE_TIMEOUT_INFINITE = (unsigned int)-1;
const size_t COOPERATIVE_WAIT_TIMEOUT = SIZE_MAX;


class event
{
public:
event();
~event();


size_t wait(unsigned int timeout = COOPERATIVE_TIMEOUT_INFINITE);
void set();
void reset();
static size_t wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);


static const unsigned int timeout_infinite = COOPERATIVE_TIMEOUT_INFINITE;
    

private:
int d;
std::shared_mutex guard;
};


};


namespace concurrency = Concurrency;


#include <unistd.h>
#include <errno.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>


#include <chrono>


#include "../HandleHolder.h"


typedef CommonHolder<int, close> fd_holder;


namespace Concurrency
{
int watch(int ep_fd, int fd)
{
epoll_event ep_event;
ep_event.events = EPOLLIN;
ep_event.data.fd = fd;


return epoll_ctl(ep_fd, EPOLL_CTL_ADD, fd, &ep_event);
}


event::event()
: d(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
{
}


event::~event()
{
std::unique_lock<std::shared_mutex> lock(guard);
close(d);
d = -1;
}


size_t event::wait(unsigned int timeout)
{
fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
{
std::shared_lock<std::shared_mutex> lock(guard);
if (d == -1 || watch(ep_fd.GetHandle(), d) < 0)
return COOPERATIVE_WAIT_TIMEOUT;
}


epoll_event ep_event;
return epoll_wait(ep_fd.GetHandle(), &ep_event, 1, timeout) == 1 && (ep_event.events & EPOLLIN) ? 0 : COOPERATIVE_WAIT_TIMEOUT;
}


void event::set()
{
uint64_t count = 1;
write(d, &count, sizeof(count));
}


void event::reset()
{
uint64_t count;
read(d, &count, sizeof(count));
}


size_t event::wait_for_multiple(event** _PPEvents, size_t _Count, bool _FWaitAll, unsigned int _Timeout)
{
if (_FWaitAll) // not implemented
std::abort();


const auto deadline = _Timeout != COOPERATIVE_TIMEOUT_INFINITE ? std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() + _Timeout : COOPERATIVE_TIMEOUT_INFINITE;


fd_holder ep_fd(epoll_create1(EPOLL_CLOEXEC));
int fds[_Count];
for (int i = 0; i < _Count; ++i)
{
std::shared_lock<std::shared_mutex> lock(_PPEvents[i]->guard);
fds[i] = _PPEvents[i]->d;
if (fds[i] != -1 && watch(ep_fd.GetHandle(), fds[i]) < 0)
fds[i] = -1;
}


epoll_event ep_events[_Count];


// Вызов epoll_wait может быть прерван сигналом. Ждём весь тайм-аут, так же, как в Windows
int res = 0;
while (true)
{
res = epoll_wait(ep_fd.GetHandle(), &ep_events[0], _Count, _Timeout);
if (res == -1 && errno == EINTR && std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() < deadline)
continue;
break;
}


for (int i = 0; i < _Count; ++i)
{
if (fds[i] == -1)
continue;


for (int j = 0; j < res; ++j)
if (ep_events[j].data.fd == fds[i] && (ep_events[j].events & EPOLLIN))
return i;
}


return COOPERATIVE_WAIT_TIMEOUT;
}
};
#endif

然后用 并发: : 事件