多媒体信息传输

前言

前面实现了聊天信息的存储和加载,以及实现了一些资源的上传和下载。

接下来实现聊天中传输多媒体消息的逻辑,基本包括图片传输,视频传输和播放,文件传输等。

为了更顺利的实现功能,我打算先以图片聊天传输为切入点,然后再实现视频传输,文件传输等。

图片传输思路

在聊天中传输图片

  • 一方面是要把聊天消息传输到ChatServer
  • 另一方面在收到ChatServer的回复后,将资源断点续传方式传输给ResourceServer
  • `ResourceServer需要采用断点上传方式回复给客户端。
  • ResourceServer将接收完整资源后,需要通过grpc将消息发送给ChatServer更新消息为已经上传完成的状态。
  • ChatServer收到消息后更新数据,并且做消息推送,推送给消息关联的双方。推送给发送方资源已经上传完成。推送给接收方资源已经上传完成
  • 发送方将图片设置为已上传状态,接收方则展示预览图,并显示进度百分比。
  • 后期还要优化,发送方在上传资源的时候显示圆圈百分比。已经响应点击事件,暂停或者继续。

image-20251114211632712

MsgInfo完善

我修改了MsgInfo的结构,以支持图片视频等多媒体资源在聊天中传输

  1. struct MsgInfo{
  2. MsgInfo(MsgType msgtype, QString text_or_url, QPixmap pixmap, QString unique_name, qint64 total_size, QString md5)
  3. :_msg_type(msgtype), _text_or_url(text_or_url), _preview_pix(pixmap),_unique_name(unique_name),_total_size(total_size),
  4. _current_size(0),_seq(1),_md5(md5)
  5. {}
  6. MsgType _msg_type;
  7. QString _text_or_url;//表示文件和图像的url,文本信息
  8. QPixmap _preview_pix;//文件和图片的缩略图
  9. QString _unique_name; //文件唯一名字
  10. qint64 _total_size; //文件总大小
  11. qint64 _current_size; //传输大小
  12. qint64 _seq; //传输序号
  13. QString _md5; //文件md5
  14. };
  • 将内容字段改为_text_or_url,表示文件和图像的url,或者纯文本信息
  • _preview_pix为文件或者图片的缩略图,如果为视频则需要抽取第一帧作为缩略图,如果文件则设置指定格式即可
  • _unique_name为文件唯一名字,生成唯一名字有一个好处,同一个文件可以多次传输,每个文件按不同副本保存。当然也可以保存为同一份,采用md5做区分,总之这里先按照不同的副本存储在服务器。
  • _total_size用来统计文件的总大小
  • _current_size用来表示当前已经传输的大小
  • _seq表示传输的序号,将来做断点续传使用
  • _md5文件传输用的md5

修改插入消息的逻辑

  1. void MessageTextEdit::insertMsgList(QVector<std::shared_ptr<MsgInfo>> &list, MsgType msgtype,
  2. QString text_or_url, QPixmap preview_pix,
  3. QString unique_name, uint64_t total_size, QString md5) {
  4. auto msg_info = std::make_shared<MsgInfo>(msgtype, text_or_url, preview_pix, unique_name, total_size, md5);
  5. list.append(msg_info);
  6. }

将消息插入到消息列表,第一个参数是可选择的,有时我们需要将消息插入到全局消息列表。有时需要将消息插入到资源消息列表。

比如当我们拖动一个多媒体资源到富文本编辑框的时候,就是将这个资源的信息插入到资源消息列表。

当我们汇总所有的消息,用来发送的时候,就需要将消息添加到全局消息列表。

图片气泡框

image-20251115152713630

声明

  1. class PictureBubble : public BubbleFrame
  2. {
  3. Q_OBJECT
  4. public:
  5. PictureBubble(const QPixmap &picture, ChatRole role, QWidget *parent = nullptr);
  6. };

具体实现

  1. #include "PictureBubble.h"
  2. #include <QLabel>
  3. #define PIC_MAX_WIDTH 160
  4. #define PIC_MAX_HEIGHT 90
  5. PictureBubble::PictureBubble(const QPixmap &picture, ChatRole role, QWidget *parent)
  6. :BubbleFrame(role, parent)
  7. {
  8. QLabel *lb = new QLabel();
  9. lb->setScaledContents(true);
  10. QPixmap pix = picture.scaled(QSize(PIC_MAX_WIDTH, PIC_MAX_HEIGHT),
  11. Qt::KeepAspectRatio, Qt::SmoothTransformation);
  12. lb->setPixmap(pix);
  13. this->setWidget(lb);
  14. int left_margin = this->layout()->contentsMargins().left();
  15. int right_margin = this->layout()->contentsMargins().right();
  16. int v_margin = this->layout()->contentsMargins().bottom();
  17. setFixedSize(pix.width()+left_margin + right_margin, pix.height() + v_margin *2);
  18. }

图片聊天消息

实现

  1. class ImgChatData : public ChatDataBase {
  2. public:
  3. ImgChatData(std::shared_ptr<MsgInfo> msg_info, QString unique_id,
  4. int thread_id, ChatFormType form_type, ChatMsgType msg_type,
  5. int send_uid, int status, QString chat_time = ""):
  6. ChatDataBase(unique_id,thread_id, form_type, msg_type, msg_info->_text_or_url,
  7. send_uid, status, chat_time), _msg_info(msg_info){
  8. }
  9. std::shared_ptr<MsgInfo> _msg_info;
  10. };
  11. Q_DECLARE_METATYPE(std::shared_ptr<ImgChatData>)

发送消息逻辑

image-20251115154431242

修改发送消息的逻辑,发送图片时,需要将之前的文本消息发送出去,再发送图片

  1. void ChatPage::on_send_btn_clicked() {
  2. if (_chat_data == nullptr) {
  3. qDebug() << "friend_info is empty";
  4. return;
  5. }
  6. auto user_info = UserMgr::GetInstance()->GetUserInfo();
  7. auto pTextEdit = ui->chatEdit;
  8. ChatRole role = ChatRole::Self;
  9. QString userName = user_info->_name;
  10. QString userIcon = user_info->_icon;
  11. const QVector<std::shared_ptr<MsgInfo>>& msgList = pTextEdit->getMsgList();
  12. QJsonObject textObj;
  13. QJsonArray textArray;
  14. int txt_size = 0;
  15. auto thread_id = _chat_data->GetThreadId();
  16. for (int i = 0; i < msgList.size(); ++i)
  17. {
  18. //消息内容长度不合规就跳过
  19. if (msgList[i]->_text_or_url.length() > 1024) {
  20. continue;
  21. }
  22. MsgType type = msgList[i]->_msg_type;
  23. ChatItemBase* pChatItem = new ChatItemBase(role);
  24. pChatItem->setUserName(userName);
  25. SetSelfIcon(pChatItem, user_info->_icon);
  26. QWidget* pBubble = nullptr;
  27. //生成唯一id
  28. QUuid uuid = QUuid::createUuid();
  29. //转为字符串
  30. QString uuidString = uuid.toString();
  31. if (type == MsgType::TEXT_MSG)
  32. {
  33. pBubble = new TextBubble(role, msgList[i]->_text_or_url);
  34. if (txt_size + msgList[i]->_text_or_url.length() > 1024) {
  35. textObj["fromuid"] = user_info->_uid;
  36. textObj["touid"] = _chat_data->GetOtherId();
  37. textObj["thread_id"] = thread_id;
  38. textObj["text_array"] = textArray;
  39. QJsonDocument doc(textObj);
  40. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  41. //发送并清空之前累计的文本列表
  42. txt_size = 0;
  43. textArray = QJsonArray();
  44. textObj = QJsonObject();
  45. //发送tcp请求给chat server
  46. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);
  47. }
  48. //将bubble和uid绑定,以后可以等网络返回消息后设置是否送达
  49. //_bubble_map[uuidString] = pBubble;
  50. txt_size += msgList[i]->_text_or_url.length();
  51. QJsonObject obj;
  52. QByteArray utf8Message = msgList[i]->_text_or_url.toUtf8();
  53. auto content = QString::fromUtf8(utf8Message);
  54. obj["content"] = content;
  55. obj["unique_id"] = uuidString;
  56. textArray.append(obj);
  57. //todo... 注意,此处先按私聊处理
  58. auto txt_msg = std::make_shared<TextChatData>(uuidString, thread_id, ChatFormType::PRIVATE,
  59. ChatMsgType::TEXT, content, user_info->_uid, 0);
  60. //将未回复的消息加入到未回复列表中,以便后续处理
  61. _chat_data->AppendUnRspMsg(uuidString, txt_msg);
  62. }
  63. else if (type == MsgType::IMG_MSG)
  64. {
  65. //将之前缓存的文本发送过去
  66. if (txt_size) {
  67. textObj["fromuid"] = user_info->_uid;
  68. textObj["touid"] = _chat_data->GetOtherId();
  69. textObj["thread_id"] = thread_id;
  70. textObj["text_array"] = textArray;
  71. QJsonDocument doc(textObj);
  72. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  73. //发送并清空之前累计的文本列表
  74. txt_size = 0;
  75. textArray = QJsonArray();
  76. textObj = QJsonObject();
  77. //发送tcp请求给chat server
  78. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);
  79. }
  80. pBubble = new PictureBubble(QPixmap(msgList[i]->_text_or_url), role);
  81. //需要组织成文件发送,具体参考头像上传
  82. auto img_msg = std::make_shared<ImgChatData>(msgList[i],uuidString, thread_id, ChatFormType::PRIVATE,
  83. ChatMsgType::TEXT, user_info->_uid, 0);
  84. //将未回复的消息加入到未回复列表中,以便后续处理
  85. _chat_data->AppendUnRspMsg(uuidString, img_msg);
  86. textObj["fromuid"] = user_info->_uid;
  87. textObj["touid"] = _chat_data->GetOtherId();
  88. textObj["thread_id"] = thread_id;
  89. textObj["md5"] = msgList[i]->_md5;
  90. textObj["name"] = msgList[i]->_unique_name;
  91. textObj["token"] = UserMgr::GetInstance()->GetToken();
  92. textObj["unique_id"] = uuidString;
  93. //文件信息加入管理
  94. UserMgr::GetInstance()->AddTransFile(msgList[i]->_unique_name, msgList[i]);
  95. QJsonDocument doc(textObj);
  96. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  97. //发送tcp请求给chat server
  98. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_IMG_CHAT_MSG_REQ, jsonData);
  99. }
  100. else if (type == MsgType::FILE_MSG)
  101. {
  102. }
  103. //发送消息
  104. if (pBubble != nullptr)
  105. {
  106. pChatItem->setWidget(pBubble);
  107. pChatItem->setStatus(0);
  108. ui->chat_data_list->appendChatItem(pChatItem);
  109. _unrsp_item_map[uuidString] = pChatItem;
  110. }
  111. }
  112. qDebug() << "textArray is " << textArray;
  113. //发送给服务器
  114. textObj["text_array"] = textArray;
  115. textObj["fromuid"] = user_info->_uid;
  116. textObj["touid"] = _chat_data->GetOtherId();
  117. textObj["thread_id"] = thread_id;
  118. QJsonDocument doc(textObj);
  119. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  120. //发送并清空之前累计的文本列表
  121. txt_size = 0;
  122. textArray = QJsonArray();
  123. textObj = QJsonObject();
  124. //发送tcp请求给chat server
  125. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);
  126. }

接收服务器返回

先注册消息,用于跨线程调用

  1. void TcpMgr::registerMetaType() {
  2. // 注册所有自定义类型
  3. qRegisterMetaType<ServerInfo>("ServerInfo");
  4. qRegisterMetaType<SearchInfo>("SearchInfo");
  5. qRegisterMetaType<std::shared_ptr<SearchInfo>>("std::shared_ptr<SearchInfo>");
  6. qRegisterMetaType<AddFriendApply>("AddFriendApply");
  7. qRegisterMetaType<std::shared_ptr<AddFriendApply>>("std::shared_ptr<AddFriendApply>");
  8. qRegisterMetaType<ApplyInfo>("ApplyInfo");
  9. qRegisterMetaType<std::shared_ptr<AuthInfo>>("std::shared_ptr<AuthInfo>");
  10. qRegisterMetaType<AuthRsp>("AuthRsp");
  11. qRegisterMetaType<std::shared_ptr<AuthRsp>>("std::shared_ptr<AuthRsp>");
  12. qRegisterMetaType<UserInfo>("UserInfo");
  13. qRegisterMetaType<std::vector<std::shared_ptr<TextChatData>>>("std::vector<std::shared_ptr<TextChatData>>");
  14. qRegisterMetaType<std::vector<std::shared_ptr<ChatThreadInfo>>>("std::vector<std::shared_ptr<ChatThreadInfo>>");
  15. qRegisterMetaType<std::shared_ptr<ChatThreadData>>("std::shared_ptr<ChatThreadData>");
  16. qRegisterMetaType<ReqId>("ReqId");
  17. qRegisterMetaType<std::shared_ptr<ImgChatData>>("std::shared_ptr<ImgChatData>");
  18. }

注册消息

  1. void TcpMgr::initHandlers()
  2. {
  3. _handlers.insert(ID_IMG_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {
  4. Q_UNUSED(len);
  5. qDebug() << "handle id is " << id << " data is " << data;
  6. // 将QByteArray转换为QJsonDocument
  7. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  8. // 检查转换是否成功
  9. if (jsonDoc.isNull()) {
  10. qDebug() << "Failed to create QJsonDocument.";
  11. return;
  12. }
  13. QJsonObject jsonObj = jsonDoc.object();
  14. if (!jsonObj.contains("error")) {
  15. int err = ErrorCodes::ERR_JSON;
  16. qDebug() << "parse create private chat json parse failed " << err;
  17. return;
  18. }
  19. int err = jsonObj["error"].toInt();
  20. if (err != ErrorCodes::SUCCESS) {
  21. qDebug() << "get create private chat failed, error is " << err;
  22. return;
  23. }
  24. qDebug() << "Receive create private chat rsp Success";
  25. //收到消息后转发给页面
  26. auto thread_id = jsonObj["thread_id"].toInt();
  27. auto unique_id = jsonObj["unique_id"].toString();
  28. auto unique_name = jsonObj["unique_name"].toString();
  29. auto sender = jsonObj["fromuid"].toInt();
  30. auto msg_id = jsonObj["message_id"].toInt();
  31. QString chat_time = jsonObj["chat_time"].toString();
  32. int status = jsonObj["status"].toInt();
  33. auto file_info = UserMgr::GetInstance()->GetTransFileByName(unique_name);
  34. auto chat_data = std::make_shared<ImgChatData>(file_info, unique_id, thread_id, ChatFormType::PRIVATE,
  35. ChatMsgType::TEXT, sender, status, chat_time);
  36. //发送信号通知界面
  37. emit sig_chat_img_rsp(thread_id, chat_data);
  38. });
  39. }

服务器接收图片消息

先注册消息

  1. void LogicSystem::RegisterCallBacks() {
  2. _fun_callbacks[ID_LOAD_CHAT_MSG_REQ] = std::bind(&LogicSystem::LoadChatMsg, this,
  3. placeholders::_1, placeholders::_2, placeholders::_3);
  4. _fun_callbacks[ID_IMG_CHAT_MSG_REQ] = std::bind(&LogicSystem::DealChatImgMsg, this,
  5. placeholders::_1, placeholders::_2, placeholders::_3);
  6. }

处理聊天中的图片消息

  1. void LogicSystem::DealChatImgMsg(std::shared_ptr<CSession> session,
  2. const short& msg_id, const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto uid = root["fromuid"].asInt();
  7. auto touid = root["touid"].asInt();
  8. auto md5 = root["md5"].asString();
  9. auto unique_name = root["name"].asString();
  10. auto token = root["token"].asString();
  11. auto unique_id = root["unique_id"].asString();
  12. auto chat_time = root["chat_time"].asString();
  13. auto status = root["status"].asInt();
  14. Json::Value rtvalue;
  15. rtvalue["error"] = ErrorCodes::Success;
  16. rtvalue["fromuid"] = uid;
  17. rtvalue["touid"] = touid;
  18. auto thread_id = root["thread_id"].asInt();
  19. rtvalue["thread_id"] = thread_id;
  20. rtvalue["md5"] = md5;
  21. rtvalue["unique_name"] = unique_name;
  22. rtvalue["unique_id"] = unique_id;
  23. rtvalue["chat_time"] = chat_time;
  24. rtvalue["status"] = status;
  25. auto timestamp = getCurrentTimestamp();
  26. auto chat_msg = std::make_shared<ChatMessage>();
  27. chat_msg->chat_time = timestamp;
  28. chat_msg->sender_id = uid;
  29. chat_msg->recv_id = touid;
  30. chat_msg->unique_id = unique_id;
  31. chat_msg->thread_id = thread_id;
  32. chat_msg->content = unique_name;
  33. chat_msg->status = MsgStatus::UN_UPLOAD;
  34. //插入数据库
  35. MysqlMgr::GetInstance()->AddChatMsg(chat_msg);
  36. Defer defer([this, &rtvalue, session]() {
  37. std::string return_str = rtvalue.toStyledString();
  38. session->Send(return_str, ID_IMG_CHAT_MSG_RSP);
  39. });
  40. //发送通知 todo... 以后等文件上传完成再通知
  41. }

聊天资源断点续传

image-20251114211632712

之前我们实现了1和2,接下来在客户端Client收到ChatServer的回复消息2后,需要执行步骤3

这期间要在客户端和服务器之间实现断点续传。

发送资源

  1. _handlers.insert(ID_IMG_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "parse create private chat json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get create private chat failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive create private chat rsp Success";
  23. //收到消息后转发给页面
  24. auto thread_id = jsonObj["thread_id"].toInt();
  25. auto unique_id = jsonObj["unique_id"].toString();
  26. auto unique_name = jsonObj["unique_name"].toString();
  27. auto sender = jsonObj["fromuid"].toInt();
  28. auto msg_id = jsonObj["message_id"].toInt();
  29. QString chat_time = jsonObj["chat_time"].toString();
  30. int status = jsonObj["status"].toInt();
  31. auto text_or_url = jsonObj["text_or_url"].toString();
  32. auto file_info = UserMgr::GetInstance()->GetTransFileByName(unique_name);
  33. auto chat_data = std::make_shared<ImgChatData>(file_info, unique_id, thread_id, ChatFormType::PRIVATE,
  34. ChatMsgType::TEXT, sender, status, chat_time);
  35. //发送信号通知界面
  36. emit sig_chat_img_rsp(thread_id, chat_data);
  37. //创建QFileInfo 对象 todo 留作以后收到服务器返回消息后再发送
  38. QFile file(file_info->_text_or_url);
  39. if (!file.open(QIODevice::ReadOnly)) {
  40. qWarning() << "Could not open file:" << file.errorString();
  41. return;
  42. }
  43. file.seek(file_info->_current_size);
  44. auto buffer = file.read(MAX_FILE_LEN);
  45. qDebug() << "buffer is " << buffer;
  46. //将文件内容转换为base64编码
  47. QString base64Data = buffer.toBase64();
  48. QJsonObject file_obj;
  49. file_obj["name"] = file_info->_unique_name;
  50. file_obj["unique_id"] = unique_id;
  51. file_obj["seq"] = file_info->_seq;
  52. file_info->_current_size = buffer.size() + (file_info->_seq - 1) * MAX_FILE_LEN;
  53. file_obj["trans_size"] = file_info->_current_size;
  54. file_obj["total_size"] = file_info->_total_size;
  55. file_obj["token"] = UserMgr::GetInstance()->GetToken();
  56. file_obj["md5"] = file_info->_md5;
  57. file_obj["uid"] = UserMgr::GetInstance()->GetUid();
  58. file_obj["data"] = base64Data;
  59. if (buffer.size() + (file_info->_seq - 1) * MAX_FILE_LEN >= file_info->_total_size) {
  60. file_obj["last"] = 1;
  61. }
  62. else {
  63. file_obj["last"] = 0;
  64. }
  65. //发送文件 todo 留作以后收到服务器返回消息后再发送
  66. QJsonDocument doc_file(file_obj);
  67. QByteArray fileData = doc_file.toJson(QJsonDocument::Compact);
  68. //发送消息给ResourceServer
  69. FileTcpMgr::GetInstance()->SendData(ReqId::ID_IMG_CHAT_UPLOAD_REQ, fileData);
  70. });

我们的客户端在收到服务器的回复(步骤2)之后,立刻读取文件,发送第一个包,这里通过FileTcpMgr发送资源给ResourceServer

资源服务器存储

我们实现断点续传,在LogicWorker中注册处理逻辑

  1. void LogicWorker::RegisterCallBacks()
  2. {
  3. _fun_callbacks[ID_IMG_CHAT_UPLOAD_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  4. const string& msg_data) {
  5. Json::Reader reader;
  6. Json::Value root;
  7. reader.parse(msg_data, root);
  8. auto md5 = root["md5"].asString();
  9. auto seq = root["seq"].asInt();
  10. auto name = root["name"].asString();
  11. auto total_size = root["total_size"].asInt();
  12. auto trans_size = root["trans_size"].asInt();
  13. auto last = root["last"].asInt();
  14. auto file_data = root["data"].asString();
  15. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  16. auto uid = root["uid"].asInt();
  17. //转化为字符串
  18. auto uid_str = std::to_string(uid);
  19. auto file_path_str = (file_path / uid_str / name).string();
  20. Json::Value rtvalue;
  21. auto callback = [=](const Json::Value& result) {
  22. // 在异步任务完成后调用
  23. Json::Value rtvalue = result;
  24. rtvalue["error"] = ErrorCodes::Success;
  25. rtvalue["total_size"] = total_size;
  26. rtvalue["seq"] = seq;
  27. rtvalue["name"] = name;
  28. rtvalue["trans_size"] = trans_size;
  29. rtvalue["last"] = last;
  30. rtvalue["md5"] = md5;
  31. rtvalue["uid"] = uid;
  32. std::string return_str = rtvalue.toStyledString();
  33. session->Send(return_str, ID_IMG_CHAT_UPLOAD_RSP);
  34. };
  35. // 使用 std::hash 对字符串进行哈希
  36. std::hash<std::string> hash_fn;
  37. size_t hash_value = hash_fn(name); // 生成哈希值
  38. int index = hash_value % FILE_WORKER_COUNT;
  39. std::cout << "Hash value: " << hash_value << std::endl;
  40. //第一个包
  41. if (seq == 1) {
  42. //构造数据存储
  43. auto file_info = std::make_shared<FileInfo>();
  44. file_info->_file_path_str = file_path_str;
  45. file_info->_name = name;
  46. file_info->_seq = seq;
  47. file_info->_total_size = total_size;
  48. file_info->_trans_size = trans_size;
  49. bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
  50. if (!success) {
  51. rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
  52. std::string return_str = rtvalue.toStyledString();
  53. session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
  54. return;
  55. }
  56. }
  57. else {
  58. auto file_info = RedisMgr::GetInstance()->GetFileInfo(name);
  59. if (file_info == nullptr) {
  60. rtvalue["error"] = ErrorCodes::FileNotExists;
  61. std::string return_str = rtvalue.toStyledString();
  62. session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
  63. return;
  64. }
  65. file_info->_seq = seq;
  66. file_info->_trans_size = trans_size;
  67. bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
  68. if (!success) {
  69. rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
  70. std::string return_str = rtvalue.toStyledString();
  71. session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
  72. return;
  73. }
  74. }
  75. FileSystem::GetInstance()->PostMsgToQue(
  76. std::make_shared<FileTask>(session, ID_IMG_CHAT_UPLOAD_REQ, uid, file_path_str, name, seq, total_size,
  77. trans_size, last, file_data, callback),
  78. index
  79. );
  80. };
  81. }
  1. 通过callback存储回调函数,一同包裹进FileInfo, 回调函数通过=捕获所有局部变量,这样构造了一个伪闭包。
  2. 我们将包裹好的FileInfo投递到FileSystem中,交给其中的线程池,由多个FileWorker线程处理

我们跟进FileSystem的投递逻辑

  1. void FileSystem::PostMsgToQue(shared_ptr<FileTask> msg, int index)
  2. {
  3. _file_workers[index]->PostTask(msg);
  4. }

FileWorker投递逻辑

  1. void FileWorker::PostTask(std::shared_ptr<FileTask> task)
  2. {
  3. {
  4. std::lock_guard<std::mutex> lock(_mtx);
  5. //借鉴python万物皆对象思想,构造伪闭包将函数对象扔到队列中
  6. _task_que.push([task, this]() {
  7. task_callback(task);
  8. });
  9. }
  10. _cv.notify_one();
  11. }

我们将任务直接包裹到一个lambda表达式中,利用python万物皆对象的思想,结合C++语法,将这个可调用对象投递给队列。

将来可调用对象从队列取出后就会调用这个lambda表达式, 进而调用task_callback函数

  1. FileWorker::FileWorker() :_b_stop(false)
  2. {
  3. RegisterHandlers();
  4. _work_thread = std::thread([this]() {
  5. while (!_b_stop) {
  6. std::unique_lock<std::mutex> lock(_mtx);
  7. _cv.wait(lock, [this]() {
  8. if (_b_stop) {
  9. return true;
  10. }
  11. if (_task_que.empty()) {
  12. return false;
  13. }
  14. return true;
  15. });
  16. if (_b_stop) {
  17. break;
  18. }
  19. auto task_call = _task_que.front();
  20. _task_que.pop();
  21. task_call();
  22. }
  23. });
  24. }

调用逻辑

  1. void FileWorker::task_callback(std::shared_ptr<FileTask> task)
  2. {
  3. auto iter = _handlers.find(task->_msg_id);
  4. if (iter == _handlers.end()) {
  5. return;
  6. }
  7. iter->second(task);
  8. }

_handlers中根据消息id检索,取出回调函数,传入task参数调用

_handlers的注册逻辑

  1. void FileWorker::RegisterHandlers()
  2. {
  3. //处理头像上传
  4. _handlers[ID_UPLOAD_HEAD_ICON_REQ] = [this](std::shared_ptr<FileTask> task) {
  5. // 解码
  6. std::string decoded = base64_decode(task->_file_data);
  7. auto file_path_str = task->_path;
  8. auto last = task->_last;
  9. //std::cout << "file_path_str is " << file_path_str << std::endl;
  10. boost::filesystem::path file_path(file_path_str);
  11. boost::filesystem::path dir_path = file_path.parent_path();
  12. // 获取完整文件名(包含扩展名)
  13. std::string filename = file_path.filename().string();
  14. Json::Value result;
  15. result["error"] = ErrorCodes::Success;
  16. // Check if directory exists, if not, create it
  17. if (!boost::filesystem::exists(dir_path)) {
  18. if (!boost::filesystem::create_directories(dir_path)) {
  19. std::cerr << "Failed to create directory: " << dir_path.string() << std::endl;
  20. result["error"] = ErrorCodes::FileNotExists;
  21. task->_callback(result);
  22. return;
  23. }
  24. }
  25. std::ofstream outfile;
  26. //第一个包
  27. if (task->_seq == 1) {
  28. // 打开文件,如果存在则清空,不存在则创建
  29. outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
  30. }
  31. else {
  32. // 保存为文件
  33. outfile.open(file_path_str, std::ios::binary | std::ios::app);
  34. }
  35. if (!outfile) {
  36. std::cerr << "无法打开文件进行写入。" << std::endl;
  37. result["error"] = ErrorCodes::FileWritePermissionFailed;
  38. task->_callback(result);
  39. return;
  40. }
  41. outfile.write(decoded.data(), decoded.size());
  42. if (!outfile) {
  43. std::cerr << "写入文件失败。" << std::endl;
  44. result["error"] = ErrorCodes::FileWritePermissionFailed;
  45. task->_callback(result);
  46. return;
  47. }
  48. outfile.close();
  49. if (last) {
  50. std::cout << "文件已成功保存为: " << task->_name << std::endl;
  51. //更新头像
  52. MysqlMgr::GetInstance()->UpdateUserIcon(task->_uid, filename);
  53. //获取用户信息
  54. auto user_info = MysqlMgr::GetInstance()->GetUser(task->_uid);
  55. if (user_info == nullptr) {
  56. return;
  57. }
  58. //将数据库内容写入redis缓存
  59. Json::Value redis_root;
  60. redis_root["uid"] = task->_uid;
  61. redis_root["pwd"] = user_info->pwd;
  62. redis_root["name"] = user_info->name;
  63. redis_root["email"] = user_info->email;
  64. redis_root["nick"] = user_info->nick;
  65. redis_root["desc"] = user_info->desc;
  66. redis_root["sex"] = user_info->sex;
  67. redis_root["icon"] = user_info->icon;
  68. std::string base_key = USER_BASE_INFO + std::to_string(task->_uid);
  69. RedisMgr::GetInstance()->Set(base_key, redis_root.toStyledString());
  70. }
  71. if (task->_callback) {
  72. task->_callback(result);
  73. }
  74. };
  75. //处理聊天图片上传
  76. _handlers[ID_IMG_CHAT_UPLOAD_REQ] = [this](std::shared_ptr<FileTask> task) {
  77. // 解码
  78. std::string decoded = base64_decode(task->_file_data);
  79. auto file_path_str = task->_path;
  80. auto last = task->_last;
  81. //std::cout << "file_path_str is " << file_path_str << std::endl;
  82. boost::filesystem::path file_path(file_path_str);
  83. boost::filesystem::path dir_path = file_path.parent_path();
  84. // 获取完整文件名(包含扩展名)
  85. std::string filename = file_path.filename().string();
  86. Json::Value result;
  87. result["error"] = ErrorCodes::Success;
  88. // Check if directory exists, if not, create it
  89. if (!boost::filesystem::exists(dir_path)) {
  90. if (!boost::filesystem::create_directories(dir_path)) {
  91. std::cerr << "Failed to create directory: " << dir_path.string() << std::endl;
  92. result["error"] = ErrorCodes::FileNotExists;
  93. task->_callback(result);
  94. return;
  95. }
  96. }
  97. std::ofstream outfile;
  98. //第一个包
  99. if (task->_seq == 1) {
  100. // 打开文件,如果存在则清空,不存在则创建
  101. outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
  102. }
  103. else {
  104. // 保存为文件
  105. outfile.open(file_path_str, std::ios::binary | std::ios::app);
  106. }
  107. if (!outfile) {
  108. std::cerr << "无法打开文件进行写入。" << std::endl;
  109. result["error"] = ErrorCodes::FileWritePermissionFailed;
  110. task->_callback(result);
  111. return;
  112. }
  113. outfile.write(decoded.data(), decoded.size());
  114. if (!outfile) {
  115. std::cerr << "写入文件失败。" << std::endl;
  116. result["error"] = ErrorCodes::FileWritePermissionFailed;
  117. task->_callback(result);
  118. return;
  119. }
  120. outfile.close();
  121. if (last) {
  122. std::cout << "文件已成功保存为: " << task->_name << std::endl;
  123. //todo...更新数据库聊天图像上传状态
  124. //通过grpc通知ChatServer
  125. }
  126. if (task->_callback) {
  127. task->_callback(result);
  128. }
  129. };
  130. }

比如是聊天图片上传的请求,就调用如下

  1. _handlers[ID_IMG_CHAT_UPLOAD_REQ] = [this](std::shared_ptr<FileTask> task) {
  2. // 解码
  3. std::string decoded = base64_decode(task->_file_data);
  4. auto file_path_str = task->_path;
  5. auto last = task->_last;
  6. //std::cout << "file_path_str is " << file_path_str << std::endl;
  7. boost::filesystem::path file_path(file_path_str);
  8. boost::filesystem::path dir_path = file_path.parent_path();
  9. // 获取完整文件名(包含扩展名)
  10. std::string filename = file_path.filename().string();
  11. Json::Value result;
  12. result["error"] = ErrorCodes::Success;
  13. // Check if directory exists, if not, create it
  14. if (!boost::filesystem::exists(dir_path)) {
  15. if (!boost::filesystem::create_directories(dir_path)) {
  16. std::cerr << "Failed to create directory: " << dir_path.string() << std::endl;
  17. result["error"] = ErrorCodes::FileNotExists;
  18. task->_callback(result);
  19. return;
  20. }
  21. }
  22. std::ofstream outfile;
  23. //第一个包
  24. if (task->_seq == 1) {
  25. // 打开文件,如果存在则清空,不存在则创建
  26. outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
  27. }
  28. else {
  29. // 保存为文件
  30. outfile.open(file_path_str, std::ios::binary | std::ios::app);
  31. }
  32. if (!outfile) {
  33. std::cerr << "无法打开文件进行写入。" << std::endl;
  34. result["error"] = ErrorCodes::FileWritePermissionFailed;
  35. task->_callback(result);
  36. return;
  37. }
  38. outfile.write(decoded.data(), decoded.size());
  39. if (!outfile) {
  40. std::cerr << "写入文件失败。" << std::endl;
  41. result["error"] = ErrorCodes::FileWritePermissionFailed;
  42. task->_callback(result);
  43. return;
  44. }
  45. outfile.close();
  46. if (last) {
  47. std::cout << "文件已成功保存为: " << task->_name << std::endl;
  48. //todo...更新数据库聊天图像上传状态
  49. //通过grpc通知ChatServer
  50. }
  51. if (task->_callback) {
  52. task->_callback(result);
  53. }
  54. };

在这个逻辑里我们打开文件,并采取追加的方式将数据写入服务器所在的磁盘目录保存

测试效果

image-20251123112409697

image-20251123112343094

结论

  1. 经过测试,可以实现断点续传上传聊天资源的功能
  2. 但是对于大文件,采用串行方式断点续传效率很慢
  3. 考虑搞一个拥塞窗口多序列传输,本质上还是通过网络线程串行上传,但不是等待服务器回复后才上传,而是通过一个拥塞窗口控制发送频率。

设计拥塞窗口提高发送效率

思路分析

客户端发送端单线程还是多线程

本质上客户端如果采用切片的方式将一个文件切割为多个小文件,可以不考虑顺序,将来汇总服务器的回包统计是否传输完成即可。

但是对于同一个socket多线程调用send会产生数据错乱,对于asio这种网络库,我们采用的是发送队列控制顺序,保证互斥性,一个包发送完成再发送下一个。

对于QT其底层封装了发送队列,支持多线程并发调用send,但是本质上底层的发送还是很串行化。

所以对于现有的结构,我们通过跨线程的方式,将要发送的数据投递给FileMgr所在的线程的消息队列,统一发送。

这个结构不用改。

客户端发送逻辑修改

客户端不再等待服务器回包后再发送,而是将切割好的包一次性添加到发送队列。

但是如果文件过大,要几百个包,一次性会堆满队列,另外循环发送几百个包会造成网络拥塞,导致服务器一段时间只为这一个客户端服务,这是不可取的。

拥塞窗口设计

为了解决这个问题,我们可以优先将要发送的数据放入拥塞窗口,处于拥塞窗口的数据优先发送

其余的数据投递到队列中。

如果文件数据过多,可以优先将一部分数据放入队列,等到队列队列大小缩小后继续放入数据。

当客户端收到服务器回包后,做错误判断,如果无误则从队列取出数据放到拥塞窗口中继续发送。

队列减小到一定阈值后,将文件剩余未发送的包继续填充到队列中。

image-20251123115425162

这么做还要考虑如果发送失败,就要清除队列中该次未发送的数据包。

如果发送两个文件,队列中的数据将会是交叉的。所以对于错误处理,要考虑剔除发送失败的包。

数据结构设计:

  1. struct SendTask {
  2. int file_id; // 文件唯一标识
  3. int chunk_id; // 分片序号
  4. int total_chunks; // 总分片数
  5. vector<char> data; // 数据内容
  6. int retry_count; // 重试次数
  7. };

队列管理:

  • 使用map<file_id, queue<SendTask>>区分不同文件的数据包
  • 发送失败时,只清除对应file_id的所有待发送包
  • 维护已发送但未确认的包列表,便于超时重传

服务器逻辑

服务器是多线程还是单线程

服务器可以采用多线程方式处理收到的文件包,可以采用多线程的方式写如文件,但是对于同一个文件要加锁。

本质上同一个时刻只有一个线程可以对文件进行读写。所以干脆就用一个线程负责一个文件的写,可以根据session_id区分不同的连接,对于同一个连接采用同一个FileWorker执行写就可以了。

这样不用加锁还保证线程安全了。

image-20251123123519059

服务器乱序存储

服务器不再用原来的线性方式将内容追加到磁盘上。

而是优先接收客户端的第一个包,获取文件信息,然后按照seq个数创建文件大小,在最后一个字节写入空,这样整个空文件就构造好了。

image-20251123122259919

然后服务器每次接收到客户端的乱序序列后,将内容写入对应的偏移位置。并且回复客户端,将序列号和文件基本信息回复给客户端。

image-20251123122429328

客户端实现拥塞窗口

窗口大小成员

FileTcpMgr中添加成员变量

  1. class FileTcpMgr : public QObject, public Singleton<FileTcpMgr>,
  2. public std::enable_shared_from_this<FileTcpMgr>{
  3. //发送的拥塞窗口,控制发送数量
  4. int _cwnd_size;
  5. }

封装发送逻辑

  1. class FileTcpMgr : public QObject, public Singleton<FileTcpMgr>,
  2. public std::enable_shared_from_this<FileTcpMgr>
  3. {
  4. Q_OBJECT
  5. public:
  6. void BatchSend(std::shared_ptr<MsgInfo> msg_info);
  7. }

具体实现

  1. void FileTcpMgr::BatchSend(std::shared_ptr<MsgInfo> msg_info) {
  2. if ((msg_info->_seq) * MAX_FILE_LEN >= msg_info->_total_size) {
  3. qDebug() << "file has sent finished";
  4. return;
  5. }
  6. if (MAX_CWND_SIZE - _cwnd_size == 0) {
  7. return;
  8. }
  9. //打开
  10. QFile file(msg_info->_text_or_url);
  11. if (!file.open(QIODevice::ReadOnly)) {
  12. qWarning() << "Could not open file: " << file.errorString();
  13. return;
  14. }
  15. //文件偏移到已经发送的位置,继续读取发送
  16. file.seek(msg_info->_seq * MAX_FILE_LEN);
  17. bool b_last = false;
  18. //再次组织数据发送
  19. for (; MAX_CWND_SIZE - _cwnd_size > 0; ) {
  20. QByteArray buffer;
  21. msg_info->_seq++;
  22. //放入发送未回包集合
  23. msg_info->_flighting_seqs.insert(msg_info->_seq);
  24. //每次读取MAX_FILE_LEN字节发送
  25. buffer = file.read(MAX_FILE_LEN);
  26. QJsonObject sendObj;
  27. //将文件内容转换为base64编码
  28. QString base64Data = buffer.toBase64();
  29. sendObj["md5"] = msg_info->_md5;
  30. sendObj["name"] = msg_info->_unique_name;
  31. sendObj["seq"] = msg_info->_seq;
  32. sendObj["trans_size"] = buffer.size() + (msg_info->_seq - 1) * MAX_FILE_LEN;
  33. sendObj["total_size"] = msg_info->_total_size;
  34. b_last = false;
  35. if (buffer.size() + (msg_info->_seq - 1) * MAX_FILE_LEN >= msg_info->_total_size) {
  36. sendObj["last"] = 1;
  37. b_last = true;
  38. }
  39. else {
  40. sendObj["last"] = 0;
  41. }
  42. sendObj["data"] = base64Data;
  43. sendObj["last_seq"] = msg_info->_max_seq;
  44. sendObj["uid"] = UserMgr::GetInstance()->GetUid();
  45. QJsonDocument doc(sendObj);
  46. auto send_data = doc.toJson();
  47. //直接发送,其实是放入tcpmgr发送队列
  48. SendData(ID_IMG_CHAT_UPLOAD_REQ, send_data);
  49. _cwnd_size++;
  50. //如果
  51. if (b_last) {
  52. break;
  53. }
  54. }
  55. file.close();
  56. }

同步发送信息

考虑以后很多场景都会将发送信息同步给服务器,所以单独抽象了一个发送协议

TcpMgr收到聊天消息回复后,可以考虑先将图片信息同步给资源服务器

  1. _handlers.insert(ID_IMG_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "parse create private chat json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get create private chat failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive create private chat rsp Success";
  23. //收到消息后转发给页面
  24. auto thread_id = jsonObj["thread_id"].toInt();
  25. auto unique_id = jsonObj["unique_id"].toString();
  26. auto unique_name = jsonObj["unique_name"].toString();
  27. auto sender = jsonObj["fromuid"].toInt();
  28. auto msg_id = jsonObj["message_id"].toInt();
  29. QString chat_time = jsonObj["chat_time"].toString();
  30. int status = jsonObj["status"].toInt();
  31. auto text_or_url = jsonObj["text_or_url"].toString();
  32. auto file_info = UserMgr::GetInstance()->GetTransFileByName(unique_name);
  33. auto chat_data = std::make_shared<ImgChatData>(file_info, unique_id, thread_id, ChatFormType::PRIVATE,
  34. ChatMsgType::TEXT, sender, status, chat_time);
  35. //发送信号通知界面
  36. emit sig_chat_img_rsp(thread_id, chat_data);
  37. //管理消息,添加序列号到正在发送集合
  38. file_info->_flighting_seqs.insert(file_info->_seq);
  39. //发送消息
  40. QFile file(file_info->_text_or_url);
  41. if (!file.open(QIODevice::ReadOnly)) {
  42. qWarning() << "Could not open file:" << file.errorString();
  43. return;
  44. }
  45. file.seek(file_info->_current_size);
  46. auto buffer = file.read(MAX_FILE_LEN);
  47. qDebug() << "buffer is " << buffer;
  48. //将文件内容转换为base64编码
  49. QString base64Data = buffer.toBase64();
  50. QJsonObject file_obj;
  51. file_obj["name"] = file_info->_unique_name;
  52. file_obj["unique_id"] = unique_id;
  53. file_obj["seq"] = file_info->_seq;
  54. file_info->_current_size = buffer.size() + (file_info->_seq - 1) * MAX_FILE_LEN;
  55. file_obj["trans_size"] = file_info->_current_size;
  56. file_obj["total_size"] = file_info->_total_size;
  57. file_obj["token"] = UserMgr::GetInstance()->GetToken();
  58. file_obj["md5"] = file_info->_md5;
  59. file_obj["uid"] = UserMgr::GetInstance()->GetUid();
  60. file_obj["data"] = base64Data;
  61. if (buffer.size() + (file_info->_seq - 1) * MAX_FILE_LEN >= file_info->_total_size) {
  62. file_obj["last"] = 1;
  63. }
  64. else {
  65. file_obj["last"] = 0;
  66. }
  67. //发送文件 todo 留作以后收到服务器返回消息后再发送
  68. QJsonDocument doc_file(file_obj);
  69. QByteArray fileData = doc_file.toJson(QJsonDocument::Compact);
  70. //发送消息给ResourceServer
  71. FileTcpMgr::GetInstance()->SendData(ReqId::ID_FILE_INFO_SYNC_REQ, fileData);
  72. });

处理同步信息回包

  1. _handlers.insert(ID_FILE_INFO_SYNC_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject recvObj = jsonDoc.object();
  12. qDebug() << "data jsonobj is " << recvObj;
  13. if (!recvObj.contains("error")) {
  14. int err = ErrorCodes::ERR_JSON;
  15. qDebug() << "icon upload_failed, err is Json Parse Err" << err;
  16. //todo ... 提示上传失败,将来可能断点重传等
  17. //emit upload_failed();
  18. return;
  19. }
  20. int err = recvObj["error"].toInt();
  21. if (err != ErrorCodes::SUCCESS) {
  22. qDebug() << "Login Failed, err is " << err;
  23. //emit upload_failed();
  24. return;
  25. }
  26. //为了简单起见,先处理网络正常情况
  27. auto seq = recvObj["seq"].toInt();
  28. auto name = recvObj["name"].toString();
  29. auto file_info = UserMgr::GetInstance()->GetTransFileByName(name);
  30. if (!file_info) {
  31. return;
  32. }
  33. //根据seq从未接收集合移动到已接收集合中
  34. file_info->_flighting_seqs.erase(seq);
  35. //将seq放入已收到集合中
  36. file_info->_rsp_seqs.insert(seq);
  37. //计算当前最后确认的序列号
  38. while (file_info->_rsp_seqs.count(file_info->_last_confirmed_seq + 1)) {
  39. ++file_info->_last_confirmed_seq;
  40. }
  41. qDebug() << "recv : " << name << "file seq is " << seq;
  42. //判断最大序列和最后确认序列号相等,说明收全了
  43. if (file_info->_last_confirmed_seq == file_info->_max_seq) {
  44. UserMgr::GetInstance()->RmvTransFileByName(name);
  45. //todo 此处添加发送其他待发送的文件
  46. auto free_file = UserMgr::GetInstance()->GetFreeTransFile();
  47. return;
  48. }
  49. BatchSend(file_info);
  50. });

之后的处理逻辑就和聊天图片上传一样,只是这个是一次上传多个。

有个更好的改进点就是不用等到服务器写完,服务器就回复给客户端,但是逻辑控制更复杂,如果后续写失败,还要回滚之类的,更麻烦。这里还是保留原逻辑,服务器写完就回复,只不过客户端不是等待回复后一个一个发送了,开始的时候是一起发送,用拥塞窗口控制。后续还是会收到限制,因为受限于服务器写,这次就先这样了,以后在考虑做优化。

响应资源回复

  1. _handlers.insert(ID_IMG_CHAT_UPLOAD_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. _cwnd_size--;
  7. // 检查转换是否成功
  8. if (jsonDoc.isNull()) {
  9. qDebug() << "Failed to create QJsonDocument.";
  10. return;
  11. }
  12. QJsonObject recvObj = jsonDoc.object();
  13. qDebug() << "data jsonobj is " << recvObj;
  14. if (!recvObj.contains("error")) {
  15. int err = ErrorCodes::ERR_JSON;
  16. qDebug() << "icon upload_failed, err is Json Parse Err" << err;
  17. //todo ... 提示上传失败
  18. //emit upload_failed();
  19. return;
  20. }
  21. int err = recvObj["error"].toInt();
  22. if (err != ErrorCodes::SUCCESS) {
  23. qDebug() << "Login Failed, err is " << err;
  24. //emit upload_failed();
  25. return;
  26. }
  27. auto name = recvObj["name"].toString();
  28. auto file_info = UserMgr::GetInstance()->GetTransFileByName(name);
  29. if (!file_info) {
  30. return;
  31. }
  32. auto md5 = file_info->_md5;
  33. auto seq = recvObj["seq"].toInt();
  34. //根据seq从未接收集合移动到已接收集合中
  35. file_info->_flighting_seqs.erase(seq);
  36. //将seq放入已收到集合中
  37. file_info->_rsp_seqs.insert(seq);
  38. //计算当前最后确认的序列号
  39. while (file_info->_rsp_seqs.count(file_info->_last_confirmed_seq + 1)) {
  40. ++file_info->_last_confirmed_seq;
  41. }
  42. qDebug() << "recv : " << name << "file seq is " << seq;
  43. //判断最大序列和最后确认序列号相等,说明收全了
  44. if (file_info->_last_confirmed_seq == file_info->_max_seq) {
  45. UserMgr::GetInstance()->RmvTransFileByName(name);
  46. //todo 此处添加发送其他待发送的文件
  47. auto free_file = UserMgr::GetInstance()->GetFreeTransFile();
  48. BatchSend(free_file);
  49. return;
  50. }
  51. BatchSend(file_info); });

服务器响应同步信息

  1. _fun_callbacks[ID_FILE_INFO_SYNC_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  2. const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto md5 = root["md5"].asString();
  7. auto seq = root["seq"].asInt();
  8. auto name = root["name"].asString();
  9. auto total_size = root["total_size"].asInt();
  10. auto trans_size = root["trans_size"].asInt();
  11. auto last = root["last"].asInt();
  12. auto file_data = root["data"].asString();
  13. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  14. auto uid = root["uid"].asInt();
  15. //转化为字符串
  16. auto uid_str = std::to_string(uid);
  17. auto file_path_str = (file_path / uid_str / name).string();
  18. Json::Value rtvalue;
  19. auto callback = [=](const Json::Value& result) {
  20. // 在异步任务完成后调用
  21. Json::Value rtvalue = result;
  22. rtvalue["error"] = ErrorCodes::Success;
  23. rtvalue["total_size"] = total_size;
  24. rtvalue["seq"] = seq;
  25. rtvalue["name"] = name;
  26. rtvalue["trans_size"] = trans_size;
  27. rtvalue["last"] = last;
  28. rtvalue["md5"] = md5;
  29. rtvalue["uid"] = uid;
  30. std::string return_str = rtvalue.toStyledString();
  31. session->Send(return_str, ID_FILE_INFO_SYNC_RSP);
  32. };
  33. // 使用 std::hash 对字符串进行哈希
  34. std::hash<std::string> hash_fn;
  35. size_t hash_value = hash_fn(name); // 生成哈希值
  36. int index = hash_value % FILE_WORKER_COUNT;
  37. std::cout << "Hash value: " << hash_value << std::endl;
  38. //第一个包
  39. if (seq == 1) {
  40. //构造数据存储
  41. auto file_info = std::make_shared<FileInfo>();
  42. file_info->_file_path_str = file_path_str;
  43. file_info->_name = name;
  44. file_info->_seq = seq;
  45. file_info->_total_size = total_size;
  46. file_info->_trans_size = trans_size;
  47. bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
  48. if (!success) {
  49. rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
  50. std::string return_str = rtvalue.toStyledString();
  51. session->Send(return_str, ID_FILE_INFO_SYNC_RSP);
  52. return;
  53. }
  54. }
  55. else {
  56. auto file_info = RedisMgr::GetInstance()->GetFileInfo(name);
  57. if (file_info == nullptr) {
  58. rtvalue["error"] = ErrorCodes::FileNotExists;
  59. std::string return_str = rtvalue.toStyledString();
  60. session->Send(return_str, ID_FILE_INFO_SYNC_RSP);
  61. return;
  62. }
  63. file_info->_seq = seq;
  64. file_info->_trans_size = trans_size;
  65. bool success = RedisMgr::GetInstance()->SetFileInfo(name, file_info);
  66. if (!success) {
  67. rtvalue["error"] = ErrorCodes::FileSaveRedisFailed;
  68. std::string return_str = rtvalue.toStyledString();
  69. session->Send(return_str, ID_FILE_INFO_SYNC_RSP);
  70. return;
  71. }
  72. }
  73. FileSystem::GetInstance()->PostMsgToQue(
  74. std::make_shared<FileTask>(session, ID_FILE_INFO_SYNC_REQ, uid, file_path_str, name, seq, total_size,
  75. trans_size, last, file_data, callback),
  76. index
  77. );
  78. };

其余逻辑不变。

测试效果

image-20251213174219570

热门评论

热门文章

  1. vscode搭建windows C++开发环境

    喜欢(596) 浏览(100577)
  2. 使用hexo搭建个人博客

    喜欢(533) 浏览(14372)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(15945)
  4. MarkDown在线编辑器

    喜欢(514) 浏览(16295)
  5. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(7367)

最新评论

  1. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  2. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  3. C++ 线程池原理和实现 mzx2023:两种方法解决,一种是改排序算法,就是当线程耗尽的时候,使用普通递归,另一种是当在线程池commit的时候,判断线程是否耗尽,耗尽的话就直接当前线程执行task
  4. 利用指针和容器实现文本查询 越今朝:应该添加一个过滤功能以解决部分单词无法被查询的问题: eg: "I am a teacher."中的teacher无法被查询,因为在示例代码中teacher.被解释为一个单词从而忽略了teacher本身。
  5. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗

个人公众号

个人微信