聊天项目(27) 分布式聊天服务设计

简介

本文介绍如何将chatserver设置为分布式服务,并且实现statusserver的负载均衡处理,根据每个chatserver现有的连接数匹配最小的chatserver返回给GateServer并返回给客户端。

为了实现这一系列分布式设计,我们需要先完善chatserver,增加grpc客户端和服务端。这样能实现两个chatserver之间端对端的通信。

visual studio中右键chatserver项目选择添加新文件ChatGrpcClient, 会为我们生成ChatGrpcClient.h和ChatGrpcClient.cpp文件。

连接池客户端

先实现ChatConPool连接池

  1. class ChatConPool {
  2. public:
  3. ChatConPool(size_t poolSize, std::string host, std::string port):
  4. poolSize_(poolSize), host_(host),port_(port),b_stop_(false){
  5. for (size_t i = 0; i < poolSize_; ++i) {
  6. std::shared_ptr<Channel> channel = grpc::CreateChannel(host + ":" + port, grpc::InsecureChannelCredentials());
  7. connections_.push(ChatService::NewStub(channel));
  8. }
  9. }
  10. ~ChatConPool() {
  11. std::lock_guard<std::mutex> lock(mutex_);
  12. Close();
  13. while (!connections_.empty()) {
  14. connections_.pop();
  15. }
  16. }
  17. std::unique_ptr<ChatService::Stub> getConnection() {
  18. std::unique_lock<std::mutex> lock(mutex_);
  19. cond_.wait(lock, [this] {
  20. if (b_stop_) {
  21. return true;
  22. }
  23. return !connections_.empty();
  24. });
  25. //如果停止则直接返回空指针
  26. if (b_stop_) {
  27. return nullptr;
  28. }
  29. auto context = std::move(connections_.front());
  30. connections_.pop();
  31. return context;
  32. }
  33. void returnConnection(std::unique_ptr<ChatService::Stub> context) {
  34. std::lock_guard<std::mutex> lock(mutex_);
  35. if (b_stop_) {
  36. return;
  37. }
  38. connections_.push(std::move(context));
  39. cond_.notify_one();
  40. }
  41. void Close() {
  42. b_stop_ = true;
  43. cond_.notify_all();
  44. }
  45. private:
  46. atomic<bool> b_stop_;
  47. size_t poolSize_;
  48. std::string host_;
  49. std::string port_;
  50. std::queue<std::unique_ptr<ChatService::Stub> > connections_;
  51. std::mutex mutex_;
  52. std::condition_variable cond_;
  53. };

然后利用单例模式实现grpc通信的客户端

  1. class ChatGrpcClient: public Singleton<ChatGrpcClient>
  2. {
  3. friend class Singleton<ChatGrpcClient>;
  4. public:
  5. ~ChatGrpcClient() {
  6. }
  7. AddFriendRsp NotifyAddFriend(std::string server_ip, const AddFriendReq& req);
  8. AuthFriendRsp NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req);
  9. bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo);
  10. TextChatMsgRsp NotifyTextChatMsg(std::string server_ip, const TextChatMsgReq& req, const Json::Value& rtvalue);
  11. private:
  12. ChatGrpcClient();
  13. unordered_map<std::string, std::unique_ptr<ChatConPool>> _pools;
  14. };

实现具体的ChatGrpcClient

  1. ChatGrpcClient::ChatGrpcClient()
  2. {
  3. auto& cfg = ConfigMgr::Inst();
  4. auto server_list = cfg["PeerServer"]["Servers"];
  5. std::vector<std::string> words;
  6. std::stringstream ss(server_list);
  7. std::string word;
  8. while (std::getline(ss, word, ',')) {
  9. words.push_back(word);
  10. }
  11. for (auto& word : words) {
  12. if (cfg[word]["Name"].empty()) {
  13. continue;
  14. }
  15. _pools[cfg[word]["Name"]] = std::make_unique<ChatConPool>(5, cfg[word]["Host"], cfg[word]["Port"]);
  16. }
  17. }
  18. AddFriendRsp ChatGrpcClient::NotifyAddFriend(std::string server_ip, const AddFriendReq& req) {
  19. AddFriendRsp rsp;
  20. return rsp;
  21. }
  22. AuthFriendRsp ChatGrpcClient::NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req) {
  23. AuthFriendRsp rsp;
  24. return rsp;
  25. }
  26. bool ChatGrpcClient::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) {
  27. return true;
  28. }
  29. TextChatMsgRsp ChatGrpcClient::NotifyTextChatMsg(std::string server_ip,
  30. const TextChatMsgReq& req, const Json::Value& rtvalue) {
  31. TextChatMsgRsp rsp;
  32. return rsp;
  33. }

