rpc通知客户端异步下载聊天图片

前情回顾

前面我们搞定了1,2,3以及5过程

image-20260208131004814

今天主要完成

4,6和7

资源服务器grpc设置

因为资源服务器要通知ChatServer,所以要设置grpc客户端

完善下proto协议,新增消息通知请求

image-20260208133832656

具体代码逻辑

  1. message NotifyChatImgReq{
  2. int32 from_uid = 1;
  3. int32 to_uid = 2;
  4. int32 message_id = 3;
  5. string file_name = 4;
  6. int64 total_size = 5;
  7. int32 thread_id =6;
  8. }
  9. message NotifyChatImgRsp{
  10. int32 error = 1;
  11. int32 from_uid = 2;
  12. int32 to_uid = 3;
  13. int32 message_id = 4;
  14. string file_name = 5;
  15. int64 total_size = 6;
  16. int32 thread_id =7;
  17. }
  18. service ChatService {
  19. rpc NotifyAddFriend(AddFriendReq) returns (AddFriendRsp) {}
  20. rpc RplyAddFriend(RplyFriendReq) returns (RplyFriendRsp) {}
  21. rpc SendChatMsg(SendChatMsgReq) returns (SendChatMsgRsp) {}
  22. rpc NotifyAuthFriend(AuthFriendReq) returns (AuthFriendRsp) {}
  23. rpc NotifyTextChatMsg(TextChatMsgReq) returns (TextChatMsgRsp){}
  24. rpc NotifyKickUser(KickUserReq) returns (KickUserRsp){}
  25. rpc NotifyChatImgMsg(NotifyChatImgReq) returns (NotifyChatImgRsp){}
  26. }

实现grpc客户端逻辑如下:

  1. #pragma once
  2. #include "const.h"
  3. #include "Singleton.h"
  4. #include "ConfigMgr.h"
  5. #include "message.grpc.pb.h"
  6. #include "message.pb.h"
  7. #include <grpcpp/grpcpp.h>
  8. #include <queue>
  9. #include <condition_variable>
  10. using grpc::Channel;
  11. using grpc::Status;
  12. using grpc::ClientContext;
  13. using message::ChatService;
  14. using message::NotifyChatImgReq;
  15. using message::NotifyChatImgRsp;
  16. class ChatServerConPool {
  17. public:
  18. ChatServerConPool(size_t poolSize, std::string host, std::string port)
  19. : poolSize_(poolSize), host_(host), port_(port), b_stop_(false) {
  20. for (size_t i = 0; i < poolSize_; ++i) {
  21. std::shared_ptr<Channel> channel = grpc::CreateChannel(host + ":" + port,
  22. grpc::InsecureChannelCredentials());
  23. connections_.push(ChatService::NewStub(channel));
  24. }
  25. }
  26. ~ChatServerConPool() {
  27. std::lock_guard<std::mutex> lock(mutex_);
  28. Close();
  29. while (!connections_.empty()) {
  30. connections_.pop();
  31. }
  32. }
  33. std::unique_ptr<ChatService::Stub> getConnection() {
  34. std::unique_lock<std::mutex> lock(mutex_);
  35. cond_.wait(lock, [this] {
  36. if (b_stop_) {
  37. return true;
  38. }
  39. return !connections_.empty();
  40. });
  41. //如果停止则直接返回空指针
  42. if (b_stop_) {
  43. return nullptr;
  44. }
  45. auto context = std::move(connections_.front());
  46. connections_.pop();
  47. return context;
  48. }
  49. void returnConnection(std::unique_ptr<ChatService::Stub> context) {
  50. std::lock_guard<std::mutex> lock(mutex_);
  51. if (b_stop_) {
  52. return;
  53. }
  54. connections_.push(std::move(context));
  55. cond_.notify_one();
  56. }
  57. void Close() {
  58. b_stop_ = true;
  59. cond_.notify_all();
  60. }
  61. private:
  62. atomic<bool> b_stop_;
  63. size_t poolSize_;
  64. std::string host_;
  65. std::string port_;
  66. std::queue<std::unique_ptr<ChatService::Stub>> connections_;
  67. std::mutex mutex_;
  68. std::condition_variable cond_;
  69. };
  70. class ChatServerGrpcClient :public Singleton<ChatServerGrpcClient>
  71. {
  72. friend class Singleton<ChatServerGrpcClient>;
  73. public:
  74. ~ChatServerGrpcClient() {
  75. }
  76. NotifyChatImgRsp NotifyChatImgMsg(int message_id, std::string chatserver);
  77. private:
  78. ChatServerGrpcClient();
  79. //sever_ip到连接池的映射, <chatserver1,std::unique_ptr<ChatServerConPool>>
  80. std::unordered_map<std::string, std::unique_ptr<ChatServerConPool>> _hash_pools;
  81. };

