实现断点续传

独立网络线程

实现断点续传,需要配合队列缓存客户端要上传的数据,网络线程从队列中一条一条的取出任务上传。

独立前

image-20250803111220495

独立后

image-20250803111620510

槽函数连接方式

image-20250803114247485

  1. 直接连接

    connect(发送者, 信号, [](信号参数){});

​ 这种槽函数在发送者所在线程触发。

  1. 增加接收者

    connect(发送者,信号,接收者,槽函数)

    如果发送者和接收者在同一个线程,则槽函数调用的线程就是发送者所在的线程。

  2. 发送者和接收者不在一个线程,connect默认采用队列连接方式

connect(发送者,信号,接收者,槽函数)

​ 槽函数在接收者所在的线程触发。好处就是解耦合。

元对象系统

  1. 信号和槽
  2. 反射
  3. 动态增加函数和属性

当我们信号和槽连接方式采用队列连接,那么信号的参数会被封装为元对象,投递到队列中。

要想支持元对象有两种方式

  1. 继承于QObject,并且类内填写Q_OBJECT宏
  2. 声明并且注册元对象类

为了支持高并发情况下断点续传,考虑将目前项目中TcpMgr中网络模块独立到独立线程

封装TcpThread

利用RAII思想封装线程启动和回收

  1. class TcpThread:public std::enable_shared_from_this<TcpThread> {
  2. public:
  3. TcpThread();
  4. ~TcpThread();
  5. private:
  6. QThread* _tcp_thread;
  7. };

具体实现

  1. TcpThread::TcpThread()
  2. {
  3. _tcp_thread = new QThread();
  4. TcpMgr::GetInstance()->moveToThread(_tcp_thread);
  5. QObject::connect(_tcp_thread, &QThread::finished, _tcp_thread, &QObject::deleteLater);
  6. _tcp_thread->start();
  7. }
  8. TcpThread::~TcpThread()
  9. {
  10. _tcp_thread->quit();
  11. }

主函数启动时记得提前启动线程,将TcpMgr转移到独立线程中

  1. //启动tcp线程
  2. TcpThread tcpthread;
  3. MainWindow w;
  4. w.show();
  5. return a.exec();

测试发现,登录卡住,检测是信号sig_connect_tcp发送了,槽函数slot_tcp_connect没触发。

  1. //连接tcp连接请求的信号和槽函数
  2. connect(this, &LoginDialog::sig_connect_tcp, TcpMgr::GetInstance().get(), &TcpMgr::slot_tcp_connect);

为了测试

先在TcpMgr中添加测试槽函数

  1. void slot_test() {
  2. qDebug() << "receve thread is " << QThread::currentThread();
  3. qDebug() << "slot test......";
  4. }

LoginDialog中连接信号

  1. connect(this, &LoginDialog::sig_test, TcpMgr::GetInstance().get(), &TcpMgr::slot_test);

在发送sig_connect_tcp处发送sig_test

  1. void LoginDialog::initHttpHandlers()
  2. {
  3. //注册获取登录回包逻辑
  4. _handlers.insert(ReqId::ID_LOGIN_USER, [this](QJsonObject jsonObj){
  5. int error = jsonObj["error"].toInt();
  6. if(error != ErrorCodes::SUCCESS){
  7. showTip(tr("参数错误"),false);
  8. enableBtn(true);
  9. return;
  10. }
  11. auto email = jsonObj["email"].toString();
  12. //发送信号通知tcpMgr发送长链接
  13. ServerInfo si;
  14. si.Uid = jsonObj["uid"].toInt();
  15. si.Host = jsonObj["host"].toString();
  16. si.Port = jsonObj["port"].toString();
  17. si.Token = jsonObj["token"].toString();
  18. _uid = si.Uid;
  19. _token = si.Token;
  20. qDebug()<< "email is " << email << " uid is " << si.Uid <<" host is "
  21. << si.Host << " Port is " << si.Port << " Token is " << si.Token;
  22. emit sig_connect_tcp(si);
  23. emit sig_test();
  24. });
  25. }

测试,是可以看到能触发slot_test函数得,而且线程id显示是子线程中触发得槽函数。

那么sig_connect_tcp信号发出,没有触发槽函数,就是因为信号得参数类型不支持元对象系统。

为了支持元对象系统,我们需要在信号的参数ServerInfo类实现默认构造,同时声明为元对象类型

  1. struct ServerInfo{
  2. public:
  3. ServerInfo() = default;
  4. ServerInfo(const ServerInfo& other):Host(other.Host),Port(other.Port),Token(other.Token),Uid(other.Uid){}
  5. QString Host;
  6. QString Port;
  7. QString Token;
  8. int Uid;
  9. };
  10. Q_DECLARE_METATYPE(ServerInfo)

TcpMgr中注册这个元对象类型

  1. qRegisterMetaType<ServerInfo>("ServerInfo");

再次测试就通过登录了,但是在发送后续得消息时,又遇到了自定义类型作为参数得情况,我们需要和上面一样,依次声明元对象类型并且注册。

如下列举一个,还有很多,不再详细列举

  1. class SearchInfo {
  2. public:
  3. SearchInfo(int uid, QString name, QString nick, QString desc, int sex, QString icon);
  4. SearchInfo() = default;
  5. int _uid;
  6. QString _name;
  7. QString _nick;
  8. QString _desc;
  9. int _sex;
  10. QString _icon;
  11. };
  12. Q_DECLARE_METATYPE(SearchInfo)
  13. Q_DECLARE_METATYPE(std::shared_ptr<SearchInfo>)

TcpMgr封装注册元对象函数

  1. void TcpMgr::registerMetaType() {
  2. // 注册所有自定义类型
  3. qRegisterMetaType<ServerInfo>("ServerInfo");
  4. qRegisterMetaType<SearchInfo>("SearchInfo");
  5. qRegisterMetaType<std::shared_ptr<SearchInfo>>("std::shared_ptr<SearchInfo>");
  6. qRegisterMetaType<AddFriendApply>("AddFriendApply");
  7. qRegisterMetaType<std::shared_ptr<AddFriendApply>>("std::shared_ptr<AddFriendApply>");
  8. qRegisterMetaType<ApplyInfo>("ApplyInfo");
  9. qRegisterMetaType<std::shared_ptr<AuthInfo>>("std::shared_ptr<AuthInfo>");
  10. qRegisterMetaType<AuthRsp>("AuthRsp");
  11. qRegisterMetaType<std::shared_ptr<AuthRsp>>("std::shared_ptr<AuthRsp>");
  12. qRegisterMetaType<UserInfo>("UserInfo");
  13. qRegisterMetaType<std::vector<std::shared_ptr<TextChatData>>>("std::vector<std::shared_ptr<TextChatData>>");
  14. qRegisterMetaType<std::vector<std::shared_ptr<ChatThreadInfo>>>("std::vector<std::shared_ptr<ChatThreadInfo>>");
  15. qRegisterMetaType<std::shared_ptr<ChatThreadData>>("std::shared_ptr<ChatThreadData>");
  16. qRegisterMetaType<ReqId>("ReqId");
  17. }