连接池服务端

向ChatServer中添加ChatServiceImpl类,自动生成头文件和源文件

  1. class ChatServiceImpl final : public ChatService::Service
  2. {
  3. public:
  4. ChatServiceImpl();
  5. Status NotifyAddFriend(ServerContext* context, const AddFriendReq* request,
  6. AddFriendRsp* reply) override;
  7. Status NotifyAuthFriend(ServerContext* context,
  8. const AuthFriendReq* request, AuthFriendRsp* response) override;
  9. Status NotifyTextChatMsg(::grpc::ServerContext* context,
  10. const TextChatMsgReq* request, TextChatMsgRsp* response) override;
  11. bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo);
  12. private:
  13. };

实现服务逻辑,先简单写成不处理直接返回。

  1. ChatServiceImpl::ChatServiceImpl()
  2. {
  3. }
  4. Status ChatServiceImpl::NotifyAddFriend(ServerContext* context, const AddFriendReq* request,
  5. AddFriendRsp* reply) {
  6. return Status::OK;
  7. }
  8. Status ChatServiceImpl::NotifyAuthFriend(ServerContext* context,
  9. const AuthFriendReq* request, AuthFriendRsp* response) {
  10. return Status::OK;
  11. }
  12. Status ChatServiceImpl::NotifyTextChatMsg(::grpc::ServerContext* context,
  13. const TextChatMsgReq* request, TextChatMsgRsp* response) {
  14. return Status::OK;
  15. }
  16. bool ChatServiceImpl::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) {
  17. return true;
  18. }

并且完善chatserver配置

  1. [GateServer]
  2. Port = 8080
  3. [VarifyServer]
  4. Host = 127.0.0.1
  5. Port = 50051
  6. [StatusServer]
  7. Host = 127.0.0.1
  8. Port = 50052
  9. [SelfServer]
  10. Name = chatserver1
  11. Host = 0.0.0.0
  12. Port = 8090
  13. RPCPort = 50055
  14. [Mysql]
  15. Host = 81.68.86.146
  16. Port = 3308
  17. User = root
  18. Passwd = 123456.
  19. Schema = llfc
  20. [Redis]
  21. Host = 81.68.86.146
  22. Port = 6380
  23. Passwd = 123456
  24. [PeerServer]
  25. Servers = chatserver2
  26. [chatserver2]
  27. Name = chatserver2
  28. Host = 127.0.0.1
  29. Port = 50056

增加了PeerServer字段,存储对端server列表,通过逗号分隔,可以通过逗号切割对端服务器名字,再根据名字去配置里查找对应字段。

对应的chatserver复制一份,改名为chatserver2,然后修改config.ini配置。要和server1配置不同,实现端对端的配置。具体详见服务器代码。

服务器连接数管理

每当服务器chatserver启动后,都要重新设置一下用户连接数管理,并且我们每个chatserver既要有tcp服务监听也要有grpc服务监听

  1. using namespace std;
  2. bool bstop = false;
  3. std::condition_variable cond_quit;
  4. std::mutex mutex_quit;
  5. int main()
  6. {
  7. auto& cfg = ConfigMgr::Inst();
  8. auto server_name = cfg["SelfServer"]["Name"];
  9. try {
  10. auto pool = AsioIOServicePool::GetInstance();
  11. //将登录数设置为0
  12. RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, "0");
  13. //定义一个GrpcServer
  14. std::string server_address(cfg["SelfServer"]["Host"] + ":" + cfg["SelfServer"]["RPCPort"]);
  15. ChatServiceImpl service;
  16. grpc::ServerBuilder builder;
  17. // 监听端口和添加服务
  18. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  19. builder.RegisterService(&service);
  20. // 构建并启动gRPC服务器
  21. std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
  22. std::cout << "RPC Server listening on " << server_address << std::endl;
  23. //单独启动一个线程处理grpc服务
  24. std::thread grpc_server_thread([&server]() {
  25. server->Wait();
  26. });
  27. boost::asio::io_context io_context;
  28. boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
  29. signals.async_wait([&io_context, pool, &server](auto, auto) {
  30. io_context.stop();
  31. pool->Stop();
  32. server->Shutdown();
  33. });
  34. auto port_str = cfg["SelfServer"]["Port"];
  35. CServer s(io_context, atoi(port_str.c_str()));
  36. io_context.run();
  37. RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
  38. RedisMgr::GetInstance()->Close();
  39. grpc_server_thread.join();
  40. }
  41. catch (std::exception& e) {
  42. std::cerr << "Exception: " << e.what() << endl;
  43. }
  44. }