具体实现

  1. #include "ChatServerGrpcClient.h"
  2. #include "MysqlMgr.h"
  3. NotifyChatImgRsp ChatServerGrpcClient::NotifyChatImgMsg(int message_id,std::string chatserver)
  4. {
  5. ClientContext context;
  6. NotifyChatImgRsp reply;
  7. NotifyChatImgReq request;
  8. request.set_message_id(message_id);
  9. if (_hash_pools.find(chatserver) == _hash_pools.end()) {
  10. reply.set_error(ErrorCodes::ServerIpErr);
  11. return reply;
  12. }
  13. auto chat_msg = MysqlMgr::GetInstance()->GetChatMsgById(message_id);
  14. request.set_file_name(chat_msg->content);
  15. request.set_from_uid(chat_msg->sender_id);
  16. request.set_to_uid(chat_msg->recv_id);
  17. request.set_thread_id(chat_msg->thread_id);
  18. // 资源文件路径
  19. auto file_dir = ConfigMgr::Inst().GetFileOutPath();
  20. //该消息是接收方客户端发送过来的,服务器将资源存储在发送方的文件夹中
  21. auto uid_str = std::to_string(chat_msg->sender_id);
  22. auto file_path = (file_dir / uid_str / chat_msg->content);
  23. boost::uintmax_t file_size = boost::filesystem::file_size(file_path);
  24. request.set_total_size(file_size);
  25. auto &pool_ = _hash_pools[chatserver];
  26. auto stub = pool_->getConnection();
  27. Status status = stub->NotifyChatImgMsg(&context, request, &reply);
  28. Defer defer([&stub, &pool_, this]() {
  29. pool_->returnConnection(std::move(stub));
  30. });
  31. if (status.ok()) {
  32. return reply;
  33. }
  34. else {
  35. reply.set_error(ErrorCodes::RPCFailed);
  36. return reply;
  37. }
  38. }
  39. ChatServerGrpcClient::ChatServerGrpcClient()
  40. {
  41. auto& gCfgMgr = ConfigMgr::Inst();
  42. std::string host1 = gCfgMgr["chatserver1"]["Host"];
  43. std::string port1 = gCfgMgr["chatserver1"]["Port"];
  44. _hash_pools["chatserver1"] = std::make_unique<ChatServerConPool>(5, host1, port1);
  45. std::string host2 = gCfgMgr["chatserver2"]["Host"];
  46. std::string port2 = gCfgMgr["chatserver2"]["Port"];
  47. _hash_pools["chatserver2"] = std::make_unique<ChatServerConPool>(5, host2, port2);
  48. }

实现了通知接口,用来通知ChatServer图片消息上传完成,让ChatServer再通知其他客户端.

ChatServer响应资源服务器通知

ChatServer的proto也需要进行同样配置,这里略去

具体在ChatServiceImpl中添加响应消息通知的逻辑

  1. Status ChatServiceImpl::NotifyChatImgMsg(::grpc::ServerContext* context, const ::message::NotifyChatImgReq* request, ::message::NotifyChatImgRsp* response)
  2. {
  3. //查找用户是否在本服务器
  4. auto uid = request->to_uid();
  5. auto session = UserMgr::GetInstance()->GetSession(uid);
  6. Defer defer([request, response]() {
  7. //设置具体的回包信息
  8. response->set_error(ErrorCodes::Success);
  9. response->set_message_id(request->message_id());
  10. });
  11. //用户不在内存中则直接返回
  12. if (session == nullptr) {
  13. //这里只是返回1个状态
  14. return Status::OK;
  15. }
  16. //在内存中则直接发送通知对方
  17. session->NotifyChatImgRecv(request);
  18. //这里只是返回1个状态
  19. return Status::OK;
  20. }

通过Session通知客户端

  1. void CSession::NotifyChatImgRecv(const ::message::NotifyChatImgReq* request) {
  2. Json::Value rtvalue;
  3. rtvalue["error"] = ErrorCodes::Success;
  4. rtvalue["message_id"] = request->message_id();
  5. rtvalue["sender_id"] = request->from_uid();
  6. rtvalue["receiver_id"] = request->to_uid();
  7. rtvalue["img_name"] = request->file_name();
  8. rtvalue["total_size"] = std::to_string(request->total_size());
  9. rtvalue["thread_id"] = request->thread_id();
  10. std::string return_str = rtvalue.toStyledString();
  11. //通知图片聊天信息
  12. Send(return_str, ID_NOTIFY_IMG_CHAT_MSG_REQ);
  13. return;
  14. }

客户端获取通知

客户端收到服务器通知后,会优先查看本地资源是否存在,如果存在则直接加载图片,添加聊天记录到页面。

