服务器逻辑层设计和消息完善

简介

本文概述基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。

服务器架构设计

之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图

https://cdn.llfc.club/1685620269385.jpg

我们接下来要设计的服务器结构是这样的

https://cdn.llfc.club/1685621283099.jpg

消息头完善

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。
之前我们设计的消息结构是这样的

https://cdn.llfc.club/1683368829739.jpg

现在将其完善为如下的样子

https://cdn.llfc.club/1683367901552.jpg

为了减少耦合和歧义,我们重新设计消息节点。
MsgNode表示消息节点的基类,头部的消息用这个结构存储。
RecvNode表示接收消息的节点。
SendNode表示发送消息的节点。
我们将上述结构定义在MsgNode.h中

  1. class MsgNode
  2. {
  3. public:
  4. MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
  5. _data = new char[_total_len + 1]();
  6. _data[_total_len] = '\0';
  7. }
  8. ~MsgNode() {
  9. std::cout << "destruct MsgNode" << endl;
  10. delete[] _data;
  11. }
  12. void Clear() {
  13. ::memset(_data, 0, _total_len);
  14. _cur_len = 0;
  15. }
  16. short _cur_len;
  17. short _total_len;
  18. char* _data;
  19. };
  20. class RecvNode :public MsgNode {
  21. public:
  22. RecvNode(short max_len, short msg_id);
  23. private:
  24. short _msg_id;
  25. };
  26. class SendNode:public MsgNode {
  27. public:
  28. SendNode(const char* msg,short max_len, short msg_id);
  29. private:
  30. short _msg_id;
  31. };

实现如下

  1. #include "MsgNode.h"
  2. RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
  3. _msg_id(msg_id){
  4. }
  5. SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
  6. , _msg_id(msg_id){
  7. //先发送id, 转为网络字节序
  8. short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
  9. memcpy(_data, &msg_id_host, HEAD_ID_LEN);
  10. //转为网络字节序
  11. short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
  12. memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
  13. memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
  14. }

SendNode发送节点构造时,先将id转为网络字节序,然后写入_data数据域。
然后将要发送数据的长度转为大端字节序,写入_data数据域,注意要偏移HEAD_ID_LEN长度。
最后将要发送的数据msg写入_data数据域,注意要偏移HEAD_ID_LEN+HEAD_DATA_LEN

Session类改写

因为消息结构改变了,所以我们接收和发送数据的逻辑要做对应的修改,我们先修改Session类中收发消息结构如下

  1. std::queue<shared_ptr<MsgNode> > _send_que;
  2. std::mutex _send_lock;
  3. //收到的消息结构
  4. std::shared_ptr<MsgNode> _recv_msg_node;
  5. bool _b_head_parse;
  6. //收到的头部结构
  7. std::shared_ptr<MsgNode> _recv_head_node;

因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN(4字节)大小。

  1. CSession::CSession(boost::asio::io_context& io_context, CServer* server):
  2. _socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
  3. boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
  4. _uuid = boost::uuids::to_string(a_uuid);
  5. _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
  6. }

发送时我们构造发送节点,放到队列中即可

  1. void CSession::Send(char* msg, short max_length, short msgid) {
  2. std::lock_guard<std::mutex> lock(_send_lock);
  3. int send_que_size = _send_que.size();
  4. if (send_que_size > MAX_SENDQUE) {
  5. std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
  6. return;
  7. }
  8. _send_que.push(make_shared<SendNode>(msg, max_length, msgid));
  9. if (send_que_size>0) {
  10. return;
  11. }
  12. auto& msgnode = _send_que.front();
  13. boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
  14. std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
  15. }

当然我们也实现了一个重载版本

  1. void CSession::Send(std::string msg, short msgid) {
  2. std::lock_guard<std::mutex> lock(_send_lock);
  3. int send_que_size = _send_que.size();
  4. if (send_que_size > MAX_SENDQUE) {
  5. std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
  6. return;
  7. }
  8. _send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
  9. if (send_que_size > 0) {
  10. return;
  11. }
  12. auto& msgnode = _send_que.front();
  13. boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
  14. std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
  15. }