我们在服务器启动后将本服务器的登录数量设置为0.

同样的道理,我们将服务器关闭后,也要删除对应key。

用户连接管理

因为我们用户登录后,要将连接(session)和用户uid绑定。为以后登陆踢人做准备。所以新增UserMgr管理类.

其声明如下

  1. class CSession;
  2. class UserMgr : public Singleton<UserMgr>
  3. {
  4. friend class Singleton<UserMgr>;
  5. public:
  6. ~UserMgr();
  7. std::shared_ptr<CSession> GetSession(int uid);
  8. void SetUserSession(int uid, std::shared_ptr<CSession> session);
  9. void RmvUserSession(int uid);
  10. private:
  11. UserMgr();
  12. std::mutex _session_mtx;
  13. std::unordered_map<int, std::shared_ptr<CSession>> _uid_to_session;
  14. };

其实现如下

  1. UserMgr:: ~UserMgr() {
  2. _uid_to_session.clear();
  3. }
  4. std::shared_ptr<CSession> UserMgr::GetSession(int uid)
  5. {
  6. std::lock_guard<std::mutex> lock(_session_mtx);
  7. auto iter = _uid_to_session.find(uid);
  8. if (iter == _uid_to_session.end()) {
  9. return nullptr;
  10. }
  11. return iter->second;
  12. }
  13. void UserMgr::SetUserSession(int uid, std::shared_ptr<CSession> session)
  14. {
  15. std::lock_guard<std::mutex> lock(_session_mtx);
  16. _uid_to_session[uid] = session;
  17. }
  18. void UserMgr::RmvUserSession(int uid)
  19. {
  20. auto uid_str = std::to_string(uid);
  21. //因为再次登录可能是其他服务器,所以会造成本服务器删除key,其他服务器注册key的情况
  22. // 有可能其他服务登录,本服删除key造成找不到key的情况
  23. //RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str);
  24. {
  25. std::lock_guard<std::mutex> lock(_session_mtx);
  26. _uid_to_session.erase(uid);
  27. }
  28. }
  29. UserMgr::UserMgr()
  30. {
  31. }

RmvUserSession 暂时屏蔽,以后做登录踢人后能保证有序移除用户ip操作。

当有连接异常时,可以调用移除用户Session的接口

  1. void CServer::ClearSession(std::string session_id) {
  2. if (_sessions.find(session_id) != _sessions.end()) {
  3. //移除用户和session的关联
  4. UserMgr::GetInstance()->RmvUserSession(_sessions[session_id]->GetUserId());
  5. }
  6. {
  7. lock_guard<mutex> lock(_mutex);
  8. _sessions.erase(session_id);
  9. }
  10. }

聊天服务完善用户登录,当用户登录后, 设置其uid对应的serverip。以及更新其所在服务器的连接数。

  1. void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. auto uid = root["uid"].asInt();
  6. auto token = root["token"].asString();
  7. std::cout << "user login uid is " << uid << " user token is "
  8. << token << endl;
  9. Json::Value rtvalue;
  10. Defer defer([this, &rtvalue, session]() {
  11. std::string return_str = rtvalue.toStyledString();
  12. session->Send(return_str, MSG_CHAT_LOGIN_RSP);
  13. });
  14. //从redis获取用户token是否正确
  15. std::string uid_str = std::to_string(uid);
  16. std::string token_key = USERTOKENPREFIX + uid_str;
  17. std::string token_value = "";
  18. bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
  19. if (!success) {
  20. rtvalue["error"] = ErrorCodes::UidInvalid;
  21. return;
  22. }
  23. if (token_value != token) {
  24. rtvalue["error"] = ErrorCodes::TokenInvalid;
  25. return;
  26. }
  27. rtvalue["error"] = ErrorCodes::Success;
  28. std::string base_key = USER_BASE_INFO + uid_str;
  29. auto user_info = std::make_shared<UserInfo>();
  30. bool b_base = GetBaseInfo(base_key, uid, user_info);
  31. if (!b_base) {
  32. rtvalue["error"] = ErrorCodes::UidInvalid;
  33. return;
  34. }
  35. rtvalue["uid"] = uid;
  36. rtvalue["pwd"] = user_info->pwd;
  37. rtvalue["name"] = user_info->name;
  38. rtvalue["email"] = user_info->email;
  39. rtvalue["nick"] = user_info->nick;
  40. rtvalue["desc"] = user_info->desc;
  41. rtvalue["sex"] = user_info->sex;
  42. rtvalue["icon"] = user_info->icon;
  43. //从数据库获取申请列表
  44. //获取好友列表
  45. auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name");
  46. //将登录数量增加
  47. auto rd_res = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server_name);
  48. int count = 0;
  49. if (!rd_res.empty()) {
  50. count = std::stoi(rd_res);
  51. }
  52. count++;
  53. auto count_str = std::to_string(count);
  54. RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, count_str);
  55. //session绑定用户uid
  56. session->SetUserId(uid);
  57. //为用户设置登录ip server的名字
  58. std::string ipkey = USERIPPREFIX + uid_str;
  59. RedisMgr::GetInstance()->Set(ipkey, server_name);
  60. //uid和session绑定管理,方便以后踢人操作
  61. UserMgr::GetInstance()->SetUserSession(uid, session);
  62. return;
  63. }

