单服务器踢人逻辑实现

1. 为什么要有踢人逻辑

在服务器中经常会设计的同账户异地登陆时,将旧有账号的连接断开,必要时先发送下线消息通知旧账号的客户端,然后关闭这个连接。

服务器设计中尽量不要采用服务器主动关闭连接,那样会造成大量TIME_WAIT问题,这个之后再说。

先用一个图示说明踢人逻辑

旧客户端登录

image-20250412120427206

当有新客户端连接时

image-20250412121139480

上述图形演示的是单服务器踢人逻辑,多服务器踢人逻辑应配合分布式锁,锁住分布式的操作,保证在一个时刻只能一个客户端登录,客户端登陆完再解锁。

分布式登录我们放在下一节,这一节我们模拟两个客户端同账号登录同一个服务器,实现踢人逻辑。

2. 分布式锁和redis封装

为了更方便操作,我们将分布式锁加锁和解锁的操作封装到redis接口中

因为分布式锁也会占用连接,为了防止连接被占用耗尽连接池,所以我们提前扩大连接池的数量为10

  1. RedisMgr::RedisMgr() {
  2. auto& gCfgMgr = ConfigMgr::Inst();
  3. auto host = gCfgMgr["Redis"]["Host"];
  4. auto port = gCfgMgr["Redis"]["Port"];
  5. auto pwd = gCfgMgr["Redis"]["Passwd"];
  6. _con_pool.reset(new RedisConPool(10, host.c_str(), atoi(port.c_str()), pwd.c_str()));
  7. }

封装加锁操作, 内部调用了之前封装的分布式锁DistLock

  1. std::string RedisMgr::acquireLock(const std::string& lockName,
  2. int lockTimeout, int acquireTimeout) {
  3. auto connect = _con_pool->getConnection();
  4. if (connect == nullptr) {
  5. return "";
  6. }
  7. Defer defer([&connect, this]() {
  8. _con_pool->returnConnection(connect);
  9. });
  10. return DistLock::Inst().acquireLock(connect, lockName, lockTimeout, acquireTimeout);
  11. }

解锁操作

  1. bool RedisMgr::releaseLock(const std::string& lockName,
  2. const std::string& identifier) {
  3. if (identifier.empty()) {
  4. return true;
  5. }
  6. auto connect = _con_pool->getConnection();
  7. if (connect == nullptr) {
  8. return false;
  9. }
  10. Defer defer([&connect, this]() {
  11. _con_pool->returnConnection(connect);
  12. });
  13. return DistLock::Inst().releaseLock(connect, lockName, identifier);
  14. }

3. 加锁和解锁调用

对于踢人逻辑,最难的就是思考如何加锁和解锁,进行踢人,以保证将来分布式登录也会安全。

这里我们先考虑几个情形

  1. B新登录,此时A已登录,这种最简单,根据uid找到A的session发送踢人通知。
  2. B新登录,此时A将下线,这种要保证B和A互斥,要么B先登陆完,A再下线,要么A先下线,B再登录。

​ 这么做的好处就是保证互斥

​ 如果B先登录,会将uid对应的session更新为最新的。A下线时会优先查找uid对应的session,发现不是自己,则直接退出即可,同时不需要修改uid对应的session为空。

​ 如果A先退出,A下线时会优先查找uid对应的session, 发现uid对应的session和自己的连接吻合,则会将uid对应的session设置为空,然后B登录,将uid对应的session设置为新连接,这样是安全的。

  1. B登录,A退出,此时C查找uid发送消息,三个操作都会添加分布式锁。谁先竞争到锁谁操作,能保证操作的互斥。

基本就是这三种情况。接下来我们回顾下uid和Session的对应关系

4. 用户和会话关系

添加用户和会话关联

  1. class UserMgr: public Singleton<UserMgr>
  2. {
  3. friend class Singleton<UserMgr>;
  4. public:
  5. ~UserMgr();
  6. std::shared_ptr<CSession> GetSession(int uid);
  7. void SetUserSession(int uid, std::shared_ptr<CSession> session);
  8. void RmvUserSession(int uid, std::string session_id);
  9. private:
  10. UserMgr();
  11. std::mutex _session_mtx;
  12. std::unordered_map<int, std::shared_ptr<CSession>> _uid_to_session;
  13. };