如果不存在则组织下载,但是也需要将消息添加到聊天界面。

  1. _handlers.insert(ID_NOTIFY_IMG_CHAT_MSG_REQ, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. qDebug() << "receive notify img chat msg req success" ;
  13. //收到消息后转发给页面
  14. auto thread_id = jsonObj["thread_id"].toInt();
  15. auto sender_id = jsonObj["sender_id"].toInt();
  16. auto message_id = jsonObj["message_id"].toInt();
  17. auto receiver_id = jsonObj["receiver_id"].toInt();
  18. auto img_name = jsonObj["img_name"].toString();
  19. auto total_size_str = jsonObj["total_size"].toString();
  20. auto total_size = total_size_str.toLongLong();
  21. auto uid = UserMgr::GetInstance()->GetUid();
  22. //客户端存储聊天记录,按照如下格式存储C:\Users\secon\AppData\Roaming\llfcchat\chatimg\uid, uid为对方uid
  23. QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
  24. QString img_path_str = storageDir +"/user/"+ QString::number(uid)+ "/chatimg/" + QString::number(sender_id);
  25. auto file_info = UserMgr::GetInstance()->GetTransFileByName(img_name);
  26. //正常情况是找不到的,所以这里初始化一个文件信息放入UserMgr中管理
  27. if (!file_info) {
  28. //预览图先默认空白,md5为空
  29. file_info = std::make_shared<MsgInfo>(MsgType::IMG_MSG, img_path_str, CreateLoadingPlaceholder(200, 200), img_name, total_size, "");
  30. UserMgr::GetInstance()->AddTransFile(img_name, file_info);
  31. }
  32. file_info->_msg_id = message_id;
  33. file_info->_sender = sender_id;
  34. file_info->_receiver = receiver_id;
  35. file_info->_thread_id = thread_id;
  36. //设置文件传输的类型
  37. file_info->_transfer_type = TransferType::Download;
  38. //设置文件传输状态
  39. file_info->_transfer_state = TransferState::Uploading;
  40. auto img_chat_data_ptr = std::make_shared<ImgChatData>(file_info, "",
  41. thread_id, ChatFormType::PRIVATE, ChatMsgType::PIC,
  42. sender_id, MsgStatus::READED);
  43. emit sig_img_chat_msg(img_chat_data_ptr);
  44. //组织请求,准备下载
  45. QJsonObject jsonObj_send;
  46. jsonObj_send["name"] = img_name;
  47. jsonObj_send["seq"] = file_info->_seq;
  48. jsonObj_send["trans_size"] = "0";
  49. jsonObj_send["total_size"] = QString::number(file_info->_total_size);
  50. jsonObj_send["token"] = UserMgr::GetInstance()->GetToken();
  51. jsonObj_send["sender_id"] = sender_id;
  52. jsonObj_send["receiver_id"] = receiver_id;
  53. jsonObj_send["message_id"] = message_id;
  54. jsonObj_send["uid"] = uid;
  55. //客户端存储聊天记录,按照如下格式存储C:\Users\secon\AppData\Roaming\llfcchat\chatimg\uid, uid为对方uid
  56. QDir chatimgDir(img_path_str);
  57. jsonObj["client_path"] = img_path_str;
  58. if (!chatimgDir.exists()) {
  59. chatimgDir.mkpath("."); // 创建当前路径
  60. }
  61. QJsonDocument doc(jsonObj_send);
  62. auto send_data = doc.toJson();
  63. FileTcpMgr::GetInstance()->SendData(ID_IMG_CHAT_DOWN_REQ, send_data);
  64. });

收到服务器通知后,开始构造json数据,发送ID_IMG_CHAT_DOWN_REQ请求

聊天记录添加

客户端在请求服务器资源的时候,因为本地没有资源,可以先在聊天界面生成一个预览的空白图片,同时显示进度条

这部分逻辑是在客户端的tcpmgr中处理服务器通知聊天消息的逻辑里

  1. _handlers.insert(ID_NOTIFY_IMG_CHAT_MSG_REQ, [this](ReqId id, int len, QByteArray data) {
  2. //...
  3. //发送给界面显示
  4. emit sig_img_chat_msg(img_chat_data_ptr);
  5. }

客户端将图片消息发送给界面显示

在ChatDialog的构造函数中添加信号槽链接

  1. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_img_chat_msg,
  2. this, &ChatDialog::slot_img_chat_msg);

ChatDialog收到该信号后,会触发添加消息的逻辑

  1. void ChatDialog::slot_img_chat_msg(std::shared_ptr<ImgChatData> imgchat) {
  2. //更新数据
  3. auto thread_id = imgchat->GetThreadId();
  4. auto thread_data = UserMgr::GetInstance()->GetChatThreadByThreadId(thread_id);
  5. thread_data->AddMsg(imgchat);
  6. if (_cur_chat_thread_id != thread_id) {
  7. return;
  8. }
  9. ui->chat_page->AppendOtherMsg(imgchat);
  10. }

