简介
本文介绍如何将chatserver设置为分布式服务,并且实现statusserver的负载均衡处理,根据每个chatserver现有的连接数匹配最小的chatserver返回给GateServer并返回给客户端。
为了实现这一系列分布式设计,我们需要先完善chatserver,增加grpc客户端和服务端。这样能实现两个chatserver之间端对端的通信。
visual studio中右键chatserver项目选择添加新文件ChatGrpcClient, 会为我们生成ChatGrpcClient.h和ChatGrpcClient.cpp文件。
连接池客户端
先实现ChatConPool连接池
class ChatConPool {public:ChatConPool(size_t poolSize, std::string host, std::string port):poolSize_(poolSize), host_(host),port_(port),b_stop_(false){for (size_t i = 0; i < poolSize_; ++i) {std::shared_ptr<Channel> channel = grpc::CreateChannel(host + ":" + port, grpc::InsecureChannelCredentials());connections_.push(ChatService::NewStub(channel));}}~ChatConPool() {std::lock_guard<std::mutex> lock(mutex_);Close();while (!connections_.empty()) {connections_.pop();}}std::unique_ptr<ChatService::Stub> getConnection() {std::unique_lock<std::mutex> lock(mutex_);cond_.wait(lock, [this] {if (b_stop_) {return true;}return !connections_.empty();});//如果停止则直接返回空指针if (b_stop_) {return nullptr;}auto context = std::move(connections_.front());connections_.pop();return context;}void returnConnection(std::unique_ptr<ChatService::Stub> context) {std::lock_guard<std::mutex> lock(mutex_);if (b_stop_) {return;}connections_.push(std::move(context));cond_.notify_one();}void Close() {b_stop_ = true;cond_.notify_all();}private:atomic<bool> b_stop_;size_t poolSize_;std::string host_;std::string port_;std::queue<std::unique_ptr<ChatService::Stub> > connections_;std::mutex mutex_;std::condition_variable cond_;};
然后利用单例模式实现grpc通信的客户端
class ChatGrpcClient: public Singleton<ChatGrpcClient>{friend class Singleton<ChatGrpcClient>;public:~ChatGrpcClient() {}AddFriendRsp NotifyAddFriend(std::string server_ip, const AddFriendReq& req);AuthFriendRsp NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req);bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo);TextChatMsgRsp NotifyTextChatMsg(std::string server_ip, const TextChatMsgReq& req, const Json::Value& rtvalue);private:ChatGrpcClient();unordered_map<std::string, std::unique_ptr<ChatConPool>> _pools;};
实现具体的ChatGrpcClient
ChatGrpcClient::ChatGrpcClient(){auto& cfg = ConfigMgr::Inst();auto server_list = cfg["PeerServer"]["Servers"];std::vector<std::string> words;std::stringstream ss(server_list);std::string word;while (std::getline(ss, word, ',')) {words.push_back(word);}for (auto& word : words) {if (cfg[word]["Name"].empty()) {continue;}_pools[cfg[word]["Name"]] = std::make_unique<ChatConPool>(5, cfg[word]["Host"], cfg[word]["Port"]);}}AddFriendRsp ChatGrpcClient::NotifyAddFriend(std::string server_ip, const AddFriendReq& req) {AddFriendRsp rsp;return rsp;}AuthFriendRsp ChatGrpcClient::NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req) {AuthFriendRsp rsp;return rsp;}bool ChatGrpcClient::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) {return true;}TextChatMsgRsp ChatGrpcClient::NotifyTextChatMsg(std::string server_ip,const TextChatMsgReq& req, const Json::Value& rtvalue) {TextChatMsgRsp rsp;return rsp;}
连接池服务端
向ChatServer中添加ChatServiceImpl类,自动生成头文件和源文件
class ChatServiceImpl final : public ChatService::Service{public:ChatServiceImpl();Status NotifyAddFriend(ServerContext* context, const AddFriendReq* request,AddFriendRsp* reply) override;Status NotifyAuthFriend(ServerContext* context,const AuthFriendReq* request, AuthFriendRsp* response) override;Status NotifyTextChatMsg(::grpc::ServerContext* context,const TextChatMsgReq* request, TextChatMsgRsp* response) override;bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo);private:};
实现服务逻辑,先简单写成不处理直接返回。
ChatServiceImpl::ChatServiceImpl(){}Status ChatServiceImpl::NotifyAddFriend(ServerContext* context, const AddFriendReq* request,AddFriendRsp* reply) {return Status::OK;}Status ChatServiceImpl::NotifyAuthFriend(ServerContext* context,const AuthFriendReq* request, AuthFriendRsp* response) {return Status::OK;}Status ChatServiceImpl::NotifyTextChatMsg(::grpc::ServerContext* context,const TextChatMsgReq* request, TextChatMsgRsp* response) {return Status::OK;}bool ChatServiceImpl::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) {return true;}
并且完善chatserver配置
[GateServer]Port = 8080[VarifyServer]Host = 127.0.0.1Port = 50051[StatusServer]Host = 127.0.0.1Port = 50052[SelfServer]Name = chatserver1Host = 0.0.0.0Port = 8090RPCPort = 50055[Mysql]Host = 81.68.86.146Port = 3308User = rootPasswd = 123456.Schema = llfc[Redis]Host = 81.68.86.146Port = 6380Passwd = 123456[PeerServer]Servers = chatserver2[chatserver2]Name = chatserver2Host = 127.0.0.1Port = 50056
增加了PeerServer字段,存储对端server列表,通过逗号分隔,可以通过逗号切割对端服务器名字,再根据名字去配置里查找对应字段。
对应的chatserver复制一份,改名为chatserver2,然后修改config.ini配置。要和server1配置不同,实现端对端的配置。具体详见服务器代码。
服务器连接数管理
每当服务器chatserver启动后,都要重新设置一下用户连接数管理,并且我们每个chatserver既要有tcp服务监听也要有grpc服务监听
using namespace std;bool bstop = false;std::condition_variable cond_quit;std::mutex mutex_quit;int main(){auto& cfg = ConfigMgr::Inst();auto server_name = cfg["SelfServer"]["Name"];try {auto pool = AsioIOServicePool::GetInstance();//将登录数设置为0RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, "0");//定义一个GrpcServerstd::string server_address(cfg["SelfServer"]["Host"] + ":" + cfg["SelfServer"]["RPCPort"]);ChatServiceImpl service;grpc::ServerBuilder builder;// 监听端口和添加服务builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());builder.RegisterService(&service);// 构建并启动gRPC服务器std::unique_ptr<grpc::Server> server(builder.BuildAndStart());std::cout << "RPC Server listening on " << server_address << std::endl;//单独启动一个线程处理grpc服务std::thread grpc_server_thread([&server]() {server->Wait();});boost::asio::io_context io_context;boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);signals.async_wait([&io_context, pool, &server](auto, auto) {io_context.stop();pool->Stop();server->Shutdown();});auto port_str = cfg["SelfServer"]["Port"];CServer s(io_context, atoi(port_str.c_str()));io_context.run();RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);RedisMgr::GetInstance()->Close();grpc_server_thread.join();}catch (std::exception& e) {std::cerr << "Exception: " << e.what() << endl;}}
我们在服务器启动后将本服务器的登录数量设置为0.
同样的道理,我们将服务器关闭后,也要删除对应key。
用户连接管理
因为我们用户登录后,要将连接(session)和用户uid绑定。为以后登陆踢人做准备。所以新增UserMgr管理类.
其声明如下
class CSession;class UserMgr : public Singleton<UserMgr>{friend class Singleton<UserMgr>;public:~UserMgr();std::shared_ptr<CSession> GetSession(int uid);void SetUserSession(int uid, std::shared_ptr<CSession> session);void RmvUserSession(int uid);private:UserMgr();std::mutex _session_mtx;std::unordered_map<int, std::shared_ptr<CSession>> _uid_to_session;};
其实现如下
UserMgr:: ~UserMgr() {_uid_to_session.clear();}std::shared_ptr<CSession> UserMgr::GetSession(int uid){std::lock_guard<std::mutex> lock(_session_mtx);auto iter = _uid_to_session.find(uid);if (iter == _uid_to_session.end()) {return nullptr;}return iter->second;}void UserMgr::SetUserSession(int uid, std::shared_ptr<CSession> session){std::lock_guard<std::mutex> lock(_session_mtx);_uid_to_session[uid] = session;}void UserMgr::RmvUserSession(int uid){auto uid_str = std::to_string(uid);//因为再次登录可能是其他服务器,所以会造成本服务器删除key,其他服务器注册key的情况// 有可能其他服务登录,本服删除key造成找不到key的情况//RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str);{std::lock_guard<std::mutex> lock(_session_mtx);_uid_to_session.erase(uid);}}UserMgr::UserMgr(){}
RmvUserSession 暂时屏蔽,以后做登录踢人后能保证有序移除用户ip操作。
当有连接异常时,可以调用移除用户Session的接口
void CServer::ClearSession(std::string session_id) {if (_sessions.find(session_id) != _sessions.end()) {//移除用户和session的关联UserMgr::GetInstance()->RmvUserSession(_sessions[session_id]->GetUserId());}{lock_guard<mutex> lock(_mutex);_sessions.erase(session_id);}}
聊天服务完善用户登录,当用户登录后, 设置其uid对应的serverip。以及更新其所在服务器的连接数。
void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto uid = root["uid"].asInt();auto token = root["token"].asString();std::cout << "user login uid is " << uid << " user token is "<< token << endl;Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, MSG_CHAT_LOGIN_RSP);});//从redis获取用户token是否正确std::string uid_str = std::to_string(uid);std::string token_key = USERTOKENPREFIX + uid_str;std::string token_value = "";bool success = RedisMgr::GetInstance()->Get(token_key, token_value);if (!success) {rtvalue["error"] = ErrorCodes::UidInvalid;return;}if (token_value != token) {rtvalue["error"] = ErrorCodes::TokenInvalid;return;}rtvalue["error"] = ErrorCodes::Success;std::string base_key = USER_BASE_INFO + uid_str;auto user_info = std::make_shared<UserInfo>();bool b_base = GetBaseInfo(base_key, uid, user_info);if (!b_base) {rtvalue["error"] = ErrorCodes::UidInvalid;return;}rtvalue["uid"] = uid;rtvalue["pwd"] = user_info->pwd;rtvalue["name"] = user_info->name;rtvalue["email"] = user_info->email;rtvalue["nick"] = user_info->nick;rtvalue["desc"] = user_info->desc;rtvalue["sex"] = user_info->sex;rtvalue["icon"] = user_info->icon;//从数据库获取申请列表//获取好友列表auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name");//将登录数量增加auto rd_res = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server_name);int count = 0;if (!rd_res.empty()) {count = std::stoi(rd_res);}count++;auto count_str = std::to_string(count);RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, count_str);//session绑定用户uidsession->SetUserId(uid);//为用户设置登录ip server的名字std::string ipkey = USERIPPREFIX + uid_str;RedisMgr::GetInstance()->Set(ipkey, server_name);//uid和session绑定管理,方便以后踢人操作UserMgr::GetInstance()->SetUserSession(uid, session);return;}
状态服务
状态服务更新配置
[StatusServer]Port = 50052Host = 0.0.0.0[Mysql]Host = 81.68.86.146Port = 3308User = rootPasswd = 123456.Schema = llfc[Redis]Host = 81.68.86.146Port = 6380Passwd = 123456[chatservers]Name = chatserver1,chatserver2[chatserver1]Name = chatserver1Host = 127.0.0.1Port = 8090[chatserver2]Name = chatserver2Host = 127.0.0.1Port = 8091
配置文件同样增加了chatservers列表,用来管理多个服务,接下来实现根据连接数动态返回chatserverip的功能
Status StatusServiceImpl::GetChatServer(ServerContext* context,const GetChatServerReq* request, GetChatServerRsp* reply){std::string prefix("llfc status server has received : ");const auto& server = getChatServer();reply->set_host(server.host);reply->set_port(server.port);reply->set_error(ErrorCodes::Success);reply->set_token(generate_unique_string());insertToken(request->uid(), reply->token());return Status::OK;}
getChatServer用来获取最小连接数的chatserver 名字
ChatServer StatusServiceImpl::getChatServer() {std::lock_guard<std::mutex> guard(_server_mtx);auto minServer = _servers.begin()->second;auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);if (count_str.empty()) {//不存在则默认设置为最大minServer.con_count = INT_MAX;}else {minServer.con_count = std::stoi(count_str);}// 使用范围基于for循环for (auto& server : _servers) {if (server.second.name == minServer.name) {continue;}auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);if (count_str.empty()) {server.second.con_count = INT_MAX;}else {server.second.con_count = std::stoi(count_str);}if (server.second.con_count < minServer.con_count) {minServer = server.second;}}return minServer;}
测试
分别启动两个chatserver,gateserver,以及statusserver,并且启动两个客户端登录,
分别查看登录信息,发现两个客户端被分配到不同的chatserver了,说明我们实现了负载均衡的分配方式。

源码连接
https://gitee.com/secondtonone1/llfcchat