在接收数据时我们解析头部也要解析id字段

  1. void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){
  2. try {
  3. if (!error) {
  4. //已经移动的字符数
  5. int copy_len = 0;
  6. while (bytes_transferred > 0) {
  7. if (!_b_head_parse) {
  8. //收到的数据不足头部大小
  9. if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
  10. memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
  11. _recv_head_node->_cur_len += bytes_transferred;
  12. ::memset(_data, 0, MAX_LENGTH);
  13. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  14. std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
  15. return;
  16. }
  17. //收到的数据比头部多
  18. //头部剩余未复制的长度
  19. int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
  20. memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
  21. //更新已处理的data长度和剩余未处理的长度
  22. copy_len += head_remain;
  23. bytes_transferred -= head_remain;
  24. //获取头部MSGID数据
  25. short msg_id = 0;
  26. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  27. //网络字节序转化为本地字节序
  28. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  29. std::cout << "msg_id is " << msg_id << endl;
  30. //id非法
  31. if (msg_id > MAX_LENGTH) {
  32. std::cout << "invalid msg_id is " << msg_id << endl;
  33. _server->ClearSession(_uuid);
  34. return;
  35. }
  36. short msg_len = 0;
  37. memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
  38. //网络字节序转化为本地字节序
  39. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  40. std::cout << "msg_len is " << msg_len << endl;
  41. //id非法
  42. if (msg_len > MAX_LENGTH) {
  43. std::cout << "invalid data length is " << msg_len << endl;
  44. _server->ClearSession(_uuid);
  45. return;
  46. }
  47. _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
  48. //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
  49. if (bytes_transferred < msg_len) {
  50. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  51. _recv_msg_node->_cur_len += bytes_transferred;
  52. ::memset(_data, 0, MAX_LENGTH);
  53. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  54. std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
  55. //头部处理完成
  56. _b_head_parse = true;
  57. return;
  58. }
  59. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
  60. _recv_msg_node->_cur_len += msg_len;
  61. copy_len += msg_len;
  62. bytes_transferred -= msg_len;
  63. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  64. //cout << "receive data is " << _recv_msg_node->_data << endl;
  65. //此处可以调用Send发送测试
  66. Json::Reader reader;
  67. Json::Value root;
  68. reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
  69. std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
  70. << root["data"].asString() << endl;
  71. root["data"] = "server has received msg, msg data is " + root["data"].asString();
  72. std::string return_str = root.toStyledString();
  73. Send(return_str, root["id"].asInt());
  74. //继续轮询剩余未处理数据
  75. _b_head_parse = false;
  76. _recv_head_node->Clear();
  77. if (bytes_transferred <= 0) {
  78. ::memset(_data, 0, MAX_LENGTH);
  79. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  80. std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
  81. return;
  82. }
  83. continue;
  84. }
  85. //已经处理完头部,处理上次未接受完的消息数据
  86. //接收的数据仍不足剩余未处理的
  87. int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
  88. if (bytes_transferred < remain_msg) {
  89. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  90. _recv_msg_node->_cur_len += bytes_transferred;
  91. ::memset(_data, 0, MAX_LENGTH);
  92. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  93. std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
  94. return;
  95. }
  96. memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
  97. _recv_msg_node->_cur_len += remain_msg;
  98. bytes_transferred -= remain_msg;
  99. copy_len += remain_msg;
  100. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  101. //cout << "receive data is " << _recv_msg_node->_data << endl;
  102. //此处可以调用Send发送测试
  103. Json::Reader reader;
  104. Json::Value root;
  105. reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
  106. std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
  107. << root["data"].asString() << endl;
  108. root["data"] = "server has received msg, msg data is " + root["data"].asString();
  109. std::string return_str = root.toStyledString();
  110. Send(return_str, root["id"].asInt());
  111. //继续轮询剩余未处理数据
  112. _b_head_parse = false;
  113. _recv_head_node->Clear();
  114. if (bytes_transferred <= 0) {
  115. ::memset(_data, 0, MAX_LENGTH);
  116. _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
  117. std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
  118. return;
  119. }
  120. continue;
  121. }
  122. }
  123. else {
  124. std::cout << "handle read failed, error is " << error.what() << endl;
  125. Close();
  126. _server->ClearSession(_uuid);
  127. }
  128. }
  129. catch (std::exception& e) {
  130. std::cout << "Exception code is " << e.what() << endl;
  131. }
  132. }

先解析头部id,再解析长度,然后根据id和长度构造消息节点,copy剩下的消息体, 把上面代码中处理消息头的逻辑截取如下

  1. //获取头部MSGID数据
  2. short msg_id = 0;
  3. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  4. //网络字节序转化为本地字节序
  5. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  6. std::cout << "msg_id is " << msg_id << endl;
  7. //id非法
  8. if (msg_id > MAX_LENGTH) {
  9. std::cout << "invalid msg_id is " << msg_id << endl;
  10. _server->ClearSession(_uuid);
  11. return;
  12. }
  13. short msg_len = 0;
  14. memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
  15. //网络字节序转化为本地字节序
  16. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  17. std::cout << "msg_len is " << msg_len << endl;
  18. //id非法
  19. if (msg_len > MAX_LENGTH) {
  20. std::cout << "invalid data length is " << msg_len << endl;
  21. _server->ClearSession(_uuid);
  22. return;
  23. }
  24. _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

其余的没什么变动。

总结

本文介绍了服务器逻辑和网络层的设计,并且基于这个架构,完善了消息发送结构,下一篇带着大家设计逻辑类和逻辑队列。

源码链接https://gitee.com/secondtonone1/boostasio-learn

热门评论

热门文章

  1. Linux环境搭建和编码

    喜欢(594) 浏览(5454)
  2. windows环境搭建和vscode配置

    喜欢(587) 浏览(1708)
  3. 解密定时器的实现细节

    喜欢(566) 浏览(1829)
  4. C++ 类的继承封装和多态

    喜欢(588) 浏览(2544)
  5. slice介绍和使用

    喜欢(521) 浏览(1704)

最新评论

  1. 泛型算法的定制操作 secondtonone1:lambda和bind是C11新增的利器,善于利用这两个机制可以极大地提升编程安全性和效率。
  2. 基于锁实现线程安全队列和栈容器 secondtonone1:我是博主,你认真学习的样子的很可爱,哈哈,我画的是链表由空变成1个的情况。其余情况和你思考的类似,只不过我用了一个无效节点表示tail的指向,最初head和tail指向的都是这个节点。
  3. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  4. 利用内存模型优化无锁栈 卡西莫多的礼物:感谢博主指点,好人一生平安o(* ̄▽ ̄*)ブ
  5. 类和对象 陈宇航:支持!!!!

个人公众号

个人微信