状态服务

状态服务更新配置

  1. [StatusServer]
  2. Port = 50052
  3. Host = 0.0.0.0
  4. [Mysql]
  5. Host = 81.68.86.146
  6. Port = 3308
  7. User = root
  8. Passwd = 123456.
  9. Schema = llfc
  10. [Redis]
  11. Host = 81.68.86.146
  12. Port = 6380
  13. Passwd = 123456
  14. [chatservers]
  15. Name = chatserver1,chatserver2
  16. [chatserver1]
  17. Name = chatserver1
  18. Host = 127.0.0.1
  19. Port = 8090
  20. [chatserver2]
  21. Name = chatserver2
  22. Host = 127.0.0.1
  23. Port = 8091

配置文件同样增加了chatservers列表,用来管理多个服务,接下来实现根据连接数动态返回chatserverip的功能

  1. Status StatusServiceImpl::GetChatServer(ServerContext* context,
  2. const GetChatServerReq* request, GetChatServerRsp* reply)
  3. {
  4. std::string prefix("llfc status server has received : ");
  5. const auto& server = getChatServer();
  6. reply->set_host(server.host);
  7. reply->set_port(server.port);
  8. reply->set_error(ErrorCodes::Success);
  9. reply->set_token(generate_unique_string());
  10. insertToken(request->uid(), reply->token());
  11. return Status::OK;
  12. }

getChatServer用来获取最小连接数的chatserver 名字

  1. ChatServer StatusServiceImpl::getChatServer() {
  2. std::lock_guard<std::mutex> guard(_server_mtx);
  3. auto minServer = _servers.begin()->second;
  4. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);
  5. if (count_str.empty()) {
  6. //不存在则默认设置为最大
  7. minServer.con_count = INT_MAX;
  8. }
  9. else {
  10. minServer.con_count = std::stoi(count_str);
  11. }
  12. // 使用范围基于for循环
  13. for (auto& server : _servers) {
  14. if (server.second.name == minServer.name) {
  15. continue;
  16. }
  17. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
  18. if (count_str.empty()) {
  19. server.second.con_count = INT_MAX;
  20. }
  21. else {
  22. server.second.con_count = std::stoi(count_str);
  23. }
  24. if (server.second.con_count < minServer.con_count) {
  25. minServer = server.second;
  26. }
  27. }
  28. return minServer;
  29. }

测试

分别启动两个chatserver,gateserver,以及statusserver,并且启动两个客户端登录,

分别查看登录信息,发现两个客户端被分配到不同的chatserver了,说明我们实现了负载均衡的分配方式。

https://cdn.llfc.club/1722314087856.jpg

源码连接

https://gitee.com/secondtonone1/llfcchat

视频连接

https://www.bilibili.com/video/BV17r421K7Px/?spm_id_from=333.999.0.0&vd_source=8be9e83424c2ed2c9b2a3ed1d01385e9

热门评论

热门文章

  1. 解密定时器的实现细节

    喜欢(566) 浏览(2812)
  2. C++ 类的继承封装和多态

    喜欢(588) 浏览(3910)
  3. slice介绍和使用

    喜欢(521) 浏览(2103)
  4. Linux环境搭建和编码

    喜欢(594) 浏览(8788)
  5. windows环境搭建和vscode配置

    喜欢(587) 浏览(2073)

最新评论

  1. asio多线程模型IOServicePool Lion:线程池一定要继承单例模式吗
  2. 泛型算法的定制操作 secondtonone1:lambda和bind是C11新增的利器,善于利用这两个机制可以极大地提升编程安全性和效率。
  3. 类和对象 陈宇航:支持!!!!
  4. C++ 虚函数表原理和类成员内存分布 WangQi888888:class Test{ int m; int b; }中b成员是int,为什么在内存中只占了1个字节。不应该是4个字节吗?是不是int应该改为char。这样的话就会符合图上说明的情况
  5. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。

个人公众号

个人微信