聊天项目(16) asio实现tcp服务器

ChatServer

一个TCP服务器必然会有连接的接收,维持,收发数据等逻辑。那我们就要基于asio完成这个服务的搭建。主服务是这个样子的

  1. #include "LogicSystem.h"
  2. #include <csignal>
  3. #include <thread>
  4. #include <mutex>
  5. #include "AsioIOServicePool.h"
  6. #include "CServer.h"
  7. #include "ConfigMgr.h"
  8. using namespace std;
  9. bool bstop = false;
  10. std::condition_variable cond_quit;
  11. std::mutex mutex_quit;
  12. int main()
  13. {
  14. try {
  15. auto &cfg = ConfigMgr::Inst();
  16. auto pool = AsioIOServicePool::GetInstance();
  17. boost::asio::io_context io_context;
  18. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  19. signals.async_wait([&io_context, pool](auto, auto) {
  20. io_context.stop();
  21. pool->Stop();
  22. });
  23. auto port_str = cfg["SelfServer"]["Port"];
  24. CServer s(io_context, atoi(port_str.c_str()));
  25. io_context.run();
  26. }
  27. catch (std::exception& e) {
  28. std::cerr << "Exception: " << e.what() << endl;
  29. }
  30. }

CServer类的声明

  1. #include <boost/asio.hpp>
  2. #include "CSession.h"
  3. #include <memory.h>
  4. #include <map>
  5. #include <mutex>
  6. using namespace std;
  7. using boost::asio::ip::tcp;
  8. class CServer
  9. {
  10. public:
  11. CServer(boost::asio::io_context& io_context, short port);
  12. ~CServer();
  13. void ClearSession(std::string);
  14. private:
  15. void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
  16. void StartAccept();
  17. boost::asio::io_context &_io_context;
  18. short _port;
  19. tcp::acceptor _acceptor;
  20. std::map<std::string, shared_ptr<CSession>> _sessions;
  21. std::mutex _mutex;
  22. };

构造函数中监听对方连接

  1. CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
  2. _acceptor(io_context, tcp::endpoint(tcp::v4(),port))
  3. {
  4. cout << "Server start success, listen on port : " << _port << endl;
  5. StartAccept();
  6. }

接受连接的函数

  1. void CServer::StartAccept() {
  2. auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();
  3. shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
  4. _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
  5. }

AsioIOServicePool

从AsioIOServicePool中返回一个可用的iocontext构造Session,然后将接受的新链接的socket写入这个Session保管。

AsioIOServicePool已经在前面讲解很多次了,它的声明如下

  1. #include <vector>
  2. #include <boost/asio.hpp>
  3. #include "Singleton.h"
  4. class AsioIOServicePool:public Singleton<AsioIOServicePool>
  5. {
  6. friend Singleton<AsioIOServicePool>;
  7. public:
  8. using IOService = boost::asio::io_context;
  9. using Work = boost::asio::io_context::work;
  10. using WorkPtr = std::unique_ptr<Work>;
  11. ~AsioIOServicePool();
  12. AsioIOServicePool(const AsioIOServicePool&) = delete;
  13. AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
  14. // 使用 round-robin 的方式返回一个 io_service
  15. boost::asio::io_context& GetIOService();
  16. void Stop();
  17. private:
  18. AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
  19. std::vector<IOService> _ioServices;
  20. std::vector<WorkPtr> _works;
  21. std::vector<std::thread> _threads;
  22. std::size_t _nextIOService;
  23. };