UserMgr中可以根据uid查找到对应的CSession。具体实现

  1. #include "UserMgr.h"
  2. #include "CSession.h"
  3. #include "RedisMgr.h"
  4. UserMgr:: ~ UserMgr(){
  5. _uid_to_session.clear();
  6. }
  7. std::shared_ptr<CSession> UserMgr::GetSession(int uid)
  8. {
  9. std::lock_guard<std::mutex> lock(_session_mtx);
  10. auto iter = _uid_to_session.find(uid);
  11. if (iter == _uid_to_session.end()) {
  12. return nullptr;
  13. }
  14. return iter->second;
  15. }
  16. void UserMgr::SetUserSession(int uid, std::shared_ptr<CSession> session)
  17. {
  18. std::lock_guard<std::mutex> lock(_session_mtx);
  19. _uid_to_session[uid] = session;
  20. }
  21. void UserMgr::RmvUserSession(int uid, std::string session_id)
  22. {
  23. {
  24. std::lock_guard<std::mutex> lock(_session_mtx);
  25. auto iter = _uid_to_session.find(uid);
  26. if (iter != _uid_to_session.end()) {
  27. return;
  28. }
  29. auto session_id_ = iter->second->GetSessionId();
  30. //不相等说明是其他地方登录了
  31. if (session_id_ != session_id) {
  32. return;
  33. }
  34. _uid_to_session.erase(uid);
  35. }
  36. }
  37. UserMgr::UserMgr()
  38. {
  39. }

大家有没有注意到,对Session的操作没有加分布式锁,只加了线程锁,因为我的思路是在最外层加分布式锁,而接口内部只加线程锁,保证同一个服务器操作的原子性。

CSession类和之前一样, 里面有user_idsession_id

  1. class CSession: public std::enable_shared_from_this<CSession>
  2. {
  3. public:
  4. CSession(boost::asio::io_context& io_context, CServer* server);
  5. ~CSession();
  6. tcp::socket& GetSocket();
  7. std::string& GetSessionId();
  8. void SetUserId(int uid);
  9. int GetUserId();
  10. void Start();
  11. void Send(char* msg, short max_length, short msgid);
  12. void Send(std::string msg, short msgid);
  13. void Close();
  14. std::shared_ptr<CSession> SharedSelf();
  15. void AsyncReadBody(int length);
  16. void AsyncReadHead(int total_len);
  17. void NotifyOffline(int uid);
  18. private:
  19. void asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code& , std::size_t)> handler);
  20. void asyncReadLen(std::size_t read_len, std::size_t total_len,
  21. std::function<void(const boost::system::error_code&, std::size_t)> handler);
  22. void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
  23. tcp::socket _socket;
  24. std::string _session_id;
  25. char _data[MAX_LENGTH];
  26. CServer* _server;
  27. bool _b_close;
  28. std::queue<shared_ptr<SendNode> > _send_que;
  29. std::mutex _send_lock;
  30. //收到的消息结构
  31. std::shared_ptr<RecvNode> _recv_msg_node;
  32. bool _b_head_parse;
  33. //收到的头部结构
  34. std::shared_ptr<MsgNode> _recv_head_node;
  35. int _user_uid;
  36. };

通过上述结构,我们可以通过UserMgr查找到CSession, 也可以通过CSession查找到userid, 实现了双向关联

5.登录添加分布式锁

我们需要对登录流程添加分布式锁,收到登录请求会做如下事情

  1. 判断tokenuid是否合理
  2. 根据uid构造分布式锁key,然后实现分布式锁加锁操作。比如uid为1001,则分布式锁的key为”lock_1001”
  3. 加锁后通过defer自动析构解锁
  4. 通过uid获取用户之前登录的服务器,如果存在则说明uid对应的用户还在线,此时要做踢人,判断serverip和现在的服务器ip是否相等,如果相等则说明是

​ 本服务器踢人,只需要通过线程锁控制好并发逻辑即可,将uid对应的旧session发送信息通知客户端下线,并且将旧sessionserver中移除。

