简介
本文介绍如何使用互斥量保证共享数据的安全,并讲述死锁的相关处理方案。
锁的使用
我们可以通过mutex对共享数据进行夹所,防止多线程访问共享区造成数据不一致问题。如下,我们初始化一个共享变量shared_data
,然后定义了一个互斥量std::mutex
,接下来启动了两个线程,分别执行use_lock
增加数据,和一个lambda表达式减少数据。
结果可以看到两个线程对于共享数据的访问是独占的,单位时间片只有一个线程访问并输出日志。
std::mutex mtx1;
int shared_data = 100;
void use_lock() {
while (true) {
mtx1.lock();
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
void test_lock() {
std::thread t1(use_lock);
std::thread t2([]() {
while (true) {
mtx1.lock();
shared_data--;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});
t1.join();
t2.join();
}
lock_guard的使用
当然我们可以用lock_guard
自动加锁和解锁,比如上面的函数可以等价简化为
void use_lock() {
while (true) {
std::lock_guard<std::mutex> lock(mtx1);
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
lock_guard
在作用域结束时自动调用其析构函数解锁,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开。
如何保证数据安全
有时候我们可以将对共享数据的访问和修改聚合到一个函数,在函数内加锁保证数据的安全性。但是对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性。比如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空,判断操作内部我们可以加锁,但是判断结束后返回值就不在加锁了,就会存在线程安全问题。
比如我定义了如下栈, 对于多线程访问时判断栈是否为空,此后两个线程同时出栈,可能会造成崩溃。
template<typename T>
class threadsafe_stack1
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack1() {}
threadsafe_stack1(const threadsafe_stack1& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack1& operator=(const threadsafe_stack1&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
//问题代码
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
如下, 线程1和线程2先后判断栈都不为空,之后执行出战操作,会造成崩溃。
void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);
std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});
std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});
t1.join();
t2.join();
}
解决这个问题我们可以用抛出异常的方式,比如定义一个空栈的异常
struct empty_stack : std::exception
{
const char* what() const throw();
};
然后实现我们的出栈函数
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}
这么做就需要在外层使用的时候捕获异常。这是C++ 并发编程中提及的建议。但是我觉得可以在函数pop内部再次判断栈是否为空,若为空则返回一个非法数据,这样比抛出异常好一些。但是如果T是一个复杂类型,我们很难定义一个非法值给外界知晓,这一点可以通过智能指针进行优化。之后我们再介绍更优化的方案,因为现在这个pop
函数仍存在问题,比如T
是一个vector<int>
类型,那么在pop
函数内部element
就是vector<int>
类型,开始element
存储了一些int值,程序没问题,函数执行pop操作, 假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够, 函数返回element
时,就会就会存在vector
做拷贝赋值时造成失败。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。这其实是内存管理不当造成的,但是C++ 并发编程一书中给出了优化方案。
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
我们提供了两个版本的pop
操作,一个是带引用类型的参数的,一个是直接pop
出智能指针类型,这样在pop
函数内部减少了数据的拷贝,防止内存溢出,其实这两种做法确实是相比之前直接pop
固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop
时如果队列为空则返回空指针,这样比抛出异常更有好一些
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) return nullptr;
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
死锁怎么造成的
死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。
举个例子
std::mutex t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;
void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}
然后我们启动两个线程
void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}
这样运行之后在某一个时刻一定会导致死锁。
实际工作中避免死锁的一个方式就是将加锁和解锁的功能封装为独立的函数,
这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
std::cout << "lock1 end lock" << std::endl;
}
void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
std::cout << "lock2 end lock" << std::endl;
}
void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void safe_lock2() {
while (true) {
atomic_lock2();
atomic_lock1();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}
同时加锁
当我们无法避免在一个函数内部使用两个互斥量,并且都要解锁的情况,那我们可以采取同时加锁的方式。我们先定义一个类,假设这个类不推荐拷贝构造,但我们也提供了这个类的拷贝构造和移动构造
class som_big_object {
public:
som_big_object(int data) :_data(data) {}
//拷贝构造
som_big_object(const som_big_object& b2) :_data(b2._data) {
_data = b2._data;
}
//移动构造
som_big_object(som_big_object&& b2) :_data(std::move(b2._data)) {
}
//重载输出运算符
friend std::ostream& operator << (std::ostream& os, const som_big_object& big_obj) {
os << big_obj._data;
return os;
}
//重载赋值运算符
som_big_object& operator = (const som_big_object& b2) {
if (this == &b2) {
return *this;
}
_data = b2._data;
return *this;
}
//交换数据
friend void swap(som_big_object& b1, som_big_object& b2) {
som_big_object temp = std::move(b1);
b1 = std::move(b2);
b2 = std::move(temp);
}
private:
int _data;
};
接下来我们定义一个类对上面的类做管理,为防止多线程情况下数据混乱, 包含了一个互斥量。
class big_object_mgr {
public:
big_object_mgr(int data = 0) :_obj(data) {}
void printinfo() {
std::cout << "current obj data is " << _obj << std::endl;
}
friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2);
private:
std::mutex _mtx;
som_big_object _obj;
};
为了方便演示哪些交换是安全的,哪些是危险的,所以写了三个函数。
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock_guard <std::mutex> gurad1(objm1._mtx);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> guard2(objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
danger_swap
是危险的交换方式。比如如下调用
void test_danger_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}
这种调用方式存在隐患,因为danger_swap
函数在两个线程中使用会造成互相竞争加锁的情况。
那就需要用锁同时锁住两个锁。
void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock(objm1._mtx, objm2._mtx);
//领养锁管理它自动释放
std::lock_guard <std::mutex> gurad1(objm1._mtx, std::adopt_lock);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard <std::mutex> gurad2(objm2._mtx, std::adopt_lock);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
比如下面的调用就是合理的
void test_safe_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}
当然上面加锁的方式可以简化,C++17 scope_lock
可以对多个互斥量同时加锁,并且自动释放
//上述代码可以简化为以下方式
void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::scoped_lock guard(objm1._mtx, objm2._mtx);
//等价于
//std::scoped_lock<std::mutex, std::mutex> guard(objm1._mtx, objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}
层级锁
现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁,所以可以自定义一个层级锁,保证实际项目中对多个互斥量加锁时是有序的。
//层级锁
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}
hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;
void lock() {
check_for_hierarchy_violation();
_internal_mutex.lock();
update_hierarchy_value();
}
void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
private:
std::mutex _internal_mutex;
//当前层级值
unsigned long const _hierarchy_value;
//上一次层级值
unsigned long _previous_hierarchy_value;
//本线程记录的层级值
static thread_local unsigned long _this_thread_hierarchy_value;
void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}
};
thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);
void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000);
hierarchical_mutex hmtx2(500);
std::thread t1([&hmtx1, &hmtx2]() {
hmtx1.lock();
hmtx2.lock();
hmtx2.unlock();
hmtx1.unlock();
});
std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock();
hmtx1.lock();
hmtx1.unlock();
hmtx2.unlock();
});
t1.join();
t2.join();
}
层级锁能保证我们每个线程加锁时,一定是先加权重高的锁。
并且释放时也保证了顺序。
主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁。
总结
本文介绍了线程互斥的常见问题和基本处理方案,在实际开发中,根据具体情境具体分析。
视频链接https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290
//仅提供一份参考代码给同样初学者的我们[脱单doge]
/*
定义了如下栈, 对于多线程访问时判断栈是否为空,
此后两个线程同时出栈,可能会造成崩溃
*/
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
#include <stack>
#include <exception>
using namespace std;
struct empty_stack : public std::exception
{
public:
const char* what()const throw()
//函数后面必须跟throw(),括号里面不能有任务参数,表示不抛出任务异常
//因为这个已经是一个异常处理信息了,不能再抛异常。
{
return "empty_stack";
}
};
//struct empty_stack : std::exception
//{
// const char* what() const throw();
//};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
void test_threadsafe_stack() {
threadsafe_stack<int> safe_stack;
safe_stack.push(1);
std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
try {
safe_stack.pop();
} catch (empty_stack &e) {
cout << e.what() << endl;
}
}
});
std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
try {
safe_stack.pop();
} catch (empty_stack &e) {
cout << e.what() << endl;
}
}
});
t1.join();
t2.join();
}
int main()
{
test_threadsafe_stack();
return 0;
}
/*
empty_stack
*/