聊天项目(16) asio实现tcp服务器

ChatServer

一个TCP服务器必然会有连接的接收,维持,收发数据等逻辑。那我们就要基于asio完成这个服务的搭建。主服务是这个样子的

  1. #include "LogicSystem.h"
  2. #include <csignal>
  3. #include <thread>
  4. #include <mutex>
  5. #include "AsioIOServicePool.h"
  6. #include "CServer.h"
  7. #include "ConfigMgr.h"
  8. using namespace std;
  9. bool bstop = false;
  10. std::condition_variable cond_quit;
  11. std::mutex mutex_quit;
  12. int main()
  13. {
  14. try {
  15. auto &cfg = ConfigMgr::Inst();
  16. auto pool = AsioIOServicePool::GetInstance();
  17. boost::asio::io_context io_context;
  18. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  19. signals.async_wait([&io_context, pool](auto, auto) {
  20. io_context.stop();
  21. pool->Stop();
  22. });
  23. auto port_str = cfg["SelfServer"]["Port"];
  24. CServer s(io_context, atoi(port_str.c_str()));
  25. io_context.run();
  26. }
  27. catch (std::exception& e) {
  28. std::cerr << "Exception: " << e.what() << endl;
  29. }
  30. }

CServer类的声明

  1. #include <boost/asio.hpp>
  2. #include "CSession.h"
  3. #include <memory.h>
  4. #include <map>
  5. #include <mutex>
  6. using namespace std;
  7. using boost::asio::ip::tcp;
  8. class CServer
  9. {
  10. public:
  11. CServer(boost::asio::io_context& io_context, short port);
  12. ~CServer();
  13. void ClearSession(std::string);
  14. private:
  15. void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
  16. void StartAccept();
  17. boost::asio::io_context &_io_context;
  18. short _port;
  19. tcp::acceptor _acceptor;
  20. std::map<std::string, shared_ptr<CSession>> _sessions;
  21. std::mutex _mutex;
  22. };

构造函数中监听对方连接

  1. CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port),
  2. _acceptor(io_context, tcp::endpoint(tcp::v4(),port))
  3. {
  4. cout << "Server start success, listen on port : " << _port << endl;
  5. StartAccept();
  6. }

接受连接的函数

  1. void CServer::StartAccept() {
  2. auto &io_context = AsioIOServicePool::GetInstance()->GetIOService();
  3. shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this);
  4. _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1));
  5. }

AsioIOServicePool

从AsioIOServicePool中返回一个可用的iocontext构造Session,然后将接受的新链接的socket写入这个Session保管。

AsioIOServicePool已经在前面讲解很多次了,它的声明如下

  1. #include <vector>
  2. #include <boost/asio.hpp>
  3. #include "Singleton.h"
  4. class AsioIOServicePool:public Singleton<AsioIOServicePool>
  5. {
  6. friend Singleton<AsioIOServicePool>;
  7. public:
  8. using IOService = boost::asio::io_context;
  9. using Work = boost::asio::io_context::work;
  10. using WorkPtr = std::unique_ptr<Work>;
  11. ~AsioIOServicePool();
  12. AsioIOServicePool(const AsioIOServicePool&) = delete;
  13. AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
  14. // 使用 round-robin 的方式返回一个 io_service
  15. boost::asio::io_context& GetIOService();
  16. void Stop();
  17. private:
  18. AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
  19. std::vector<IOService> _ioServices;
  20. std::vector<WorkPtr> _works;
  21. std::vector<std::thread> _threads;
  22. std::size_t _nextIOService;
  23. };