在构造函数中调用

  1. TcpMgr::TcpMgr():_host(""),_port(0),_b_recv_pending(false),_message_id(0),_message_len(0)
  2. {
  3. registerMetaType();
  4. //...
  5. }

再次测试就通过了

这里给大家讲讲为什么单线程情况下,信号可以携带自定义类型作为参数,不用设定元对象就可以传输,而跨线程不可以。

在 Qt 的信号/槽机制中,信号参数的传递方式取决于连接(connect)的类型,而连接类型又由发信号对象和接收槽对象所在的线程决定:

  1. 同线程(Direct Connection)

    • 如果信号和槽都在同一个线程里,默认使用 Direct Connection
    • Direct Connection 本质上就是一个普通的 C++ 函数调用,参数直接按值或按引用传递,编译时就已经知道了类型,不需要任何额外的元类型信息。
    • 因此,即使你没有把 SearchInfo 注册为 QMetaType,编译器也能直接生成函数调用代码,信号里就可以直接传递 SearchInfo
  2. 跨线程(Queued Connection)

    • 如果信号发送者和接收者不在同一个线程,Qt 会自动把连接转成 Queued Connection

    • Queued Connection 的实现是:当信号发出时,Qt 会把信号参数打包成一个事件(QEvent),然后把事件放到目标线程的事件队列里;目标线程的事件循环(QCoreApplication::processEvents())再把这个事件取出来,调用槽函数。

    • 这里的“打包”与“解包”就需要运行时才能确定参数类型,以及如何拷贝或序列化这个类型——这正是 Qt 元对象系统(QMetaType)要干的事情。

    • 如果没有把 SearchInfo 声明成一个元类型,Qt 就不知道如何在内部把它从一个线程“打包”到事件里,又如何在另一线程里还原。

    • 因此,跨线程传递自定义类型,必须在类型定义后加上:

      1. Q_DECLARE_METATYPE(SearchInfo)

      并在运行时注册(通常在 main() 里调用一次):

      1. qRegisterMetaType<SearchInfo>("SearchInfo");

小结

  • 同线程:Direct Connection,编译时直接调用,不需要 Q_DECLARE_METATYPE
  • 跨线程:Queued Connection,需要运行时打包/解包参数,必须Q_DECLARE_METATYPE(以及 qRegisterMetaType)来注册你的自定义类型。

添加发送队列

UserMgr线程安全

为了保证多线程情况下访问数据的安全性,对UserMgr类的操作加锁

  1. std::mutex _mtx;

在获取数据和设置数据的地方都进行加锁, 比如

  1. std::shared_ptr<UserInfo> UserMgr::GetUserInfo()
  2. {
  3. std::lock_guard<std::mutex> lock(_mtx);
  4. return _user_info;
  5. }

还有很多不再赘述

设置发送队列

默认情况下qtsocket都是非阻塞的。

所以调用socket.write(数据)可能会直接返回-1

返回-1表示网络出错,一般都是EWOULD_BLOCK/EAGAIN造成的。表示发送缓冲区已经满了,无法继续发送。

而我们之前的逻辑,无论在哪个线程,想要发送数据,统一发送信号

  1. void sig_send_data(ReqId reqId, QByteArray data);

会触发TcpMgr的槽函数

  1. void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes)
  2. {
  3. uint16_t id = reqId;
  4. // 计算长度(使用网络字节序转换)
  5. quint16 len = static_cast<quint16>(dataBytes.length());
  6. // 创建一个QByteArray用于存储要发送的所有数据
  7. QByteArray block;
  8. QDataStream out(&block, QIODevice::WriteOnly);
  9. // 设置数据流使用网络字节序
  10. out.setByteOrder(QDataStream::BigEndian);
  11. // 写入ID和长度
  12. out << id << len;
  13. // 添加字符串数据
  14. block.append(dataBytes);
  15. qint64 written = _socket.write(block);
  16. qDebug() << "tcp mgr send byte data is" << _current_block
  17. << ", write() returned" << written;
  18. }

上述函数在网络情况良好的时候不会产生问题,但是如果网络发送情况频繁的时候,就容易出现written为-1的情况。

也就是发送缓冲区满了,导致发送失败。

对于这种情况,我们可以模仿我们的服务器写法,添加一个发送队列,然后将要发送的数据投递到发送队列

  1. //发送队列
  2. QQueue<QByteArray> _send_queue;
  3. //正在发送的包
  4. QByteArray _current_block;
  5. //当前已发送的字节数
  6. qint64 _bytes_sent;
  7. //是否正在发送
  8. bool _pending;

修改发送逻辑

  1. void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes)
  2. {
  3. uint16_t id = reqId;
  4. // 计算长度(使用网络字节序转换)
  5. quint16 len = static_cast<quint16>(dataBytes.length());
  6. // 创建一个QByteArray用于存储要发送的所有数据
  7. QByteArray block;
  8. QDataStream out(&block, QIODevice::WriteOnly);
  9. // 设置数据流使用网络字节序
  10. out.setByteOrder(QDataStream::BigEndian);
  11. // 写入ID和长度
  12. out << id << len;
  13. // 添加字符串数据
  14. block.append(dataBytes);
  15. //判断是否正在发送
  16. if (_pending) {
  17. //放入队列直接返回,因为目前有数据正在发送
  18. _send_queue.enqueue(block);
  19. return;
  20. }
  21. // 没有正在发送,把这包设为“当前块”,重置计数,并写出去
  22. _current_block = block; // ← 保存当前正在发送的 block
  23. _bytes_sent = 0; // ← 归零
  24. _pending = true; // ← 标记正在发送
  25. qint64 written = _socket.write(_current_block);
  26. qDebug() << "tcp mgr send byte data is" << _current_block
  27. << ", write() returned" << written;
  28. }

我们需要监听发送返回的数据,QT也提供了类似于asio的异步回调功能,只是在发送完成后返回一个信号void bytesWritten(qint64 bytes);

我们连接这个信号

  1. QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {
  2. //更新发送数据
  3. _bytes_sent += bytes;
  4. //未发送完整
  5. if (_bytes_sent < _current_block.size()) {
  6. //继续发送
  7. auto data_to_send = _current_block.mid(_bytes_sent);
  8. _socket.write(data_to_send);
  9. return;
  10. }
  11. //发送完全,则查看队列是否为空
  12. if (_send_queue.isEmpty()) {
  13. //队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送
  14. _current_block.clear();
  15. _pending = false;
  16. _bytes_sent = 0;
  17. return;
  18. }
  19. //队列不为空,则取出队首元素
  20. _current_block = _send_queue.dequeue();
  21. _bytes_sent = 0;
  22. _pending = true;
  23. qint64 w2 = _socket.write(_current_block);
  24. qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;
  25. });

