我应该如何处理 C + + 中可移动类型的互斥锁?

按照设计,std::mutex既不能移动也不能复制。这意味着持有互斥锁的类 A将不会接收默认的 move 构造函数。

如何使这种类型的 A以线程安全的方式移动?

16673 次浏览

First of all, there must be something wrong with your design if you want to move an object containing a mutex.

But if you decide to do it anyway, you have to create a new mutex in move constructor, that is e.g:

// movable
struct B{};


class A {
B b;
std::mutex m;
public:
A(A&& a)
: b(std::move(a.b))
// m is default-initialized.
{
}
};

This is thread-safe, because the move constructor can safely assume that its argument isn't used anywhere else, so the locking of the argument isn't required.

Given there doesn't seem to be a nice, clean, easy way to answer this - Anton's solution I think is correct but its definitely debatable, unless a better answer comes up I would recommend putting such a class on the heap and looking after it via a std::unique_ptr:

auto a = std::make_unique<A>();

Its now a fully movable type and anyone who has a lock on the internal mutex whilst a move happens is still safe, even if its debatable whether this is a good thing to do

If you need copy semantics just use

auto a2 = std::make_shared<A>();

Using mutexes and C++ move semantics is an excellent way to safely and efficiently transfer data between threads.

Imagine a 'producer' thread that makes batches of strings and provides them to (one or more) consumers. Those batches could be represented by an object containing (potentially large) std::vector<std::string> objects. We absolutely want to 'move' the internal state of those vectors into their consumers without unnecessary duplication.

You simply recognize the mutex as part of the object not part of the object's state. That is, you don't want to move the mutex.

What locking you need depends on your algorithm or how generalized your objects are and what range of uses you permit.

If you only ever move from a shared state 'producer' object to a thread-local 'consuming' object you might be OK to only lock the moved from object.

If it's a more general design you will need to lock both. In such a case you need to then consider dead-locking.

If that is a potential issue then use std::lock() to acquire locks on both mutexes in a deadlock free way.

http://en.cppreference.com/w/cpp/thread/lock

As a final note you need to make sure you understand the move semantics. Recall that the moved from object is left in a valid but unknown state. It's entirely possible that a thread not performing the move has a valid reason to attempt access the moved from object when it may find that valid but unknown state.

Again my producer is just banging out strings and the consumer is taking away the whole load. In that case every time the producer tries to add to the vector it may find the vector non-empty or empty.

In short if the potential concurrent access to the moved from object amounts to a write it's likely to be OK. If it amounts to a read then think about why it's OK to read an arbitrary state.

Let's start with a bit of code:

class A
{
using MutexType = std::mutex;
using ReadLock = std::unique_lock<MutexType>;
using WriteLock = std::unique_lock<MutexType>;


mutable MutexType mut_;


std::string field1_;
std::string field2_;


public:
...

I've put some rather suggestive type aliases in there that we won't really take advantage of in C++11, but become much more useful in C++14. Be patient, we'll get there.

Your question boils down to:

How do I write the move constructor and move assignment operator for this class?

We'll start with the move constructor.

Move Constructor

Note that the member mutex has been made mutable. Strictly speaking this isn't necessary for the move members, but I'm assuming you also want copy members. If that is not the case, there is no need to make the mutex mutable.

When constructing A, you do not need to lock this->mut_. But you do need to lock the mut_ of the object you're constructing from (move or copy). This can be done like so:

    A(A&& a)
{
WriteLock rhs_lk(a.mut_);
field1_ = std::move(a.field1_);
field2_ = std::move(a.field2_);
}

Note that we had to default construct the members of this first, and then assign them values only after a.mut_ is locked.

Move Assignment

The move assignment operator is substantially more complicated because you do not know if some other thread is accessing either the lhs or rhs of the assignment expression. And in general, you need to guard against the following scenario:

// Thread 1
x = std::move(y);


// Thread 2
y = std::move(x);

Here is the move assignment operator that correctly guards the above scenario:

    A& operator=(A&& a)
{
if (this != &a)
{
WriteLock lhs_lk(mut_, std::defer_lock);
WriteLock rhs_lk(a.mut_, std::defer_lock);
std::lock(lhs_lk, rhs_lk);
field1_ = std::move(a.field1_);
field2_ = std::move(a.field2_);
}
return *this;
}

Note that one must use std::lock(m1, m2) to lock the two mutexes, instead of just locking them one after the other. If you lock them one after the other, then when two threads assign two objects in opposite order as shown above, you can get a deadlock. The point of std::lock is to avoid that deadlock.

Copy Constructor

You didn't ask about the copy members, but we might as well talk about them now (if not you, somebody will need them).