添加其他消息的逻辑, 此处都是将其他人发送的图片消息添加到聊天界面显示

  1. void ChatPage::AppendOtherMsg(std::shared_ptr<ChatDataBase> msg) {
  2. auto self_info = UserMgr::GetInstance()->GetUserInfo();
  3. ChatRole role;
  4. if (msg->GetSendUid() == self_info->_uid) {
  5. role = ChatRole::Self;
  6. ChatItemBase* pChatItem = new ChatItemBase(role);
  7. pChatItem->setUserName(self_info->_name);
  8. SetSelfIcon(pChatItem, self_info->_icon);
  9. QWidget* pBubble = nullptr;
  10. if (msg->GetMsgType() == ChatMsgType::TEXT) {
  11. pBubble = new TextBubble(role, msg->GetMsgContent());
  12. }
  13. else if (msg->GetMsgType() == ChatMsgType::PIC) {
  14. auto img_msg = dynamic_pointer_cast<ImgChatData>(msg);
  15. auto pic_bubble = new PictureBubble(img_msg->_msg_info->_preview_pix, role, img_msg->_msg_info->_total_size);
  16. pic_bubble->setMsgInfo(img_msg->_msg_info);
  17. pBubble = pic_bubble;
  18. //连接暂停和恢复信号
  19. connect(dynamic_cast<PictureBubble*>(pBubble), &PictureBubble::pauseRequested,
  20. this, &ChatPage::on_clicked_paused);
  21. connect(dynamic_cast<PictureBubble*>(pBubble), &PictureBubble::resumeRequested,
  22. this, &ChatPage::on_clicked_resume);
  23. }
  24. pChatItem->setWidget(pBubble);
  25. auto status = msg->GetStatus();
  26. pChatItem->setStatus(status);
  27. ui->chat_data_list->appendChatItem(pChatItem);
  28. _base_item_map[msg->GetMsgId()] = pChatItem;
  29. }
  30. else {
  31. role = ChatRole::Other;
  32. ChatItemBase* pChatItem = new ChatItemBase(role);
  33. auto friend_info = UserMgr::GetInstance()->GetFriendById(msg->GetSendUid());
  34. if (friend_info == nullptr) {
  35. return;
  36. }
  37. pChatItem->setUserName(friend_info->_name);
  38. // 使用正则表达式检查是否是默认头像
  39. QRegularExpression regex("^:/res/head_(\\d+)\\.jpg$");
  40. QRegularExpressionMatch match = regex.match(friend_info->_icon);
  41. if (match.hasMatch()) {
  42. pChatItem->setUserIcon(QPixmap(friend_info->_icon));
  43. }
  44. else {
  45. // 如果是用户上传的头像,获取存储目录
  46. QString storageDir = QStandardPaths::writableLocation(QStandardPaths::AppDataLocation);
  47. auto uid = UserMgr::GetInstance()->GetUid();
  48. QDir avatarsDir(storageDir + "/user/" + QString::number(uid) + "/avatars");
  49. auto file_name = QFileInfo(self_info->_icon).fileName();
  50. // 确保目录存在
  51. if (avatarsDir.exists()) {
  52. QString avatarPath = avatarsDir.filePath(file_name); // 获取上传头像的完整路径
  53. QPixmap pixmap(avatarPath); // 加载上传的头像图片
  54. if (!pixmap.isNull()) {
  55. pChatItem->setUserIcon(pixmap);
  56. }
  57. else {
  58. qWarning() << "无法加载上传的头像:" << avatarPath;
  59. auto icon_label = pChatItem->getIconLabel();
  60. LoadHeadIcon(avatarPath, icon_label, file_name, "self_icon");
  61. }
  62. }
  63. else {
  64. qWarning() << "头像存储目录不存在:" << avatarsDir.path();
  65. //创建目录
  66. avatarsDir.mkpath(".");
  67. auto icon_label = pChatItem->getIconLabel();
  68. QString avatarPath = avatarsDir.filePath(file_name);
  69. LoadHeadIcon(avatarPath, icon_label, file_name, "self_icon");
  70. }
  71. }
  72. QWidget* pBubble = nullptr;
  73. if (msg->GetMsgType() == ChatMsgType::TEXT) {
  74. pBubble = new TextBubble(role, msg->GetMsgContent());
  75. }
  76. else if (msg->GetMsgType() == ChatMsgType::PIC) {
  77. auto img_msg = dynamic_pointer_cast<ImgChatData>(msg);
  78. auto pic_bubble = new PictureBubble(img_msg->_msg_info->_preview_pix, role, img_msg->_msg_info->_total_size);
  79. pic_bubble->setMsgInfo(img_msg->_msg_info);
  80. pBubble = pic_bubble;
  81. //连接暂停和恢复信号
  82. connect(dynamic_cast<PictureBubble*>(pBubble), &PictureBubble::pauseRequested,
  83. this, &ChatPage::on_clicked_paused);
  84. connect(dynamic_cast<PictureBubble*>(pBubble), &PictureBubble::resumeRequested,
  85. this, &ChatPage::on_clicked_resume);
  86. }
  87. pChatItem->setWidget(pBubble);
  88. auto status = msg->GetStatus();
  89. pChatItem->setStatus(status);
  90. ui->chat_data_list->appendChatItem(pChatItem);
  91. _base_item_map[msg->GetMsgId()] = pChatItem;
  92. }
  93. }

资源服务器响应下载请求

