跨服踢人逻辑实现

前情回顾

前文我们实现了单服务器踢人的逻辑,通过分布式锁锁住登录过程,在这个期间对用户相关的信息进行更改,主要包括用户id对应的serverip, sessionid等。

同时对用户离线消息进行了处理,也是通过分布式锁锁住退出过程,判断此时用户id对应的sessionid是否和本服记录相等,如果不相等则说明有用户异地登录,此时只要退出即可,否则要清理id对应的sessionid以及serverip等信息。

接下来我们实现跨服踢人逻辑

RPC封装

因为跨服踢人,所以要调用Grpc踢人,我们在message.proto中添加踢人消息

  1. message KickUserReq{
  2. int32 uid = 1;
  3. }
  4. message KickUserRsp{
  5. int32 error = 1;
  6. int32 uid = 2;
  7. }

同时添加服务调用

  1. service ChatService {
  2. //...其他服务略去
  3. rpc NotifyKickUser(KickUserReq) returns (KickUserRsp){}
  4. }

编写bat脚本自动生成, start.bat内容如下

  1. @echo off
  2. set PROTOC_PATH=D:\cppsoft\grpc\visualpro\third_party\protobuf\Debug\protoc.exe
  3. set GRPC_PLUGIN_PATH=D:\cppsoft\grpc\visualpro\Debug\grpc_cpp_plugin.exe
  4. set PROTO_FILE=message.proto
  5. echo Generating gRPC code...
  6. %PROTOC_PATH% -I="." --grpc_out="." --plugin=protoc-gen-grpc="%GRPC_PLUGIN_PATH%" "%PROTO_FILE%"
  7. echo Generating C++ code...
  8. %PROTOC_PATH% --cpp_out=. "%PROTO_FILE%"
  9. echo Done.

双击start.bat或者在cmd中执行start.bat也可以

执行后可以发现产生了四个文件

image-20250419114210735

跨服踢人示意图

image-20250419114210735

逻辑编写

StatusServer动态分配

StatusServer中修改动态分配server逻辑

  1. ChatServer StatusServiceImpl::getChatServer() {
  2. std::lock_guard<std::mutex> guard(_server_mtx);
  3. auto minServer = _servers.begin()->second;
  4. auto lock_key = LOCK_COUNT;
  5. auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
  6. //利用defer解锁
  7. Defer defer2([this, identifier, lock_key]() {
  8. RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
  9. });
  10. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);
  11. if (count_str.empty()) {
  12. //不存在则默认设置为最大
  13. minServer.con_count = INT_MAX;
  14. }
  15. else {
  16. minServer.con_count = std::stoi(count_str);
  17. }
  18. // 使用范围基于for循环
  19. for ( auto& server : _servers) {
  20. if (server.second.name == minServer.name) {
  21. continue;
  22. }
  23. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
  24. if (count_str.empty()) {
  25. server.second.con_count = INT_MAX;
  26. }
  27. else {
  28. server.second.con_count = std::stoi(count_str);
  29. }
  30. if (server.second.con_count < minServer.con_count) {
  31. minServer = server.second;
  32. }
  33. }
  34. return minServer;
  35. }

注意这里用到了另一个分布式锁,用来控制服务器人数记录

  1. auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);

ChatServer踢人逻辑

