单例模式实现逻辑层设计

优雅退出

服务器优雅退出一直是服务器设计必须考虑的一个方向,意在能通过捕获信号使服务器安全退出。我们可以通过asio提供的信号机制绑定回调函数即可实现优雅退出。在主函数中我们添加

  1. int main()
  2. {
  3. try {
  4. boost::asio::io_context io_context;
  5. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  6. signals.async_wait([&io_context](auto, auto) {
  7. io_context.stop();
  8. });
  9. CServer s(io_context, 10086);
  10. io_context.run();
  11. }
  12. catch (std::exception& e) {
  13. std::cerr << "Exception: " << e.what() << endl;
  14. }
  15. }

利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop

单例模板类

接下来我们实现一个单例模板类,因为服务器的逻辑处理需要单例模式,后期可能还会有一些模块的设计也需要单例模式,所以先实现一个单例模板类,然后其他想实现单例类只需要继承这个模板类即可。

  1. #include <memory>
  2. #include <mutex>
  3. #include <iostream>
  4. using namespace std;
  5. template <typename T>
  6. class Singleton {
  7. protected:
  8. Singleton() = default;
  9. Singleton(const Singleton<T>&) = delete;
  10. Singleton& operator=(const Singleton<T>& st) = delete;
  11. static std::shared_ptr<T> _instance;
  12. public:
  13. static std::shared_ptr<T> GetInstance() {
  14. static std::once_flag s_flag;
  15. std::call_once(s_flag, [&]() {
  16. _instance = shared_ptr<T>(new T);
  17. });
  18. return _instance;
  19. }
  20. void PrintAddress() {
  21. std::cout << _instance.get() << endl;
  22. }
  23. ~Singleton() {
  24. std::cout << "this is singleton destruct" << std::endl;
  25. }
  26. };
  27. template <typename T>
  28. std::shared_ptr<T> Singleton<T>::_instance = nullptr;

单例模式模板类将无参构造,拷贝构造,拷贝赋值都设定为protected属性,其他的类无法访问,其实也可以设置为私有属性。析构函数设置为公有的,其实设置为私有的更合理一点。
Singleton有一个static类型的属性_instance, 它是我们实际要开辟类型的智能指针类型。
s_flag是函数GetInstance内的局部静态变量,该变量在函数GetInstance第一次调用时被初始化。以后无论调用多少次GetInstance s_flag都不会被重复初始化,而且s_flag存在静态区,会随着进程结束而自动释放。
call_once只会调用一次,而且是线程安全的, 其内部的原理就是调用该函数时加锁,然后设置s_flag内部的标记,设置为已经初始化,执行lambda表达式逻辑初始化智能指针,然后解锁。第二次调用GetInstance 内部还会调用call_once, 只是call_once判断s_flag已经被初始化了就不执行初始化智能指针的操作了。

LogicSystem单例类

我们实现逻辑系统的单例类,继承自Singleton<LogicSystem>,这样LogicSystem的构造函数和拷贝构造函数就都变为私有的了,因为基类的构造函数和拷贝构造函数都是私有的。另外LogicSystem也用了基类的成员_instanceGetInstance函数。从而达到单例效果。

  1. typedef function<void(shared_ptr<CSession>, short msg_id, string msg_data)> FunCallBack;
  2. class LogicSystem:public Singleton<LogicSystem>
  3. {
  4. friend class Singleton<LogicSystem>;
  5. public:
  6. ~LogicSystem();
  7. void PostMsgToQue(shared_ptr < LogicNode> msg);
  8. private:
  9. LogicSystem();
  10. void DealMsg();
  11. void RegisterCallBacks();
  12. void HelloWordCallBack(shared_ptr<CSession>, short msg_id, string msg_data);
  13. std::thread _worker_thread;
  14. std::queue<shared_ptr<LogicNode>> _msg_que;
  15. std::mutex _mutex;
  16. std::condition_variable _consume;
  17. bool _b_stop;
  18. std::map<short, FunCallBack> _fun_callbacks;
  19. };

1   FunCallBack为要注册的回调函数类型,其参数为绘画类智能指针,消息id,以及消息内容。

2   _msg_que为逻辑队列

3   _mutex 为保证逻辑队列安全的互斥量

4   _consume表示消费者条件变量,用来控制当逻辑队列为空时保证线程暂时挂起等待,不要干扰其他线程。

5   _fun_callbacks表示回调函数的map,根据id查找对应的逻辑处理函数。

6   _worker_thread表示工作线程,用来从逻辑队列中取数据并执行回调函数。

7   _b_stop表示收到外部的停止信号,逻辑类要中止工作线程并优雅退出。

LogicNode定义在CSession.h中

  1. class LogicNode {
  2. friend class LogicSystem;
  3. public:
  4. LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
  5. private:
  6. shared_ptr<CSession> _session;
  7. shared_ptr<RecvNode> _recvnode;
  8. };

其包含算了会话类的智能指针,主要是为了实现伪闭包,防止session被释放。
其次包含了接收消息的节点类的智能指针。
实现如下

  1. LogicNode::LogicNode(shared_ptr<CSession> session,
  2. shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) {
  3. }