​ 如果不是本服务器,则要做跨服踢人,调用grpc踢人即可,留作之后做。

  1. 登录成功后,要将uid和对应的ip信息写入redis,方便以后跨服查找。另外uid对应的session信息也要写入redis, 同时将uidsession关联,这样可以通过uid快速找到session
  1. void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. auto uid = root["uid"].asInt();
  6. auto token = root["token"].asString();
  7. std::cout << "user login uid is " << uid << " user token is "
  8. << token << endl;
  9. Json::Value rtvalue;
  10. Defer defer([this, &rtvalue, session]() {
  11. std::string return_str = rtvalue.toStyledString();
  12. session->Send(return_str, MSG_CHAT_LOGIN_RSP);
  13. });
  14. //从redis获取用户token是否正确
  15. std::string uid_str = std::to_string(uid);
  16. std::string token_key = USERTOKENPREFIX + uid_str;
  17. std::string token_value = "";
  18. bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
  19. if (!success) {
  20. rtvalue["error"] = ErrorCodes::UidInvalid;
  21. return ;
  22. }
  23. if (token_value != token) {
  24. rtvalue["error"] = ErrorCodes::TokenInvalid;
  25. return ;
  26. }
  27. rtvalue["error"] = ErrorCodes::Success;
  28. //此处添加分布式锁,让该线程独占登录
  29. //拼接用户ip对应的key
  30. auto lock_key = LOCK_PREFIX + uid_str;
  31. auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
  32. //利用defer解锁
  33. Defer defer2([this, identifier, lock_key]() {
  34. RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
  35. });
  36. //此处判断该用户是否在别处或者本服务器登录
  37. std::string uid_ip_value = "";
  38. auto uid_ip_key = USERIPPREFIX + uid_str;
  39. bool b_ip = RedisMgr::GetInstance()->Get(uid_ip_key, uid_ip_value);
  40. //说明用户已经登录了,此处应该踢掉之前的用户登录状态
  41. if (b_ip) {
  42. //获取当前服务器ip信息
  43. auto& cfg = ConfigMgr::Inst();
  44. auto self_name = cfg["SelfServer"]["Name"];
  45. //如果之前登录的服务器和当前相同,则直接在本服务器踢掉
  46. if (uid_ip_value == self_name) {
  47. //查找旧有的连接
  48. auto old_session = UserMgr::GetInstance()->GetSession(uid);
  49. //此处应该发送踢人消息
  50. if (old_session) {
  51. old_session->NotifyOffline(uid);
  52. //清除旧的连接
  53. _p_server->ClearSession(old_session->GetSessionId());
  54. }
  55. }
  56. else {
  57. //如果不是本服务器,则通知grpc通知其他服务器踢掉
  58. }
  59. }
  60. std::string base_key = USER_BASE_INFO + uid_str;
  61. auto user_info = std::make_shared<UserInfo>();
  62. bool b_base = GetBaseInfo(base_key, uid, user_info);
  63. if (!b_base) {
  64. rtvalue["error"] = ErrorCodes::UidInvalid;
  65. return;
  66. }
  67. rtvalue["uid"] = uid;
  68. rtvalue["pwd"] = user_info->pwd;
  69. rtvalue["name"] = user_info->name;
  70. rtvalue["email"] = user_info->email;
  71. rtvalue["nick"] = user_info->nick;
  72. rtvalue["desc"] = user_info->desc;
  73. rtvalue["sex"] = user_info->sex;
  74. rtvalue["icon"] = user_info->icon;
  75. //从数据库获取申请列表
  76. std::vector<std::shared_ptr<ApplyInfo>> apply_list;
  77. auto b_apply = GetFriendApplyInfo(uid,apply_list);
  78. if (b_apply) {
  79. for (auto & apply : apply_list) {
  80. Json::Value obj;
  81. obj["name"] = apply->_name;
  82. obj["uid"] = apply->_uid;
  83. obj["icon"] = apply->_icon;
  84. obj["nick"] = apply->_nick;
  85. obj["sex"] = apply->_sex;
  86. obj["desc"] = apply->_desc;
  87. obj["status"] = apply->_status;
  88. rtvalue["apply_list"].append(obj);
  89. }
  90. }
  91. //获取好友列表
  92. std::vector<std::shared_ptr<UserInfo>> friend_list;
  93. bool b_friend_list = GetFriendList(uid, friend_list);
  94. for (auto& friend_ele : friend_list) {
  95. Json::Value obj;
  96. obj["name"] = friend_ele->name;
  97. obj["uid"] = friend_ele->uid;
  98. obj["icon"] = friend_ele->icon;
  99. obj["nick"] = friend_ele->nick;
  100. obj["sex"] = friend_ele->sex;
  101. obj["desc"] = friend_ele->desc;
  102. obj["back"] = friend_ele->back;
  103. rtvalue["friend_list"].append(obj);
  104. }
  105. auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name");
  106. //将登录数量增加
  107. auto rd_res = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server_name);
  108. int count = 0;
  109. if (!rd_res.empty()) {
  110. count = std::stoi(rd_res);
  111. }
  112. count++;
  113. auto count_str = std::to_string(count);
  114. RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, count_str);
  115. //session绑定用户uid
  116. session->SetUserId(uid);
  117. //为用户设置登录ip server的名字
  118. std::string ipkey = USERIPPREFIX + uid_str;
  119. RedisMgr::GetInstance()->Set(ipkey, server_name);
  120. //uid和session绑定管理,方便以后踢人操作
  121. UserMgr::GetInstance()->SetUserSession(uid, session);
  122. std::string uid_session_key = USER_SESSION_PREFIX + uid_str;
  123. RedisMgr::GetInstance()->Set(uid_session_key, session->GetSessionId());
  124. return;
  125. }