LogicWorker中增加请求的处理

  1. _fun_callbacks[ID_IMG_CHAT_DOWN_REQ] = [this](std::shared_ptr<CSession> session, const short& msg_req_id,
  2. const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto seq = root["seq"].asInt();
  7. auto name = root["name"].asString();
  8. auto total_size_str = root["total_size"].asString();
  9. auto trans_size_str = root["trans_size"].asString();
  10. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  11. auto message_id = root["message_id"].asInt();
  12. auto sender = root["sender_id"].asInt();
  13. auto receiver = root["receiver_id"].asInt();
  14. auto token = root["token"].asString();
  15. auto uid = root["uid"].asInt();
  16. auto callback = [=](const Json::Value& result) {
  17. // 在异步任务完成后调用
  18. Json::Value rtvalue = result;
  19. rtvalue["error"] = ErrorCodes::Success;
  20. rtvalue["name"] = name;
  21. rtvalue["sender_id"] = sender;
  22. rtvalue["receiver_id"] = receiver;
  23. std::string return_str = rtvalue.toStyledString();
  24. session->Send(return_str, ID_IMG_CHAT_DOWN_RSP);
  25. };
  26. // 使用 std::hash 对字符串进行哈希
  27. std::hash<std::string> hash_fn;
  28. size_t hash_value = hash_fn(name); // 生成哈希值
  29. int index = hash_value % DOWN_LOAD_WORKER_COUNT;
  30. std::cout << "Hash value: " << hash_value << std::endl;
  31. //第一个包校验一下token是否合理
  32. if (seq == 1) {
  33. //从redis获取用户token是否正确
  34. std::string uid_str = std::to_string(uid);
  35. std::string token_key = USERTOKENPREFIX + uid_str;
  36. std::string token_value = "";
  37. bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
  38. Json::Value rtvalue;
  39. if (!success) {
  40. rtvalue["error"] = ErrorCodes::UidInvalid;
  41. std::string return_str = rtvalue.toStyledString();
  42. session->Send(return_str, ID_IMG_CHAT_DOWN_RSP);
  43. return;
  44. }
  45. if (token_value != token) {
  46. rtvalue["error"] = ErrorCodes::TokenInvalid;
  47. std::string return_str = rtvalue.toStyledString();
  48. session->Send(return_str, ID_IMG_CHAT_DOWN_RSP);
  49. return;
  50. }
  51. }
  52. auto sender_str = std::to_string(sender);
  53. //转化为字符串
  54. auto uid_str = std::to_string(uid);
  55. auto file_path_str = (file_path / sender_str / name).string();
  56. auto down_load_task = std::make_shared<DownloadTask>(session, uid, name, seq, file_path_str, callback);
  57. FileSystem::GetInstance()->PostDownloadTaskToQue(down_load_task,index);
  58. };