AsioIOServicePool具体实现

  1. #include "AsioIOServicePool.h"
  2. #include <iostream>
  3. using namespace std;
  4. AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
  5. _works(size), _nextIOService(0){
  6. for (std::size_t i = 0; i < size; ++i) {
  7. _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
  8. }
  9. //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
  10. for (std::size_t i = 0; i < _ioServices.size(); ++i) {
  11. _threads.emplace_back([this, i]() {
  12. _ioServices[i].run();
  13. });
  14. }
  15. }
  16. AsioIOServicePool::~AsioIOServicePool() {
  17. std::cout << "AsioIOServicePool destruct" << endl;
  18. }
  19. boost::asio::io_context& AsioIOServicePool::GetIOService() {
  20. auto& service = _ioServices[_nextIOService++];
  21. if (_nextIOService == _ioServices.size()) {
  22. _nextIOService = 0;
  23. }
  24. return service;
  25. }
  26. void AsioIOServicePool::Stop(){
  27. //因为仅仅执行work.reset并不能让iocontext从run的状态中退出
  28. //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。
  29. for (auto& work : _works) {
  30. //把服务先停止
  31. work->get_io_context().stop();
  32. work.reset();
  33. }
  34. for (auto& t : _threads) {
  35. t.join();
  36. }
  37. }

CServer的处理连接逻辑

  1. void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
  2. if (!error) {
  3. new_session->Start();
  4. lock_guard<mutex> lock(_mutex);
  5. _sessions.insert(make_pair(new_session->GetUuid(), new_session));
  6. }
  7. else {
  8. cout << "session accept failed, error is " << error.what() << endl;
  9. }
  10. StartAccept();
  11. }

Session层

上面的逻辑接受新链接后执行Start函数,新链接接受数据,然后Server继续监听新的连接

  1. void CSession::Start(){
  2. AsyncReadHead(HEAD_TOTAL_LEN);
  3. }

先读取头部数据

  1. void CSession::AsyncReadHead(int total_len)
  2. {
  3. auto self = shared_from_this();
  4. asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) {
  5. try {
  6. if (ec) {
  7. std::cout << "handle read failed, error is " << ec.what() << endl;
  8. Close();
  9. _server->ClearSession(_uuid);
  10. return;
  11. }
  12. if (bytes_transfered < HEAD_TOTAL_LEN) {
  13. std::cout << "read length not match, read [" << bytes_transfered << "] , total ["
  14. << HEAD_TOTAL_LEN << "]" << endl;
  15. Close();
  16. _server->ClearSession(_uuid);
  17. return;
  18. }
  19. _recv_head_node->Clear();
  20. memcpy(_recv_head_node->_data, _data, bytes_transfered);
  21. //获取头部MSGID数据
  22. short msg_id = 0;
  23. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  24. //网络字节序转化为本地字节序
  25. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  26. std::cout << "msg_id is " << msg_id << endl;
  27. //id非法
  28. if (msg_id > MAX_LENGTH) {
  29. std::cout << "invalid msg_id is " << msg_id << endl;
  30. _server->ClearSession(_uuid);
  31. return;
  32. }
  33. short msg_len = 0;
  34. memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
  35. //网络字节序转化为本地字节序
  36. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  37. std::cout << "msg_len is " << msg_len << endl;
  38. //id非法
  39. if (msg_len > MAX_LENGTH) {
  40. std::cout << "invalid data length is " << msg_len << endl;
  41. _server->ClearSession(_uuid);
  42. return;
  43. }
  44. _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
  45. AsyncReadBody(msg_len);
  46. }
  47. catch (std::exception& e) {
  48. std::cout << "Exception code is " << e.what() << endl;
  49. }
  50. });
  51. }

上面的逻辑里调用asyncReadFull读取整个长度,然后解析收到的数据,前两个字节为id,之后两个字节为长度,最后n个长度字节为消息内容。

  1. //读取完整长度
  2. void CSession::asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code&, std::size_t)> handler )
  3. {
  4. ::memset(_data, 0, MAX_LENGTH);
  5. asyncReadLen(0, maxLength, handler);
  6. }

读取指定长度

  1. //读取指定字节数
  2. void CSession::asyncReadLen(std::size_t read_len, std::size_t total_len,
  3. std::function<void(const boost::system::error_code&, std::size_t)> handler)
  4. {
  5. auto self = shared_from_this();
  6. _socket.async_read_some(boost::asio::buffer(_data + read_len, total_len-read_len),
  7. [read_len, total_len, handler, self](const boost::system::error_code& ec, std::size_t bytesTransfered) {
  8. if (ec) {
  9. // 出现错误,调用回调函数
  10. handler(ec, read_len + bytesTransfered);
  11. return;
  12. }
  13. if (read_len + bytesTransfered >= total_len) {
  14. //长度够了就调用回调函数
  15. handler(ec, read_len + bytesTransfered);
  16. return;
  17. }
  18. // 没有错误,且长度不足则继续读取
  19. self->asyncReadLen(read_len + bytesTransfered, total_len, handler);
  20. });
  21. }