AsioIOServicePool具体实现

  1. #include "AsioIOServicePool.h"
  2. #include <iostream>
  3. using namespace std;
  4. AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
  5. _works(size), _nextIOService(0){
  6. for (std::size_t i = 0; i < size; ++i) {
  7. _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
  8. }
  9. //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
  10. for (std::size_t i = 0; i < _ioServices.size(); ++i) {
  11. _threads.emplace_back([this, i]() {
  12. _ioServices[i].run();
  13. });
  14. }
  15. }
  16. AsioIOServicePool::~AsioIOServicePool() {
  17. std::cout << "AsioIOServicePool destruct" << endl;
  18. }
  19. boost::asio::io_context& AsioIOServicePool::GetIOService() {
  20. auto& service = _ioServices[_nextIOService++];
  21. if (_nextIOService == _ioServices.size()) {
  22. _nextIOService = 0;
  23. }
  24. return service;
  25. }
  26. void AsioIOServicePool::Stop(){
  27. //因为仅仅执行work.reset并不能让iocontext从run的状态中退出
  28. //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。
  29. for (auto& work : _works) {
  30. //把服务先停止
  31. work->get_io_context().stop();
  32. work.reset();
  33. }
  34. for (auto& t : _threads) {
  35. t.join();
  36. }
  37. }

CServer的处理连接逻辑

  1. void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){
  2. if (!error) {
  3. new_session->Start();
  4. lock_guard<mutex> lock(_mutex);
  5. _sessions.insert(make_pair(new_session->GetUuid(), new_session));
  6. }
  7. else {
  8. cout << "session accept failed, error is " << error.what() << endl;
  9. }
  10. StartAccept();
  11. }

Session层

上面的逻辑接受新链接后执行Start函数,新链接接受数据,然后Server继续监听新的连接

  1. void CSession::Start(){
  2. AsyncReadHead(HEAD_TOTAL_LEN);
  3. }

先读取头部数据

  1. void CSession::AsyncReadHead(int total_len)
  2. {
  3. auto self = shared_from_this();
  4. asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) {
  5. try {
  6. if (ec) {
  7. std::cout << "handle read failed, error is " << ec.what() << endl;
  8. Close();
  9. _server->ClearSession(_uuid);
  10. return;
  11. }
  12. if (bytes_transfered < HEAD_TOTAL_LEN) {
  13. std::cout << "read length not match, read [" << bytes_transfered << "] , total ["
  14. << HEAD_TOTAL_LEN << "]" << endl;
  15. Close();
  16. _server->ClearSession(_uuid);
  17. return;
  18. }
  19. _recv_head_node->Clear();
  20. memcpy(_recv_head_node->_data, _data, bytes_transfered);
  21. //获取头部MSGID数据
  22. short msg_id = 0;
  23. memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
  24. //网络字节序转化为本地字节序
  25. msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
  26. std::cout << "msg_id is " << msg_id << endl;
  27. //id非法
  28. if (msg_id > MAX_LENGTH) {
  29. std::cout << "invalid msg_id is " << msg_id << endl;
  30. _server->ClearSession(_uuid);
  31. return;
  32. }
  33. short msg_len = 0;
  34. memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
  35. //网络字节序转化为本地字节序
  36. msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
  37. std::cout << "msg_len is " << msg_len << endl;
  38. //id非法
  39. if (msg_len > MAX_LENGTH) {
  40. std::cout << "invalid data length is " << msg_len << endl;
  41. _server->ClearSession(_uuid);
  42. return;
  43. }
  44. _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
  45. AsyncReadBody(msg_len);
  46. }
  47. catch (std::exception& e) {
  48. std::cout << "Exception code is " << e.what() << endl;
  49. }
  50. });
  51. }

上面的逻辑里调用asyncReadFull读取整个长度,然后解析收到的数据,前两个字节为id,之后两个字节为长度,最后n个长度字节为消息内容。

  1. //读取完整长度
  2. void CSession::asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code&, std::size_t)> handler )
  3. {
  4. ::memset(_data, 0, MAX_LENGTH);
  5. asyncReadLen(0, maxLength, handler);
  6. }

读取指定长度

  1. //读取指定字节数
  2. void CSession::asyncReadLen(std::size_t read_len, std::size_t total_len,
  3. std::function<void(const boost::system::error_code&, std::size_t)> handler)
  4. {
  5. auto self = shared_from_this();
  6. _socket.async_read_some(boost::asio::buffer(_data + read_len, total_len-read_len),
  7. [read_len, total_len, handler, self](const boost::system::error_code& ec, std::size_t bytesTransfered) {
  8. if (ec) {
  9. // 出现错误,调用回调函数
  10. handler(ec, read_len + bytesTransfered);
  11. return;
  12. }
  13. if (read_len + bytesTransfered >= total_len) {
  14. //长度够了就调用回调函数
  15. handler(ec, read_len + bytesTransfered);
  16. return;
  17. }
  18. // 没有错误,且长度不足则继续读取
  19. self->asyncReadLen(read_len + bytesTransfered, total_len, handler);
  20. });
  21. }