LogicWorker将请求投递给FileSystem队列,FileSystem队列排队处理消息,被DownloaderWorker处理

  1. void DownloadWorker::task_callback(std::shared_ptr<DownloadTask> task)
  2. {
  3. // 解码
  4. auto file_path_str = task->_file_path;
  5. //std::cout << "file_path_str is " << file_path_str << std::endl;
  6. boost::filesystem::path file_path(file_path_str);
  7. Json::Value result;
  8. result["error"] = ErrorCodes::Success;
  9. if (!boost::filesystem::exists(file_path)) {
  10. std::cerr << "文件不存在: " << file_path_str << std::endl;
  11. result["error"] = ErrorCodes::FileNotExists;
  12. task->_callback(result);
  13. return;
  14. }
  15. std::ifstream infile(file_path_str, std::ios::binary);
  16. if (!infile) {
  17. std::cerr << "无法打开文件进行读取。" << std::endl;
  18. result["error"] = ErrorCodes::FileReadPermissionFailed;
  19. task->_callback(result);
  20. return;
  21. }
  22. std::shared_ptr<FileInfo> file_info = nullptr;
  23. if (task->_seq == 1) {
  24. // 获取文件大小
  25. infile.seekg(0, std::ios::end);
  26. std::streamsize file_size = infile.tellg();
  27. infile.seekg(0, std::ios::beg);
  28. //如果为空,则创建FileInfo 构造数据存储
  29. file_info = std::make_shared<FileInfo>();
  30. file_info->_file_path_str = file_path_str;
  31. file_info->_name = task->_name;
  32. file_info->_seq = 1;
  33. file_info->_total_size = file_size;
  34. file_info->_trans_size = 0;
  35. // 立即保存到 Redis,覆盖旧数据,设置过期时间
  36. RedisMgr::GetInstance()->SetDownLoadInfo(task->_name, file_info);
  37. std::cout << "[新下载] 文件: " << task->_name
  38. << ", 大小: " << file_size << " 字节" << std::endl;
  39. }
  40. else {
  41. //断点续传,从 Redis 获取历史信息
  42. file_info = RedisMgr::GetInstance()->GetDownloadInfo(task->_name);
  43. if (file_info == nullptr) {
  44. // Redis 中没有信息(可能过期了)
  45. std::cerr << "断点续传失败,Redis 中无下载信息: " << task->_name << std::endl;
  46. result["error"] = ErrorCodes::RedisReadErr;
  47. task->_callback(result);
  48. infile.close();
  49. return;
  50. }
  51. // 验证序列号是否匹配
  52. if (task->_seq != file_info->_seq) {
  53. std::cerr << "序列号不匹配,期望: " << file_info->_seq
  54. << ", 实际: " << task->_seq << std::endl;
  55. result["error"] = ErrorCodes::FileSeqInvalid;
  56. task->_callback(result);
  57. infile.close();
  58. return;
  59. }
  60. std::cout << "[续传] 文件: " << task->_name
  61. << ", seq: " << task->_seq
  62. << ", 进度: " << file_info->_trans_size
  63. << "/" << file_info->_total_size << std::endl;
  64. }
  65. // 计算当前偏移量
  66. std::streamsize offset = ((std::streamsize)task->_seq - 1) * MAX_FILE_LEN;
  67. if (offset >= file_info->_total_size) {
  68. std::cerr << "偏移量超出文件大小。" << std::endl;
  69. result["error"] = ErrorCodes::FileOffsetInvalid;
  70. task->_callback(result);
  71. infile.close();
  72. return;
  73. }
  74. // 定位到指定偏移量
  75. infile.seekg(offset);
  76. // 读取最多MAX_FILE_LEN字节
  77. char buffer[MAX_FILE_LEN];
  78. infile.read(buffer, MAX_FILE_LEN);
  79. //获取read实际读取多少字节
  80. std::streamsize bytes_read = infile.gcount();
  81. if (bytes_read <= 0) {
  82. std::cerr << "读取文件失败。" << std::endl;
  83. result["error"] = ErrorCodes::FileReadFailed;
  84. task->_callback(result);
  85. infile.close();
  86. return;
  87. }
  88. // 将读取的数据进行base64编码
  89. std::string data_to_encode(buffer, bytes_read);
  90. std::string encoded_data = base64_encode(data_to_encode);
  91. // 检查是否是最后一个包
  92. std::streamsize current_pos = offset + bytes_read;
  93. bool is_last = (current_pos >= file_info->_total_size);
  94. // 设置返回结果
  95. result["data"] = encoded_data;
  96. result["seq"] = task->_seq;
  97. result["total_size"] = std::to_string(file_info->_total_size);
  98. result["current_size"] = std::to_string(current_pos);
  99. result["is_last"] = is_last;
  100. infile.close();
  101. if (is_last) {
  102. std::cout << "文件读取完成: " << file_path_str << std::endl;
  103. RedisMgr::GetInstance()->DelDownLoadInfo(task->_name);
  104. }
  105. else {
  106. //更新信息
  107. file_info->_seq++;
  108. file_info->_trans_size = offset + bytes_read;
  109. //更新redis
  110. RedisMgr::GetInstance()->SetDownLoadInfo(task->_name, file_info);
  111. }
  112. if (task->_callback) {
  113. task->_callback(result);
  114. }
  115. }

资源服务器每次收到请求后,由DownloadWorker从队列中获取请求,查询服务器资源,将资源按照seq计算偏移量最后读取数据发送给客户端。

客户端存储下载的资源

客户端需要存储服务器传输的资源

  1. _handlers.insert(ID_IMG_CHAT_DOWN_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "parse create private chat json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get create private chat failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive download file info rsp success";
  23. QString base64Data = jsonObj["data"].toString();
  24. int seq = jsonObj["seq"].toInt();
  25. bool is_last = jsonObj["is_last"].toBool();
  26. QString total_size_str = jsonObj["total_size"].toString();
  27. qint64 total_size = total_size_str.toLongLong(nullptr);
  28. QString current_size_str = jsonObj["current_size"].toString();
  29. qint64 current_size = current_size_str.toLongLong(nullptr);
  30. QString name = jsonObj["name"].toString();
  31. auto file_info = UserMgr::GetInstance()->GetTransFileByName(name);
  32. if (file_info == nullptr) {
  33. qDebug() << "file: " << name << " not found";
  34. return;
  35. }
  36. file_info->_current_size = current_size;
  37. file_info->_rsp_size = current_size;
  38. file_info->_total_size = total_size;
  39. auto clientPath = file_info->_text_or_url;
  40. //Base64解码
  41. QByteArray decodedData = QByteArray::fromBase64(base64Data.toUtf8());
  42. auto file_path = clientPath + "/" + name;
  43. QFile file(file_path);
  44. // 根据 seq 决定打开模式
  45. QIODevice::OpenMode mode;
  46. if (seq == 1) {
  47. // 第一个包,覆盖写入
  48. mode = QIODevice::WriteOnly;
  49. }
  50. else {
  51. // 后续包,追加写入
  52. mode = QIODevice::WriteOnly | QIODevice::Append;
  53. }
  54. if (!file.open(mode)) {
  55. qDebug() << "Failed to open file for writing:" << clientPath;
  56. qDebug() << "Error:" << file.errorString();
  57. return;
  58. }
  59. qint64 bytesWritten = file.write(decodedData);
  60. if (bytesWritten != decodedData.size()) {
  61. qDebug() << "Failed to write all data. Written:" << bytesWritten
  62. << "Expected:" << decodedData.size();
  63. }
  64. file.close();
  65. qDebug() << "Successfully wrote" << bytesWritten << "bytes to file";
  66. qDebug() << "Progress:" << current_size << "/" << total_size
  67. << "(" << (current_size * 100 / total_size) << "%)";
  68. if (is_last) {
  69. qDebug() << "File download completed:" << clientPath;
  70. UserMgr::GetInstance()->RmvTransFileByName(name);
  71. //通知界面下载完成
  72. emit sig_download_finish(file_info, file_path);
  73. }
  74. else {
  75. //继续请求
  76. file_info->_seq = seq + 1;
  77. file_info->_last_confirmed_seq = seq;
  78. if (file_info->_transfer_state == TransferState::Paused) {
  79. //暂停状态,则直接返回
  80. return;
  81. }
  82. //组织请求,准备下载
  83. QJsonObject jsonObj_send;
  84. jsonObj_send["name"] = name;
  85. jsonObj_send["seq"] = file_info->_seq;
  86. jsonObj_send["trans_size"] = QString::number(file_info->_current_size);
  87. jsonObj_send["total_size"] = QString::number(file_info->_total_size);
  88. jsonObj_send["token"] = UserMgr::GetInstance()->GetToken();
  89. jsonObj_send["sender_id"] = file_info->_sender;
  90. jsonObj_send["receiver_id"] = file_info->_receiver;
  91. jsonObj_send["message_id"] = file_info->_msg_id;
  92. auto uid = UserMgr::GetInstance()->GetUid();
  93. jsonObj_send["uid"] = uid;
  94. QJsonDocument doc(jsonObj_send);
  95. auto send_data = doc.toJson();
  96. FileTcpMgr::GetInstance()->SendData(ID_IMG_CHAT_DOWN_REQ, send_data);
  97. //todo...通知界面更新进度
  98. emit sig_update_download_progress(file_info);
  99. }
  100. });

