利用栅栏实现同步

简介

前文我们通过原子操作实战实现了无锁队列,今天完善一下无锁的原子操作剩余的知识,包括Relaese和Acquire内存序在什么情况下是存在危险的,以及我们可以利用栅栏机制实现同步等等。

线程可见顺序

我们提到过除了memory_order_seq_cst顺序,其他的顺序都不能保证原子变量修改的值在其他多线程中看到的顺序是一致的。

但是可以通过同步机制保证一个线程对原子变量的修改对另一个原子变量可见。通过“Syncronizes With” 的方式达到先行的效果。

但是我们说的先行是指 “A Syncronizes With B ”, 如果A 的结果被B读取,则A 先行于B。

有时候我们线程1对A的store操作采用release内存序,而线程2对B的load采用acquire内存序,并不能保证A 一定比 B先执行。因为两个线程并行执行无法确定先后顺序,我们指的先行不过是说如果B读取了A操作的结果,则称A先行于B。

我们看下面的一段案例

  1. #include <iostream>
  2. #include <atomic>
  3. #include <thread>
  4. #include <cassert>
  5. std::atomic<bool> x, y;
  6. std::atomic<int> z;
  7. void write_x()
  8. {
  9. x.store(true, std::memory_order_release); //1
  10. }
  11. void write_y()
  12. {
  13. y.store(true, std::memory_order_release); //2
  14. }
  15. void read_x_then_y()
  16. {
  17. while (!x.load(std::memory_order_acquire));
  18. if (y.load(std::memory_order_acquire)) //3
  19. ++z;
  20. }
  21. void read_y_then_x()
  22. {
  23. while (!y.load(std::memory_order_acquire));
  24. if (x.load(std::memory_order_acquire)) //4
  25. ++z;
  26. }

我们写一个函数测试,函数TestAR中初始化x和y为false, 启动4个线程a,b,c,d,分别执行write_x, write_y, read_x_then_y, read_y_then_x.

  1. void TestAR()
  2. {
  3. x = false;
  4. y = false;
  5. z = 0;
  6. std::thread a(write_x);
  7. std::thread b(write_y);
  8. std::thread c(read_x_then_y);
  9. std::thread d(read_y_then_x);
  10. a.join();
  11. b.join();
  12. c.join();
  13. d.join();
  14. assert(z.load() != 0); //5
  15. std::cout << "z value is " << z.load() << std::endl;
  16. }

有的读者可能会觉5处的断言不会被触发,他们认为c和d肯定会有一个线程对z执行++操作。他们的思路是这样的。
1 如果c线程执行read_x_then_y没有对z执行加加操作,那么说明c线程读取的x值为true, y值为false。
2 之后d线程读取时,如果保证执行到4处说明y为true,等d线程执行4处代码时x必然为true。
3 他们的理解是如果x先被store为true,y后被store为true,c线程看到y为false时x已经为true了,那么d线程y为true时x也早就为true了,所以z一定会执行加加操作。

上述理解是不正确的,我们提到过即便是releas和acquire顺序也不能保证多个线程看到的一个变量的值是一致的,更不能保证看到的多个变量的值是一致的。

变量x和y的载入操作3和4有可能都读取false值(与宽松次序的情况一样),因此有可能令断言触发错误。变量x和y分别由不同线程写出,所以两个释放操作都不会影响到对方线程。

看下图

https://cdn.llfc.club/1699748355232.jpg

无论x和y的store顺序谁先谁后,线程c和线程d读取的x和y顺序都不一定一致。

从CPU的角度我们可以这么理解

https://cdn.llfc.club/1699750613188.jpg

在一个4核CPU结构的主机上,a,b,c,d分别运行在不同的CPU内核上。

a执行x.store(true)先被线程c读取,而此时线程b对y的store还没有被c读取到新的值,所以此时c读取的x为true,y为false。

同样的道理,d可以读取b修改y的最新值,但是没来的及读取x的最新值,那么读取到y为true,x为false。

即使我们采用release和acquire方式也不能保证全局顺序一致。如果一个线程对变量执行release内存序的store操作,另一个线程不一定会马上读取到。这个大家要理解。

栅栏

有时候我们可以通过栅栏保证指令编排顺序。

看下面一段代码

  1. #include <atomic>
  2. #include <thread>
  3. #include <assert.h>
  4. std::atomic<bool> x,y;
  5. std::atomic<int> z;
  6. void write_x_then_y()
  7. {
  8. x.store(true,std::memory_order_relaxed); // 1
  9. y.store(true,std::memory_order_relaxed); // 2
  10. }
  11. void read_y_then_x()
  12. {
  13. while(!y.load(std::memory_order_relaxed)); // 3
  14. if(x.load(std::memory_order_relaxed)) // 4
  15. ++z;
  16. }
  17. int main()
  18. {
  19. x=false;
  20. y=false;
  21. z=0;
  22. std::thread a(write_x_then_y);
  23. std::thread b(read_y_then_x);
  24. a.join();
  25. b.join();
  26. assert(z.load()!=0); //5
  27. }

