asio异步读写操作及注意事项

简介

本文介绍boost asio的异步读写操作及注意事项,为保证知识便于读者吸收,仅介绍api使用的代码片段,下一节再编写完整的客户端和服务器程序。
所以我们定义一个session类,这个session类表示服务器处理客户端连接的管理类

  1. class Session {
  2. public:
  3. Session(std::shared_ptr<asio::ip::tcp::socket> socket);
  4. void Connect(const asio::ip::tcp::endpoint& ep);
  5. private:
  6. std::shared_ptr<asio::ip::tcp::socket> _socket;
  7. };

session类定义了一个socket成员变量,负责处理对端的连接读写,封装了Connect函数

  1. void Session::Connect(const asio::ip::tcp::endpoint &ep) {
  2. _socket->connect(ep);
  3. }

这里只是简单意思一下,下面核心介绍异步读写api的使用

异步写操作

在写操作前,我们先封装一个Node结构,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

  1. //最大报文接收大小
  2. const int RECVSIZE = 1024;
  3. class MsgNode {
  4. public :
  5. MsgNode(const char* msg, int total_len): _total_len(total_len), _cur_len(0){
  6. _msg = new char[total_len];
  7. memcpy(_msg, msg, total_len);
  8. }
  9. MsgNode(int total_len) :_total_len(total_len), _cur_len(0) {
  10. _msg = new char[total_len];
  11. }
  12. ~MsgNode(){
  13. delete[]_msg;
  14. }
  15. //消息首地址
  16. char* _msg;
  17. //总长度
  18. int _total_len;
  19. //当前长度
  20. int _cur_len;
  21. };

写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。
接下来为Session添加异步写操作和负责发送写数据的节点

  1. class Session{
  2. public:
  3. void WriteCallBackErr(const boost::system::error_code & ec, std::size_t bytes_transferred,
  4. std::shared_ptr<MsgNode>);
  5. void WriteToSocketErr(const std::string& buf);
  6. private:
  7. std::shared_ptr<MsgNode> _send_node;
  8. };

WriteToSocketErr函数为我们封装的写操作,WriteCallBackErr为异步写操作回调的函数,为什么会有三个参数呢,
我们可以看一下asio源码

  1. BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
  2. std::size_t)) WriteToken
  3. BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
  4. BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,
  5. void (boost::system::error_code, std::size_t))
  6. async_write_some(const ConstBufferSequence& buffers,
  7. BOOST_ASIO_MOVE_ARG(WriteToken)token
  8. BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))

async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffers,
第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_code和size_t,
所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的WriteCallBackErr函数,
前两个参数为WriteToken规定的参数,第三个参数为MsgNode的智能指针,这样通过智能指针保证我们发送的Node生命周期延长。
我们看一下WriteToSocketErr函数的具体实现

  1. void Session::WriteToSocketErr(const std::string& buf) {
  2. _send_node = make_shared<MsgNode>(buf.c_str(), buf.length());
  3. //异步发送数据,因为异步所以不会一下发送完
  4. this->_socket->async_write_some(asio::buffer(_send_node->_msg,
  5. _send_node->_total_len),
  6. std::bind(&Session::WriteCallBackErr,
  7. this, std::placeholders::_1, std::placeholders::_2, _send_node));
  8. }

因为WriteCallBackErr函数为三个参数且为成员函数,而async_write_some需要的回调函数为两个参数,所以我们通过bind将三个参数转换为两个参数的普通函数。
我们看看回调函数的实现

  1. void Session::WriteCallBackErr(const boost::system::error_code& ec,
  2. std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) {
  3. if (bytes_transferred + msg_node->_cur_len
  4. < msg_node->_total_len) {
  5. _send_node->_cur_len += bytes_transferred;
  6. this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len,
  7. _send_node->_total_len-_send_node->_cur_len),
  8. std::bind(&Session::WriteCallBackErr,
  9. this, std::placeholders::_1, std::placeholders::_2, _send_node));
  10. }
  11. }

在WriteCallBackErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。
但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节
https://cdn.llfc.club/1680692232796.jpg
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。
而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型,当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。
比如我们如下代码

  1. //用户发送数据
  2. WriteToSocketErr("Hello World!");
  3. //用户无感知下层调用情况又一次发送了数据
  4. WriteToSocketErr("Hello World!");

那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!
所以对端收到的数据很可能是”HelloHello World! World!”
那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理

  1. class Session{
  2. public:
  3. void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
  4. void WriteToSocket(const std::string &buf);
  5. private:
  6. std::queue<std::shared_ptr<MsgNode>> _send_queue;
  7. std::shared_ptr<asio::ip::tcp::socket> _socket;
  8. bool _send_pending;
  9. };

定义了bool变量_send_pending,该变量为true表示一个节点还未发送完。
_send_queue用来缓存要发送的消息节点,是一个队列。
我们实现异步发送功能

  1. void Session::WriteToSocket(const std::string& buf){
  2. //插入发送队列
  3. _send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
  4. //pending状态说明上一次有未发送完的数据
  5. if (_send_pending) {
  6. return;
  7. }
  8. //异步发送数据,因为异步所以不会一下发送完
  9. this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
  10. _send_pending = true;
  11. }
  12. void Session::WriteCallBack(const boost::system::error_code & ec, std::size_t bytes_transferred){
  13. if (ec.value() != 0) {
  14. std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message();
  15. return;
  16. }
  17. //取出队首元素即当前未发送完数据
  18. auto & send_data = _send_queue.front();
  19. send_data->_cur_len += bytes_transferred;
  20. //数据未发送完, 则继续发送
  21. if (send_data->_cur_len < send_data->_total_len) {
  22. this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len),
  23. std::bind(&Session::WriteCallBack,
  24. this, std::placeholders::_1, std::placeholders::_2));
  25. return;
  26. }
  27. //如果发送完,则pop出队首元素
  28. _send_queue.pop();
  29. //如果队列为空,则说明所有数据都发送完,将pending设置为false
  30. if (_send_queue.empty()) {
  31. _send_pending = false;
  32. }
  33. //如果队列不是空,则继续将队首元素发送
  34. if (!_send_queue.empty()) {
  35. auto& send_data = _send_queue.front();
  36. this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
  37. std::bind(&Session::WriteCallBack,
  38. this, std::placeholders::_1, std::placeholders::_2));
  39. }
  40. }

async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数

  1. //不能与async_write_some混合使用
  2. void Session::WriteAllToSocket(const std::string& buf) {
  3. //插入发送队列
  4. _send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
  5. //pending状态说明上一次有未发送完的数据
  6. if (_send_pending) {
  7. return;
  8. }
  9. //异步发送数据,因为异步所以不会一下发送完
  10. this->_socket->async_send(asio::buffer(buf),
  11. std::bind(&Session::WriteAllCallBack, this,
  12. std::placeholders::_1, std::placeholders::_2));
  13. _send_pending = true;
  14. }
  15. void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
  16. if (ec.value() != 0) {
  17. std::cout << "Error occured! Error code = "
  18. << ec.value()
  19. << ". Message: " << ec.message();
  20. return;
  21. }
  22. //如果发送完,则pop出队首元素
  23. _send_queue.pop();
  24. //如果队列为空,则说明所有数据都发送完,将pending设置为false
  25. if (_send_queue.empty()) {
  26. _send_pending = false;
  27. }
  28. //如果队列不是空,则继续将队首元素发送
  29. if (!_send_queue.empty()) {
  30. auto& send_data = _send_queue.front();
  31. this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
  32. std::bind(&Session::WriteAllCallBack,
  33. this, std::placeholders::_1, std::placeholders::_2));
  34. }
  35. }

异步读操作

接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_some和async_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。
先基于async_read_some封装一个读取的函数ReadFromSocket,同样在Session类的声明中添加一些变量

  1. class Session {
  2. public:
  3. void ReadFromSocket();
  4. void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
  5. private:
  6. std::shared_ptr<asio::ip::tcp::socket> _socket;
  7. std::shared_ptr<MsgNode> _recv_node;
  8. bool _recv_pending;
  9. };