_pending控制发送还是放入队列。

断点续传思路

思路图

image-20250810113255940

修改上传逻辑

原来的传输逻辑,采用的是循环上传,就是将一个文件拆分成多个报文段,循环上传,而不等待服务器每次回复

  1. void MainWindow::on_uploadBtn_clicked()
  2. {
  3. ui->uploadBtn->setEnabled(false);
  4. // 打开文件
  5. QFile file(_file_name);
  6. if (!file.open(QIODevice::ReadOnly)) {
  7. qWarning() << "Could not open file:" << file.errorString();
  8. return;
  9. }
  10. // 保存当前文件指针位置
  11. qint64 originalPos = file.pos();
  12. QCryptographicHash hash(QCryptographicHash::Md5);
  13. if (!hash.addData(&file)) {
  14. qWarning() << "Failed to read data from file:" << _file_name;
  15. return ;
  16. }
  17. _file_md5 = hash.result().toHex(); // 返回十六进制字符串
  18. // 读取文件内容并发送
  19. QByteArray buffer;
  20. int seq = 0;
  21. QFileInfo fileInfo(_file_name); // 创建 QFileInfo 对象
  22. QString fileName = fileInfo.fileName(); // 获取文件名
  23. qDebug() << "文件名是:" << fileName; // 输出文件名
  24. int total_size = fileInfo.size();
  25. int last_seq = 0;
  26. if(total_size % MAX_FILE_LEN){
  27. last_seq = (total_size/MAX_FILE_LEN)+1;
  28. }else{
  29. last_seq = total_size/MAX_FILE_LEN;
  30. }
  31. // 恢复文件指针到原来的位置
  32. file.seek(originalPos);
  33. while (!file.atEnd()) {
  34. //每次读取2048字节发送
  35. buffer = file.read(MAX_FILE_LEN);
  36. QJsonObject jsonObj;
  37. // 将文件内容转换为 Base64 编码(可选)
  38. QString base64Data = buffer.toBase64();
  39. //qDebug() << "send data is " << base64Data;
  40. ++seq;
  41. jsonObj["md5"] = _file_md5;
  42. jsonObj["name"] = fileName;
  43. jsonObj["seq"] = seq;
  44. jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
  45. jsonObj["total_size"] = total_size;
  46. if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
  47. jsonObj["last"] = 1;
  48. }else{
  49. jsonObj["last"] = 0;
  50. }
  51. jsonObj["data"]= base64Data;
  52. jsonObj["last_seq"] = last_seq;
  53. QJsonDocument doc(jsonObj);
  54. auto send_data = doc.toJson();
  55. TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
  56. //startDelay(500);
  57. }
  58. //关闭文件
  59. file.close();
  60. }

现在需要改为分段上传,每次上传后,等待服务器返回响应后再上传下一个

  1. void MainWindow::on_uploadBtn_clicked()
  2. {
  3. ui->uploadBtn->setEnabled(false);
  4. ui->pauseBtn->setEnabled(true);
  5. // 打开文件
  6. QFile file(_file_name);
  7. if (!file.open(QIODevice::ReadOnly)) {
  8. qWarning() << "Could not open file:" << file.errorString();
  9. return;
  10. }
  11. // 保存当前文件指针位置
  12. qint64 originalPos = file.pos();
  13. QCryptographicHash hash(QCryptographicHash::Md5);
  14. if (!hash.addData(&file)) {
  15. qWarning() << "Failed to read data from file:" << _file_name;
  16. return ;
  17. }
  18. _file_md5 = hash.result().toHex(); // 返回十六进制字符串
  19. // 读取文件内容并发送
  20. QByteArray buffer;
  21. int seq = 0;
  22. // 创建 QFileInfo 对象
  23. auto fileInfo = std::make_shared<QFileInfo>(_file_name);
  24. QString fileName = fileInfo->fileName(); // 获取文件名
  25. qDebug() << "文件名是:" << fileName; // 输出文件名
  26. int total_size = fileInfo->size();
  27. int last_seq = 0;
  28. if(total_size % MAX_FILE_LEN){
  29. last_seq = (total_size/MAX_FILE_LEN)+1;
  30. }else{
  31. last_seq = total_size/MAX_FILE_LEN;
  32. }
  33. // 恢复文件指针到原来的位置
  34. file.seek(originalPos);
  35. //改为读取第一块并发送
  36. //每次读取2048字节发送
  37. buffer = file.read(MAX_FILE_LEN);
  38. QJsonObject jsonObj;
  39. // 将文件内容转换为 Base64 编码(可选)
  40. QString base64Data = buffer.toBase64();
  41. //qDebug() << "send data is " << base64Data;
  42. ++seq;
  43. jsonObj["md5"] = _file_md5;
  44. jsonObj["name"] = fileName;
  45. jsonObj["seq"] = seq;
  46. jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
  47. jsonObj["total_size"] = total_size;
  48. if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
  49. jsonObj["last"] = 1;
  50. }else{
  51. jsonObj["last"] = 0;
  52. }
  53. jsonObj["data"]= base64Data;
  54. jsonObj["last_seq"] = last_seq;
  55. QJsonDocument doc(jsonObj);
  56. auto send_data = doc.toJson();
  57. TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
  58. LogicMgr::Inst()->AddMD5File(_file_md5, fileInfo);
  59. //关闭文件
  60. file.close();
  61. }

收到响应后续传

