简介
本文介绍如何使用条件变量控制并发的同步操作,试想有一个线程A一直输出1,另一个线程B一直输出2。我想让两个线程交替输出1,2,1,2…之类的效果,该如何实现?有的同学可能会说不是有互斥量mutex吗?可以用一个全局变量num表示应该哪个线程输出,比如num为1则线程A输出1,num为2则线程B输出2,mutex控制两个线程访问num,如果num和线程不匹配,就让该线程睡一会,这不就实现了吗?比如线程A加锁后发现当前num为2则表示它不能输出1,就解锁,将锁的使用权交给线程A,线程B就sleep一会。
不良实现
上面说的方式可以实现我们需要的功能,代码如下
void PoorImpleman() {
std::thread t1([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
std::thread t2([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
t1.join();
t2.join();
}
PoorImpleman
虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num
值,如果num
不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。所以我们提出了用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。
条件变量
我们这里用条件变量实现上面的逻辑
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
cvA.wait(lock, []() {
return num == 1;
});
num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one();
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
cvB.wait(lock, []() {
return num == 2;
});
num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one();
}
});
t1.join();
t2.join();
}
当条件不满足时(num 不等于1 时)cvA.wait
就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone
。
这么做的好处就是线程交替处理非常及时。比起sleep
的方式,我们可以从控制台上看出差异效果,sleep
的方式看出日志基本是每隔1秒才打印一次,效率不高。
线程安全队列
之前我们实现过线程安全的栈,对于pop操作,我们如果在线程中调用empty判断是否为空,如果不为空,则pop,因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。对于智能指针版本我们发现队列为空则返回空指针,对于引用版本,
发现队列为空则抛出异常,这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); ⇽--- ②
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data))
break;
}
}
我们可以启动三个线程,一个producer
线程用来向队列中放入数据。一个consumer1
线程用来阻塞等待pop队列中的元素。
另一个consumer2
尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。
打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出
测试代码如下
void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print;
std::thread producer(
[&]() {
for (int i = 0; ;i++) {
safe_que.push(i);
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);
std::thread consumer1(
[&]() {
for (;;) {
auto data = safe_que.wait_and_pop();
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << *data << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
std::thread consumer2(
[&]() {
for (;;) {
auto data = safe_que.try_pop();
if (data != nullptr) {
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << *data << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
producer.join();
consumer1.join();
consumer2.join();
}
测试效果如下
producer push data is 0
consumer1 wait and pop data is 0
producer push data is 1
producer push data is 2
consumer2 try_pop data is 1
consumer1 wait and pop data is 2
producer push data is 3
producer push data is 4
consumer2 try_pop data is 3
consumer1 wait and pop data is 4
producer push data is 5
producer push data is 6
producer push data is 7
consumer2 try_pop data is 5
consumer1 wait and pop data is 6
我们能看到consumer1和consumer2是并发消费的
总结
本文介绍了如何通过条件变量实现并发线程的同步处理。
视频链接
https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290
源码链接