    A(const A& a)
{
ReadLock  rhs_lk(a.mut_);
field1_ = a.field1_;
field2_ = a.field2_;
}

The copy constructor looks much like the move constructor except the ReadLock alias is used instead of the WriteLock. Currently these both alias std::unique_lock<std::mutex> and so it doesn't really make any difference.

But in C++14, you will have the option of saying this:

    using MutexType = std::shared_timed_mutex;
using ReadLock  = std::shared_lock<MutexType>;
using WriteLock = std::unique_lock<MutexType>;

This may be an optimization, but not definitely. You will have to measure to determine if it is. But with this change, one can copy construct from the same rhs in multiple threads simultaneously. The C++11 solution forces you to make such threads sequential, even though the rhs isn't being modified.

Copy Assignment

For completeness, here is the copy assignment operator, which should be fairly self explanatory after reading about everything else:

    A& operator=(const A& a)
{
if (this != &a)
{
WriteLock lhs_lk(mut_, std::defer_lock);
ReadLock  rhs_lk(a.mut_, std::defer_lock);
std::lock(lhs_lk, rhs_lk);
field1_ = a.field1_;
field2_ = a.field2_;
}
return *this;
}

And etc.

Any other members or free functions that access A's state will also need to be protected if you expect multiple threads to be able to call them at once. For example, here's swap:

    friend void swap(A& x, A& y)
{
if (&x != &y)
{
WriteLock lhs_lk(x.mut_, std::defer_lock);
WriteLock rhs_lk(y.mut_, std::defer_lock);
std::lock(lhs_lk, rhs_lk);
using std::swap;
swap(x.field1_, y.field1_);
swap(x.field2_, y.field2_);
}
}

Note that if you just depend on std::swap doing the job, the locking will be at the wrong granularity, locking and unlocking between the three moves that std::swap would internally perform.

Indeed, thinking about swap can give you insight into the API you might need to provide for a "thread-safe" A, which in general will be different from a "non-thread-safe" API, because of the "locking granularity" issue.

Also note the need to protect against "self-swap". "self-swap" should be a no-op. Without the self-check one would recursively lock the same mutex. This could also be solved without the self-check by using std::recursive_mutex for MutexType.

Update

In the comments below Yakk is pretty unhappy about having to default construct things in the copy and move constructors (and he has a point). Should you feel strongly enough about this issue, so much so that you are willing to spend memory on it, you can avoid it like so:

  • Add whatever lock types you need as data members. These members must come before the data that is being protected:

    mutable MutexType mut_;
    ReadLock  read_lock_;
    WriteLock write_lock_;
    // ... other data members ...
    
  • And then in the constructors (e.g. the copy constructor) do this:

    A(const A& a)
    : read_lock_(a.mut_)
    , field1_(a.field1_)
    , field2_(a.field2_)
    {
    read_lock_.unlock();
    }
    

Oops, Yakk erased his comment before I had the chance to complete this update. But he deserves credit for pushing this issue, and getting a solution into this answer.

Update 2

And dyp came up with this good suggestion:

    A(const A& a)
: A(a, ReadLock(a.mut_))
{}
private:
A(const A& a, ReadLock rhs_lk)
: field1_(a.field1_)
, field2_(a.field2_)
{}

This is an upside-down answer. Instead of embedding "this objects needs to be synchronized" as a base of the type, instead inject it under any type.

You deal with a synchronized object very differently. One big issue is you have to worry about deadlocks (locking multiple objects). It also should basically never be your "default version of an object": synchronized objects are for objects that will be in contention, and your goal should be to minimize contention between threads, not sweep it under the rug.

But synchronizing objects is still useful. Instead of inheriting from a synchronizer, we can write a class that wraps an arbitrary type in synchronization. Users have to jump through a few hoops to do operations on the object now that it is synchronized, but they are not limited to some hand-coded limited set of operations on the object. They can compose multiple operations on the object into one, or have an operation on multiple objects.

Here is a synchronized wrapper around an arbitrary type T:

template<class T>
struct synchronized {
template<class F>
auto read(F&& f) const&->std::result_of_t<F(T const&)> {
return access(std::forward<F>(f), *this);
}
template<class F>
auto read(F&& f) &&->std::result_of_t<F(T&&)> {
return access(std::forward<F>(f), std::move(*this));
}
template<class F>
auto write(F&& f)->std::result_of_t<F(T&)> {
return access(std::forward<F>(f), *this);
}
// uses `const` ness of Syncs to determine access:
template<class F, class... Syncs>
friend auto access( F&& f, Syncs&&... syncs )->
std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
{
return access2( std::index_sequence_for<Syncs...>{}, std::forward<F>(f), std::forward<Syncs>(syncs)... );
};
synchronized(synchronized const& o):t(o.read([](T const&o){return o;})){}
synchronized(synchronized && o):t(std::move(o).read([](T&&o){return std::move(o);})){}
// special member functions:
synchronized( T & o ):t(o) {}
synchronized( T const& o ):t(o) {}
synchronized( T && o ):t(std::move(o)) {}
synchronized( T const&& o ):t(std::move(o)) {}
synchronized& operator=(T const& o) {
write([&](T& t){
t=o;
});
return *this;
}
synchronized& operator=(T && o) {
write([&](T& t){
t=std::move(o);
});
return *this;
}
private:
template<class X, class S>
static auto smart_lock(S const& s) {
return std::shared_lock< std::shared_timed_mutex >(s.m, X{});
}
template<class X, class S>
static auto smart_lock(S& s) {
return std::unique_lock< std::shared_timed_mutex >(s.m, X{});
}
template<class L>
static void lock(L& lockable) {
lockable.lock();
}
template<class...Ls>
static void lock(Ls&... lockable) {
std::lock( lockable... );
}
template<size_t...Is, class F, class...Syncs>
friend auto access2( std::index_sequence<Is...>, F&&f, Syncs&&...syncs)->
std::result_of_t< F(decltype(std::forward<Syncs>(syncs).t)...) >
{
auto locks = std::make_tuple( smart_lock<std::defer_lock_t>(syncs)... );
lock( std::get<Is>(locks)... );
return std::forward<F>(f)(std::forward<Syncs>(syncs).t ...);
}


mutable std::shared_timed_mutex m;
T t;
};
template<class T>
synchronized< T > sync( T&& t ) {
return {std::forward<T>(t)};
}

C++14 and C++1z features included.

this assumes that const operations are multiple-reader safe (which is what std containers assume).

Use looks like:

synchronized<int> x = 7;
x.read([&](auto&& v){
std::cout << v << '\n';
});

for an int with synchronized access.

I'd advise against having synchronized(synchronized const&). It is rarely needed.

If you need synchronized(synchronized const&), I'd be tempted to replace T t; with std::aligned_storage, allowing manual placement construction, and do manual destruction. That allows proper lifetime management.

Barring that, we could copy the source T, then read from it:

synchronized(synchronized const& o):
t(o.read(
[](T const&o){return o;})
)
{}
synchronized(synchronized && o):
t(std::move(o).read(
[](T&&o){return std::move(o);})
)
{}

for assignment:

synchronized& operator=(synchronized const& o) {
access([](T& lhs, T const& rhs){
lhs = rhs;
}, *this, o);
return *this;
}
synchronized& operator=(synchronized && o) {
access([](T& lhs, T&& rhs){
lhs = std::move(rhs);
}, *this, std::move(o));
return *this;
}
friend void swap(synchronized& lhs, synchronized& rhs) {
access([](T& lhs, T& rhs){
using std::swap;
swap(lhs, rhs);
}, *this, o);
}

the placement and aligned storage versions are a bit messier. Most access to t would be replaced by a member function T&t() and T const&t()const, except at construction where you'd have to jump through some hoops.

By making synchronized a wrapper instead of part of the class, all we have to ensure is that the class internally respects const as being multiple-reader, and write it in a single-threaded manner.

In the rare cases we need a synchronized instance, we jump through hoops like the above.

Apologies for any typos in the above. There are probably some.

A side benefit to the above is that n-ary arbitrary operations on synchronized objects (of the same type) work together, without having to hard-code it before hand. Add in a friend declaration and n-ary synchronized objects of multiple types might work together. I might have to move access out of being an inline friend to deal with overload conficts in that case.

live example