当客户端收到服务器的回包后,解析后传递给LogicMgr, LogicMgr中需要将后续的报文段发送给服务器。我们封装如下逻辑

  1. void LogicWorker::InitHandlers()
  2. {
  3. //注册上传消息
  4. _handlers[ID_UPLOAD_FILE_RSP] = [this](QJsonObject obj){
  5. auto err = obj["error"].toInt();
  6. if(err != RSP_SUCCESS){
  7. qDebug() << "upload msg rsp err is " << err;
  8. return;
  9. }
  10. auto name = obj["name"].toString();
  11. auto total_size = obj["total_size"].toInt();
  12. auto trans_size = obj["trans_size"].toInt();
  13. auto md5 = obj["md5"].toString();
  14. auto seq = obj["seq"].toInt();
  15. qDebug() << "recv : " << name << " file trans_size is " << trans_size;
  16. emit sig_trans_size(trans_size);
  17. //判断trans_size是否和total_size相等
  18. if(total_size == trans_size){
  19. return;
  20. }
  21. auto file_info = LogicMgr::Inst()->GetFileInfo(md5);
  22. if(!file_info){
  23. return;
  24. }
  25. //再次组织数据发送
  26. QFile file(file_info->filePath());
  27. if (!file.open(QIODevice::ReadOnly)) {
  28. qWarning() << "Could not open file:" << file.errorString();
  29. return;
  30. }
  31. //文件偏移到已经发送的位置,继续读取发送
  32. file.seek(trans_size);
  33. if(LogicMgr::Inst()->Pause()){
  34. return ;
  35. }
  36. QByteArray buffer;
  37. seq++;
  38. //每次读取2048字节发送
  39. buffer = file.read(MAX_FILE_LEN);
  40. QJsonObject jsonObj;
  41. // 将文件内容转换为 Base64 编码(可选)
  42. QString base64Data = buffer.toBase64();
  43. jsonObj["md5"] = md5;
  44. jsonObj["name"] = file_info->fileName();
  45. jsonObj["seq"] = seq;
  46. jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
  47. jsonObj["total_size"] = total_size;
  48. if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
  49. jsonObj["last"] = 1;
  50. }else{
  51. jsonObj["last"] = 0;
  52. }
  53. jsonObj["data"]= base64Data;
  54. jsonObj["last_seq"] = obj["last_seq"].toInt();
  55. QJsonDocument doc(jsonObj);
  56. auto send_data = doc.toJson();
  57. TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
  58. file.close();
  59. };
  60. }

其中sig_trans_size为信号,通知主界面显示进度

暂停和续传

客户端需增加暂停和续传按钮,支持传说过程中暂停,点击后再继续上传等功能

  1. void MainWindow::slot_pause_continue()
  2. {
  3. //续传状态或者初始状态,按下暂停按钮
  4. if(_cur_state == INIT || _cur_state == CONTINUE){
  5. //设置当前状态为暂停状态
  6. _b_pause = true;
  7. ui->pauseBtn->setText("继续");
  8. _cur_state = PAUSE;
  9. LogicMgr::Inst()->SetPause(true);
  10. return;
  11. }
  12. //判断当前为暂停状态,则点击后开启续传
  13. if(_cur_state == PAUSE){
  14. _b_pause = false;
  15. ui->pauseBtn->setText("暂停");
  16. _cur_state = CONTINUE ;
  17. LogicMgr::Inst()->SetPause(false);
  18. //发送请求获取文件信息,继续上传
  19. auto file_info = LogicMgr::Inst()->GetFileInfo(_file_md5);
  20. QJsonObject jsonObj;
  21. jsonObj["md5"] = _file_md5;
  22. QJsonDocument doc(jsonObj);
  23. auto send_data = doc.toJson();
  24. TcpClient::Inst().sendMsg(ID_SYNC_FILE_REQ, send_data);
  25. return;
  26. }
  27. }

这里继续上传需要请求一下服务器,同步之前的上传进度。

我们添加了新的协议ID_SYNC_FILE_REQ, 服务器收到后将状态和进度返回,客户端响应

  1. _handlers[ID_SYNC_FILE_RSP] = [this](QJsonObject obj){
  2. auto err = obj["error"].toInt();
  3. if(err != RSP_SUCCESS){
  4. qDebug() << " msg rsp err is " << err;
  5. return;
  6. }
  7. auto md5 = obj["md5"].toString();
  8. auto seq = obj["seq"].toInt();
  9. auto total_size = obj["total_size"].toInt();
  10. auto file_info = LogicMgr::Inst()->GetFileInfo(md5);
  11. if(!file_info){
  12. qDebug() << "not found file" ;
  13. return;
  14. }
  15. //再次组织数据发送
  16. QFile file(file_info->filePath());
  17. if (!file.open(QIODevice::ReadOnly)) {
  18. qWarning() << "Could not open file:" << file.errorString();
  19. return;
  20. }
  21. auto trans_size = obj["trans_size"].toInt();
  22. //文件偏移到已经发送的位置,继续读取发送
  23. file.seek(trans_size);
  24. if(LogicMgr::Inst()->Pause()){
  25. return ;
  26. }
  27. QByteArray buffer;
  28. seq++;
  29. //每次读取2048字节发送
  30. buffer = file.read(MAX_FILE_LEN);
  31. QJsonObject jsonObj;
  32. // 将文件内容转换为 Base64 编码(可选)
  33. QString base64Data = buffer.toBase64();
  34. jsonObj["md5"] = md5;
  35. jsonObj["name"] = file_info->fileName();
  36. jsonObj["seq"] = seq;
  37. jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
  38. jsonObj["total_size"] = total_size;
  39. if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){
  40. jsonObj["last"] = 1;
  41. }else{
  42. jsonObj["last"] = 0;
  43. }
  44. jsonObj["data"]= base64Data;
  45. jsonObj["last_seq"] = obj["last_seq"].toInt();
  46. QJsonDocument doc(jsonObj);
  47. auto send_data = doc.toJson();
  48. TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);
  49. file.close();
  50. };

客户端根据返回的进度,按照偏移量读取指定文件,并且继续上报。

如果健壮一点,可以判断服务器返回的错误信息,根据错误,提示主界面做出交互显示等。这里不再赘述。

到此客户端设计完成。

单线程服务器改造