通过QFile类实现文件写入。

客户端进度显示

为了让客户端更为直观的显示下载进度,可以在收到服务器消息后,将文件下载进度同步给界面,同时显示支持暂停和继续

进度通知在上述逻辑中

  1. _handlers.insert(ID_IMG_CHAT_DOWN_RSP, [this](ReqId id, int len, QByteArray data) {
  2. //...
  3. emit sig_update_download_progress(file_info);
  4. });

同样是在ChatDialog构造函数中添加消息链接

  1. //接收tcp返回的下载进度信息
  2. connect(FileTcpMgr::GetInstance().get(), &FileTcpMgr::sig_update_download_progress,
  3. this, &ChatDialog::slot_update_download_progress);

进度处理槽函数

  1. void ChatDialog::slot_update_download_progress(std::shared_ptr<MsgInfo> msg_info) {
  2. auto chat_data = UserMgr::GetInstance()->GetChatThreadByThreadId(msg_info->_thread_id);
  3. if (chat_data == nullptr) {
  4. return;
  5. }
  6. //更新消息,其实不用更新,都是共享msg_info的一块内存,这里为了安全还是再次更新下
  7. chat_data->UpdateProgress(msg_info);
  8. if (_cur_chat_thread_id != msg_info->_thread_id) {
  9. return;
  10. }
  11. //更新聊天界面信息
  12. ui->chat_page->UpdateFileProgress(msg_info);
  13. }

在ChatPage中详细处理更新

  1. void ChatPage::UpdateFileProgress(std::shared_ptr<MsgInfo> msg_info) {
  2. auto iter = _base_item_map.find(msg_info->_msg_id);
  3. if (iter == _base_item_map.end()) {
  4. return;
  5. }
  6. if (msg_info->_msg_type == MsgType::IMG_MSG) {
  7. auto bubble = iter.value()->getBubble();
  8. PictureBubble* pic_bubble = dynamic_cast<PictureBubble*>(bubble);
  9. pic_bubble->setProgress(msg_info->_rsp_size, msg_info->_total_size);
  10. }
  11. }

PicBubble中完成状态显示

  1. void PictureBubble::setProgress(int value, int total_value)
  2. {
  3. if (m_total_size != total_value) {
  4. m_total_size = total_value;
  5. }
  6. float percent = (value / (m_total_size*1.0))*100;
  7. m_progressBar->setValue(percent);
  8. if (percent >= 100) {
  9. setState(TransferState::Completed);
  10. }
  11. }

断点续传

因为在客户端收到服务器通知的图片聊天信息的时候,已经通过sig_img_chat_msg将消息发送给ChatDialog添加到页面上了。同时传输了图片的状态为下载中。

点击继续和暂停的逻辑可以复用PicBubble的逻辑

  1. void PictureBubble::onPictureClicked()
  2. {
  3. switch (m_state) {
  4. case TransferState::Downloading:
  5. case TransferState::Uploading:
  6. // 暂停
  7. setState(TransferState::Paused);
  8. emit pauseRequested(_msg_info->_unique_name, _msg_info->_transfer_type);
  9. break;
  10. case TransferState::Paused:
  11. // 继续
  12. resumeState(); //
  13. emit resumeRequested(_msg_info->_unique_name, _msg_info->_transfer_type);
  14. break;
  15. case TransferState::Failed:
  16. // 重试
  17. emit resumeRequested(_msg_info->_unique_name, _msg_info->_transfer_type);
  18. break;
  19. default:
  20. // 其他状态可以实现查看大图等功能
  21. break;
  22. }
  23. }

