独立网络线程
实现断点续传,需要配合队列缓存客户端要上传的数据,网络线程从队列中一条一条的取出任务上传。
独立前

独立后

槽函数连接方式

直接连接
connect(发送者, 信号, [](信号参数){});
这种槽函数在发送者所在线程触发。
增加接收者
connect(发送者,信号,接收者,槽函数)如果发送者和接收者在同一个线程,则槽函数调用的线程就是发送者所在的线程。
发送者和接收者不在一个线程,connect默认采用队列连接方式
connect(发送者,信号,接收者,槽函数)
槽函数在接收者所在的线程触发。好处就是解耦合。
元对象系统
- 信号和槽
- 反射
- 动态增加函数和属性
当我们信号和槽连接方式采用队列连接,那么信号的参数会被封装为元对象,投递到队列中。
要想支持元对象有两种方式
- 继承于QObject,并且类内填写Q_OBJECT宏
- 声明并且注册元对象类
为了支持高并发情况下断点续传,考虑将目前项目中TcpMgr中网络模块独立到独立线程
封装TcpThread类
利用RAII思想封装线程启动和回收
class TcpThread:public std::enable_shared_from_this<TcpThread> {public:TcpThread();~TcpThread();private:QThread* _tcp_thread;};
具体实现
TcpThread::TcpThread(){_tcp_thread = new QThread();TcpMgr::GetInstance()->moveToThread(_tcp_thread);QObject::connect(_tcp_thread, &QThread::finished, _tcp_thread, &QObject::deleteLater);_tcp_thread->start();}TcpThread::~TcpThread(){_tcp_thread->quit();}
主函数启动时记得提前启动线程,将TcpMgr转移到独立线程中
//启动tcp线程TcpThread tcpthread;MainWindow w;w.show();return a.exec();
测试发现,登录卡住,检测是信号sig_connect_tcp发送了,槽函数slot_tcp_connect没触发。
//连接tcp连接请求的信号和槽函数connect(this, &LoginDialog::sig_connect_tcp, TcpMgr::GetInstance().get(), &TcpMgr::slot_tcp_connect);
为了测试
先在TcpMgr中添加测试槽函数
void slot_test() {qDebug() << "receve thread is " << QThread::currentThread();qDebug() << "slot test......";}
在LoginDialog中连接信号
connect(this, &LoginDialog::sig_test, TcpMgr::GetInstance().get(), &TcpMgr::slot_test);
在发送sig_connect_tcp处发送sig_test
void LoginDialog::initHttpHandlers(){//注册获取登录回包逻辑_handlers.insert(ReqId::ID_LOGIN_USER, [this](QJsonObject jsonObj){int error = jsonObj["error"].toInt();if(error != ErrorCodes::SUCCESS){showTip(tr("参数错误"),false);enableBtn(true);return;}auto email = jsonObj["email"].toString();//发送信号通知tcpMgr发送长链接ServerInfo si;si.Uid = jsonObj["uid"].toInt();si.Host = jsonObj["host"].toString();si.Port = jsonObj["port"].toString();si.Token = jsonObj["token"].toString();_uid = si.Uid;_token = si.Token;qDebug()<< "email is " << email << " uid is " << si.Uid <<" host is "<< si.Host << " Port is " << si.Port << " Token is " << si.Token;emit sig_connect_tcp(si);emit sig_test();});}
测试,是可以看到能触发slot_test函数得,而且线程id显示是子线程中触发得槽函数。
那么sig_connect_tcp信号发出,没有触发槽函数,就是因为信号得参数类型不支持元对象系统。
为了支持元对象系统,我们需要在信号的参数ServerInfo类实现默认构造,同时声明为元对象类型
struct ServerInfo{public:ServerInfo() = default;ServerInfo(const ServerInfo& other):Host(other.Host),Port(other.Port),Token(other.Token),Uid(other.Uid){}QString Host;QString Port;QString Token;int Uid;};Q_DECLARE_METATYPE(ServerInfo)
在TcpMgr中注册这个元对象类型
qRegisterMetaType<ServerInfo>("ServerInfo");
再次测试就通过登录了,但是在发送后续得消息时,又遇到了自定义类型作为参数得情况,我们需要和上面一样,依次声明元对象类型并且注册。
如下列举一个,还有很多,不再详细列举
class SearchInfo {public:SearchInfo(int uid, QString name, QString nick, QString desc, int sex, QString icon);SearchInfo() = default;int _uid;QString _name;QString _nick;QString _desc;int _sex;QString _icon;};Q_DECLARE_METATYPE(SearchInfo)Q_DECLARE_METATYPE(std::shared_ptr<SearchInfo>)
TcpMgr封装注册元对象函数
void TcpMgr::registerMetaType() {// 注册所有自定义类型qRegisterMetaType<ServerInfo>("ServerInfo");qRegisterMetaType<SearchInfo>("SearchInfo");qRegisterMetaType<std::shared_ptr<SearchInfo>>("std::shared_ptr<SearchInfo>");qRegisterMetaType<AddFriendApply>("AddFriendApply");qRegisterMetaType<std::shared_ptr<AddFriendApply>>("std::shared_ptr<AddFriendApply>");qRegisterMetaType<ApplyInfo>("ApplyInfo");qRegisterMetaType<std::shared_ptr<AuthInfo>>("std::shared_ptr<AuthInfo>");qRegisterMetaType<AuthRsp>("AuthRsp");qRegisterMetaType<std::shared_ptr<AuthRsp>>("std::shared_ptr<AuthRsp>");qRegisterMetaType<UserInfo>("UserInfo");qRegisterMetaType<std::vector<std::shared_ptr<TextChatData>>>("std::vector<std::shared_ptr<TextChatData>>");qRegisterMetaType<std::vector<std::shared_ptr<ChatThreadInfo>>>("std::vector<std::shared_ptr<ChatThreadInfo>>");qRegisterMetaType<std::shared_ptr<ChatThreadData>>("std::shared_ptr<ChatThreadData>");qRegisterMetaType<ReqId>("ReqId");}
在构造函数中调用
TcpMgr::TcpMgr():_host(""),_port(0),_b_recv_pending(false),_message_id(0),_message_len(0){registerMetaType();//...}
再次测试就通过了
这里给大家讲讲为什么单线程情况下,信号可以携带自定义类型作为参数,不用设定元对象就可以传输,而跨线程不可以。
在 Qt 的信号/槽机制中,信号参数的传递方式取决于连接(connect)的类型,而连接类型又由发信号对象和接收槽对象所在的线程决定:
同线程(Direct Connection)
- 如果信号和槽都在同一个线程里,默认使用 Direct Connection。
- Direct Connection 本质上就是一个普通的 C++ 函数调用,参数直接按值或按引用传递,编译时就已经知道了类型,不需要任何额外的元类型信息。
- 因此,即使你没有把
SearchInfo注册为QMetaType,编译器也能直接生成函数调用代码,信号里就可以直接传递SearchInfo。
跨线程(Queued Connection)
如果信号发送者和接收者不在同一个线程,Qt 会自动把连接转成 Queued Connection。
Queued Connection 的实现是:当信号发出时,Qt 会把信号参数打包成一个事件(
QEvent),然后把事件放到目标线程的事件队列里;目标线程的事件循环(QCoreApplication::processEvents())再把这个事件取出来,调用槽函数。这里的“打包”与“解包”就需要运行时才能确定参数类型,以及如何拷贝或序列化这个类型——这正是 Qt 元对象系统(
QMetaType)要干的事情。如果没有把
SearchInfo声明成一个元类型,Qt 就不知道如何在内部把它从一个线程“打包”到事件里,又如何在另一线程里还原。因此,跨线程传递自定义类型,必须在类型定义后加上:
Q_DECLARE_METATYPE(SearchInfo)
并在运行时注册(通常在
main()里调用一次):qRegisterMetaType<SearchInfo>("SearchInfo");
小结
- 同线程:Direct Connection,编译时直接调用,不需要
Q_DECLARE_METATYPE。 - 跨线程:Queued Connection,需要运行时打包/解包参数,必须用
Q_DECLARE_METATYPE(以及qRegisterMetaType)来注册你的自定义类型。
添加发送队列
UserMgr线程安全
为了保证多线程情况下访问数据的安全性,对UserMgr类的操作加锁
std::mutex _mtx;
在获取数据和设置数据的地方都进行加锁, 比如
std::shared_ptr<UserInfo> UserMgr::GetUserInfo(){std::lock_guard<std::mutex> lock(_mtx);return _user_info;}
还有很多不再赘述
设置发送队列
默认情况下qt的socket都是非阻塞的。
所以调用socket.write(数据)可能会直接返回-1
返回-1表示网络出错,一般都是EWOULD_BLOCK/EAGAIN造成的。表示发送缓冲区已经满了,无法继续发送。
而我们之前的逻辑,无论在哪个线程,想要发送数据,统一发送信号
void sig_send_data(ReqId reqId, QByteArray data);
会触发TcpMgr的槽函数
void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes){uint16_t id = reqId;// 计算长度(使用网络字节序转换)quint16 len = static_cast<quint16>(dataBytes.length());// 创建一个QByteArray用于存储要发送的所有数据QByteArray block;QDataStream out(&block, QIODevice::WriteOnly);// 设置数据流使用网络字节序out.setByteOrder(QDataStream::BigEndian);// 写入ID和长度out << id << len;// 添加字符串数据block.append(dataBytes);qint64 written = _socket.write(block);qDebug() << "tcp mgr send byte data is" << _current_block<< ", write() returned" << written;}
上述函数在网络情况良好的时候不会产生问题,但是如果网络发送情况频繁的时候,就容易出现written为-1的情况。
也就是发送缓冲区满了,导致发送失败。
对于这种情况,我们可以模仿我们的服务器写法,添加一个发送队列,然后将要发送的数据投递到发送队列
//发送队列QQueue<QByteArray> _send_queue;//正在发送的包QByteArray _current_block;//当前已发送的字节数qint64 _bytes_sent;//是否正在发送bool _pending;
修改发送逻辑
void TcpMgr::slot_send_data(ReqId reqId, QByteArray dataBytes){uint16_t id = reqId;// 计算长度(使用网络字节序转换)quint16 len = static_cast<quint16>(dataBytes.length());// 创建一个QByteArray用于存储要发送的所有数据QByteArray block;QDataStream out(&block, QIODevice::WriteOnly);// 设置数据流使用网络字节序out.setByteOrder(QDataStream::BigEndian);// 写入ID和长度out << id << len;// 添加字符串数据block.append(dataBytes);//判断是否正在发送if (_pending) {//放入队列直接返回,因为目前有数据正在发送_send_queue.enqueue(block);return;}// 没有正在发送,把这包设为“当前块”,重置计数,并写出去_current_block = block; // ← 保存当前正在发送的 block_bytes_sent = 0; // ← 归零_pending = true; // ← 标记正在发送qint64 written = _socket.write(_current_block);qDebug() << "tcp mgr send byte data is" << _current_block<< ", write() returned" << written;}
我们需要监听发送返回的数据,QT也提供了类似于asio的异步回调功能,只是在发送完成后返回一个信号void bytesWritten(qint64 bytes);
我们连接这个信号
QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {//更新发送数据_bytes_sent += bytes;//未发送完整if (_bytes_sent < _current_block.size()) {//继续发送auto data_to_send = _current_block.mid(_bytes_sent);_socket.write(data_to_send);return;}//发送完全,则查看队列是否为空if (_send_queue.isEmpty()) {//队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送_current_block.clear();_pending = false;_bytes_sent = 0;return;}//队列不为空,则取出队首元素_current_block = _send_queue.dequeue();_bytes_sent = 0;_pending = true;qint64 w2 = _socket.write(_current_block);qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;});
_pending控制发送还是放入队列。
断点续传思路
思路图

修改上传逻辑
原来的传输逻辑,采用的是循环上传,就是将一个文件拆分成多个报文段,循环上传,而不等待服务器每次回复
void MainWindow::on_uploadBtn_clicked(){ui->uploadBtn->setEnabled(false);// 打开文件QFile file(_file_name);if (!file.open(QIODevice::ReadOnly)) {qWarning() << "Could not open file:" << file.errorString();return;}// 保存当前文件指针位置qint64 originalPos = file.pos();QCryptographicHash hash(QCryptographicHash::Md5);if (!hash.addData(&file)) {qWarning() << "Failed to read data from file:" << _file_name;return ;}_file_md5 = hash.result().toHex(); // 返回十六进制字符串// 读取文件内容并发送QByteArray buffer;int seq = 0;QFileInfo fileInfo(_file_name); // 创建 QFileInfo 对象QString fileName = fileInfo.fileName(); // 获取文件名qDebug() << "文件名是:" << fileName; // 输出文件名int total_size = fileInfo.size();int last_seq = 0;if(total_size % MAX_FILE_LEN){last_seq = (total_size/MAX_FILE_LEN)+1;}else{last_seq = total_size/MAX_FILE_LEN;}// 恢复文件指针到原来的位置file.seek(originalPos);while (!file.atEnd()) {//每次读取2048字节发送buffer = file.read(MAX_FILE_LEN);QJsonObject jsonObj;// 将文件内容转换为 Base64 编码(可选)QString base64Data = buffer.toBase64();//qDebug() << "send data is " << base64Data;++seq;jsonObj["md5"] = _file_md5;jsonObj["name"] = fileName;jsonObj["seq"] = seq;jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;jsonObj["total_size"] = total_size;if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){jsonObj["last"] = 1;}else{jsonObj["last"] = 0;}jsonObj["data"]= base64Data;jsonObj["last_seq"] = last_seq;QJsonDocument doc(jsonObj);auto send_data = doc.toJson();TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);//startDelay(500);}//关闭文件file.close();}
现在需要改为分段上传,每次上传后,等待服务器返回响应后再上传下一个
void MainWindow::on_uploadBtn_clicked(){ui->uploadBtn->setEnabled(false);ui->pauseBtn->setEnabled(true);// 打开文件QFile file(_file_name);if (!file.open(QIODevice::ReadOnly)) {qWarning() << "Could not open file:" << file.errorString();return;}// 保存当前文件指针位置qint64 originalPos = file.pos();QCryptographicHash hash(QCryptographicHash::Md5);if (!hash.addData(&file)) {qWarning() << "Failed to read data from file:" << _file_name;return ;}_file_md5 = hash.result().toHex(); // 返回十六进制字符串// 读取文件内容并发送QByteArray buffer;int seq = 0;// 创建 QFileInfo 对象auto fileInfo = std::make_shared<QFileInfo>(_file_name);QString fileName = fileInfo->fileName(); // 获取文件名qDebug() << "文件名是:" << fileName; // 输出文件名int total_size = fileInfo->size();int last_seq = 0;if(total_size % MAX_FILE_LEN){last_seq = (total_size/MAX_FILE_LEN)+1;}else{last_seq = total_size/MAX_FILE_LEN;}// 恢复文件指针到原来的位置file.seek(originalPos);//改为读取第一块并发送//每次读取2048字节发送buffer = file.read(MAX_FILE_LEN);QJsonObject jsonObj;// 将文件内容转换为 Base64 编码(可选)QString base64Data = buffer.toBase64();//qDebug() << "send data is " << base64Data;++seq;jsonObj["md5"] = _file_md5;jsonObj["name"] = fileName;jsonObj["seq"] = seq;jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;jsonObj["total_size"] = total_size;if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){jsonObj["last"] = 1;}else{jsonObj["last"] = 0;}jsonObj["data"]= base64Data;jsonObj["last_seq"] = last_seq;QJsonDocument doc(jsonObj);auto send_data = doc.toJson();TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);LogicMgr::Inst()->AddMD5File(_file_md5, fileInfo);//关闭文件file.close();}
收到响应后续传
当客户端收到服务器的回包后,解析后传递给LogicMgr, LogicMgr中需要将后续的报文段发送给服务器。我们封装如下逻辑
void LogicWorker::InitHandlers(){//注册上传消息_handlers[ID_UPLOAD_FILE_RSP] = [this](QJsonObject obj){auto err = obj["error"].toInt();if(err != RSP_SUCCESS){qDebug() << "upload msg rsp err is " << err;return;}auto name = obj["name"].toString();auto total_size = obj["total_size"].toInt();auto trans_size = obj["trans_size"].toInt();auto md5 = obj["md5"].toString();auto seq = obj["seq"].toInt();qDebug() << "recv : " << name << " file trans_size is " << trans_size;emit sig_trans_size(trans_size);//判断trans_size是否和total_size相等if(total_size == trans_size){return;}auto file_info = LogicMgr::Inst()->GetFileInfo(md5);if(!file_info){return;}//再次组织数据发送QFile file(file_info->filePath());if (!file.open(QIODevice::ReadOnly)) {qWarning() << "Could not open file:" << file.errorString();return;}//文件偏移到已经发送的位置,继续读取发送file.seek(trans_size);if(LogicMgr::Inst()->Pause()){return ;}QByteArray buffer;seq++;//每次读取2048字节发送buffer = file.read(MAX_FILE_LEN);QJsonObject jsonObj;// 将文件内容转换为 Base64 编码(可选)QString base64Data = buffer.toBase64();jsonObj["md5"] = md5;jsonObj["name"] = file_info->fileName();jsonObj["seq"] = seq;jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;jsonObj["total_size"] = total_size;if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){jsonObj["last"] = 1;}else{jsonObj["last"] = 0;}jsonObj["data"]= base64Data;jsonObj["last_seq"] = obj["last_seq"].toInt();QJsonDocument doc(jsonObj);auto send_data = doc.toJson();TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);file.close();};}
其中sig_trans_size为信号,通知主界面显示进度
暂停和续传
客户端需增加暂停和续传按钮,支持传说过程中暂停,点击后再继续上传等功能
void MainWindow::slot_pause_continue(){//续传状态或者初始状态,按下暂停按钮if(_cur_state == INIT || _cur_state == CONTINUE){//设置当前状态为暂停状态_b_pause = true;ui->pauseBtn->setText("继续");_cur_state = PAUSE;LogicMgr::Inst()->SetPause(true);return;}//判断当前为暂停状态,则点击后开启续传if(_cur_state == PAUSE){_b_pause = false;ui->pauseBtn->setText("暂停");_cur_state = CONTINUE ;LogicMgr::Inst()->SetPause(false);//发送请求获取文件信息,继续上传auto file_info = LogicMgr::Inst()->GetFileInfo(_file_md5);QJsonObject jsonObj;jsonObj["md5"] = _file_md5;QJsonDocument doc(jsonObj);auto send_data = doc.toJson();TcpClient::Inst().sendMsg(ID_SYNC_FILE_REQ, send_data);return;}}
这里继续上传需要请求一下服务器,同步之前的上传进度。
我们添加了新的协议ID_SYNC_FILE_REQ, 服务器收到后将状态和进度返回,客户端响应
_handlers[ID_SYNC_FILE_RSP] = [this](QJsonObject obj){auto err = obj["error"].toInt();if(err != RSP_SUCCESS){qDebug() << " msg rsp err is " << err;return;}auto md5 = obj["md5"].toString();auto seq = obj["seq"].toInt();auto total_size = obj["total_size"].toInt();auto file_info = LogicMgr::Inst()->GetFileInfo(md5);if(!file_info){qDebug() << "not found file" ;return;}//再次组织数据发送QFile file(file_info->filePath());if (!file.open(QIODevice::ReadOnly)) {qWarning() << "Could not open file:" << file.errorString();return;}auto trans_size = obj["trans_size"].toInt();//文件偏移到已经发送的位置,继续读取发送file.seek(trans_size);if(LogicMgr::Inst()->Pause()){return ;}QByteArray buffer;seq++;//每次读取2048字节发送buffer = file.read(MAX_FILE_LEN);QJsonObject jsonObj;// 将文件内容转换为 Base64 编码(可选)QString base64Data = buffer.toBase64();jsonObj["md5"] = md5;jsonObj["name"] = file_info->fileName();jsonObj["seq"] = seq;jsonObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;jsonObj["total_size"] = total_size;if(buffer.size() + (seq-1)*MAX_FILE_LEN == total_size){jsonObj["last"] = 1;}else{jsonObj["last"] = 0;}jsonObj["data"]= base64Data;jsonObj["last_seq"] = obj["last_seq"].toInt();QJsonDocument doc(jsonObj);auto send_data = doc.toJson();TcpClient::Inst().sendMsg(ID_UPLOAD_FILE_REQ, send_data);file.close();};
客户端根据返回的进度,按照偏移量读取指定文件,并且继续上报。
如果健壮一点,可以判断服务器返回的错误信息,根据错误,提示主界面做出交互显示等。这里不再赘述。
到此客户端设计完成。
单线程服务器改造
单线程服务器改造不大,只需要增加同步文件进度信息的处理逻辑,以及优化之前的上传处理逻辑即可
_fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto data = root["data"].asString();//std::cout << "recv file data is " << data << std::endl;Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_UPLOAD_FILE_RSP);});// 解码std::string decoded = base64_decode(data);auto md5 = root["md5"].asString();auto seq = root["seq"].asInt();auto name = root["name"].asString();auto total_size = root["total_size"].asInt();auto trans_size = root["trans_size"].asInt();auto file_path = ConfigMgr::Inst().GetFileOutPath();auto file_path_str = (file_path / name).string();std::cout << "file_path_str is " << file_path_str << std::endl;if (seq != 1) {auto iter = _map_md5_files.find(md5);if (iter == _map_md5_files.end()) {rtvalue["error"] = ErrorCodes::FileNotExists;return;}}std::ofstream outfile;//第一个包if (seq == 1) {// 打开文件,如果存在则清空,不存在则创建outfile.open(file_path_str, std::ios::binary | std::ios::trunc);//构造数据存储auto file_info = std::make_shared<FileInfo>();file_info->_file_path_str = file_path_str;file_info->_name = name;file_info->_seq = seq;file_info->_total_size = total_size;file_info->_trans_size = trans_size;std::lock_guard<std::mutex> lock(_file_mtx);_map_md5_files[md5] = file_info;}else {// 保存为文件outfile.open(file_path_str, std::ios::binary | std::ios::app);std::lock_guard<std::mutex> lock(_file_mtx);auto file_info = _map_md5_files[md5];file_info->_seq = seq;file_info->_trans_size = trans_size;}if (!outfile) {std::cerr << "无法打开文件进行写入。" << std::endl;return ;}outfile.write(decoded.data(), decoded.size());if (!outfile) {std::cerr << "写入文件失败。" << std::endl;return ;}outfile.close();std::cout << "文件已成功保存为: " << name << std::endl;rtvalue["error"] = ErrorCodes::Success;rtvalue["total_size"] = total_size;rtvalue["seq"] = seq;rtvalue["name"] = name;rtvalue["trans_size"] = trans_size;rtvalue["md5"] = md5;};_fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_SYNC_FILE_RSP);});auto md5 = root["md5"].asString();auto iter = _map_md5_files.find(md5);if (iter == _map_md5_files.end()) {rtvalue["error"] = ErrorCodes::FileNotExists;return;}rtvalue["error"] = ErrorCodes::Success;rtvalue["total_size"] = iter->second->_total_size;rtvalue["seq"] = iter->second->_seq;rtvalue["name"] = iter->second->_name;rtvalue["trans_size"] = iter->second->_trans_size;rtvalue["md5"] = md5;};
多线程服务器
多线程服务器改造和单线程类似
只不过将处理逻辑放入LogicWorker中
void LogicWorker::RegisterCallBacks(){_fun_callbacks[ID_TEST_MSG_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto data = root["data"].asString();std::cout << "recv test data is " << data << std::endl;Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_TEST_MSG_RSP);});rtvalue["error"] = ErrorCodes::Success;rtvalue["data"] = data;};_fun_callbacks[ID_UPLOAD_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto md5 = root["md5"].asString();auto seq = root["seq"].asInt();auto name = root["name"].asString();auto total_size = root["total_size"].asInt();auto trans_size = root["trans_size"].asInt();auto last = root["last"].asInt();auto file_data = root["data"].asString();auto file_path = ConfigMgr::Inst().GetFileOutPath();auto file_path_str = (file_path / name).string();Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_UPLOAD_FILE_RSP);});// 使用 std::hash 对字符串进行哈希std::hash<std::string> hash_fn;size_t hash_value = hash_fn(name); // 生成哈希值int index = hash_value % FILE_WORKER_COUNT;std::cout << "Hash value: " << hash_value << std::endl;//第一个包if (seq == 1) {//构造数据存储auto file_info = std::make_shared<FileInfo>();file_info->_file_path_str = file_path_str;file_info->_name = name;file_info->_seq = seq;file_info->_total_size = total_size;file_info->_trans_size = trans_size;LogicSystem::GetInstance()->AddMD5File(md5, file_info);}else {auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);if (file_info == nullptr) {rtvalue["error"] = ErrorCodes::FileNotExists;return;}file_info->_seq = seq;file_info->_trans_size = trans_size;}FileSystem::GetInstance()->PostMsgToQue(std::make_shared<FileTask>(session, name, seq, total_size,trans_size, last, file_data),index);rtvalue["error"] = ErrorCodes::Success;rtvalue["total_size"] = total_size;rtvalue["seq"] = seq;rtvalue["name"] = name;rtvalue["trans_size"] = trans_size;rtvalue["last"] = last;rtvalue["md5"] = md5;};_fun_callbacks[ID_SYNC_FILE_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_SYNC_FILE_RSP);});auto md5 = root["md5"].asString();auto file = LogicSystem::GetInstance()->GetFileInfo(md5);if (file == nullptr) {rtvalue["error"] = ErrorCodes::FileNotExists;return;}rtvalue["error"] = ErrorCodes::Success;rtvalue["total_size"] = file->_total_size;rtvalue["seq"] = file->_seq;rtvalue["name"] = file->_name;rtvalue["trans_size"] = file->_trans_size;rtvalue["md5"] = md5;};}
将进度信息存储在LogicSystem中,后续可参考填写入redis,方便后续分布式扩展,注意如果填写了多个资源服务器,还有写入服务器信息,这个不再赘述和进阶,我们只用一个资源服务器做演示,后续读者可自己进阶分布式设计。
void LogicSystem::AddMD5File(std::string md5, std::shared_ptr<FileInfo> fileinfo) {std::lock_guard<std::mutex> lock(_file_mtx);_map_md5_files[md5] = fileinfo;}std::shared_ptr<FileInfo> LogicSystem::GetFileInfo(std::string md5) {std::lock_guard<std::mutex> lock(_file_mtx);auto iter = _map_md5_files.find(md5);if (iter == _map_md5_files.end()) {return nullptr;}return iter->second;}
集成资源服务器
新架构形式
集成资源服务器后的架构为

将上述多线程服务器,整合到项目目录,同时设置资源属性表,复用之前的就可以了。
注意资源服务器配置要稍作修改
[GateServer]Port = 8080[VarifyServer]Host = 127.0.0.1Port = 50051[StatusServer]Host = 127.0.0.1Port = 50052[SelfServer]Name = reserverHost = 0.0.0.0Port = 9090RPCPort = 51055[Mysql]Host = 81.68.86.146Port = 3308User = rootPasswd = 123456.Schema = llfc[Redis]Host = 81.68.86.146Port = 6380Passwd = 123456[Static]Path = static[Output]Path = bin
客户端新增资源网络类
构造函数解析
因为客户端需要长连接资源服务器,采用TCP方式上传文件,所以需要封装一个单例的FileTcpMgr类,用于上传资源。
class FileTcpMgr : public QObject, public Singleton<FileTcpMgr>,public std::enable_shared_from_this<FileTcpMgr>{Q_OBJECTpublic:friend class Singleton<FileTcpMgr>;~FileTcpMgr();void SendData(ReqId reqId, QByteArray data);void CloseConnection();private:void initHandlers();explicit FileTcpMgr(QObject *parent = nullptr);void registerMetaType();void handleMsg(ReqId id, int len, QByteArray data);QTcpSocket _socket;QString _host;uint16_t _port;QByteArray _buffer;bool _b_recv_pending;quint16 _message_id;quint32 _message_len;QMap<ReqId, std::function<void(ReqId id, int len, QByteArray data)>> _handlers;//发送队列QQueue<QByteArray> _send_queue;//正在发送的包QByteArray _current_block;//当前已发送的字节数qint64 _bytes_sent;//是否正在发送bool _pending;signals:void sig_send_data(ReqId reqId, QByteArray data);void sig_con_success(bool bsuccess);void sig_connection_closed();public slots:void slot_send_data(ReqId reqId, QByteArray data);void slot_tcp_connect(std::shared_ptr<ServerInfo> si);};
构造函数具体实现
FileTcpMgr::FileTcpMgr(QObject *parent) : QObject(parent),_host(""), _port(0), _b_recv_pending(false), _message_id(0), _message_len(0), _bytes_sent(0), _pending(false){registerMetaType();QObject::connect(&_socket, &QTcpSocket::connected, this, [&]() {qDebug() << "Connected to server!";emit sig_con_success(true);});QObject::connect(&_socket, &QTcpSocket::readyRead, this, [&]() {// 当有数据可读时,读取所有数据// 读取所有数据并追加到缓冲区_buffer.append(_socket.readAll());QDataStream stream(&_buffer, QIODevice::ReadOnly);stream.setVersion(QDataStream::Qt_5_0);forever {//先解析头部if(!_b_recv_pending){// 检查缓冲区中的数据是否足够解析出一个消息头(消息ID + 消息长度)if (_buffer.size() < FILE_UPLOAD_HEAD_LEN) {return; // 数据不够,等待更多数据}// 预读取消息ID和消息长度,但不从缓冲区中移除stream >> _message_id >> _message_len;//将buffer 中的前六个字节移除_buffer = _buffer.mid(FILE_UPLOAD_HEAD_LEN);// 输出读取的数据qDebug() << "Message ID:" << _message_id << ", Length:" << _message_len;}//buffer剩余长读是否满足消息体长度,不满足则退出继续等待接受if(_buffer.size() < _message_len){_b_recv_pending = true;return;}_b_recv_pending = false;// 读取消息体QByteArray messageBody = _buffer.mid(0, _message_len);qDebug() << "receive body msg is " << messageBody ;_buffer = _buffer.mid(_message_len);handleMsg(ReqId(_message_id),_message_len, messageBody);}});//5.15 之后版本// QObject::connect(&_socket, QOverload<QAbstractSocket::SocketError>::of(&QTcpSocket::errorOccurred), [&](QAbstractSocket::SocketError socketError) {// Q_UNUSED(socketError)// qDebug() << "Error:" << _socket.errorString();// });// 处理错误(适用于Qt 5.15之前的版本)QObject::connect(&_socket, static_cast<void (QTcpSocket::*)(QTcpSocket::SocketError)>(&QTcpSocket::error),this,[&](QTcpSocket::SocketError socketError) {qDebug() << "Error:" << _socket.errorString() ;//todo... 根据错误类型做不同的处理switch (socketError) {case QTcpSocket::ConnectionRefusedError:qDebug() << "Connection Refused!";emit sig_con_success(false);break;case QTcpSocket::RemoteHostClosedError:qDebug() << "Remote Host Closed Connection!";break;case QTcpSocket::HostNotFoundError:qDebug() << "Host Not Found!";emit sig_con_success(false);break;case QTcpSocket::SocketTimeoutError:qDebug() << "Connection Timeout!";emit sig_con_success(false);break;case QTcpSocket::NetworkError://qDebug() << "Network Error!";break;default://qDebug() << "Other Error!";break;}});// 处理连接断开QObject::connect(&_socket, &QTcpSocket::disconnected, this,[&]() {qDebug() << "Disconnected from server.";emit sig_connection_closed();});//连接发送信号用来发送数据QObject::connect(this, &FileTcpMgr::sig_send_data, this, &FileTcpMgr::slot_send_data);//连接发送信号QObject::connect(&_socket, &QTcpSocket::bytesWritten, this, [this](qint64 bytes) {//更新发送数据_bytes_sent += bytes;//未发送完整if (_bytes_sent < _current_block.size()) {//继续发送auto data_to_send = _current_block.mid(_bytes_sent);_socket.write(data_to_send);return;}//发送完全,则查看队列是否为空if (_send_queue.isEmpty()) {//队列为空,说明已经将所有数据发送完成,将pending设置为false,这样后续要发送数据时可以继续发送_current_block.clear();_pending = false;_bytes_sent = 0;return;}//队列不为空,则取出队首元素_current_block = _send_queue.dequeue();_bytes_sent = 0;_pending = true;qint64 w2 = _socket.write(_current_block);qDebug() << "[TcpMgr] Dequeued and write() returned" << w2;});//连接QObject::connect(this, &FileTcpMgr::sig_close, this, &FileTcpMgr::slot_tcp_close);//注册消息initHandlers();}
简单描述下上述构造函数做的事情:
- 成功连接服务器后,会触发
QTcpSocket::connected信号,从而回调lambda表达式,发送sig_con_success信号 - 接收服务器传输的数据,会触发
QTcpSocket::readyRead信号,从而回调lambda表达式,在这里处理头部信息和包体信息。进行TLV协议解析后回调handleMsg。 - 捕获
QTcpSocket::SocketError信号,当出错后回调lambda表达式发送信号通知主界面错误。 - 捕获连接断开信号
QTcpSocket::disconnected,回调lambda表达式,通知主界面连接断开。 - 连接发送信号
sig_send_data,因为socket在独立线程,不能直接调用发送,所以改为异步发送,触发槽函数slot_send_data - 因为异步发送,可能存在未发送完全的情况,所以我们用
QTcpSocket::bytesWritten来检测发送了多少字节,通过lambda表达式回调处理,继续发送数据。 - 因为
socket被独立为单独线程,所以关闭也不能直接调用close(socket), 需要统一在槽函数中处理。
注册元对象系统的逻辑不再赘述。
连接槽函数
我们实现槽函数slot_tcp_connect用来创建客户端到资源服务器的连接
void FileTcpMgr::slot_tcp_connect(std::shared_ptr<ServerInfo> si){qDebug()<< "receive tcp connect signal";// 尝试连接到服务器qDebug() << "Connecting to server...";_host = si->_res_host;_port = static_cast<uint16_t>(si->_res_port.toUInt());_socket.connectToHost(_host, _port);}
注册处理流程
注册上传头像回调逻辑
void FileTcpMgr::initHandlers(){//todo 接收上传用户头像回复_handlers.insert(ID_UPLOAD_HEAD_ICON_RSP, [this](ReqId id, int len, QByteArray data){Q_UNUSED(len);qDebug()<< "handle id is "<< id ;// 将QByteArray转换为QJsonDocumentQJsonDocument jsonDoc = QJsonDocument::fromJson(data);// 检查转换是否成功if(jsonDoc.isNull()){qDebug() << "Failed to create QJsonDocument.";return;}QJsonObject recvObj = jsonDoc.object();qDebug()<< "data jsonobj is " << recvObj ;if(!recvObj.contains("error")){int err = ErrorCodes::ERR_JSON;qDebug() << "icon upload_failed, err is Json Parse Err" << err ;//todo ... 提示上传失败//emit upload_failed();return;}int err = recvObj["error"].toInt();if(err != ErrorCodes::SUCCESS){qDebug() << "Login Failed, err is " << err ;//emit upload_failed();return;}auto md5 = recvObj["md5"].toString();auto seq = recvObj["seq"].toInt();auto trans_size = recvObj["trans_size"].toInt();auto uid = recvObj["uid"].toInt();auto total_size = recvObj["total_size"].toInt();auto name = recvObj["name"].toString();qDebug() << "recv : " << name << "file trans_size is " << trans_size;//判断trans_size和total_size相等if(total_size == trans_size){return;}auto file_info = UserMgr::GetInstance()->GetFileInfoByMD5(md5);if(!file_info){return;}//再次组织数据发送QFile file(file_info->filePath());if(!file.open(QIODevice::ReadOnly)){qWarning() << "Could not open file: " << file.errorString();return;}//文件偏移到已经发送的位置,继续读取发送file.seek(trans_size);QByteArray buffer;seq ++;//每次读取2048字节发送buffer = file.read(MAX_FILE_LEN);QJsonObject sendObj;//将文件内容转换为base64编码QString base64Data = buffer.toBase64();sendObj["md5"] = md5;sendObj["name"] = file_info->fileName();sendObj["seq"] = seq;sendObj["trans_size"] = buffer.size() + (seq-1)*MAX_FILE_LEN;sendObj["total_size"] = total_size;if(buffer.size() + (seq-1)*MAX_FILE_LEN >= total_size){sendObj["last"] = 1;}else{sendObj["last"] = 0;}sendObj["data"] = base64Data;sendObj["last_seq"] = recvObj["last_seq"].toInt();sendObj["uid"] = uid;QJsonDocument doc(sendObj);auto send_data = doc.toJson();SendData(ID_UPLOAD_HEAD_ICON_REQ, send_data);file.close();});}
独立文件线程
对于上传我们独立到文件上报线程中
class FileTcpThread: public std::enable_shared_from_this<FileTcpThread>{public:FileTcpThread();~FileTcpThread();private:QThread * _file_tcp_thread;};
具体实现
FileTcpThread::FileTcpThread(){_file_tcp_thread = new QThread();FileTcpMgr::GetInstance()->moveToThread(_file_tcp_thread);QObject::connect(_file_tcp_thread, &QThread::finished, _file_tcp_thread, &QObject::deleteLater);_file_tcp_thread->start();}FileTcpThread::~FileTcpThread(){_file_tcp_thread->quit();}
主函数调用
#include "mainwindow.h"#include <QApplication>#include <QFile>#include "global.h"#include "tcpmgr.h"#include "filetcpmgr.h"int main(int argc, char *argv[]){QApplication a(argc, argv);QFile qss(":/style/stylesheet.qss");if( qss.open(QFile::ReadOnly)){qDebug("open success");QString style = QLatin1String(qss.readAll());a.setStyleSheet(style);qss.close();}else{qDebug("Open failed");}// 获取当前应用程序的路径QString app_path = QCoreApplication::applicationDirPath();// 拼接文件名QString fileName = "config.ini";QString config_path = QDir::toNativeSeparators(app_path +QDir::separator() + fileName);QSettings settings(config_path, QSettings::IniFormat);QString gate_host = settings.value("GateServer/host").toString();QString gate_port = settings.value("GateServer/port").toString();gate_url_prefix = "http://"+gate_host+":"+gate_port;//启动tcp线程TcpThread tcpthread;//启动资源网络线程FileTcpThread file_tcp_thread;MainWindow w;w.show();return a.exec();}
原来的登录流程稍作修改,连接好ChatServer后,连接ResourceServer, 最后再让用户登录。
服务器逻辑
服务器新增文件上报逻辑处理, 在LogicWorker::RegisterCallBacks中添加
_fun_callbacks[ID_UPLOAD_HEAD_ICON_REQ] = [this](shared_ptr<CSession> session, const short& msg_id,const string& msg_data) {Json::Reader reader;Json::Value root;reader.parse(msg_data, root);auto md5 = root["md5"].asString();auto seq = root["seq"].asInt();auto name = root["name"].asString();auto total_size = root["total_size"].asInt();auto trans_size = root["trans_size"].asInt();auto last = root["last"].asInt();auto file_data = root["data"].asString();auto uid = root["uid"].asInt();auto token = root["token"].asString();auto last_seq = root["last_seq"].asInt();//转化为字符串auto uid_str = std::to_string(uid);auto file_path = ConfigMgr::Inst().GetFileOutPath();auto file_path_str = (file_path / uid_str / name).string();Json::Value rtvalue;Defer defer([this, &rtvalue, session]() {std::string return_str = rtvalue.toStyledString();session->Send(return_str, ID_UPLOAD_HEAD_ICON_RSP);});//第一个包校验一下token是否合理if (seq == 1) {//从redis获取用户token是否正确std::string uid_str = std::to_string(uid);std::string token_key = USERTOKENPREFIX + uid_str;std::string token_value = "";bool success = RedisMgr::GetInstance()->Get(token_key, token_value);if (!success) {rtvalue["error"] = ErrorCodes::UidInvalid;return;}if (token_value != token) {rtvalue["error"] = ErrorCodes::TokenInvalid;return;}}// 使用 std::hash 对字符串进行哈希std::hash<std::string> hash_fn;size_t hash_value = hash_fn(name); // 生成哈希值int index = hash_value % FILE_WORKER_COUNT;std::cout << "Hash value: " << hash_value << std::endl;//第一个包if (seq == 1) {//构造数据存储auto file_info = std::make_shared<FileInfo>();file_info->_file_path_str = file_path_str;file_info->_name = name;file_info->_seq = seq;file_info->_total_size = total_size;file_info->_trans_size = trans_size;LogicSystem::GetInstance()->AddMD5File(md5, file_info);}else {auto file_info = LogicSystem::GetInstance()->GetFileInfo(md5);if (file_info == nullptr) {rtvalue["error"] = ErrorCodes::FileNotExists;return;}file_info->_seq = seq;file_info->_trans_size = trans_size;}FileSystem::GetInstance()->PostMsgToQue(std::make_shared<FileTask>(session, file_path_str, name, seq, total_size,trans_size, last, file_data),index);rtvalue["error"] = ErrorCodes::Success;rtvalue["total_size"] = total_size;rtvalue["seq"] = seq;rtvalue["name"] = name;rtvalue["trans_size"] = trans_size;rtvalue["last"] = last;rtvalue["md5"] = md5;rtvalue["uid"] = uid;rtvalue["last_seq"] = last_seq;};
源码
https://gitee.com/secondtonone1/llfcchat
效果展示:
上传前

上传后

服务器存储成功