单线程服务器改造不大,只需要增加同步文件进度信息的处理逻辑,以及优化之前的上传处理逻辑即可

  1. _fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  2. const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto data = root["data"].asString();
  7. //std::cout << "recv file data is " << data << std::endl;
  8. Json::Value rtvalue;
  9. Defer defer([this, &rtvalue, session]() {
  10. std::string return_str = rtvalue.toStyledString();
  11. session->Send(return_str, ID_UPLOAD_FILE_RSP);
  12. });
  13. // 解码
  14. std::string decoded = base64_decode(data);
  15. auto md5 = root["md5"].asString();
  16. auto seq = root["seq"].asInt();
  17. auto name = root["name"].asString();
  18. auto total_size = root["total_size"].asInt();
  19. auto trans_size = root["trans_size"].asInt();
  20. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  21. auto file_path_str = (file_path / name).string();
  22. std::cout << "file_path_str is " << file_path_str << std::endl;
  23. if (seq != 1) {
  24. auto iter = _map_md5_files.find(md5);
  25. if (iter == _map_md5_files.end()) {
  26. rtvalue["error"] = ErrorCodes::FileNotExists;
  27. return;
  28. }
  29. }
  30. std::ofstream outfile;
  31. //第一个包
  32. if (seq == 1) {
  33. // 打开文件,如果存在则清空,不存在则创建
  34. outfile.open(file_path_str, std::ios::binary | std::ios::trunc);
  35. //构造数据存储
  36. auto file_info = std::make_shared<FileInfo>();
  37. file_info->_file_path_str = file_path_str;
  38. file_info->_name = name;
  39. file_info->_seq = seq;
  40. file_info->_total_size = total_size;
  41. file_info->_trans_size = trans_size;
  42. std::lock_guard<std::mutex> lock(_file_mtx);
  43. _map_md5_files[md5] = file_info;
  44. }
  45. else {
  46. // 保存为文件
  47. outfile.open(file_path_str, std::ios::binary | std::ios::app);
  48. std::lock_guard<std::mutex> lock(_file_mtx);
  49. auto file_info = _map_md5_files[md5];
  50. file_info->_seq = seq;
  51. file_info->_trans_size = trans_size;
  52. }
  53. if (!outfile) {
  54. std::cerr << "无法打开文件进行写入。" << std::endl;
  55. return ;
  56. }
  57. outfile.write(decoded.data(), decoded.size());
  58. if (!outfile) {
  59. std::cerr << "写入文件失败。" << std::endl;
  60. return ;
  61. }
  62. outfile.close();
  63. std::cout << "文件已成功保存为: " << name << std::endl;
  64. rtvalue["error"] = ErrorCodes::Success;
  65. rtvalue["total_size"] = total_size;
  66. rtvalue["seq"] = seq;
  67. rtvalue["name"] = name;
  68. rtvalue["trans_size"] = trans_size;
  69. rtvalue["md5"] = md5;
  70. };
  71. _fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  72. const string& msg_data) {
  73. Json::Reader reader;
  74. Json::Value root;
  75. reader.parse(msg_data, root);
  76. Json::Value rtvalue;
  77. Defer defer([this, &rtvalue, session]() {
  78. std::string return_str = rtvalue.toStyledString();
  79. session->Send(return_str, ID_SYNC_FILE_RSP);
  80. });
  81. auto md5 = root["md5"].asString();
  82. auto iter = _map_md5_files.find(md5);
  83. if (iter == _map_md5_files.end()) {
  84. rtvalue["error"] = ErrorCodes::FileNotExists;
  85. return;
  86. }
  87. rtvalue["error"] = ErrorCodes::Success;
  88. rtvalue["total_size"] = iter->second->_total_size;
  89. rtvalue["seq"] = iter->second->_seq;
  90. rtvalue["name"] = iter->second->_name;
  91. rtvalue["trans_size"] = iter->second->_trans_size;
  92. rtvalue["md5"] = md5;
  93. };

多线程服务器

多线程服务器改造和单线程类似

只不过将处理逻辑放入LogicWorker

  1. void LogicWorker::RegisterCallBacks()
  2. {
  3. _fun_callbacks[ID_TEST_MSG_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  4. const string& msg_data) {
  5. Json::Reader reader;
  6. Json::Value root;
  7. reader.parse(msg_data, root);
  8. auto data = root["data"].asString();
  9. std::cout << "recv test data is " << data << std::endl;
  10. Json::Value rtvalue;
  11. Defer defer([this, &rtvalue, session]() {
  12. std::string return_str = rtvalue.toStyledString();
  13. session->Send(return_str, ID_TEST_MSG_RSP);
  14. });
  15. rtvalue["error"] = ErrorCodes::Success;
  16. rtvalue["data"] = data;
  17. };
  18. _fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  19. const string& msg_data) {
  20. Json::Reader reader;
  21. Json::Value root;
  22. reader.parse(msg_data, root);
  23. auto md5 = root["md5"].asString();
  24. auto seq = root["seq"].asInt();
  25. auto name = root["name"].asString();
  26. auto total_size = root["total_size"].asInt();
  27. auto trans_size = root["trans_size"].asInt();
  28. auto last = root["last"].asInt();
  29. auto file_data = root["data"].asString();
  30. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  31. auto file_path_str = (file_path / name).string();
  32. Json::Value rtvalue;
  33. Defer defer([this, &rtvalue, session]() {
  34. std::string return_str = rtvalue.toStyledString();
  35. session->Send(return_str, ID_UPLOAD_FILE_RSP);
  36. });
  37. // 使用 std::hash 对字符串进行哈希
  38. std::hash<std::string> hash_fn;
  39. size_t hash_value = hash_fn(name); // 生成哈希值
  40. int index = hash_value % FILE_WORKER_COUNT;
  41. std::cout << "Hash value: " << hash_value << std::endl;
  42. //第一个包
  43. if (seq == 1) {
  44. //构造数据存储
  45. auto file_info = std::make_shared<FileInfo>();
  46. file_info->_file_path_str = file_path_str;
  47. file_info->_name = name;
  48. file_info->_seq = seq;
  49. file_info->_total_size = total_size;
  50. file_info->_trans_size = trans_size;
  51. LogicSystem::GetInstance()->AddMD5File(md5, file_info);
  52. }
  53. else {
  54. auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
  55. if (file_info == nullptr) {
  56. rtvalue["error"] = ErrorCodes::FileNotExists;
  57. return;
  58. }
  59. file_info->_seq = seq;
  60. file_info->_trans_size = trans_size;
  61. }
  62. FileSystem::GetInstance()->PostMsgToQue(
  63. std::make_shared<FileTask>(session, name, seq, total_size,
  64. trans_size, last, file_data),
  65. index
  66. );
  67. rtvalue["error"] = ErrorCodes::Success;
  68. rtvalue["total_size"] = total_size;
  69. rtvalue["seq"] = seq;
  70. rtvalue["name"] = name;
  71. rtvalue["trans_size"] = trans_size;
  72. rtvalue["last"] = last;
  73. rtvalue["md5"] = md5;
  74. };
  75. _fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  76. const string& msg_data) {
  77. Json::Reader reader;
  78. Json::Value root;
  79. reader.parse(msg_data, root);
  80. Json::Value rtvalue;
  81. Defer defer([this, &rtvalue, session]() {
  82. std::string return_str = rtvalue.toStyledString();
  83. session->Send(return_str, ID_SYNC_FILE_RSP);
  84. });
  85. auto md5 = root["md5"].asString();
  86. auto file = LogicSystem::GetInstance()->GetFileInfo(md5);
  87. if (file == nullptr) {
  88. rtvalue["error"] = ErrorCodes::FileNotExists;
  89. return;
  90. }
  91. rtvalue["error"] = ErrorCodes::Success;
  92. rtvalue["total_size"] = file->_total_size;
  93. rtvalue["seq"] = file->_seq;
  94. rtvalue["name"] = file->_name;
  95. rtvalue["trans_size"] = file->_trans_size;
  96. rtvalue["md5"] = md5;
  97. };
  98. }

