利用条件变量实现线程安全队列

简介

本文介绍如何使用条件变量控制并发的同步操作,试想有一个线程A一直输出1,另一个线程B一直输出2。我想让两个线程交替输出1,2,1,2…之类的效果,该如何实现?有的同学可能会说不是有互斥量mutex吗?可以用一个全局变量num表示应该哪个线程输出,比如num为1则线程A输出1,num为2则线程B输出2,mutex控制两个线程访问num,如果num和线程不匹配,就让该线程睡一会,这不就实现了吗?比如线程A加锁后发现当前num为2则表示它不能输出1,就解锁,将锁的使用权交给线程A,线程B就sleep一会。

不良实现

上面说的方式可以实现我们需要的功能,代码如下

  1. void PoorImpleman() {
  2. std::thread t1([]() {
  3. for (;;) {
  4. {
  5. std::lock_guard<std::mutex> lock(mtx_num);
  6. if (num == 1) {
  7. std::cout << "thread A print 1....." << std::endl;
  8. num++;
  9. continue;
  10. }
  11. }
  12. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  13. }
  14. });
  15. std::thread t2([]() {
  16. for (;;) {
  17. {
  18. std::lock_guard<std::mutex> lock(mtx_num);
  19. if (num == 2) {
  20. std::cout << "thread B print 2....." << std::endl;
  21. num--;
  22. continue;
  23. }
  24. }
  25. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  26. }
  27. });
  28. t1.join();
  29. t2.join();
  30. }

PoorImpleman虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。所以我们提出了用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。

条件变量

我们这里用条件变量实现上面的逻辑

  1. void ResonableImplemention() {
  2. std::thread t1([]() {
  3. for (;;) {
  4. std::unique_lock<std::mutex> lock(mtx_num);
  5. cvA.wait(lock, []() {
  6. return num == 1;
  7. });
  8. num++;
  9. std::cout << "thread A print 1....." << std::endl;
  10. cvB.notify_one();
  11. }
  12. });
  13. std::thread t2([]() {
  14. for (;;) {
  15. std::unique_lock<std::mutex> lock(mtx_num);
  16. cvB.wait(lock, []() {
  17. return num == 2;
  18. });
  19. num--;
  20. std::cout << "thread B print 2....." << std::endl;
  21. cvA.notify_one();
  22. }
  23. });
  24. t1.join();
  25. t2.join();
  26. }

当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone
这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,sleep的方式看出日志基本是每隔1秒才打印一次,效率不高。

线程安全队列

之前我们实现过线程安全的栈,对于pop操作,我们如果在线程中调用empty判断是否为空,如果不为空,则pop,因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。对于智能指针版本我们发现队列为空则返回空指针,对于引用版本,
发现队列为空则抛出异常,这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。

  1. #include <queue>
  2. #include <mutex>
  3. #include <condition_variable>
  4. template<typename T>
  5. class threadsafe_queue
  6. {
  7. private:
  8. std::mutex mut;
  9. std::queue<T> data_queue;
  10. std::condition_variable data_cond;
  11. public:
  12. void push(T new_value)
  13. {
  14. std::lock_guard<std::mutex> lk(mut);
  15. data_queue.push(new_value);
  16. data_cond.notify_one();
  17. }
  18. void wait_and_pop(T& value)
  19. {
  20. std::unique_lock<std::mutex> lk(mut);
  21. data_cond.wait(lk,[this]{return !data_queue.empty();});
  22. value=data_queue.front();
  23. data_queue.pop();
  24. }
  25. };
  26. threadsafe_queue<data_chunk> data_queue;
  27. void data_preparation_thread()
  28. {
  29. while(more_data_to_prepare())
  30. {
  31. data_chunk const data=prepare_data();
  32. data_queue.push(data); ⇽---
  33. }
  34. }
  35. void data_processing_thread()
  36. {
  37. while(true)
  38. {
  39. data_chunk data;
  40. data_queue.wait_and_pop(data);
  41. process(data);
  42. if(is_last_chunk(data))
  43. break;
  44. }
  45. }

我们可以启动三个线程,一个producer线程用来向队列中放入数据。一个consumer1线程用来阻塞等待pop队列中的元素。

另一个consumer2尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。

打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出

测试代码如下

  1. void test_safe_que() {
  2. threadsafe_queue<int> safe_que;
  3. std::mutex mtx_print;
  4. std::thread producer(
  5. [&]() {
  6. for (int i = 0; ;i++) {
  7. safe_que.push(i);
  8. {
  9. std::lock_guard<std::mutex> printlk(mtx_print);
  10. std::cout << "producer push data is " << i << std::endl;
  11. }
  12. std::this_thread::sleep_for(std::chrono::milliseconds(200));
  13. }
  14. }
  15. );
  16. std::thread consumer1(
  17. [&]() {
  18. for (;;) {
  19. auto data = safe_que.wait_and_pop();
  20. {
  21. std::lock_guard<std::mutex> printlk(mtx_print);
  22. std::cout << "consumer1 wait and pop data is " << *data << std::endl;
  23. }
  24. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  25. }
  26. }
  27. );
  28. std::thread consumer2(
  29. [&]() {
  30. for (;;) {
  31. auto data = safe_que.try_pop();
  32. if (data != nullptr) {
  33. {
  34. std::lock_guard<std::mutex> printlk(mtx_print);
  35. std::cout << "consumer2 try_pop data is " << *data << std::endl;
  36. }
  37. }
  38. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  39. }
  40. }
  41. );
  42. producer.join();
  43. consumer1.join();
  44. consumer2.join();
  45. }

测试效果如下

  1. producer push data is 0
  2. consumer1 wait and pop data is 0
  3. producer push data is 1
  4. producer push data is 2
  5. consumer2 try_pop data is 1
  6. consumer1 wait and pop data is 2
  7. producer push data is 3
  8. producer push data is 4
  9. consumer2 try_pop data is 3
  10. consumer1 wait and pop data is 4
  11. producer push data is 5
  12. producer push data is 6
  13. producer push data is 7
  14. consumer2 try_pop data is 5
  15. consumer1 wait and pop data is 6

我们能看到consumer1和consumer2是并发消费的

总结

本文介绍了如何通过条件变量实现并发线程的同步处理。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

热门评论

热门文章

  1. Linux环境搭建和编码

    喜欢(594) 浏览(13540)
  2. Qt环境搭建

    喜欢(517) 浏览(24684)
  3. vscode搭建windows C++开发环境

    喜欢(596) 浏览(83344)
  4. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(6093)
  5. 使用hexo搭建个人博客

    喜欢(533) 浏览(11888)

最新评论

  1. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  2. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。
  3. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  4. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  5. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  6. 类和对象 陈宇航:支持!!!!
  7. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  8. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  9. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  10. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  11. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作
  12. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  13. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW
  14. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  15. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  16. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  17. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  18. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上
  19. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  20. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  21. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  22. 创建项目和编译 secondtonone1:谢谢支持
  23. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握

个人公众号

个人微信