asio多线程模型IOServicePool

简介

前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。

单线程和多线程对比

之前的单线程模式图如下

https://cdn.llfc.club/1685620269385.jpg

我们设计的IOServicePool类型的多线程模型如下:

https://cdn.llfc.club/%E5%BE%AE%E4%BF%A1%E5%9B%BE%E7%89%87_20230604151126.png

IOServicePool多线程模式特点

1   每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。

2   但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。

3   多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

IOServicePool实现

IOServicePool本质上是一个线程池,基本功能就是根据构造函数传入的数量创建n个线程和iocontext,然后每个线程跑一个iocontext,这样就可以并发处理不同iocontext读写事件了。

IOServicePool的声明

  1. class AsioIOServicePool:public Singleton<AsioIOServicePool>
  2. {
  3. friend Singleton<AsioIOServicePool>;
  4. public:
  5. using IOService = boost::asio::io_context;
  6. using Work = boost::asio::io_context::work;
  7. using WorkPtr = std::unique_ptr<Work>;
  8. ~AsioIOServicePool();
  9. AsioIOServicePool(const AsioIOServicePool&) = delete;
  10. AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
  11. // 使用 round-robin 的方式返回一个 io_service
  12. boost::asio::io_context& GetIOService();
  13. void Stop();
  14. private:
  15. AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
  16. std::vector<IOService> _ioServices;
  17. std::vector<WorkPtr> _works;
  18. std::vector<std::thread> _threads;
  19. std::size_t _nextIOService;
  20. };

1   _ioServices是一个IOService的vector变量,用来存储初始化的多个IOService。

2   WorkPtrboost::asio::io_context::work类型的unique指针。
在实际使用中,我们通常会将一些异步操作提交给io_context进行处理,然后该操作会被异步执行,而不会立即返回结果。如果没有其他任务需要执行,那么io_context就会停止工作,导致所有正在进行的异步操作都被取消。这时,我们需要使用boost::asio::io_context::work对象来防止io_context停止工作。

boost::asio::io_context::work的作用是持有一个指向io_context的引用,并通过创建一个“工作”项来保证io_context不会停止工作,直到work对象被销毁或者调用reset()方法为止。当所有异步操作完成后,程序可以使用work.reset()方法来释放io_context,从而让其正常退出。

3   _threads是一个线程vector,管理我们开辟的所有线程。

4   _nextIOService是一个轮询索引,我们用最简单的轮询算法为每个新创建的连接分配io_context.

5   因为IOServicePool不允许被copy构造,所以我们将其拷贝构造和拷贝复制函数置为delete

接下来我们实现构造函数

  1. AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
  2. _works(size), _nextIOService(0){
  3. for (std::size_t i = 0; i < size; ++i) {
  4. _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
  5. }
  6. //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
  7. for (std::size_t i = 0; i < _ioServices.size(); ++i) {
  8. _threads.emplace_back([this, i]() {
  9. _ioServices[i].run();
  10. });
  11. }
  12. }

_works是unique_ptr的vector类型,所以初始化时要么放在构造函数初始化列表里初始化,要么通过一个临时的std::unique_ptr右值初始化,我们采取的是第二种。

实现获取io_context&的函数

  1. boost::asio::io_context& AsioIOServicePool::GetIOService() {
  2. auto& service = _ioServices[_nextIOService++];
  3. if (_nextIOService == _ioServices.size()) {
  4. _nextIOService = 0;
  5. }
  6. return service;
  7. }

我们根据_nextIOService作为索引,轮询获取io_context&

同样我们要实现Stop函数,控制AsioIOServicePool停止的行为。因为我们要保证每个线程安全退出后再让AsioIOServicePool停止。

  1. void AsioIOServicePool::Stop(){
  2. for (auto& work : _works) {
  3. work.reset();
  4. }
  5. for (auto& t : _threads) {
  6. t.join();
  7. }
  8. }

其中work.reset()是让unique指针置空并释放,那么work的析构函数就会被调用,work被析构,其管理的io_service在没有事件监听时就会被释放。

优雅退出

IOServicePool多线程服务器退出时,需要捕获退出信号如SIGINT,SIGTERM等,将退出信号和一个iocontext绑定,当收到退出信号时,我们将IOServicePool停止,并且停止iocontext即可。

  1. int main()
  2. {
  3. try {
  4. auto pool = AsioIOServicePool::GetInstance();
  5. boost::asio::io_context io_context;
  6. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  7. signals.async_wait([&io_context,pool](auto, auto) {
  8. io_context.stop();
  9. pool->Stop();
  10. });
  11. CServer s(io_context, 10086);
  12. io_context.run();
  13. }
  14. catch (std::exception& e) {
  15. std::cerr << "Exception: " << e.what() << endl;
  16. }
  17. }

上例中,我们将一个iocontext绑定给Server,当收到退出消息后停止这个iocontext,并且停止IOServicePool。

总结

本文总结了如何使用IOServicePool模式构造多线程模型的asio服务器。

视频连接https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101

源码链接https://gitee.com/secondtonone1/boostasio-learn

热门评论
  • 我和小徐不掉头发
    2025-02-20 15:13:30
      boost::asio::io_context& AsioIOServicePool::GetIOService() { auto& service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size()) { _nextIOService = 0; } return service;}

      这个是不是要加个锁会更好?

  • Lion
    2024-06-26 11:01:01

    线程池一定要继承单例模式吗

热门文章

  1. 解密定时器的实现细节

    喜欢(566) 浏览(3504)
  2. slice介绍和使用

    喜欢(521) 浏览(2497)
  3. C++ 类的继承封装和多态

    喜欢(588) 浏览(5023)
  4. windows环境搭建和vscode配置

    喜欢(587) 浏览(2848)
  5. Linux环境搭建和编码

    喜欢(594) 浏览(12369)

最新评论

  1. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  2. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  3. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  4. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊

个人公众号

个人微信