简介
本文概述基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。
服务器架构设计
之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图

我们接下来要设计的服务器结构是这样的

消息头完善
我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。
之前我们设计的消息结构是这样的

现在将其完善为如下的样子

为了减少耦合和歧义,我们重新设计消息节点。MsgNode表示消息节点的基类,头部的消息用这个结构存储。RecvNode表示接收消息的节点。SendNode表示发送消息的节点。
我们将上述结构定义在MsgNode.h中
class MsgNode{public:MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {_data = new char[_total_len + 1]();_data[_total_len] = '\0';}~MsgNode() {std::cout << "destruct MsgNode" << endl;delete[] _data;}void Clear() {::memset(_data, 0, _total_len);_cur_len = 0;}short _cur_len;short _total_len;char* _data;};class RecvNode :public MsgNode {public:RecvNode(short max_len, short msg_id);private:short _msg_id;};class SendNode:public MsgNode {public:SendNode(const char* msg,short max_len, short msg_id);private:short _msg_id;};
实现如下
#include "MsgNode.h"RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),_msg_id(msg_id){}SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN), _msg_id(msg_id){//先发送id, 转为网络字节序short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);memcpy(_data, &msg_id_host, HEAD_ID_LEN);//转为网络字节序short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);}
SendNode发送节点构造时,先将id转为网络字节序,然后写入_data数据域。
然后将要发送数据的长度转为大端字节序,写入_data数据域,注意要偏移HEAD_ID_LEN长度。
最后将要发送的数据msg写入_data数据域,注意要偏移HEAD_ID_LEN+HEAD_DATA_LEN
Session类改写
因为消息结构改变了,所以我们接收和发送数据的逻辑要做对应的修改,我们先修改Session类中收发消息结构如下
std::queue<shared_ptr<MsgNode> > _send_que;std::mutex _send_lock;//收到的消息结构std::shared_ptr<MsgNode> _recv_msg_node;bool _b_head_parse;//收到的头部结构std::shared_ptr<MsgNode> _recv_head_node;
因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN(4字节)大小。
CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);}
发送时我们构造发送节点,放到队列中即可
void CSession::Send(char* msg, short max_length, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg, max_length, msgid));if (send_que_size>0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));}
当然我们也实现了一个重载版本
void CSession::Send(std::string msg, short msgid) {std::lock_guard<std::mutex> lock(_send_lock);int send_que_size = _send_que.size();if (send_que_size > MAX_SENDQUE) {std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;return;}_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));if (send_que_size > 0) {return;}auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));}
在接收数据时我们解析头部也要解析id字段
void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){try {if (!error) {//已经移动的字符数int copy_len = 0;while (bytes_transferred > 0) {if (!_b_head_parse) {//收到的数据不足头部大小if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);_recv_head_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}//收到的数据比头部多//头部剩余未复制的长度int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);//更新已处理的data长度和剩余未处理的长度copy_len += head_remain;bytes_transferred -= head_remain;//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里if (bytes_transferred < msg_len) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));//头部处理完成_b_head_parse = true;return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);_recv_msg_node->_cur_len += msg_len;copy_len += msg_len;bytes_transferred -= msg_len;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处可以调用Send发送测试Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << endl;root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();Send(return_str, root["id"].asInt());//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}//已经处理完头部,处理上次未接受完的消息数据//接收的数据仍不足剩余未处理的int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;if (bytes_transferred < remain_msg) {memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);_recv_msg_node->_cur_len += bytes_transferred;::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);_recv_msg_node->_cur_len += remain_msg;bytes_transferred -= remain_msg;copy_len += remain_msg;_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';//cout << "receive data is " << _recv_msg_node->_data << endl;//此处可以调用Send发送测试Json::Reader reader;Json::Value root;reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "<< root["data"].asString() << endl;root["data"] = "server has received msg, msg data is " + root["data"].asString();std::string return_str = root.toStyledString();Send(return_str, root["id"].asInt());//继续轮询剩余未处理数据_b_head_parse = false;_recv_head_node->Clear();if (bytes_transferred <= 0) {::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));return;}continue;}}else {std::cout << "handle read failed, error is " << error.what() << endl;Close();_server->ClearSession(_uuid);}}catch (std::exception& e) {std::cout << "Exception code is " << e.what() << endl;}}
先解析头部id,再解析长度,然后根据id和长度构造消息节点,copy剩下的消息体, 把上面代码中处理消息头的逻辑截取如下
//获取头部MSGID数据short msg_id = 0;memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);//网络字节序转化为本地字节序msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);std::cout << "msg_id is " << msg_id << endl;//id非法if (msg_id > MAX_LENGTH) {std::cout << "invalid msg_id is " << msg_id << endl;_server->ClearSession(_uuid);return;}short msg_len = 0;memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);//网络字节序转化为本地字节序msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);std::cout << "msg_len is " << msg_len << endl;//id非法if (msg_len > MAX_LENGTH) {std::cout << "invalid data length is " << msg_len << endl;_server->ClearSession(_uuid);return;}_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
其余的没什么变动。
总结
本文介绍了服务器逻辑和网络层的设计,并且基于这个架构,完善了消息发送结构,下一篇带着大家设计逻辑类和逻辑队列。