简介
前文已经介绍了异步操作的api,今天写一个简单的异步echo服务器,以应答为主
Session类
Session类主要是处理客户端消息收发的会话类,为了简单起见,我们不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。
<!--more-->
class Session{public:Session(boost::asio::io_context& ioc):_socket(ioc){}tcp::socket& Socket() {return _socket;}void Start();private:void handle_read(const boost::system::error_code & error, size_t bytes_transfered);void handle_write(const boost::system::error_code& error);tcp::socket _socket;enum {max_length = 1024};char _data[max_length];};
1 _data用来接收客户端传递的数据
2 _socket为单独处理客户端读写的socket。
3 handle_read和handle_write分别为读回调函数和写回调函数。
接下来我们实现Session类
void Session::Start(){memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length),std::bind(&Session::handle_read, this, placeholders::_1,placeholders::_2));}
在Start方法中我们调用异步读操作,监听对端发送的消息。当对端发送数据后,触发handle_read函数
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {if (!error) {cout << "server receive data is " << _data << endl;boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered),std::bind(&Session::handle_write, this, placeholders::_1));}else {delete this;}}
handle_read函数内将收到的数据发送给对端,当发送完成后触发handle_write回调函数。
void Session::handle_write(const boost::system::error_code& error) {if (!error) {memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,this, placeholders::_1, placeholders::_2));}else {delete this;}}
handle_write函数内又一次监听了读事件,如果对端有数据发送过来则触发handle_read,我们再将收到的数据发回去。从而达到应答式服务的效果。
Server类
Server类为服务器接收连接的管理类
class Server {public:Server(boost::asio::io_context& ioc, short port);private:void start_accept();void handle_accept(Session* new_session, const boost::system::error_code& error);boost::asio::io_context& _ioc;tcp::acceptor _acceptor;};
start_accept将要接收连接的acceptor绑定到服务上,其内部就是将accpeptor对应的socket描述符绑定到epoll或iocp模型上,实现事件驱动。
handle_accept为新连接到来后触发的回调函数。
下面是具体实现
Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc),_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {start_accept();}void Server::start_accept() {Session* new_session = new Session(_ioc);_acceptor.async_accept(new_session->Socket(),std::bind(&Server::handle_accept, this, new_session, placeholders::_1));}void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {if (!error) {new_session->Start();}else {delete new_session;}start_accept();}
客户端
客户端的设计用之前的同步模式即可,客户端不需要异步的方式,因为客户端并不是以并发为主,当然写成异步收发更好一些。
#include <iostream>#include <boost/asio.hpp>using namespace std;using namespace boost::asio::ip;const int MAX_LENGTH = 1024;int main(){try {//创建上下文服务boost::asio::io_context ioc;//构造endpointtcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);tcp::socket sock(ioc);boost::system::error_code error = boost::asio::error::host_not_found; ;sock.connect(remote_ep, error);if (error) {cout << "connect failed, code is " << error.value() << " error msg is " << error.message();return 0;}std::cout << "Enter message: ";char request[MAX_LENGTH];std::cin.getline(request, MAX_LENGTH);size_t request_length = strlen(request);boost::asio::write(sock, boost::asio::buffer(request, request_length));char reply[MAX_LENGTH];size_t reply_length = boost::asio::read(sock,boost::asio::buffer(reply, request_length));std::cout << "Reply is: ";std::cout.write(reply, reply_length);std::cout << "\n";}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}return 0;}
运行服务器之后再运行客户端,输入字符串后,就可以收到服务器应答的字符串了。
隐患
该demo示例为仿照asio官网编写的,其中存在隐患,就是当服务器即将发送数据前(调用async_write前),此刻客户端中断,服务器此时调用async_write会触发发送回调函数,判断ec为非0进而执行delete this逻辑回收session。但要注意的是客户端关闭后,在tcp层面会触发读就绪事件,服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0,进而再次执行delete操作,从而造成二次析构,这是极度危险的。
总结
本文介绍了异步的应答服务器设计,但是这种服务器并不会在实际生产中使用,主要有两个原因:
1 因为该服务器的发送和接收以应答的方式交互,而并不能做到应用层想随意发送的目的,也就是未做到完全的收发分离(全双工逻辑)。
2 该服务器未处理粘包,序列化,以及逻辑和收发线程解耦等问题。
3 该服务器存在二次析构的风险。
这些问题我们会在接下来的文章中不断完善
源码链接
https://gitee.com/secondtonone1/boostasio-learn
对,因为这里服务器收发数据是个同步的过程;但是当收发数据是个异步过程时就可能会出错。
async_write出错后,由于走到了delete this分支,不会再次调用
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,
this, placeholders::_1, placeholders::_2));注册读事件回调了吧?为什么还会进入读事件回调函数中判断错误码ec为非0呢?