将进度信息存储在LogicSystem中,后续可参考填写入redis,方便后续分布式扩展,注意如果填写了多个资源服务器,还有写入服务器信息,这个不再赘述和进阶,我们只用一个资源服务器做演示,后续读者可自己进阶分布式设计。

  1. void LogicSystem::AddMD5File(std::string md5, std::shared_ptr<FileInfo> fileinfo) {
  2. std::lock_guard<std::mutex> lock(_file_mtx);
  3. _map_md5_files[md5] = fileinfo;
  4. }
  5. std::shared_ptr<FileInfo> LogicSystem::GetFileInfo(std::string md5) {
  6. std::lock_guard<std::mutex> lock(_file_mtx);
  7. auto iter = _map_md5_files.find(md5);
  8. if (iter == _map_md5_files.end()) {
  9. return nullptr;
  10. }
  11. return iter->second;
  12. }

集成资源服务器

新架构形式

集成资源服务器后的架构为

image-20250905105415106

将上述多线程服务器,整合到项目目录,同时设置资源属性表,复用之前的就可以了。

注意资源服务器配置要稍作修改

  1. [GateServer]
  2. Port = 8080
  3. [VarifyServer]
  4. Host = 127.0.0.1
  5. Port = 50051
  6. [StatusServer]
  7. Host = 127.0.0.1
  8. Port = 50052
  9. [SelfServer]
  10. Name = reserver
  11. Host = 0.0.0.0
  12. Port = 9090
  13. RPCPort = 51055
  14. [Mysql]
  15. Host = 81.68.86.146
  16. Port = 3308
  17. User = root
  18. Passwd = 123456.
  19. Schema = llfc
  20. [Redis]
  21. Host = 81.68.86.146
  22. Port = 6380
  23. Passwd = 123456
  24. [Static]
  25. Path = static
  26. [Output]
  27. Path = bin

客户端新增资源网络类

构造函数解析

因为客户端需要长连接资源服务器,采用TCP方式上传文件,所以需要封装一个单例的FileTcpMgr类,用于上传资源。

  1. class FileTcpMgr : public QObject, public Singleton<FileTcpMgr>,
  2. public std::enable_shared_from_this<FileTcpMgr>
  3. {
  4. Q_OBJECT
  5. public:
  6. friend class Singleton<FileTcpMgr>;
  7. ~FileTcpMgr();
  8. void SendData(ReqId reqId, QByteArray data);
  9. void CloseConnection();
  10. private:
  11. void initHandlers();
  12. explicit FileTcpMgr(QObject *parent = nullptr);
  13. void registerMetaType();
  14. void handleMsg(ReqId id, int len, QByteArray data);
  15. QTcpSocket _socket;
  16. QString _host;
  17. uint16_t _port;
  18. QByteArray _buffer;
  19. bool _b_recv_pending;
  20. quint16 _message_id;
  21. quint32 _message_len;
  22. QMap<ReqId, std::function<void(ReqId id, int len, QByteArray data)>> _handlers;
  23. //发送队列
  24. QQueue<QByteArray> _send_queue;
  25. //正在发送的包
  26. QByteArray _current_block;
  27. //当前已发送的字节数
  28. qint64 _bytes_sent;
  29. //是否正在发送
  30. bool _pending;
  31. signals:
  32. void sig_send_data(ReqId reqId, QByteArray data);
  33. void sig_con_success(bool bsuccess);
  34. void sig_connection_closed();
  35. public slots:
  36. void slot_send_data(ReqId reqId, QByteArray data);
  37. void slot_tcp_connect(std::shared_ptr<ServerInfo> si);
  38. };

构造函数具体实现

  1. FileTcpMgr::FileTcpMgr(QObject *parent) : QObject(parent),
  2. _host(""), _port(0), _b_recv_pending(false), _message_id(0), _message_len(0), _bytes_sent(0), _pending(false)
  3. {
  4. registerMetaType();
  5. QObject::connect(&_socket, &QTcpSocket::connected, this, [&]() {
  6. qDebug() << "Connected to server!";
  7. emit sig_con_success(true);
  8. });
  9. QObject::connect(&_socket, &QTcpSocket::readyRead, this, [&]() {
  10. // 当有数据可读时,读取所有数据
  11. // 读取所有数据并追加到缓冲区
  12. _buffer.append(_socket.readAll());
  13. QDataStream stream(&_buffer, QIODevice::ReadOnly);
  14. stream.setVersion(QDataStream::Qt_5_0);
  15. forever {
  16. //先解析头部
  17. if(!_b_recv_pending){
  18. // 检查缓冲区中的数据是否足够解析出一个消息头(消息ID + 消息长度)
  19. if (_buffer.size() < FILE_UPLOAD_HEAD_LEN) {
  20. return; // 数据不够,等待更多数据
  21. }
  22. // 预读取消息ID和消息长度,但不从缓冲区中移除
  23. stream >> _message_id >> _message_len;
  24. //将buffer 中的前六个字节移除
  25. _buffer = _buffer.mid(FILE_UPLOAD_HEAD_LEN);
  26. // 输出读取的数据
  27. qDebug() << "Message ID:" << _message_id << ", Length:" << _message_len;
  28. }
  29. //buffer剩余长读是否满足消息体长度,不满足则退出继续等待接受
  30. if(_buffer.size() < _message_len){
  31. _b_recv_pending = true;
  32. return;
  33. }
  34. _b_recv_pending = false;
  35. // 读取消息体
  36. QByteArray messageBody = _buffer.mid(0, _message_len);
  37. qDebug() << "receive body msg is " << messageBody ;
  38. _buffer = _buffer.mid(_message_len);
  39. handleMsg(ReqId(_message_id),_message_len, messageBody);
  40. }
  41. });
  42. //5.15 之后版本
  43. // QObject::connect(&_socket, QOverload<QAbstractSocket::SocketError>::of(&QTcpSocket::errorOccurred), [&](QAbstractSocket::SocketError socketError) {
  44. // Q_UNUSED(socketError)
  45. // qDebug() << "Error:" << _socket.errorString();
  46. // });
  47. // 处理错误(适用于Qt 5.15之前的版本)
  48. QObject::connect(&_socket, static_cast<void (QTcpSocket::*)(QTcpSocket::SocketError)>(&QTcpSocket::error),
  49. this,
  50. [&](QTcpSocket::SocketError socketError) {
  51. qDebug() << "Error:" << _socket.errorString() ;
  52. //todo... 根据错误类型做不同的处理
  53. switch (socketError) {
  54. case QTcpSocket::ConnectionRefusedError:
  55. qDebug() << "Connection Refused!";
  56. emit sig_con_success(false);
  57. break;
  58. case QTcpSocket::RemoteHostClosedError:
  59. qDebug() << "Remote Host Closed Connection!";
  60. break;
  61. case QTcpSocket::HostNotFoundError:
  62. qDebug() << "Host Not Found!";
  63. emit sig_con_success(false);
  64. break;
  65. case QTcpSocket::SocketTimeoutError:
  66. qDebug() << "Connection Timeout!";
  67. emit sig_con_success(false);
  68. break;
  69. case QTcpSocket::NetworkError:
  70. //qDebug() << "Network Error!";
  71. break;
  72. default:
  73. //qDebug() << "Other Error!";
  74. break;
  75. }
  76. });
  77. // 处理连接断开
  78. QObject::connect(&_socket, &QTcpSocket::disconnected, this,[&]() {
  79. qDebug() << "Disconnected from server.";
  80. emit sig_connection_closed();
  81. });
  82. //连接发送信号用来发送数据
  83. QObject::connect(this, &FileTcpMgr::sig_send_data, this, &FileTcpMgr::slot_send_data);
  84. //连接发送信号
  85. QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {
  86. //更新发送数据
  87. _bytes_sent += bytes;
  88. //未发送完整
  89. if (_bytes_sent < _current_block.size()) {
  90. //继续发送
  91. auto data_to_send = _current_block.mid(_bytes_sent);
  92. _socket.write(data_to_send);
  93. return;
  94. }
  95. //发送完全,则查看队列是否为空
  96. if (_send_queue.isEmpty()) {
  97. //队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送
  98. _current_block.clear();
  99. _pending = false;
  100. _bytes_sent = 0;
  101. return;
  102. }
  103. //队列不为空,则取出队首元素
  104. _current_block = _send_queue.dequeue();
  105. _bytes_sent = 0;
  106. _pending = true;
  107. qint64 w2 = _socket.write(_current_block);
  108. qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;
  109. });
  110. //连接
  111. QObject::connect(this, &FileTcpMgr::sig_close, this, &FileTcpMgr::slot_tcp_close);
  112. //注册消息
  113. initHandlers();
  114. }