LogicSystem的构造函数如下

  1. LogicSystem::LogicSystem():_b_stop(false){
  2. RegisterCallBacks();
  3. _worker_thread = std::thread (&LogicSystem::DealMsg, this);
  4. }

构造函数中将停止信息初始化为false,注册消息处理函数并且启动了一个工作线程,工作线程执行DealMsg逻辑。
注册消息处理函数的逻辑如下

  1. void LogicSystem::RegisterCallBacks() {
  2. _fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this,
  3. placeholders::_1, placeholders::_2, placeholders::_3);
  4. }

MSG_HELLO_WORD定义在const.h中

  1. enum MSG_IDS {
  2. MSG_HELLO_WORD = 1001
  3. };

MSG_HELLO_WORD表示消息id,HelloWordCallBack为对应的回调处理函数

  1. void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, short msg_id, string msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
  6. << root["data"].asString() << endl;
  7. root["data"] = "server has received msg, msg data is " + root["data"].asString();
  8. std::string return_str = root.toStyledString();
  9. session->Send(return_str, root["id"].asInt());
  10. }

HelloWordCallBack里我们根据消息id和收到的消息,做了相应的处理并且回应给客户端。

工作线程的处理函数DealMsg逻辑

  1. void LogicSystem::DealMsg() {
  2. for (;;) {
  3. std::unique_lock<std::mutex> unique_lk(_mutex);
  4. //判断队列为空则用条件变量阻塞等待,并释放锁
  5. while (_msg_que.empty() && !_b_stop) {
  6. _consume.wait(unique_lk);
  7. }
  8. //判断是否为关闭状态,把所有逻辑执行完后则退出循环
  9. if (_b_stop ) {
  10. while (!_msg_que.empty()) {
  11. auto msg_node = _msg_que.front();
  12. cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;
  13. auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
  14. if (call_back_iter == _fun_callbacks.end()) {
  15. _msg_que.pop();
  16. continue;
  17. }
  18. call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
  19. std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
  20. _msg_que.pop();
  21. }
  22. break;
  23. }
  24. //如果没有停服,且说明队列中有数据
  25. auto msg_node = _msg_que.front();
  26. cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;
  27. auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
  28. if (call_back_iter == _fun_callbacks.end()) {
  29. _msg_que.pop();
  30. continue;
  31. }
  32. call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
  33. std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
  34. _msg_que.pop();
  35. }
  36. }

1   DealMsg逻辑中初始化了一个unique_lock,主要是用来控制队列安全,并且配合条件变量可以随时解锁。lock_guard不具备解锁功能,所以此处用unique_lock。
2   我们判断队列为空,并且不是停止状态,就挂起线程。否则继续执行之后的逻辑,如果_b_stop为true,说明处于停服状态,则将队列中未处理的消息全部处理完然后退出循环。如果_b_stop未false,则说明没有停服,是consumer发送的激活信号激活了线程,则继续取队列中的数据处理。

LogicSystem的析构函数需要等待工作线程处理完再退出,但是工作线程可能处于挂起状态,所以要发送一个激活信号唤醒工作线程。并且将_b_stop标记设置为true。

  1. LogicSystem::~LogicSystem(){
  2. _b_stop = true;
  3. _consume.notify_one();
  4. _worker_thread.join();
  5. }

因为网络层收到消息后我们需要将消息投递给逻辑队列进行处理,那么LogicSystem就要封装一个投递函数

  1. void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) {
  2. std::unique_lock<std::mutex> unique_lk(_mutex);
  3. _msg_que.push(msg);
  4. //由0变为1则发送通知信号
  5. if (_msg_que.size() == 1) {
  6. _consume.notify_one();
  7. }
  8. }

在Session收到数据时这样调用

  1. LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));

再次启动服务器,编译启动,和之前一样可以看到数据收发正常。如下图:

https://cdn.llfc.club/1685849713914.jpg

总结

本文实现了服务器的逻辑类,包括并发控制等手段。

视频连接https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101

源码链接https://gitee.com/secondtonone1/boostasio-learn

热门评论

热门文章

  1. 解密定时器的实现细节

    喜欢(566) 浏览(1841)
  2. C++ 类的继承封装和多态

    喜欢(588) 浏览(2562)
  3. slice介绍和使用

    喜欢(521) 浏览(1715)
  4. Linux环境搭建和编码

    喜欢(594) 浏览(5484)
  5. windows环境搭建和vscode配置

    喜欢(587) 浏览(1710)

最新评论

  1. 泛型算法的定制操作 secondtonone1:lambda和bind是C11新增的利器,善于利用这两个机制可以极大地提升编程安全性和效率。
  2. 基于锁实现线程安全队列和栈容器 secondtonone1:我是博主,你认真学习的样子的很可爱,哈哈,我画的是链表由空变成1个的情况。其余情况和你思考的类似,只不过我用了一个无效节点表示tail的指向,最初head和tail指向的都是这个节点。
  3. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  4. 利用内存模型优化无锁栈 卡西莫多的礼物:感谢博主指点,好人一生平安o(* ̄▽ ̄*)ブ
  5. 类和对象 陈宇航:支持!!!!

个人公众号

个人微信