上面的代码我们都采用的是memory_order_relaxed, 所以无法保证a线程将x,y修改后b线程看到的也是先修改x,再修改y的值。b线程可能先看到y被修改为true,x后被修改为true,那么b线程执行到4处时x可能为false导致z不会加加,5处断言会被触发。

那我们之前做法可以解决这个问题

  1. void write_x_then_y3()
  2. {
  3. x.store(true, std::memory_order_relaxed); // 1
  4. y.store(true, std::memory_order_release); // 2
  5. }
  6. void read_y_then_x3()
  7. {
  8. while (!y.load(std::memory_order_acquire)); // 3
  9. if (x.load(std::memory_order_relaxed)) // 4
  10. ++z;
  11. }

可以通过std::memory_order_releasestd::memory_order_acquire形成同步关系。

线程a执行write_x_then_y3,线程b执行read_y_then_x3,如果线程b执行到4处,说明y已经被线程a设置为true。

线程a执行到2,也必然执行了1,因为是memory_order_release的内存顺序,所以线程a能2操作之前的指令在2之前被写入内存。

同样的道理,线程b在3处执行的是memory_order_acquire的内存顺序,所以能保证4不会先于3写入内存,这样我们能知道1一定先行于4.

进而推断出z会加加,所以不会触发assert(z.load() != 0);的断言。

其实我们可以通过栅栏机制保证指令的写入顺序。栅栏的机制和memory_order_release类似。

  1. void write_x_then_y_fence()
  2. {
  3. x.store(true, std::memory_order_relaxed); //1
  4. std::atomic_thread_fence(std::memory_order_release); //2
  5. y.store(true, std::memory_order_relaxed); //3
  6. }
  7. void read_y_then_x_fence()
  8. {
  9. while (!y.load(std::memory_order_relaxed)); //4
  10. std::atomic_thread_fence(std::memory_order_acquire); //5
  11. if (x.load(std::memory_order_relaxed)) //6
  12. ++z;
  13. }

我们写一个函数测试上面的逻辑

  1. void TestFence()
  2. {
  3. x = false;
  4. y = false;
  5. z = 0;
  6. std::thread a(write_x_then_y_fence);
  7. std::thread b(read_y_then_x_fence);
  8. a.join();
  9. b.join();
  10. assert(z.load() != 0); //7
  11. }

7处的断言也不会触发。我们可以分析一下,

线程a运行write_x_then_y_fence,线程b运行read_y_then_x_fence.

当线程b执行到5处时说明4已经结束,此时线程a看到y为true,那么线程a必然已经执行完3.

尽管4和3我们采用的是std::memory_order_relaxed顺序,但是通过逻辑关系保证了3的结果同步给4,进而”3 happens-before 4”

因为我们采用了栅栏std::atomic_fence所以,5处能保证6不会先于5写入内存,(memory_order_acquire保证其后的指令不会先于其写入内存)

2处能保证1处的指令先于2写入内存,进而”1 happens-before 6”, 1的结果会同步给 6

所以”atomic_thread_fence”其实和”release-acquire”相似,都是保证memory_order_release之前的指令不会排到其后,memory_order_acquire之后的指令不会排到其之前。

总结

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day13-fence

热门评论
  • Dzher
    2024-12-11 14:48:03

    作者你好!我觉得

    1. std::thread a(write_x);
    2. std::thread b(write_y);
    3. std::thread c(read_x_then_y);
    4. std::thread d(read_y_then_x);
    5. 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!

热门文章

  1. Qt环境搭建

    喜欢(517) 浏览(24686)
  2. vscode搭建windows C++开发环境

    喜欢(596) 浏览(83351)
  3. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(6093)
  4. 使用hexo搭建个人博客

    喜欢(533) 浏览(11891)
  5. Linux环境搭建和编码

    喜欢(594) 浏览(13541)

最新评论

  1. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  2. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  3. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  4. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作
  5. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  6. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  7. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  8. 类和对象 陈宇航:支持!!!!
  9. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  10. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  11. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  12. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。
  13. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  14. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  15. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW
  16. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  17. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  18. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  19. 创建项目和编译 secondtonone1:谢谢支持
  20. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  21. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  22. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  23. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上

个人公众号

个人微信