简单描述下上述构造函数做的事情:

  1. 成功连接服务器后,会触发QTcpSocket::connected信号,从而回调lambda表达式,发送sig_con_success信号
  2. 接收服务器传输的数据,会触发QTcpSocket::readyRead信号,从而回调lambda表达式,在这里处理头部信息和包体信息。进行TLV协议解析后回调handleMsg
  3. 捕获QTcpSocket::SocketError信号,当出错后回调lambda表达式发送信号通知主界面错误。
  4. 捕获连接断开信号QTcpSocket::disconnected,回调lambda表达式,通知主界面连接断开。
  5. 连接发送信号sig_send_data,因为socket在独立线程,不能直接调用发送,所以改为异步发送,触发槽函数slot_send_data
  6. 因为异步发送,可能存在未发送完全的情况,所以我们用QTcpSocket::bytesWritten来检测发送了多少字节,通过lambda表达式回调处理,继续发送数据。
  7. 因为socket被独立为单独线程,所以关闭也不能直接调用close(socket), 需要统一在槽函数中处理。

注册元对象系统的逻辑不再赘述。

连接槽函数

我们实现槽函数slot_tcp_connect用来创建客户端到资源服务器的连接

  1. void FileTcpMgr::slot_tcp_connect(std::shared_ptr<ServerInfo> si)
  2. {
  3. qDebug()<< "receive tcp connect signal";
  4. // 尝试连接到服务器
  5. qDebug() << "Connecting to server...";
  6. _host = si->_res_host;
  7. _port = static_cast<uint16_t>(si->_res_port.toUInt());
  8. _socket.connectToHost(_host, _port);
  9. }

注册处理流程

注册上传头像回调逻辑

  1. void FileTcpMgr::initHandlers()
  2. {
  3. //todo 接收上传用户头像回复
  4. _handlers.insert(ID_UPLOAD_HEAD_ICON_RSP, [this](ReqId id, int len, QByteArray data){
  5. Q_UNUSED(len);
  6. qDebug()<< "handle id is "<< id ;
  7. // 将QByteArray转换为QJsonDocument
  8. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  9. // 检查转换是否成功
  10. if(jsonDoc.isNull()){
  11. qDebug() << "Failed to create QJsonDocument.";
  12. return;
  13. }
  14. QJsonObject recvObj = jsonDoc.object();
  15. qDebug()<< "data jsonobj is " << recvObj ;
  16. if(!recvObj.contains("error")){
  17. int err = ErrorCodes::ERR_JSON;
  18. qDebug() << "icon upload_failed, err is Json Parse Err" << err ;
  19. //todo ... 提示上传失败
  20. //emit upload_failed();
  21. return;
  22. }
  23. int err = recvObj["error"].toInt();
  24. if(err != ErrorCodes::SUCCESS){
  25. qDebug() << "Login Failed, err is " << err ;
  26. //emit upload_failed();
  27. return;
  28. }
  29. auto md5 = recvObj["md5"].toString();
  30. auto seq = recvObj["seq"].toInt();
  31. auto trans_size = recvObj["trans_size"].toInt();
  32. auto uid = recvObj["uid"].toInt();
  33. auto total_size = recvObj["total_size"].toInt();
  34. auto name = recvObj["name"].toString();
  35. qDebug() << "recv : " << name << "file trans_size is " << trans_size;
  36. //判断trans_size和total_size相等
  37. if(total_size == trans_size){
  38. return;
  39. }
  40. auto file_info = UserMgr::GetInstance()->GetFileInfoByMD5(md5);
  41. if(!file_info){
  42. return;
  43. }
  44. //再次组织数据发送
  45. QFile file(file_info->filePath());
  46. if(!file.open(QIODevice::ReadOnly)){
  47. qWarning() << "Could not open file: " << file.errorString();
  48. return;
  49. }
  50. //文件偏移到已经发送的位置,继续读取发送
  51. file.seek(trans_size);
  52. QByteArray buffer;
  53. seq ++;
  54. //每次读取2048字节发送
  55. buffer = file.read(MAX_FILE_LEN);
  56. QJsonObject sendObj;
  57. //将文件内容转换为base64编码
  58. QString base64Data = buffer.toBase64();
  59. sendObj["md5"] = md5;
  60. sendObj["name"] = file_info->fileName();
  61. sendObj["seq"] = seq;
  62. sendObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;
  63. sendObj["total_size"] = total_size;
  64. if(buffer.size() + (seq-1)*MAX_FILE_LEN >= total_size){
  65. sendObj["last"] = 1;
  66. }else{
  67. sendObj["last"] = 0;
  68. }
  69. sendObj["data"] = base64Data;
  70. sendObj["last_seq"] = recvObj["last_seq"].toInt();
  71. sendObj["uid"] = uid;
  72. QJsonDocument doc(sendObj);
  73. auto send_data = doc.toJson();
  74. SendData(ID_UPLOAD_HEAD_ICON_REQ, send_data);
  75. file.close();
  76. });
  77. }

独立文件线程