6. 检测离线处理

服务器也会检测到离线也会清理连接,但是要注意,连接可以不按照分布式锁加锁清理,但是连接的信息要加分布式锁后再更新。

比如是否将uid对应的session更新到redis中,因为很可能用户在别的新服务器登录,新服务器给旧的客户端通知离线,旧的客户端不按理连接,导致旧的服务器检测连接断开,此时不能将uid对应的session清空,因为uid对应的session已经被新服务器更新了。

image-20250412144455898

在发送和接收的时候都可能检测到对方离线而报错,所以在AsyncReadBodyAsyncReadHead以及AsyncWrite等错误处理的时候记得加上连接清理操作

我们以读取body为例

  1. void CSession::AsyncReadBody(int total_len)
  2. {
  3. auto self = shared_from_this();
  4. asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) {
  5. try {
  6. if (ec) {
  7. std::cout << "handle read failed, error is " << ec.what() << endl;
  8. Close();
  9. //加锁清除session
  10. auto uid_str = std::to_string(_user_uid);
  11. auto lock_key = LOCK_PREFIX + uid_str;
  12. auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
  13. Defer defer([identifier, lock_key,self,this]() {
  14. _server->ClearSession(_session_id);
  15. RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
  16. });
  17. if (identifier.empty()) {
  18. return;
  19. }
  20. std::string redis_session_id = "";
  21. auto bsuccess = RedisMgr::GetInstance()->Get(USER_SESSION_PREFIX + uid_str, redis_session_id);
  22. if (!bsuccess) {
  23. return;
  24. }
  25. if (redis_session_id != _session_id) {
  26. //说明有客户在其他服务器异地登录了
  27. return;
  28. }
  29. RedisMgr::GetInstance()->Del(USER_SESSION_PREFIX + uid_str);
  30. //清除用户登录信息
  31. RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str);
  32. return;
  33. }
  34. if (bytes_transfered < total_len) {
  35. std::cout << "read length not match, read [" << bytes_transfered << "] , total ["
  36. << total_len<<"]" << endl;
  37. Close();
  38. _server->ClearSession(_session_id);
  39. return;
  40. }
  41. memcpy(_recv_msg_node->_data , _data , bytes_transfered);
  42. _recv_msg_node->_cur_len += bytes_transfered;
  43. _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
  44. cout << "receive data is " << _recv_msg_node->_data << endl;
  45. //此处将消息投递到逻辑队列中
  46. LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
  47. //继续监听头部接受事件
  48. AsyncReadHead(HEAD_TOTAL_LEN);
  49. }
  50. catch (std::exception& e) {
  51. std::cout << "Exception code is " << e.what() << endl;
  52. }
  53. });
  54. }

7. 测试效果

本节先测试单服务器同账号不同客户端登录情况,为了将同账号客户端派发到同一个服务器,暂时修改StatusServer的派发逻辑为同一个服务器

  1. ChatServer StatusServiceImpl::getChatServer() {
  2. std::lock_guard<std::mutex> guard(_server_mtx);
  3. auto minServer = _servers.begin()->second;
  4. //暂时注释,测试单服务器模式
  5. //auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);
  6. //if (count_str.empty()) {
  7. // //不存在则默认设置为最大
  8. // minServer.con_count = INT_MAX;
  9. //}
  10. //else {
  11. // minServer.con_count = std::stoi(count_str);
  12. //}
  13. //// 使用范围基于for循环
  14. //for ( auto& server : _servers) {
  15. //
  16. // if (server.second.name == minServer.name) {
  17. // continue;
  18. // }
  19. // auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
  20. // if (count_str.empty()) {
  21. // server.second.con_count = INT_MAX;
  22. // }
  23. // else {
  24. // server.second.con_count = std::stoi(count_str);
  25. // }
  26. // if (server.second.con_count < minServer.con_count) {
  27. // minServer = server.second;
  28. // }
  29. //}
  30. return minServer;
  31. }

image-20250412150004397

image-20250412150038381

8. 待做事项

  1. 跨服踢人留作下一节处理
  2. 心跳检测未作,留作以后处理
  3. 心跳检测发现僵尸连接,需要踢人,留作以后处理。

9. 源码

https://gitee.com/secondtonone1/llfcchat

热门评论

热门文章

  1. C++ 类的继承封装和多态

    喜欢(588) 浏览(4934)
  2. windows环境搭建和vscode配置

    喜欢(587) 浏览(2791)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(12097)
  4. 解密定时器的实现细节

    喜欢(566) 浏览(3457)
  5. slice介绍和使用

    喜欢(521) 浏览(2478)

最新评论

  1. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  2. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  3. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  4. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊

个人公众号

个人微信