读取头部成功后,其回调函数内部调用了读包体的逻辑

  1. void CSession::AsyncReadBody(int total_len)
  2. {
  3. auto self = shared_from_this();
  4. asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {
  5. try {
  6. if (ec) {
  7. std::cout << "handle read failed, error is " << ec.what() << endl;
  8. Close();
  9. _server->ClearSession(_uuid);
  10. return;
  11. }
  12. if (bytes_transfered < total_len) {
  13. std::cout << "read length not match, read [" << bytes_transfered << "] , total ["
  14. << total_len<<"]" << endl;
  15. Close();
  16. _server->ClearSession(_uuid);
  17. return;
  18. }
  19. memcpy(_recv_msg_node->_data , _data , bytes_transfered);
  20. _recv_msg_node->_cur_len += bytes_transfered;
  21. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  22. cout << "receive data is " << _recv_msg_node->_data << endl;
  23. //此处将消息投递到逻辑队列中
  24. LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
  25. //继续监听头部接受事件
  26. AsyncReadHead(HEAD_TOTAL_LEN);
  27. }
  28. catch (std::exception& e) {
  29. std::cout << "Exception code is " << e.what() << endl;
  30. }
  31. });
  32. }

读取包体完成后,在回调中继续读包头。以此循环往复直到读完所有数据。如果对方不发送数据,则回调函数就不会触发。不影响程序执行其他工作,因为我们采用的是asio异步的读写操作。

当然我们解析完包体后会调用LogicSystem单例将解析好的消息封装为逻辑节点传递给逻辑层进行处理。

LogicSystem

我们在逻辑层处理

  1. void LogicSystem::RegisterCallBacks() {
  2. _fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this,
  3. placeholders::_1, placeholders::_2, placeholders::_3);
  4. }
  5. void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
  6. Json::Reader reader;
  7. Json::Value root;
  8. reader.parse(msg_data, root);
  9. std::cout << "user login uid is " << root["uid"].asInt() << " user token is "
  10. << root["token"].asString() << endl;
  11. std::string return_str = root.toStyledString();
  12. session->Send(return_str, msg_id);
  13. }

并在构造函数中注册这些处理流程

  1. LogicSystem::LogicSystem():_b_stop(false){
  2. RegisterCallBacks();
  3. _worker_thread = std::thread (&LogicSystem::DealMsg, this);
  4. }

总结

到此,完成了ChatServer收到QT客户端发送过来的长链接请求,并解析读取的数据,将收到的数据通过tcp发送给对端。接下来还要做ChatServer到GateServer的token验证,判断是否合理,这个教给之后的文章处理。

热门评论

热门文章

  1. Linux环境搭建和编码

    喜欢(594) 浏览(6419)
  2. C++ 类的继承封装和多态

    喜欢(588) 浏览(2960)
  3. 解密定时器的实现细节

    喜欢(566) 浏览(2125)
  4. windows环境搭建和vscode配置

    喜欢(587) 浏览(1801)
  5. slice介绍和使用

    喜欢(521) 浏览(1880)

最新评论

  1. 泛型算法的定制操作 secondtonone1:lambda和bind是C11新增的利器,善于利用这两个机制可以极大地提升编程安全性和效率。
  2. C++ 虚函数表原理和类成员内存分布 WangQi888888:class Test{ int m; int b; }中b成员是int,为什么在内存中只占了1个字节。不应该是4个字节吗?是不是int应该改为char。这样的话就会符合图上说明的情况
  3. 类和对象 陈宇航:支持!!!!
  4. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  5. slice介绍和使用 恋恋风辰:切片作为引用类型极大的提高了数据传递的效率和性能,但也要注意切片的浅拷贝隐患,算是一把双刃剑,这世间的常态就是在两极之间寻求一种稳定。

个人公众号

个人微信