读取头部成功后,其回调函数内部调用了读包体的逻辑

  1. void CSession::AsyncReadBody(int total_len)
  2. {
  3. auto self = shared_from_this();
  4. asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {
  5. try {
  6. if (ec) {
  7. std::cout << "handle read failed, error is " << ec.what() << endl;
  8. Close();
  9. _server->ClearSession(_uuid);
  10. return;
  11. }
  12. if (bytes_transfered < total_len) {
  13. std::cout << "read length not match, read [" << bytes_transfered << "] , total ["
  14. << total_len<<"]" << endl;
  15. Close();
  16. _server->ClearSession(_uuid);
  17. return;
  18. }
  19. memcpy(_recv_msg_node->_data , _data , bytes_transfered);
  20. _recv_msg_node->_cur_len += bytes_transfered;
  21. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  22. cout << "receive data is " << _recv_msg_node->_data << endl;
  23. //此处将消息投递到逻辑队列中
  24. LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
  25. //继续监听头部接受事件
  26. AsyncReadHead(HEAD_TOTAL_LEN);
  27. }
  28. catch (std::exception& e) {
  29. std::cout << "Exception code is " << e.what() << endl;
  30. }
  31. });
  32. }

读取包体完成后,在回调中继续读包头。以此循环往复直到读完所有数据。如果对方不发送数据,则回调函数就不会触发。不影响程序执行其他工作,因为我们采用的是asio异步的读写操作。

当然我们解析完包体后会调用LogicSystem单例将解析好的消息封装为逻辑节点传递给逻辑层进行处理。

LogicSystem

我们在逻辑层处理

  1. void LogicSystem::RegisterCallBacks() {
  2. _fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this,
  3. placeholders::_1, placeholders::_2, placeholders::_3);
  4. }
  5. void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
  6. Json::Reader reader;
  7. Json::Value root;
  8. reader.parse(msg_data, root);
  9. std::cout << "user login uid is " << root["uid"].asInt() << " user token is "
  10. << root["token"].asString() << endl;
  11. std::string return_str = root.toStyledString();
  12. session->Send(return_str, msg_id);
  13. }

并在构造函数中注册这些处理流程

  1. LogicSystem::LogicSystem():_b_stop(false){
  2. RegisterCallBacks();
  3. _worker_thread = std::thread (&LogicSystem::DealMsg, this);
  4. }

总结

到此,完成了ChatServer收到QT客户端发送过来的长链接请求,并解析读取的数据,将收到的数据通过tcp发送给对端。接下来还要做ChatServer到GateServer的token验证,判断是否合理,这个教给之后的文章处理。

热门评论
  • Tsutamachi
    2025-03-18 09:51:50

    这个服务器程序好像就是 之前BoostAsio网络编程课程的服务器?
    区别好像只有  之前是用的boost::asio::async_read,这次用的是socket.async_read_some?

热门文章

  1. Qt环境搭建

    喜欢(517) 浏览(23610)
  2. vscode搭建windows C++开发环境

    喜欢(596) 浏览(79654)
  3. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(5769)
  4. 使用hexo搭建个人博客

    喜欢(533) 浏览(11361)
  5. Linux环境搭建和编码

    喜欢(594) 浏览(13008)

最新评论

  1. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  2. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作
  3. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  4. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  5. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  6. 创建项目和编译 secondtonone1:谢谢支持
  7. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  8. 类和对象 陈宇航:支持!!!!
  9. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  10. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  11. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  12. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  13. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  14. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  15. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  16. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  17. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  18. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW
  19. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上
  20. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  21. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  22. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  23. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。

个人公众号

个人微信