ChatSever中登录逻辑里添加跨服踢人调用

  1. void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. auto uid = root["uid"].asInt();
  6. auto token = root["token"].asString();
  7. std::cout << "user login uid is " << uid << " user token is "
  8. << token << endl;
  9. Json::Value rtvalue;
  10. Defer defer([this, &rtvalue, session]() {
  11. std::string return_str = rtvalue.toStyledString();
  12. session->Send(return_str, MSG_CHAT_LOGIN_RSP);
  13. });
  14. //从redis获取用户token是否正确
  15. std::string uid_str = std::to_string(uid);
  16. std::string token_key = USERTOKENPREFIX + uid_str;
  17. std::string token_value = "";
  18. bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
  19. if (!success) {
  20. rtvalue["error"] = ErrorCodes::UidInvalid;
  21. return ;
  22. }
  23. if (token_value != token) {
  24. rtvalue["error"] = ErrorCodes::TokenInvalid;
  25. return ;
  26. }
  27. rtvalue["error"] = ErrorCodes::Success;
  28. std::string base_key = USER_BASE_INFO + uid_str;
  29. auto user_info = std::make_shared<UserInfo>();
  30. bool b_base = GetBaseInfo(base_key, uid, user_info);
  31. if (!b_base) {
  32. rtvalue["error"] = ErrorCodes::UidInvalid;
  33. return;
  34. }
  35. rtvalue["uid"] = uid;
  36. rtvalue["pwd"] = user_info->pwd;
  37. rtvalue["name"] = user_info->name;
  38. rtvalue["email"] = user_info->email;
  39. rtvalue["nick"] = user_info->nick;
  40. rtvalue["desc"] = user_info->desc;
  41. rtvalue["sex"] = user_info->sex;
  42. rtvalue["icon"] = user_info->icon;
  43. //从数据库获取申请列表
  44. std::vector<std::shared_ptr<ApplyInfo>> apply_list;
  45. auto b_apply = GetFriendApplyInfo(uid, apply_list);
  46. if (b_apply) {
  47. for (auto& apply : apply_list) {
  48. Json::Value obj;
  49. obj["name"] = apply->_name;
  50. obj["uid"] = apply->_uid;
  51. obj["icon"] = apply->_icon;
  52. obj["nick"] = apply->_nick;
  53. obj["sex"] = apply->_sex;
  54. obj["desc"] = apply->_desc;
  55. obj["status"] = apply->_status;
  56. rtvalue["apply_list"].append(obj);
  57. }
  58. }
  59. //获取好友列表
  60. std::vector<std::shared_ptr<UserInfo>> friend_list;
  61. bool b_friend_list = GetFriendList(uid, friend_list);
  62. for (auto& friend_ele : friend_list) {
  63. Json::Value obj;
  64. obj["name"] = friend_ele->name;
  65. obj["uid"] = friend_ele->uid;
  66. obj["icon"] = friend_ele->icon;
  67. obj["nick"] = friend_ele->nick;
  68. obj["sex"] = friend_ele->sex;
  69. obj["desc"] = friend_ele->desc;
  70. obj["back"] = friend_ele->back;
  71. rtvalue["friend_list"].append(obj);
  72. }
  73. auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name");
  74. {
  75. //此处添加分布式锁,让该线程独占登录
  76. //拼接用户ip对应的key
  77. auto lock_key = LOCK_PREFIX + uid_str;
  78. auto identifier = RedisMgr::GetInstance()->acquireLock(lock_key, LOCK_TIME_OUT, ACQUIRE_TIME_OUT);
  79. //利用defer解锁
  80. Defer defer2([this, identifier, lock_key]() {
  81. RedisMgr::GetInstance()->releaseLock(lock_key, identifier);
  82. });
  83. //此处判断该用户是否在别处或者本服务器登录
  84. std::string uid_ip_value = "";
  85. auto uid_ip_key = USERIPPREFIX + uid_str;
  86. bool b_ip = RedisMgr::GetInstance()->Get(uid_ip_key, uid_ip_value);
  87. //说明用户已经登录了,此处应该踢掉之前的用户登录状态
  88. if (b_ip) {
  89. //获取当前服务器ip信息
  90. auto& cfg = ConfigMgr::Inst();
  91. auto self_name = cfg["SelfServer"]["Name"];
  92. //如果之前登录的服务器和当前相同,则直接在本服务器踢掉
  93. if (uid_ip_value == self_name) {
  94. //查找旧有的连接
  95. auto old_session = UserMgr::GetInstance()->GetSession(uid);
  96. //此处应该发送踢人消息
  97. if (old_session) {
  98. old_session->NotifyOffline(uid);
  99. //清除旧的连接
  100. _p_server->ClearSession(old_session->GetSessionId());
  101. }
  102. }
  103. else {
  104. //如果不是本服务器,则通知grpc通知其他服务器踢掉
  105. //发送通知
  106. KickUserReq kick_req;
  107. kick_req.set_uid(uid);
  108. ChatGrpcClient::GetInstance()->NotifyKickUser(uid_ip_value, kick_req);
  109. }
  110. }
  111. //session绑定用户uid
  112. session->SetUserId(uid);
  113. //为用户设置登录ip server的名字
  114. std::string ipkey = USERIPPREFIX + uid_str;
  115. RedisMgr::GetInstance()->Set(ipkey, server_name);
  116. //uid和session绑定管理,方便以后踢人操作
  117. UserMgr::GetInstance()->SetUserSession(uid, session);
  118. std::string uid_session_key = USER_SESSION_PREFIX + uid_str;
  119. RedisMgr::GetInstance()->Set(uid_session_key, session->GetSessionId());
  120. }
  121. RedisMgr::GetInstance()->IncreaseCount(server_name);
  122. return;
  123. }

注意上面代码,这段代码就是跨服踢人逻辑。

  1. else {
  2. //如果不是本服务器,则通知grpc通知其他服务器踢掉
  3. //发送通知
  4. KickUserReq kick_req;
  5. kick_req.set_uid(uid);
  6. ChatGrpcClient::GetInstance()->NotifyKickUser(uid_ip_value, kick_req);
  7. }

关于KickUserReq其实是我们在message.pb.h中生成的。但是我们在自己的文件中使用要用作用域messag::, 所以我们在GrpcClient.h中添加声明

  1. using message::KickUserReq;
  2. using message::KickUserRsp;

以后我们包含GrpcClient.h就可以使用这些类了。

封装rpc踢人

接下来我们封装rpc接口实现踢人逻辑

