boost::asio协程实现并发服务器

简介

之前介绍了asio服务器并发编程的几种模型,包括单线程,多线程IOServicePool,多线程IOThreadPool等,今天带着大家利用asio协程实现并发服务器。利用协程实现并发程序有两个好处
1   将回调函数改写为顺序调用,提高开发效率。
2   协程调度比线程调度更轻量化,因为协程是运行在用户空间的,线程切换需要在用户空间和内核空间切换。

协程案例

asio官网提供了一个协程并发编程的案例,我们列举一下

  1. #include <boost/asio/co_spawn.hpp>
  2. #include <boost/asio/detached.hpp>
  3. #include <boost/asio/io_context.hpp>
  4. #include <boost/asio/ip/tcp.hpp>
  5. #include <boost/asio/signal_set.hpp>
  6. #include <boost/asio/write.hpp>
  7. #include <cstdio>
  8. using boost::asio::ip::tcp;
  9. using boost::asio::awaitable;
  10. using boost::asio::co_spawn;
  11. using boost::asio::detached;
  12. using boost::asio::use_awaitable;
  13. namespace this_coro = boost::asio::this_coro;
  14. #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
  15. # define use_awaitable \
  16. boost::asio::use_awaitable_t(__FILE__, __LINE__, __PRETTY_FUNCTION__)
  17. #endif
  18. awaitable<void> echo(tcp::socket socket)
  19. {
  20. try
  21. {
  22. char data[1024];
  23. for (;;)
  24. {
  25. std::size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);
  26. co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
  27. }
  28. }
  29. catch (std::exception& e)
  30. {
  31. std::printf("echo Exception: %s\n", e.what());
  32. }
  33. }
  34. awaitable<void> listener()
  35. {
  36. auto executor = co_await this_coro::executor;
  37. tcp::acceptor acceptor(executor, { tcp::v4(), 10086 });
  38. for (;;)
  39. {
  40. tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
  41. co_spawn(executor, echo(std::move(socket)), detached);
  42. }
  43. }
  44. int main()
  45. {
  46. try
  47. {
  48. boost::asio::io_context io_context(1);
  49. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  50. signals.async_wait([&](auto, auto) { io_context.stop(); });
  51. co_spawn(io_context, listener(), detached);
  52. io_context.run();
  53. }
  54. catch (std::exception& e)
  55. {
  56. std::printf("Exception: %s\n", e.what());
  57. }
  58. }

1   我们用awaitable<void>声明了一个函数,那么这个函数就变为可等待的函数了,比如listener被添加awaitable<void>之后,就可以被协程调用和等待了。
2   co_spawn表示启动一个协程,参数分别为调度器,执行的函数,以及启动方式, 比如我们启动了一个协程,deatched表示将协程对象分离出来,这种启动方式可以启动多个协程,他们都是独立的,如何调度取决于调度器,在用户的感知上更像是线程调度的模式,类似于并发运行,其实底层都是串行的。

  1. co_spawn(io_context, listener(), detached);

我们启动了一个协程,执行listener中的逻辑,listener内部co_await 等待 acceptor接收连接,如果没有连接到来则挂起协程。执行之后的io_context.run()逻辑。所以协程实际上是在一个线程中串行调度的,只是感知上像是并发而已。
3   当acceptor接收到连接后,继续调用co_spawn启动一个协程,用来执行echo逻辑。echo逻辑里也是通过co_wait的方式接收和发送数据的,如果对端不发数据,执行echo的协程就会挂起,另一个协程启动,继续接收新的连接。当没有连接到来,接收新连接的协程挂起,如果所有协程都挂起,则等待新的就绪事件(对端发数据,或者新连接)到来唤醒。

改进服务器

我们可以利用协程改进服务器编码流程,用一个iocontext管理绑定acceptor用来接收新的连接,再用一个iocontext或以IOServicePool的方式管理连接的收发操作,在每个连接的接收数据时改为启动一个协程,通过顺序的方式读取收到的数据

  1. void CSession::Start() {
  2. auto shared_this = shared_from_this();
  3. //开启接收协程
  4. co_spawn(_io_context, [=]()->awaitable<void> {
  5. try {
  6. for (;!_b_close;) {
  7. _recv_head_node->Clear();
  8. std::size_t n = co_await boost::asio::async_read(_socket,
  9. boost::asio::buffer(_recv_head_node->_data, HEAD_TOTAL_LEN),
  10. use_awaitable);
  11. if (n == 0) {
  12. std::cout << "receive peer closed" << endl;
  13. Close();
  14. _server->ClearSession(_uuid);
  15. co_return;
  16. }
  17. //获取头部MSGID数据
  18. short msg_id = 0;
  19. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  20. //网络字节序转化为本地字节序
  21. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  22. std::cout << "msg_id is " << msg_id << endl;
  23. //id非法
  24. if (msg_id > MAX_LENGTH) {
  25. std::cout << "invalid msg_id is " << msg_id << endl;
  26. _server->ClearSession(_uuid);
  27. co_return;
  28. }
  29. short msg_len = 0;
  30. memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
  31. //网络字节序转化为本地字节序
  32. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  33. std::cout << "msg_len is " << msg_len << endl;
  34. //长度非法
  35. if (msg_len > MAX_LENGTH) {
  36. std::cout << "invalid data length is " << msg_len << endl;
  37. _server->ClearSession(_uuid);
  38. co_return;
  39. }
  40. _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
  41. //读出包体
  42. n = co_await boost::asio::async_read(_socket,
  43. boost::asio::buffer(_recv_msg_node->_data, _recv_msg_node->_total_len), use_awaitable);
  44. if (n == 0) {
  45. std::cout << "receive peer closed" << endl;
  46. Close();
  47. _server->ClearSession(_uuid);
  48. co_return;
  49. }
  50. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  51. cout << "receive data is " << _recv_msg_node->_data << endl;
  52. //投递给逻辑线程
  53. LogicSystem::GetInstance().PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
  54. }
  55. }
  56. catch (std::exception& e) {
  57. std::cout << "exception is " << e.what() << endl;
  58. Close();
  59. _server->ClearSession(_uuid);
  60. }
  61. }, detached);
  62. }

其余的逻辑和之前大体相同,测试了一下在一个iocontext负责接收新连接,一个iocontext负责接收数据和发送数据的情况下,客户端创建100个连接,收发500次,总用时为55s
https://cdn.llfc.club/20230616141235.png

热门评论

热门文章

  1. windows环境搭建和vscode配置

    喜欢(587) 浏览(2800)
  2. Linux环境搭建和编码

    喜欢(594) 浏览(12130)
  3. 解密定时器的实现细节

    喜欢(566) 浏览(3464)
  4. slice介绍和使用

    喜欢(521) 浏览(2478)
  5. C++ 类的继承封装和多态

    喜欢(588) 浏览(4944)

最新评论

  1. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  2. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  3. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  4. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google

个人公众号

个人微信