_recv_node用来缓存接收的数据,_recv_pending为true表示节点正在接收数据,还未接受完。

  1. //不考虑粘包情况, 先用固定的字节接收
  2. void Session::ReadFromSocket() {
  3. if (_recv_pending) {
  4. return;
  5. }
  6. //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
  7. /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
  8. _recv_node = _recv_nodez;*/
  9. _recv_node = std::make_shared<MsgNode>(RECVSIZE);
  10. _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this,
  11. std::placeholders::_1, std::placeholders::_2));
  12. _recv_pending = true;
  13. }
  14. void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
  15. _recv_node->_cur_len += bytes_transferred;
  16. //没读完继续读
  17. if (_recv_node->_cur_len < _recv_node->_total_len) {
  18. _socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len,
  19. _recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this,
  20. std::placeholders::_1, std::placeholders::_2));
  21. return;
  22. }
  23. //将数据投递到队列里交给逻辑线程处理,此处略去
  24. //如果读完了则将标记置为false
  25. _recv_pending = false;
  26. //指针置空
  27. _recv_node = nullptr;
  28. }

我们基于async_receive再封装一个接收数据的函数

  1. void Session::ReadAllFromSocket(const std::string& buf) {
  2. if (_recv_pending) {
  3. return;
  4. }
  5. //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
  6. /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
  7. _recv_node = _recv_nodez;*/
  8. _recv_node = std::make_shared<MsgNode>(RECVSIZE);
  9. _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this,
  10. std::placeholders::_1, std::placeholders::_2));
  11. _recv_pending = true;
  12. }
  13. void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
  14. _recv_node->_cur_len += bytes_transferred;
  15. //将数据投递到队列里交给逻辑线程处理,此处略去
  16. //如果读完了则将标记置为false
  17. _recv_pending = false;
  18. //指针置空
  19. _recv_node = nullptr;
  20. }

同样async_read_some和async_receive不能混合使用,否则会出现逻辑问题。

源码连接

本文介绍了boost asio异步读写的操作,仅仅是代码片段和api的封装便于大家理解,下一篇利用这些异步api写一个异步的服务器展示收发效果。
源码连接
https://gitee.com/secondtonone1/boostasio-learn

热门评论
  • Zeheng Feng
    2024-04-11 12:07:39
    //如果队列为空,则说明所有数据都发送完,将pending设置为false
        if (_send_queue.empty()) {
            _send_pending = false;
        }
        //如果队列不是空,则继续将队首元素发送
        if (!_send_queue.empty()) {
            auto& send_data = _send_queue.front();
            this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
                std::bind(&Session::WriteAllCallBack,
                    this, std::placeholders::_1, std::placeholders::_2));
        }

    想问下Session::WriteAllCallBack里两个if的逻辑是否会引起多线程同时写一个socket的情况,比如1号线程执行到第二个if前被时间片切出,此时另一线程调用Session::WriteAllToSocket发送数据,1号线程此时唤醒执行后续逻辑。这样是否会存在多线程写socket的情况。🤔

热门文章

  1. Linux环境搭建和编码

    喜欢(594) 浏览(6008)
  2. windows环境搭建和vscode配置

    喜欢(587) 浏览(1759)
  3. 解密定时器的实现细节

    喜欢(566) 浏览(2023)
  4. C++ 类的继承封装和多态

    喜欢(588) 浏览(2795)
  5. slice介绍和使用

    喜欢(521) 浏览(1838)

最新评论

  1. C++ 类的拷贝构造、赋值运算、单例模式 secondtonone1:好的,已修复。
  2. 双链表实现LRU算法 secondtonone1:双链表插入和删除节点是本篇的难点,多多练习即可。
  3. 线程安全的无锁栈 secondtonone1:谢谢支持,如果pop的次数大于push的次数是会让线程处于重试的,这个是测试用例,必须满足push和pop的次数相同,实际情况不会这么使用。栈的设计没有问题。
  4. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  5. Linux环境搭建和编码 恋恋风辰:Linux环境下go的安装比较简单,可以不用设置GOPATH环境变量,后期我们学习go mod 之后就拜托了go文件目录的限制了。

个人公众号

个人微信