rpc客户端接口

  1. KickUserRsp ChatGrpcClient::NotifyKickUser(std::string server_ip, const KickUserReq& req)
  2. {
  3. KickUserRsp rsp;
  4. Defer defer([&rsp, &req]() {
  5. rsp.set_error(ErrorCodes::Success);
  6. rsp.set_uid(req.uid());
  7. });
  8. auto find_iter = _pools.find(server_ip);
  9. if (find_iter == _pools.end()) {
  10. return rsp;
  11. }
  12. auto& pool = find_iter->second;
  13. ClientContext context;
  14. auto stub = pool->getConnection();
  15. Defer defercon([&stub, this, &pool]() {
  16. pool->returnConnection(std::move(stub));
  17. });
  18. Status status = stub->NotifyKickUser(&context, req, &rsp);
  19. if (!status.ok()) {
  20. rsp.set_error(ErrorCodes::RPCFailed);
  21. return rsp;
  22. }
  23. return rsp;
  24. }

rpc服务端接口实现

  1. Status ChatServiceImpl::NotifyKickUser(::grpc::ServerContext* context,
  2. const KickUserReq* request, KickUserRsp* reply)
  3. {
  4. //查找用户是否在本服务器
  5. auto uid = request->uid();
  6. auto session = UserMgr::GetInstance()->GetSession(uid);
  7. Defer defer([request, reply]() {
  8. reply->set_error(ErrorCodes::Success);
  9. reply->set_uid(request->uid());
  10. });
  11. //用户不在内存中则直接返回
  12. if (session == nullptr) {
  13. return Status::OK;
  14. }
  15. //在内存中则直接发送通知对方
  16. session->NotifyOffline(uid);
  17. //清除旧的连接
  18. _p_server->ClearSession(session->GetSessionId());
  19. return Status::OK;
  20. }

为了让ChatServiceImpl 获取CServer, 所以我们提供了注册函数

  1. void ChatServiceImpl::RegisterServer(std::shared_ptr<CServer> pServer)
  2. {
  3. _p_server = pServer;
  4. }

这个函数在main函数中启动grpc服务前注册即可。

登录数量统计

StatusServer中利用分布式锁获取登录数量,动态分配Server给客户端,这里我们也要用ChatServer启动和退出时清空登录数量

重新调整ChatServer启动逻辑

  1. using namespace std;
  2. bool bstop = false;
  3. std::condition_variable cond_quit;
  4. std::mutex mutex_quit;
  5. int main()
  6. {
  7. auto& cfg = ConfigMgr::Inst();
  8. auto server_name = cfg["SelfServer"]["Name"];
  9. try {
  10. auto pool = AsioIOServicePool::GetInstance();
  11. //将登录数设置为0
  12. RedisMgr::GetInstance()->InitCount(server_name);
  13. Defer derfer ([server_name]() {
  14. RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
  15. RedisMgr::GetInstance()->Close();
  16. });
  17. boost::asio::io_context io_context;
  18. auto port_str = cfg["SelfServer"]["Port"];
  19. //创建Cserver智能指针
  20. auto pointer_server = std::make_shared<CServer>(io_context, atoi(port_str.c_str()));
  21. //定义一个GrpcServer
  22. std::string server_address(cfg["SelfServer"]["Host"] + ":" + cfg["SelfServer"]["RPCPort"]);
  23. ChatServiceImpl service;
  24. grpc::ServerBuilder builder;
  25. // 监听端口和添加服务
  26. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  27. builder.RegisterService(&service);
  28. service.RegisterServer(pointer_server);
  29. // 构建并启动gRPC服务器
  30. std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  31. std::cout << "RPC Server listening on " << server_address << std::endl;
  32. //单独启动一个线程处理grpc服务
  33. std::thread grpc_server_thread([&server]() {
  34. server->Wait();
  35. });
  36. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  37. signals.async_wait([&io_context, pool, &server](auto, auto) {
  38. io_context.stop();
  39. pool->Stop();
  40. server->Shutdown();
  41. });
  42. //将Cserver注册给逻辑类方便以后清除连接
  43. LogicSystem::GetInstance()->SetServer(pointer_server);
  44. io_context.run();
  45. grpc_server_thread.join();
  46. }
  47. catch (std::exception& e) {
  48. std::cerr << "Exception: " << e.what() << endl;
  49. }
  50. }

上面的逻辑有这样一段,要格外注意

  1. //将登录数设置为0
  2. RedisMgr::GetInstance()->InitCount(server_name);
  3. Defer derfer ([server_name]() {
  4. RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
  5. RedisMgr::GetInstance()->Close();
  6. });

这段逻辑是在服务器启动后将对应服务器中连接数清零写入redis,在服务器结束后从redis中删除数量信息,最后关闭Redis连接池

源码

源码地址

https://gitee.com/secondtonone1/llfcchat

热门评论

热门文章

  1. C++ 类的继承封装和多态

    喜欢(588) 浏览(4934)
  2. windows环境搭建和vscode配置

    喜欢(587) 浏览(2791)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(12097)
  4. 解密定时器的实现细节

    喜欢(566) 浏览(3457)
  5. slice介绍和使用

    喜欢(521) 浏览(2478)

最新评论

  1. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  2. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  3. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  4. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?

个人公众号

个人微信