对于上传我们独立到文件上报线程中

  1. class FileTcpThread: public std::enable_shared_from_this<FileTcpThread>{
  2. public:
  3. FileTcpThread();
  4. ~FileTcpThread();
  5. private:
  6. QThread * _file_tcp_thread;
  7. };

具体实现

  1. FileTcpThread::FileTcpThread()
  2. {
  3. _file_tcp_thread = new QThread();
  4. FileTcpMgr::GetInstance()->moveToThread(_file_tcp_thread);
  5. QObject::connect(_file_tcp_thread, &QThread::finished, _file_tcp_thread, &QObject::deleteLater);
  6. _file_tcp_thread->start();
  7. }
  8. FileTcpThread::~FileTcpThread()
  9. {
  10. _file_tcp_thread->quit();
  11. }

主函数调用

  1. #include "mainwindow.h"
  2. #include <QApplication>
  3. #include <QFile>
  4. #include "global.h"
  5. #include "tcpmgr.h"
  6. #include "filetcpmgr.h"
  7. int main(int argc, char *argv[])
  8. {
  9. QApplication a(argc, argv);
  10. QFile qss(":/style/stylesheet.qss");
  11. if( qss.open(QFile::ReadOnly))
  12. {
  13. qDebug("open success");
  14. QString style = QLatin1String(qss.readAll());
  15. a.setStyleSheet(style);
  16. qss.close();
  17. }else{
  18. qDebug("Open failed");
  19. }
  20. // 获取当前应用程序的路径
  21. QString app_path = QCoreApplication::applicationDirPath();
  22. // 拼接文件名
  23. QString fileName = "config.ini";
  24. QString config_path = QDir::toNativeSeparators(app_path +
  25. QDir::separator() + fileName);
  26. QSettings settings(config_path, QSettings::IniFormat);
  27. QString gate_host = settings.value("GateServer/host").toString();
  28. QString gate_port = settings.value("GateServer/port").toString();
  29. gate_url_prefix = "http://"+gate_host+":"+gate_port;
  30. //启动tcp线程
  31. TcpThread tcpthread;
  32. //启动资源网络线程
  33. FileTcpThread file_tcp_thread;
  34. MainWindow w;
  35. w.show();
  36. return a.exec();
  37. }

原来的登录流程稍作修改,连接好ChatServer后,连接ResourceServer, 最后再让用户登录。

服务器逻辑

服务器新增文件上报逻辑处理, 在LogicWorker::RegisterCallBacks中添加

  1. _fun_callbacks[ID_UPLOAD_HEAD_ICON_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,
  2. const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto md5 = root["md5"].asString();
  7. auto seq = root["seq"].asInt();
  8. auto name = root["name"].asString();
  9. auto total_size = root["total_size"].asInt();
  10. auto trans_size = root["trans_size"].asInt();
  11. auto last = root["last"].asInt();
  12. auto file_data = root["data"].asString();
  13. auto uid = root["uid"].asInt();
  14. auto token = root["token"].asString();
  15. auto last_seq = root["last_seq"].asInt();
  16. //转化为字符串
  17. auto uid_str = std::to_string(uid);
  18. auto file_path = ConfigMgr::Inst().GetFileOutPath();
  19. auto file_path_str = (file_path / uid_str / name).string();
  20. Json::Value rtvalue;
  21. Defer defer([this, &rtvalue, session]() {
  22. std::string return_str = rtvalue.toStyledString();
  23. session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);
  24. });
  25. //第一个包校验一下token是否合理
  26. if (seq == 1) {
  27. //从redis获取用户token是否正确
  28. std::string uid_str = std::to_string(uid);
  29. std::string token_key = USERTOKENPREFIX + uid_str;
  30. std::string token_value = "";
  31. bool success = RedisMgr::GetInstance()->Get(token_key, token_value);
  32. if (!success) {
  33. rtvalue["error"] = ErrorCodes::UidInvalid;
  34. return;
  35. }
  36. if (token_value != token) {
  37. rtvalue["error"] = ErrorCodes::TokenInvalid;
  38. return;
  39. }
  40. }
  41. // 使用 std::hash 对字符串进行哈希
  42. std::hash<std::string> hash_fn;
  43. size_t hash_value = hash_fn(name); // 生成哈希值
  44. int index = hash_value % FILE_WORKER_COUNT;
  45. std::cout << "Hash value: " << hash_value << std::endl;
  46. //第一个包
  47. if (seq == 1) {
  48. //构造数据存储
  49. auto file_info = std::make_shared<FileInfo>();
  50. file_info->_file_path_str = file_path_str;
  51. file_info->_name = name;
  52. file_info->_seq = seq;
  53. file_info->_total_size = total_size;
  54. file_info->_trans_size = trans_size;
  55. LogicSystem::GetInstance()->AddMD5File(md5, file_info);
  56. }
  57. else {
  58. auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);
  59. if (file_info == nullptr) {
  60. rtvalue["error"] = ErrorCodes::FileNotExists;
  61. return;
  62. }
  63. file_info->_seq = seq;
  64. file_info->_trans_size = trans_size;
  65. }
  66. FileSystem::GetInstance()->PostMsgToQue(
  67. std::make_shared<FileTask>(session, file_path_str, name, seq, total_size,
  68. trans_size, last, file_data),
  69. index
  70. );
  71. rtvalue["error"] = ErrorCodes::Success;
  72. rtvalue["total_size"] = total_size;
  73. rtvalue["seq"] = seq;
  74. rtvalue["name"] = name;
  75. rtvalue["trans_size"] = trans_size;
  76. rtvalue["last"] = last;
  77. rtvalue["md5"] = md5;
  78. rtvalue["uid"] = uid;
  79. rtvalue["last_seq"] = last_seq;
  80. };

源码

https://gitee.com/secondtonone1/llfcchat

效果展示:

上传前

image-20250905145321372

上传后

image-20250905145408921

服务器存储成功

image-20250905145440949

热门评论

热门文章

  1. vscode搭建windows C++开发环境

    喜欢(596) 浏览(96518)
  2. 使用hexo搭建个人博客

    喜欢(533) 浏览(13711)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(15350)
  4. MarkDown在线编辑器

    喜欢(514) 浏览(15557)
  5. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(7021)

最新评论

  1. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  2. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  3. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  4. C++ 线程池原理和实现 mzx2023:两种方法解决,一种是改排序算法,就是当线程耗尽的时候,使用普通递归,另一种是当在线程池commit的时候,判断线程是否耗尽,耗尽的话就直接当前线程执行task
  5. 利用指针和容器实现文本查询 越今朝:应该添加一个过滤功能以解决部分单词无法被查询的问题: eg: "I am a teacher."中的teacher无法被查询,因为在示例代码中teacher.被解释为一个单词从而忽略了teacher本身。

个人公众号

个人微信