封装服务器发送队列

简介

前文介绍了通过智能指针实现伪闭包的方式延长了session的生命周期,而实际使用的服务器并不是应答式,而是全双工通信方式,服务器一直监听写事件,接收对端数据,可随时发送数据给对端,今天介绍如何封装异步的发送接口,因为多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的,这一点我们已经在前文异步发送函数介绍的时候提到了。

Server和Session分离

将Server修改为CServer并分离到CServer.h中,然后将Session修改为CSession分离到CSession.h中。
CSession.h中类的声明如下,和之前的Session内容一样,就是修改了类名,放在CSession.h中

  1. #include <iostream>
  2. #include <boost/asio.hpp>
  3. #include <map>
  4. #include <boost/uuid/uuid_generators.hpp>
  5. #include <boost/uuid/uuid_io.hpp>
  6. using boost::asio::ip::tcp;
  7. using namespace std;
  8. class CServer;
  9. class CSession :public std::enable_shared_from_this<CSession>
  10. {
  11. public:
  12. CSession(boost::asio::io_context& ioc, CServer* server) :_socket(ioc), _server(server) {
  13. boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
  14. _uuid = boost::uuids::to_string(a_uuid);
  15. }
  16. tcp::socket& Socket() {
  17. return _socket;
  18. }
  19. ~CSession() {
  20. std::cout << "session destruct delete this " << this << endl;
  21. }
  22. void Start();
  23. std::string& GetUuid();
  24. private:
  25. void handle_read(const boost::system::error_code& error,
  26. size_t bytes_transferred, shared_ptr<CSession> _self_shared);
  27. void handle_write(const boost::system::error_code& error, shared_ptr<CSession> _self_shared);
  28. tcp::socket _socket;
  29. enum { max_length = 1024 };
  30. char _data[max_length];
  31. CServer* _server;
  32. std::string _uuid;
  33. };

CServer.h中声明如下,内容前文没变化,就是将Server内容写入CServer.h中

  1. #include <boost/asio.hpp>
  2. #include "CSession.h"
  3. #include <memory.h>
  4. #include <map>
  5. using namespace std;
  6. using boost::asio::ip::tcp;
  7. class CServer
  8. {
  9. public:
  10. CServer(boost::asio::io_context& io_context, short port);
  11. void ClearSession(std::string);
  12. private:
  13. void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
  14. void StartAccept();
  15. boost::asio::io_context &_io_context;
  16. short _port;
  17. tcp::acceptor _acceptor;
  18. std::map<std::string, shared_ptr<CSession>> _sessions;
  19. };

整体目录变为
https://cdn.llfc.club/1682157973797.jpg

数据节点设计

我们设计一个数据节点MsgNode用来存储数据

  1. class MsgNode
  2. {
  3. friend class CSession;
  4. public:
  5. MsgNode(char * msg, int max_len) {
  6. _data = new char[max_len];
  7. memcpy(_data, msg, max_len);
  8. }
  9. ~MsgNode() {
  10. delete[] _data;
  11. }
  12. private:
  13. int _cur_len;
  14. int _max_len;
  15. char* _data;
  16. };

1  _cur_len表示数据当前已处理的长度(已经发送的数据或者已经接收的数据长度),因为一个数据包存在未发送完或者未接收完的情况。
2  _max_len表示数据的总长度。
3  _data表示数据域,已接收或者已发送的数据都放在此空间内。

封装发送接口

首先在CSession类里新增一个队列存储要发送的数据,因为我们不能保证每次调用发送接口的时候上一次数据已经发送完,就要把要发送的数据放入队列中,通过回调函数不断地发送。而且我们不能保证发送的接口和回调函数的接口在一个线程,所以要增加一个锁保证发送队列安全性。
同时我们新增一个发送接口Send

  1. void Send(char* msg, int max_length);
  2. std::queue<shared_ptr<MsgNode> > _send_que;
  3. std::mutex _send_lock;

实现发送接口

  1. void CSession::Send(char* msg, int max_length) {
  2. bool pending = false;
  3. std::lock_guard<std::mutex> lock(_send_lock);
  4. if (_send_que.size() > 0) {
  5. pending = true;
  6. }
  7. _send_que.push(make_shared<MsgNode>(msg, max_length));
  8. if (pending) {
  9. return;
  10. }
  11. boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),
  12. std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_from_this()));
  13. }

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。
回调函数实现

  1. void CSession::HandleWrite(const boost::system::error_code& error, shared_ptr<CSession> _self_shared) {
  2. if (!error) {
  3. std::lock_guard<std::mutex> lock(_send_lock);
  4. _send_que.pop();
  5. if (!_send_que.empty()) {
  6. auto &msgnode = _send_que.front();
  7. boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len),
  8. std::bind(&CSession::HandleWrite, this, std::placeholders::_1, _self_shared));
  9. }
  10. }
  11. else {
  12. std::cout << "handle write failed, error is " << error.what() << endl;
  13. _server->ClearSession(_uuid);
  14. }
  15. }

判断发送队列是否为空,为空则发送完,否则不断取出队列数据调用async_write发送,直到队列为空。

修改读回调

因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件

  1. void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, shared_ptr<CSession> _self_shared){
  2. if (!error) {
  3. cout << "read data is " << _data << endl;
  4. //发送数据
  5. Send(_data, bytes_transferred);
  6. memset(_data, 0, MAX_LENGTH);
  7. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this,
  8. std::placeholders::_1, std::placeholders::_2, _self_shared));
  9. }
  10. else {
  11. std::cout << "handle read failed, error is " << error.what() << endl;
  12. _server->ClearSession(_uuid);
  13. }
  14. }

总结

该服务器虽然实现了全双工通信,但是仍存在缺陷,比如粘包问题未处理,下一版本实现粘包处理。
源码链接https://gitee.com/secondtonone1/boostasio-learn

热门评论
  • kapa
    2024-10-19 12:22:07

    和我当时刚学的时候一样,也有这个疑问

  • 搁浅
    2024-09-07 19:51:59

    不应该先取出来发送之后才 pop 吗 ? 怎么提前 pop ?


    1. if (!error) {
    2. std::lock_guard<std::mutex> lock(_send_lock);
    3. _send_que.pop();
    4.   if (!_send_que.empty()) {
    5.      auto &msgnode = _send_que.front();
    6.      boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len),
    7.      std::bind(&CSession::HandleWrite, this, std::placeholders::_1, _self_shared));
    8.    }
    9. }

热门文章

  1. vscode搭建windows C++开发环境

    喜欢(596) 浏览(83752)
  2. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(6108)
  3. 使用hexo搭建个人博客

    喜欢(533) 浏览(11929)
  4. Linux环境搭建和编码

    喜欢(594) 浏览(13588)
  5. Qt环境搭建

    喜欢(517) 浏览(24804)

最新评论

  1. 创建项目和编译 secondtonone1:谢谢支持
  2. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  3. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  4. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  5. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  6. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  7. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  8. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。
  9. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  10. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  11. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  12. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  13. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  14. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  15. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  16. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  17. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  18. 类和对象 陈宇航:支持!!!!
  19. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  20. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  21. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上
  22. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作
  23. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW

个人公众号

个人微信