接下来我们响应暂停和继续,这部分逻辑也已经复用之前的逻辑即可

暂停逻辑

  1. void ChatPage::on_clicked_paused(QString unique_name, TransferType transfer_type)
  2. {
  3. UserMgr::GetInstance()->PauseTransFileByName(unique_name);
  4. }
  5. void UserMgr::PauseTransFileByName(QString name) {
  6. std::lock_guard<std::mutex> mtx(_trans_mtx);
  7. auto iter = _name_to_msg_info.find(name);
  8. if (iter == _name_to_msg_info.end()) {
  9. return;
  10. }
  11. iter.value()->_transfer_state = TransferState::Paused;
  12. }

恢复逻辑

  1. void ChatPage::on_clicked_resume(QString unique_name, TransferType transfer_type)
  2. {
  3. UserMgr::GetInstance()->ResumeTransFileByName(unique_name);
  4. //继续发送或者下载
  5. if (transfer_type == TransferType::Upload) {
  6. FileTcpMgr::GetInstance()->ContinueUploadFile(unique_name);
  7. return;
  8. }
  9. if (transfer_type == TransferType::Download) {
  10. FileTcpMgr::GetInstance()->ContinueDownloadFile(unique_name);
  11. return;
  12. }
  13. }
  14. void UserMgr::ResumeTransFileByName(QString name)
  15. {
  16. std::lock_guard<std::mutex> mtx(_trans_mtx);
  17. auto iter = _name_to_msg_info.find(name);
  18. if (iter == _name_to_msg_info.end()) {
  19. return;
  20. }
  21. if (iter.value()->_transfer_type == TransferType::Download) {
  22. iter.value()->_transfer_state = TransferState::Downloading;
  23. return;
  24. }
  25. if (iter.value()->_transfer_type == TransferType::Upload) {
  26. iter.value()->_transfer_state = TransferState::Uploading;
  27. return;
  28. }
  29. }

发送继续下载信号通知FileTcpMgr继续下载

  1. void FileTcpMgr::ContinueDownloadFile(QString unique_name) {
  2. emit sig_continue_download_file(unique_name);
  3. }

FileTcpMgr响应下载逻辑

  1. void FileTcpMgr::slot_continue_download_file(QString unique_name) {
  2. auto file_info = UserMgr::GetInstance()->GetTransFileByName(unique_name);
  3. if (file_info == nullptr) {
  4. return;
  5. }
  6. if (file_info->_current_size >= file_info->_total_size) {
  7. qDebug() << "file has received finished";
  8. return;
  9. }
  10. //组织请求,准备下载
  11. QJsonObject jsonObj_send;
  12. jsonObj_send["name"] = unique_name;
  13. jsonObj_send["seq"] = file_info->_seq;
  14. jsonObj_send["trans_size"] = QString::number(file_info->_current_size);
  15. jsonObj_send["total_size"] = QString::number(file_info->_total_size);
  16. jsonObj_send["token"] = UserMgr::GetInstance()->GetToken();
  17. jsonObj_send["sender_id"] = file_info->_sender;
  18. jsonObj_send["receiver_id"] = file_info->_receiver;
  19. jsonObj_send["message_id"] = file_info->_msg_id;
  20. auto uid = UserMgr::GetInstance()->GetUid();
  21. jsonObj_send["uid"] = uid;
  22. QJsonDocument doc(jsonObj_send);
  23. auto send_data = doc.toJson();
  24. FileTcpMgr::GetInstance()->SendData(ID_IMG_CHAT_DOWN_REQ, send_data);
  25. }

通过上述逻辑可以实现客户端的断点下载和暂停。

效果演示

image-20260208162803229

源码链接

https://gitee.com/secondtonone1/llfcchat

注意第二季分支为Season_2

热门评论

热门文章

  1. vscode搭建windows C++开发环境

    喜欢(596) 浏览(102638)
  2. 使用hexo搭建个人博客

    喜欢(533) 浏览(14716)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(16370)
  4. MarkDown在线编辑器

    喜欢(514) 浏览(16776)
  5. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(7564)

最新评论

  1. 利用指针和容器实现文本查询 越今朝:应该添加一个过滤功能以解决部分单词无法被查询的问题: eg: "I am a teacher."中的teacher无法被查询,因为在示例代码中teacher.被解释为一个单词从而忽略了teacher本身。
  2. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  3. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  4. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  5. C++ 线程池原理和实现 mzx2023:两种方法解决,一种是改排序算法,就是当线程耗尽的时候,使用普通递归,另一种是当在线程池commit的时候,判断线程是否耗尽,耗尽的话就直接当前线程执行task

个人公众号

个人微信