聊天信息持久化存储方案

需求分析

我们希望客户端在登录后,从服务器拉取聊天信息,并且展示。常规的设计中,客户端本地也会有一个数据库,缓存上一次获取的最后的聊天信息,如果客户端下线了,再次登录,只需要从服务器拉取未接受的数据即可。

所以综合考虑过后将需求列出

  1. 客户端本地数据库缓存已经接受的消息(以后再做)
  2. 客户端登录后,将本地数据的最大的消息id发送给服务器,服务器根据这个id去数据库查找,找到比这个id大的消息,将消息回传给客户端
  3. 客户端登录后,先加载旧的数据,再差异加载未读取的数据即可。

客户端本地数据库存储放在之后实现,所以我们客户端目前只发送消息id为0即可。

数据模型设计

  1. 消息唯一标识
    • 在服务器端的 MySQL 表里,为每条消息分配一个全局唯一的自增主键(message_id),再配合时间戳(created_at)。
    • 客户端本地用同样的 message_id 做主键,这样就能很方便地做增量同步与去重。
  2. 会话/用户维度的索引
    • 如果支持多对多(群聊),再维护一个会话表(thread_id)和用户—会话关联表。
    • 查询和分页时,都按 (thread_id, message_id)(thread_id, created_at) 建复合索引,加速筛选。

同步流程

  1. 客户端登录时

    1. 从本地 SQLite 加载最近 N 条消息(按 message_id 或时间倒序),渲染到界面。

    2. 读取本地记录的「每个会话已同步到的最大 message_id」,发送给服务器:

      1. {
      2. "action": "fetch_messages",
      3. "thread_id": 123,
      4. "since_id": 3456
      5. }
  2. 服务器端响应

    • 查询 WHERE thread_id=123 AND message_id>3456 ORDER BY message_id ASC LIMIT 1000
    • 返回消息列表(可以分页返回,大量时前端可循环拉取,或返回 has_more 标记)。
  3. 客户端接收并保存

    • 将服务器返回的消息批量插入本地 SQLite,注意用「主键冲突忽略(INSERT OR IGNORE)」防止重复。
    • 更新本地「已同步最大 message_id」。
  4. 后续聊天时

    • 新消息既推到服务器,也实时写入本地 SQLite。
    • 如果走长连接(Asio + 自定义协议或使用 WebSocket),服务器收到新消息后直接广播给在线客户端,并提示客户端写到本地。
    • 如果客户端离线,新消息积累在服务器,下一次登录再按 above 流程拉取。

聊天消息表

下面给出消息聊天表的字段和解释,包含了message_id, thread_id以及常见的其他字段

  1. CREATE TABLE `chat_message` (
  2. `message_id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  3. `thread_id` BIGINT UNSIGNED NOT NULL,
  4. `sender_id` BIGINT UNSIGNED NOT NULL,
  5. `recv_id` BIGINT UNSIGNED NOT NULL,
  6. `content` TEXT NOT NULL,
  7. `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  8. `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  9. `status` TINYINT NOT NULL DEFAULT 0 COMMENT '0=未读 1=已读 2=撤回',
  10. PRIMARY KEY (`message_id`),
  11. KEY `idx_thread_created` (`thread_id`, `created_at`),
  12. KEY `idx_thread_message` (`thread_id`, `message_id`)
  13. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

字段说明

  • message_id:全局自增主键,唯一标识一条消息。
  • thread_id:会话(单聊、群聊)ID,同一会话下的所有消息共用一个 thread_id
  • sender_id:发送者用户 ID,指向用户表的主键。
  • recv_id : 接收者用户ID,指向用户表主键
  • content:消息正文,TEXT 类型,适合存储普通文字。
  • created_at:消息创建时间,自动记录插入时刻。
  • updated_at:消息更新时间,可用于标记“撤回”(status 变更)、编辑等操作。
  • status:消息状态,用于标记未读/已读/撤回等(也可扩展更多状态)。

索引设计

  1. 主键索引PRIMARY KEY (message_id) 用于唯一检索消息。
  2. 会话+时间索引KEY (thread_id, created_at) 支持按会话分页、按时间范围查询。
  3. 会话+消息ID 索引KEY (thread_id, message_id) 支持按 message_id 做增量拉取(WHERE thread_id=… AND message_id > since_id)。

可选扩展

  • 群聊用户表:如果支持群聊,需要一个 thread_member 表,记录每个 thread_id 下的成员及其角色。
  • 附件支持:若要存储图片/文件,可额外建 message_attachment 表,字段例如 attachment_idmessage_idfile_urlfile_type
  • 已读回执:单独设计 message_read 表,记录哪些用户在何时已读了该消息,字段如 (message_id, user_id, read_at)

会话消息表

全局聊天线程表

建立chat_thread主表,给它一个全局自增id,记录所有私聊/群聊的线程统一入口

  1. CREATE TABLE chat_thread (
  2. `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  3. `type` ENUM('private','group') NOT NULL,
  4. `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  5. PRIMARY KEY (id)
  6. );

单聊表设计

对于单聊,只有两个人,所以可以直接在private_chat表中定义两个字段存储user1_id和user2_id,这样能直接确定参与者

  1. CREATE TABLE `private_chat` (
  2. `thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用chat_thread.id',
  3. `user1_id` BIGINT UNSIGNED NOT NULL,
  4. `user2_id` BIGINT UNSIGNED NOT NULL,
  5. `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  6. PRIMARY KEY (`thread_id`),
  7. UNIQUE KEY `uniq_private_thread` (`user1_id`, `user2_id`), -- 保证每对用户只能有一个私聊会话
  8. -- 以下两行就是我们要额外加的复合索引
  9. KEY `idx_private_user1_thread` (`user1_id`, `thread_id`),
  10. KEY `idx_private_user2_thread` (`user2_id`, `thread_id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 通过 user1_iduser2_id 唯一确定一个单聊会话
  • 询某两个用户的单聊时,直接 SELECT 即可。

群聊表设计

群聊相较于单聊要复杂一些,需要记录每个群聊的多名成员及其角色、权限等信息

先建一个独立的会话(线程)表:

  1. CREATE TABLE `group_chat` (
  2. `thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用chat_thread.id',
  3. `name` VARCHAR(255) DEFAULT NULL COMMENT '群聊名称',
  4. `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  5. PRIMARY KEY (`thread_id`)
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  • 群聊会话表只存储群聊本身的信息(如群名称、创建时间等),thread_id 是唯一标识符

群聊成员表设计

  • 群聊成员表用于存储群聊中各成员的信息(包括角色、加入时间、禁言等)。
  1. CREATE TABLE `group_chat_member` (
  2. `thread_id` BIGINT UNSIGNED NOT NULL COMMENT '引用 group_chat_thread.thread_id',
  3. `user_id` BIGINT UNSIGNED NOT NULL COMMENT '引用 user.user_id',
  4. `role` TINYINT NOT NULL DEFAULT 0 COMMENT '0=普通成员,1=管理员,2=创建者',
  5. `joined_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  6. `muted_until` TIMESTAMP NULL COMMENT '如果被禁言,可存到什么时候',
  7. PRIMARY KEY (`thread_id`, `user_id`),
  8. KEY `idx_user_threads` (`user_id`)
  9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

前端聊天框调整

回顾

我们先回顾一下之前设计的聊天框

https://cdn.llfc.club/1718417551126.jpg

对于我们自己发出的信息,我们可以实现这样一个网格布局管理

https://cdn.llfc.club/1718423760358.jpg

NameLabel用来显示用户的名字,Bubble用来显示聊天信息,Spacer是个弹簧,保证将NameLabel``,IconLabelBubble等挤压到右侧。

如果是别人发出的消息,我们设置这样一个网格布局

https://cdn.llfc.club/1718497364660.jpg

增加状态标签

因为自己发送的时候要增加发送状态(发送失败,未读,已读)三种,所以考虑将自己发送的消息改为如下

image-20250601121313179

大体结构如下

  1. 0 1 2 3
  2. ┌───────┬───────────┬────────────┬──────────┐
  3. 0 (空) m_pNameLabel m_pIconLabel
  4. col=1, (右对齐+8px)│ (跨两行、靠上)
  5. 未放置)│
  6. ├───────┼───────────┼────────────┴──────────┤
  7. 1 pSpacer m_pStatusLabel m_pBubble m_pIconLabel
  8. (row=1, (聊天气泡) (继续占位)
  9. col=1)
  10. └───────┴───────────┴──────────────────────┘

代码修改如下

  1. ChatItemBase::ChatItemBase(ChatRole role, QWidget *parent)
  2. : QWidget(parent)
  3. , m_role(role)
  4. {
  5. m_pNameLabel = new QLabel();
  6. m_pNameLabel->setObjectName("chat_user_name");
  7. QFont font("Microsoft YaHei");
  8. font.setPointSize(9);
  9. m_pNameLabel->setFont(font);
  10. m_pNameLabel->setFixedHeight(20);
  11. m_pIconLabel = new QLabel();
  12. m_pIconLabel->setScaledContents(true);
  13. m_pIconLabel->setFixedSize(42, 42);
  14. m_pBubble = new QWidget();
  15. QGridLayout *pGLayout = new QGridLayout();
  16. pGLayout->setVerticalSpacing(3);
  17. pGLayout->setHorizontalSpacing(3);
  18. pGLayout->setMargin(3);
  19. QSpacerItem*pSpacer = new QSpacerItem(40, 20, QSizePolicy::Expanding, QSizePolicy::Minimum);
  20. //添加状态图标控件
  21. m_pStatusLabel = new QLabel();
  22. m_pStatusLabel->setFixedSize(16, 16);
  23. m_pStatusLabel->setScaledContents(true);
  24. if(m_role == ChatRole::Self)
  25. {
  26. m_pNameLabel->setContentsMargins(0,0,8,0);
  27. m_pNameLabel->setAlignment(Qt::AlignRight);
  28. //名字标签
  29. pGLayout->addWidget(m_pNameLabel, 0,2, 1,1);
  30. //icon 头像
  31. pGLayout->addWidget(m_pIconLabel, 0, 3, 2,1, Qt::AlignTop);
  32. //第 0 列:依然是 pSpacer,占用第 1 行,第 0 列
  33. pGLayout->addItem(pSpacer, 1, 0, 1, 1);
  34. //气泡控件
  35. pGLayout->addWidget(m_pBubble, 1,2, 1,1);
  36. //状态图标
  37. pGLayout->addWidget(m_pStatusLabel, 1, 1, 1, 1, Qt::AlignCenter);
  38. pGLayout->setColumnStretch(0, 2);
  39. pGLayout->setColumnStretch(1, 0); // status 图标 (固定大小)
  40. pGLayout->setColumnStretch(2, 3); // 名字 + 气泡 (主要拉伸区域)
  41. pGLayout->setColumnStretch(3, 0); // 头像 (固定大小)
  42. }else{
  43. m_pNameLabel->setContentsMargins(8,0,0,0);
  44. m_pNameLabel->setAlignment(Qt::AlignLeft);
  45. pGLayout->addWidget(m_pIconLabel, 0, 0, 2,1, Qt::AlignTop);
  46. pGLayout->addWidget(m_pNameLabel, 0,1, 1,1);
  47. pGLayout->addWidget(m_pBubble, 1,1, 1,1);
  48. pGLayout->addItem(pSpacer, 2, 2, 1, 1);
  49. pGLayout->setColumnStretch(1, 3);
  50. pGLayout->setColumnStretch(2, 2);
  51. }
  52. this->setLayout(pGLayout);
  53. }

增加接口设置状态

  1. void ChatItemBase::setStatus(int status) {
  2. if (status == MsgStatus::UN_READ) {
  3. m_pStatusLabel->setPixmap(QPixmap(":/res/unread.png"));
  4. return;
  5. }
  6. if (status == MsgStatus::SEND_FAILE) {
  7. m_pStatusLabel->setPixmap(QPixmap(":/res/send_fail.png"));
  8. return;
  9. }
  10. if (status == MsgStatus::READED) {
  11. m_pStatusLabel->setPixmap(QPixmap(":/res/readed.png"));
  12. return;
  13. }
  14. }

客户端同步流程

客户端本地会有sql记录该用户所有聊天记录最后收到的消息信息,包括message_id,thread_id等,每次客户端登录将本地最大messag_idthread_id发送给服务器,服务器按照每个thread_id将信息恢复给客户端,可支持分页返回。

举例

比如第一次请求,客户端携带message_id为1001,thread_id为22,那么服务器就会去chat_message中升序查找,比message_id(1001)大且thread_id为22的消息,返回20条

客户端拿到20条消息后,可根据最后一个消息messag_id继续请求消息。

所以我们得出一个结论要拉取消息就要有thread_id以及message_id

接下来的情形分为两种

情况1

本地有thread_id,但是在该用户A离线的时候B用户给他发消息,因为他们之前没有聊过天,所以此时B会通知服务器在private_chat表中创建新的thread_id,但是A本地数据库没有这个thread_id,所以A需要在登录时拉取.

拉取就传递目前A本地数据库中最大的thead_id以及自己的user_id给服务器,服务器去查找比这个thread_id大的会话列表返回即可,采取分页的方式,每次加载100个,并配合load_more字段通知客户端是否继续拉取

如果load_more字段为true则客户端继续拉取,传递上次服务器给它同步的最大的thread_id,服务器继续返回比thread_id大的会话列表。

直到load_morefalse,客户端不再拉取。

情况2

如果客户端换了新机器,本地没有记录信息,那么就需要在用户登录后向服务器发送user_idthread_id,thread_id 请求从 0 开始,服务器将返回该用户的所有聊天thread_id,必须分页返回,并且携带 load_more 字段,字段和上面类似。

一个服务器返回的数据格式如下

  1. {
  2. "error":0,
  3. "uid" : 1001,
  4. "load_more":true,
  5. "threads":[
  6. {
  7. "thread_id": 1001,
  8. "type": "private",
  9. "user1_id": 1019,
  10. "user2_id": 1020
  11. },
  12. {
  13. "thread_id": 1002,
  14. "type": "group",
  15. "user1_id": 0,
  16. "user2_id": 0,
  17. },
  18. {
  19. "thread_id": 1003,
  20. "type": "private",
  21. "user1_id": 1019,
  22. "user2_id": 1021
  23. },
  24. {
  25. "thread_id": 1004,
  26. "type": "group",
  27. "user1_id": 0,
  28. "user2_id": 0
  29. }
  30. ]
  31. }

可采用如下sql语句查询

  1. -- 1) CTE 把私聊/群聊合并好
  2. WITH all_threads AS (
  3. SELECT
  4. thread_id,
  5. 'private' AS type,
  6. user1_id,
  7. user2_id
  8. FROM private_chat
  9. WHERE (user1_id = :me OR user2_id = :me)
  10. AND thread_id > :last_id
  11. UNION ALL
  12. SELECT
  13. thread_id,
  14. 'group' AS type,
  15. NULL AS user1_id,
  16. NULL AS user2_id
  17. FROM group_chat_member
  18. WHERE user_id = :me
  19. AND thread_id > :last_id
  20. )
  21. -- 2) thread_id 升序,取 page_size+1
  22. SELECT *
  23. FROM all_threads
  24. ORDER BY thread_id
  25. LIMIT :page_size + 1;

然后在服务端(伪代码)处理结果:

  1. def fetch_threads(me, last_id, page_size):
  2. rows = db.query(sql, { "me": me, "last_id": last_id, "page_size": page_size })
  3. # rows 最多有 page_size+1 条
  4. if len(rows) > page_size:
  5. load_more = True
  6. rows = rows[:-1] # 丢掉第 page_size+1 条
  7. else:
  8. load_more = False
  9. # 更新下一次游标:取最后一条的 thread_id
  10. if rows:
  11. next_last_id = rows[-1]["thread_id"]
  12. else:
  13. next_last_id = last_id
  14. return {
  15. "data": rows,
  16. "next_last_id": next_last_id,
  17. "load_more": load_more
  18. }

说明

  1. 为什么要多取 1 条?
    • page_size + 1 条后,如果结果确实多出那 1 条,就说明“在本页之后”还有数据;
    • 如果正好只有 page_size 条或更少,就可以断定已经取尽。
  2. 游标(cursor)模式 vs OFFSET
    • 用游标(thread_id > last_id)可以保证性能,避免大 OFFSET 带来的全表扫描。
    • 每次请求只跑新数据所在的索引范围。
  3. 客户端流程
    • 初次加载:传 last_id = 0
    • 点「加载更多」:传上次接口返回的 next_last_id
    • 收到 load_more = false:表示已到末尾,不要再发更多请求。

当然为了提升效率,可以在用户登录后,选择是否同步消息的勾选框

如果勾选则调用上述sql语句查询该用户所有chat_thread返回。

如果没勾选,就不用加载chat_thread。

重构聊天item

需要重构聊天左侧item列表结构,以支持聊天消息记录持久化存储。

默认情况下,会检索本地客户端是否有聊天记录信息,

如果没有则需要请求所有thread_id列表,然后更新左侧item列表。

如果有,也需要差异化加载 thread_id 列表,比如说 A 下线了,B 和 A 通信,A 之前没有收到过 B 的信息,所以也要拉取所有新建立的会话。

所以当务之急是先把这个聊天列表加载好

因为我们没有为客户端设置本地数据库,所以我们默认每次用户登录都请求一下所有thread_id列表,这样方便测试效果

Server返回聊天列表

Server需要根据用户uid返回他的聊天列表

1 注册消息

  1. _fun_callbacks[ID_LOAD_CHAT_THREAD_REQ] = std::bind(&LogicSystem::GetUserThreadsHandler, this,
  2. placeholders::_1, placeholders::_2, placeholders::_3);

2 实现获取聊天记录逻辑

  1. void LogicSystem::GetUserThreadsHandler(std::shared_ptr<CSession> session,
  2. const short& msg_id, const string& msg_data)
  3. {
  4. //从数据库加chat_threads记录
  5. Json::Reader reader;
  6. Json::Value root;
  7. reader.parse(msg_data, root);
  8. auto uid = root["uid"].asInt();
  9. std::cout << "get uid threads " << uid << std::endl;
  10. Json::Value rtvalue;
  11. rtvalue["error"] = ErrorCodes::Success;
  12. rtvalue["uid"] = uid;
  13. Defer defer([this, &rtvalue, session]() {
  14. std::string return_str = rtvalue.toStyledString();
  15. session->Send(return_str, ID_LOAD_CHAT_THREAD_RSP);
  16. });
  17. std::vector<std::shared_ptr<ChatThreadInfo>> threads;
  18. bool res = GetUserThreads(uid, threads);
  19. if (!res) {
  20. rtvalue["error"] = ErrorCodes::UidInvalid;
  21. return;
  22. }
  23. //整理threads数据写入json返回
  24. for (auto& thread : threads) {
  25. Json::Value thread_value;
  26. thread_value["thread_id"] = int(thread->_thread_id);
  27. thread_value["type"] = thread->_type;
  28. thread_value["user1_id"] = thread->_user1_id;
  29. thread_value["user2_id"] = thread->_user2_id;
  30. rtvalue["threads"].append(thread_value);
  31. }
  32. }
  33. bool LogicSystem::GetUserThreads(int userId,
  34. std::vector<std::shared_ptr<ChatThreadInfo>>& threads)
  35. {
  36. return MysqlMgr::GetInstance()->GetUserThreads(userId, threads);
  37. }

3 数据库加载聊天

  1. // 新增两个输出参数:loadMore, nextLastId
  2. bool MysqlDao::GetUserThreads(
  3. int64_t userId,
  4. int64_t lastId,
  5. int pageSize,
  6. std::vector<std::shared_ptr<ChatThreadInfo>>& threads,
  7. bool& loadMore,
  8. int64_t& nextLastId)
  9. {
  10. // 初始状态
  11. loadMore = false;
  12. nextLastId = lastId;
  13. threads.clear();
  14. auto con = pool_->getConnection();
  15. if (!con) {
  16. return false;
  17. }
  18. Defer defer([this, &con]() {
  19. pool_->returnConnection(std::move(con));
  20. });
  21. auto& conn = con->_con;
  22. try {
  23. // 准备分页查询:CTE + UNION ALL + ORDER + LIMIT N+1
  24. std::string sql =
  25. "WITH all_threads AS ( "
  26. " SELECT thread_id, 'private' AS type, user1_id, user2_id "
  27. " FROM private_chat "
  28. " WHERE (user1_id = ? OR user2_id = ?) "
  29. " AND thread_id > ? "
  30. " UNION ALL "
  31. " SELECT thread_id, 'group' AS type, 0 AS user1_id, 0 AS user2_id "
  32. " FROM group_chat_member "
  33. " WHERE user_id = ? "
  34. " AND thread_id > ? "
  35. ") "
  36. "SELECT thread_id, type, user1_id, user2_id "
  37. " FROM all_threads "
  38. " ORDER BY thread_id "
  39. " LIMIT ?;";
  40. std::unique_ptr<sql::PreparedStatement> pstmt(
  41. conn->prepareStatement(sql));
  42. // 绑定参数:? 对应 (userId, userId, lastId, userId, lastId, pageSize+1)
  43. int idx = 1;
  44. pstmt->setInt64(idx++, userId); // private.user1_id
  45. pstmt->setInt64(idx++, userId); // private.user2_id
  46. pstmt->setInt64(idx++, lastId); // private.thread_id > lastId
  47. pstmt->setInt64(idx++, userId); // group.user_id
  48. pstmt->setInt64(idx++, lastId); // group.thread_id > lastId
  49. pstmt->setInt(idx++, pageSize + 1); // LIMIT pageSize+1
  50. // 执行
  51. std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());
  52. // 先把所有行读到临时容器
  53. std::vector<std::shared_ptr<ChatThreadInfo>> tmp;
  54. while (res->next()) {
  55. auto cti = std::make_shared<ChatThreadInfo>();
  56. cti->_thread_id = res->getInt64("thread_id");
  57. cti->_type = res->getString("type");
  58. cti->_user1_id = res->getInt64("user1_id");
  59. cti->_user2_id = res->getInt64("user2_id");
  60. tmp.push_back(cti);
  61. }
  62. // 判断是否多取到一条
  63. if ((int)tmp.size() > pageSize) {
  64. loadMore = true;
  65. tmp.pop_back(); // 丢掉第 pageSize+1 条
  66. }
  67. // 如果还有数据,更新 nextLastId 为最后一条的 thread_id
  68. if (!tmp.empty()) {
  69. nextLastId = tmp.back()->_thread_id;
  70. }
  71. // 移入输出向量
  72. threads = std::move(tmp);
  73. }
  74. catch (sql::SQLException& e) {
  75. std::cerr << "SQLException: " << e.what()
  76. << " (MySQL error code: " << e.getErrorCode()
  77. << ", SQLState: " << e.getSQLState() << ")\n";
  78. return false;
  79. }
  80. return true;
  81. }

客户端请求聊天列表

1 完善loading对话框

完善加载对话框,调整下布局,增加一个label和旋转gif的布局

image-20250605180346879

布局界面

image-20250605180401895

接下来调整下代码

  1. #ifndef LOADINGDLG_H
  2. #define LOADINGDLG_H
  3. #include <QDialog>
  4. namespace Ui {
  5. class LoadingDlg;
  6. }
  7. class LoadingDlg : public QDialog
  8. {
  9. Q_OBJECT
  10. public:
  11. explicit LoadingDlg(QWidget *parent = nullptr, QString tip = "Loading...");
  12. ~LoadingDlg();
  13. private:
  14. Ui::LoadingDlg *ui;
  15. };
  16. #endif // LOADINGDLG_H

具体实现

  1. LoadingDlg::LoadingDlg(QWidget *parent, QString tip):
  2. QDialog(parent),
  3. ui(new Ui::LoadingDlg)
  4. {
  5. ui->setupUi(this);
  6. // 1. 让这个 Widget 透明背景、无边框、拦截底部事件
  7. setWindowFlags(Qt::Dialog | Qt::FramelessWindowHint | Qt::WindowSystemMenuHint | Qt::WindowStaysOnTopHint);
  8. setAttribute(Qt::WA_TranslucentBackground);// 设置背景透明
  9. // 2. 让它覆盖父窗口整个面积
  10. if (parent) {
  11. // 获取屏幕尺寸
  12. setFixedSize(parent->size()); // 设置对话框为全屏尺寸
  13. }
  14. if (parent) {
  15. QPoint topLeft = parent->mapToGlobal(QPoint(0, 0));
  16. move(topLeft);
  17. }
  18. // 3. 半透明黑色背景(alpha = 128,大约 50% 透明度)
  19. // setStyleSheet("background-color: rgba(0, 0, 0, 128);");
  20. QMovie *movie = new QMovie(":/res/loading2.gif"); // 加载动画的资源文件
  21. ui->loading_lb->setMovie(movie);
  22. movie->start();
  23. // 3. 告诉 QMovie:将解码后的每一帧缩放到 100×100(固定大小)
  24. movie->setScaledSize(ui->loading_lb->size());
  25. ui->status_lb->setText(tip);
  26. }
  27. LoadingDlg::~LoadingDlg()
  28. {
  29. delete ui;
  30. }

2 加载聊天记录

之前没有从数据库加载聊天记录,只是模拟从本地好友中加载为聊天记录了,现在需要将这部分从ChatDialog构造函数中移除

改为从服务器申请,并且此时展示LoadingDlg对话框,直到获取记录后,将LoadingDlg移除。

因为获取服务器记录是通过网络获取的,所以在客户端的TcpMgr中通过信号发送给ChatDialog界面

所以ChatDialog的构造函数改为如下

  1. ChatDialog::ChatDialog(QWidget* parent) :
  2. QDialog(parent),
  3. ui(new Ui::ChatDialog), _b_loading(false), _mode(ChatUIMode::ChatMode),
  4. _state(ChatUIMode::ChatMode), _last_widget(nullptr), _cur_chat_uid(0), _loading_dlg(nullptr)
  5. {
  6. ui->setupUi(this);
  7. ui->add_btn->SetState("normal", "hover", "press");
  8. ui->add_btn->setProperty("state", "normal");
  9. QAction* searchAction = new QAction(ui->search_edit);
  10. searchAction->setIcon(QIcon(":/res/search.png"));
  11. ui->search_edit->addAction(searchAction, QLineEdit::LeadingPosition);
  12. ui->search_edit->setPlaceholderText(QStringLiteral("搜索"));
  13. // 创建一个清除动作并设置图标
  14. QAction* clearAction = new QAction(ui->search_edit);
  15. clearAction->setIcon(QIcon(":/res/close_transparent.png"));
  16. // 初始时不显示清除图标
  17. // 将清除动作添加到LineEdit的末尾位置
  18. ui->search_edit->addAction(clearAction, QLineEdit::TrailingPosition);
  19. // 当需要显示清除图标时,更改为实际的清除图标
  20. connect(ui->search_edit, &QLineEdit::textChanged, [clearAction](const QString& text) {
  21. if (!text.isEmpty()) {
  22. clearAction->setIcon(QIcon(":/res/close_search.png"));
  23. }
  24. else {
  25. clearAction->setIcon(QIcon(":/res/close_transparent.png")); // 文本为空时,切换回透明图标
  26. }
  27. });
  28. // 连接清除动作的触发信号到槽函数,用于清除文本
  29. connect(clearAction, &QAction::triggered, [this, clearAction]() {
  30. ui->search_edit->clear();
  31. clearAction->setIcon(QIcon(":/res/close_transparent.png")); // 清除文本后,切换回透明图标
  32. ui->search_edit->clearFocus();
  33. //清除按钮被按下则不显示搜索框
  34. ShowSearch(false);
  35. });
  36. ui->search_edit->SetMaxLength(15);
  37. //连接加载信号和槽
  38. connect(ui->chat_user_list, &ChatUserList::sig_loading_chat_user, this, &ChatDialog::slot_loading_chat_user);
  39. //模拟加载自己头像
  40. QString head_icon = UserMgr::GetInstance()->GetIcon();
  41. QPixmap pixmap(head_icon); // 加载图片
  42. QPixmap scaledPixmap = pixmap.scaled(ui->side_head_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation); // 将图片缩放到label的大小
  43. ui->side_head_lb->setPixmap(scaledPixmap); // 将缩放后的图片设置到QLabel上
  44. ui->side_head_lb->setScaledContents(true); // 设置QLabel自动缩放图片内容以适应大小
  45. ui->side_chat_lb->setProperty("state", "normal");
  46. ui->side_chat_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");
  47. ui->side_contact_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");
  48. ui->side_settings_lb->SetState("normal", "hover", "pressed", "selected_normal", "selected_hover", "selected_pressed");
  49. AddLBGroup(ui->side_chat_lb);
  50. AddLBGroup(ui->side_contact_lb);
  51. AddLBGroup(ui->side_settings_lb);
  52. connect(ui->side_chat_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_chat);
  53. connect(ui->side_contact_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_contact);
  54. connect(ui->side_settings_lb, &StateWidget::clicked, this, &ChatDialog::slot_side_setting);
  55. //链接搜索框输入变化
  56. connect(ui->search_edit, &QLineEdit::textChanged, this, &ChatDialog::slot_text_changed);
  57. ShowSearch(false);
  58. //检测鼠标点击位置判断是否要清空搜索框
  59. this->installEventFilter(this); // 安装事件过滤器
  60. //设置聊天label选中状态
  61. ui->side_chat_lb->SetSelected(true);
  62. //设置选中条目
  63. SetSelectChatItem();
  64. //更新聊天界面信息
  65. SetSelectChatPage();
  66. //连接加载联系人的信号和槽函数
  67. connect(ui->con_user_list, &ContactUserList::sig_loading_contact_user,
  68. this, &ChatDialog::slot_loading_contact_user);
  69. //连接联系人页面点击好友申请条目的信号
  70. connect(ui->con_user_list, &ContactUserList::sig_switch_apply_friend_page,
  71. this, &ChatDialog::slot_switch_apply_friend_page);
  72. //连接清除搜索框操作
  73. connect(ui->friend_apply_page, &ApplyFriendPage::sig_show_search, this, &ChatDialog::slot_show_search);
  74. //为searchlist 设置search edit
  75. ui->search_list->SetSearchEdit(ui->search_edit);
  76. //连接申请添加好友信号
  77. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_friend_apply, this, &ChatDialog::slot_apply_friend);
  78. //连接认证添加好友信号
  79. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_add_auth_friend, this, &ChatDialog::slot_add_auth_friend);
  80. //链接自己认证回复信号
  81. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_auth_rsp, this,
  82. &ChatDialog::slot_auth_rsp);
  83. //连接点击联系人item发出的信号和用户信息展示槽函数
  84. connect(ui->con_user_list, &ContactUserList::sig_switch_friend_info_page,
  85. this, &ChatDialog::slot_friend_info_page);
  86. //设置中心部件为chatpage
  87. ui->stackedWidget->setCurrentWidget(ui->chat_page);
  88. //连接searchlist跳转聊天信号
  89. connect(ui->search_list, &SearchList::sig_jump_chat_item, this, &ChatDialog::slot_jump_chat_item);
  90. //连接好友信息界面发送的点击事件
  91. connect(ui->friend_info_page, &FriendInfoPage::sig_jump_chat_item, this,
  92. &ChatDialog::slot_jump_chat_item_from_infopage);
  93. //连接聊天列表点击信号
  94. connect(ui->chat_user_list, &QListWidget::itemClicked, this, &ChatDialog::slot_item_clicked);
  95. //连接对端消息通知
  96. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_text_chat_msg,
  97. this, &ChatDialog::slot_text_chat_msg);
  98. connect(ui->chat_page, &ChatPage::sig_append_send_chat_msg, this, &ChatDialog::slot_append_send_chat_msg);
  99. _timer = new QTimer(this);
  100. connect(_timer, &QTimer::timeout, this, [this]() {
  101. auto user_info = UserMgr::GetInstance()->GetUserInfo();
  102. QJsonObject textObj;
  103. textObj["fromuid"] = user_info->_uid;
  104. QJsonDocument doc(textObj);
  105. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  106. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_HEART_BEAT_REQ, jsonData);
  107. });
  108. _timer->start(10000);
  109. //连接tcp返回的加载聊天回复
  110. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_load_chat_thread,
  111. this, &ChatDialog::slot_load_chat_thread);
  112. }

当用户登录成功后会切换到聊天页面,此时请求聊天列表

  1. void MainWindow::SlotSwitchChat()
  2. {
  3. _chat_dlg = new ChatDialog();
  4. _chat_dlg->setWindowFlags(Qt::CustomizeWindowHint|Qt::FramelessWindowHint);
  5. setCentralWidget(_chat_dlg);
  6. _chat_dlg->show();
  7. _login_dlg->hide();
  8. this->setMinimumSize(QSize(1050,900));
  9. this->setMaximumSize(QWIDGETSIZE_MAX, QWIDGETSIZE_MAX);
  10. _ui_status = CHAT_UI;
  11. _chat_dlg->loadChatList();
  12. }

通过发送请求获取聊天记录

  1. void ChatDialog::loadChatList()
  2. {
  3. showLoadingDlg(true);
  4. //发送请求逻辑
  5. QJsonObject jsonObj;
  6. auto uid = UserMgr::GetInstance()->GetUid();
  7. jsonObj["uid"] = uid;
  8. int last_chat_thread_id = UserMgr::GetInstance()->GetLastChatThreadId();
  9. jsonObj["thread_id"] = last_chat_thread_id;
  10. QJsonDocument doc(jsonObj);
  11. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  12. //发送tcp请求给chat server
  13. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);
  14. }

TCPMgr注册从服务器获取回复的消息处理

  1. _handlers.insert(ID_LOAD_CHAT_THREAD_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "chat thread json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get chat thread rsp failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive chat thread rsp Success";
  23. auto thread_array = jsonObj["threads"].toArray();
  24. std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads;
  25. for (const QJsonValue& value : thread_array) {
  26. auto cti = std::make_shared<ChatThreadInfo>();
  27. cti->_thread_id = value["thread_id"].toInt();
  28. cti->_type = value["type"].toString();
  29. cti->_user1_id = value["user1_id"].toInt();
  30. cti->_user2_id = value["user2_id"].toInt();
  31. chat_threads.push_back(cti);
  32. }
  33. bool load_more = jsonObj["load_more"].toBool();
  34. int next_last_id = jsonObj["next_last_id"].toInt();
  35. //发送信号通知界面
  36. emit sig_load_chat_thread(load_more, next_last_id, chat_threads);
  37. });

ChatDialog接收TcpMgr发送的sig_load_chat_thread消息,然后触发如下函数,该函数主要加载聊天列表并且消除加载动画

  1. void ChatDialog::slot_load_chat_thread(bool load_more, int last_thread_id,
  2. std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads)
  3. {
  4. for (auto& cti : chat_threads) {
  5. //先处理单聊,群聊跳过,以后添加
  6. if (cti->_type == "group") {
  7. continue;
  8. }
  9. auto uid = UserMgr::GetInstance()->GetUid();
  10. auto other_uid = 0;
  11. if (uid == cti->_user1_id) {
  12. other_uid = cti->_user2_id;
  13. }else {
  14. other_uid = cti->_user1_id;
  15. }
  16. auto friend_info = UserMgr::GetInstance()->GetFriendById(other_uid);
  17. if (!friend_info) {
  18. continue;
  19. }
  20. auto* chat_user_wid = new ChatUserWid();
  21. auto user_info = std::make_shared<UserInfo>(friend_info);
  22. chat_user_wid->SetInfo(user_info);
  23. QListWidgetItem* item = new QListWidgetItem;
  24. //qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  25. item->setSizeHint(chat_user_wid->sizeHint());
  26. ui->chat_user_list->addItem(item);
  27. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  28. _chat_items_added.insert(user_info->_uid, item);
  29. auto chat_thread_data = std::make_shared<ChatThreadData>();
  30. chat_thread_data->_user1_id = uid;
  31. chat_thread_data->_user2_id = other_uid;
  32. chat_thread_data->_last_msg_id = 0;
  33. chat_thread_data->_thread_id = cti->_thread_id;
  34. UserMgr::GetInstance()->AddChatThreadData(chat_thread_data);
  35. }
  36. UserMgr::GetInstance()->SetLastChatThreadId(last_thread_id);
  37. if (load_more) {
  38. //发送请求逻辑
  39. QJsonObject jsonObj;
  40. auto uid = UserMgr::GetInstance()->GetUid();
  41. jsonObj["uid"] = uid;
  42. jsonObj["thread_id"] = last_thread_id;
  43. QJsonDocument doc(jsonObj);
  44. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  45. //发送tcp请求给chat server
  46. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);
  47. return;
  48. }
  49. //更新聊天界面信息
  50. SetSelectChatItem();
  51. SetSelectChatPage();
  52. showLoadingDlg(false);
  53. }

数据库构建

navicat中执行上面数据模型设计中提到的几个sql语句

1 创建chat_thread

2 创建group_chat

image-20250613154003601

成功后显示

image-20250605181826063

3 创建group_member

image-20250613154030973

成功后显示表

image-20250605182001600

4 创建私聊表

image-20250613154048586

成功后显示

image-20250605182147160

注意: 创建后没有数据,数据是我自己添加的,为了方便测试

开启服务器,客户端登陆后加载数据

效果如下

image-20250605183518757

首次单聊

A和B是好友,首次单聊,A发送给服务器创建聊天的请求。

服务器根据A的创建请求创建私聊,然后返回给客户端A。

注意

因为聊天服务是异步的,而且是分布式的,所以有可能对方B就在此时发送消息给A,服务器已经创建好了,或者服务器正在调用sql创建。

所以对于创建请求,sql需要先查询是否已经被其他人创建了thread_id, 我们可以制定一个规则,任何一方创建thread_id,在写入私聊表private_chat时都需要保证最小的uiduid1_id, 大的在uid2_id, 这样查询的时候也方便。

这个查询要加行级锁,避免分布式造成数据混乱。

总结

所以创建单聊时,要先去private_chat表根据uid查询,如果查到了则返回这个thread_id, 这个查询要加行级锁。

如果没查到,则在chat_thread表创建thread_id并且插入private_chat

思路

我们整理下思路

1) 查询是否已存在私聊会话,如果存在则加锁行并返回 thread_id

  1. SELECT thread_id
  2. FROM private_chat
  3. WHERE (user1_id = LEAST(:user1_id, :user2_id) AND user2_id = GREATEST(:user1_id, :user2_id))
  4. FOR UPDATE; -- 使用行级锁,避免并发冲突

查询时使用 LEASTGREATEST 来保证无论是 user1_id 还是 user2_id,都将较小的 ID 存放在 user1_id,较大的存放在 user2_id。这样可以避免不同的用户顺序导致查找不到匹配的记录。

FOR UPDATE 关键字会锁定这些查询行,确保在事务结束之前不会有其他并发的操作修改数据。

  1. 如果未找到数据(查询返回空),则插入新记录:
  1. -- 1. chat_thread 表中创建新记录
  2. INSERT INTO chat_thread (type, created_at)
  3. VALUES ('private', NOW());
  4. -- 2. 获取新插入的 thread_id(假设你可以通过 LAST_INSERT_ID 获取)
  5. SELECT LAST_INSERT_ID();
  1. 将新生成的 thread_id 插入 private_chat 表
  1. INSERT INTO private_chat (thread_id, user1_id, user2_id, created_at)
  2. VALUES (:new_thread_id, LEAST(:user1_id, :user2_id), GREATEST(:user1_id, :user2_id), NOW());

使用 INSERT INTO chat_thread 创建新的聊天记录,并使用 LAST_INSERT_ID() 获取新生成的 thread_id

将新 thread_id 插入到 private_chat 表中,同时使用 LEASTGREATEST 确保较小的 ID 存入 user1_id,较大的存入 user2_id

问题分析

  • 行级锁的生命周期:
    行级锁(通过 FOR UPDATE 获得的锁)只在当前事务中有效。当查询结束后,锁会被释放。也就是说,如果我们查询了是否存在 private_chat 的记录并加了锁,但在查询完成后进行插入 chat_threadprivate_chat 的操作时,其他并发请求可能会先插入新的私聊记录,从而造成数据冲突。
  • 可能的并发问题:
    例如:
    1. 线程 A 执行查询,锁定了 private_chat 表的行;
    2. 线程 B 也执行了相同的查询,发现没有记录,于是开始插入 chat_thread
    3. 线程 A 完成插入 chat_threadprivate_chat,但线程 B 也在此时完成了它的插入,导致 private_chat 表中出现两个重复的记录。

解决方案

为了确保并发操作的安全性,我们可以使用 事务 来保证在查询、插入 chat_threadprivate_chat 表的过程中,数据的一致性和原子性。具体步骤如下:

方案:使用事务(Atomic Transaction)

我们可以使用 事务 来确保操作的一致性,整个操作从查询到插入都在一个事务中进行。这样即使存在多个并发请求,也能保证同一时间只有一个请求可以成功创建 chat_threadprivate_chat

关键改动:

  1. 在查询时加行级锁。
  2. 确保所有的数据库操作(查询和插入)都在一个事务中进行,这样可以防止并发插入的问题。
  3. 使用事务提交(commit)和回滚(rollback)确保数据一致性。

关键代码

  1. bool MysqlDao::CreatePrivateChat(int user1_id, int user2_id, int& thread_id)
  2. {
  3. auto con = pool_->getConnection();
  4. if (!con) {
  5. return false;
  6. }
  7. Defer defer([this, &con]() {
  8. pool_->returnConnection(std::move(con));
  9. });
  10. auto& conn = con->_con;
  11. try {
  12. // 开启事务
  13. conn->setAutoCommit(false);
  14. // 1. 查询是否已存在私聊并加行级锁
  15. int uid1 = std::min(user1_id, user2_id);
  16. int uid2 = std::max(user1_id, user2_id);
  17. std::string check_sql =
  18. "SELECT thread_id FROM private_chat "
  19. "WHERE (user1_id = ? AND user2_id = ?) "
  20. "FOR UPDATE;";
  21. std::unique_ptr<sql::PreparedStatement> pstmt(conn->prepareStatement(check_sql));
  22. pstmt->setInt64(1, uid1);
  23. pstmt->setInt64(2, uid2);
  24. std::unique_ptr<sql::ResultSet> res(pstmt->executeQuery());
  25. if (res->next()) {
  26. // 如果已存在,返回该 thread_id
  27. thread_id = res->getInt("thread_id");
  28. conn->commit(); // 提交事务
  29. return true;
  30. }
  31. // 2. 如果未找到,创建新的 chat_thread 和 private_chat 记录
  32. // 在 chat_thread 表插入新记录
  33. std::string insert_chat_thread_sql =
  34. "INSERT INTO chat_thread (type, created_at) VALUES ('private', NOW());";
  35. std::unique_ptr<sql::PreparedStatement> pstmt_insert_thread(conn->prepareStatement(insert_chat_thread_sql));
  36. pstmt_insert_thread->executeUpdate();
  37. // 获取新插入的 thread_id
  38. std::string get_last_insert_id_sql = "SELECT LAST_INSERT_ID();";
  39. std::unique_ptr<sql::PreparedStatement> pstmt_last_insert_id(conn->prepareStatement(get_last_insert_id_sql));
  40. std::unique_ptr<sql::ResultSet> res_last_id(pstmt_last_insert_id->executeQuery());
  41. res_last_id->next();
  42. thread_id = res_last_id->getInt(1);
  43. // 3. 在 private_chat 表插入新记录
  44. std::string insert_private_chat_sql =
  45. "INSERT INTO private_chat (thread_id, user1_id, user2_id, created_at) "
  46. "VALUES (?, ?, ?, NOW());";
  47. std::unique_ptr<sql::PreparedStatement> pstmt_insert_private(conn->prepareStatement(insert_private_chat_sql));
  48. pstmt_insert_private->setInt64(1, thread_id);
  49. pstmt_insert_private->setInt64(2, uid1);
  50. pstmt_insert_private->setInt64(3, uid2);
  51. pstmt_insert_private->executeUpdate();
  52. // 提交事务
  53. conn->commit();
  54. return true;
  55. }
  56. catch (sql::SQLException& e) {
  57. std::cerr << "SQLException: " << e.what() << std::endl;
  58. conn->rollback();
  59. return false;
  60. }
  61. return false;
  62. }
  63. bool MysqlMgr::CreatePrivateChat(int user1_id, int user2_id, int& thread_id)
  64. {
  65. return _dao.CreatePrivateChat(user1_id, user2_id, thread_id);
  66. }

LogicSystem添加创建聊天的回调函数,并且注册

  1. void LogicSystem::CreatePrivateChat(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data)
  2. {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto uid = root["uid"].asInt();
  7. auto other_id = root["other_id"].asInt();
  8. Json::Value rtvalue;
  9. rtvalue["error"] = ErrorCodes::Success;
  10. rtvalue["uid"] = uid;
  11. rtvalue["other_id"] = other_id;
  12. Defer defer([this, &rtvalue, session]() {
  13. std::string return_str = rtvalue.toStyledString();
  14. session->Send(return_str, ID_LOAD_CHAT_THREAD_RSP);
  15. });
  16. int thread_id = 0;
  17. bool res = MysqlMgr::GetInstance()->CreatePrivateChat(uid, other_id, thread_id);
  18. if (!res) {
  19. rtvalue["error"] = ErrorCodes::CREATE_CHAT_FAILED;
  20. return;
  21. }
  22. rtvalue["thread_id"] = thread_id;
  23. }
  24. _fun_callbacks[ID_CREATE_PRIVATE_CHAT_REQ] = std::bind(&LogicSystem::CreatePrivateChat, this,
  25. placeholders::_1, placeholders::_2, placeholders::_3);

客户端完善

在好友信息界面

  1. void FriendInfoPage::on_msg_chat_clicked()
  2. {
  3. qDebug() << "msg chat btn clicked";
  4. emit sig_jump_chat_item(_user_info);
  5. }

追踪这个信号,我们完善槽函数

  1. void ChatDialog::slot_jump_chat_item_from_infopage(std::shared_ptr<UserInfo> user_info)
  2. {
  3. qDebug() << "slot jump chat item " << endl;
  4. auto thread_id = UserMgr::GetInstance()->GetThreadIdByUid(user_info->_uid);
  5. if (thread_id != -1) {
  6. auto find_iter = _chat_thread_items.find(thread_id);
  7. if (find_iter != _chat_thread_items.end()) {
  8. qDebug() << "jump to chat item , uid is " << user_info->_uid;
  9. ui->chat_user_list->scrollToItem(find_iter.value());
  10. ui->side_chat_lb->SetSelected(true);
  11. SetSelectChatItem(user_info->_uid);
  12. //更新聊天界面信息
  13. SetSelectChatPage(user_info->_uid);
  14. slot_side_chat();
  15. return;
  16. } //说明之前有缓存过聊天列表,只是被删除了,那么重新加进来即可
  17. else {
  18. auto* chat_user_wid = new ChatUserWid();
  19. chat_user_wid->SetInfo(user_info);
  20. QListWidgetItem* item = new QListWidgetItem;
  21. qDebug() << "chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  22. ui->chat_user_list->insertItem(0, item);
  23. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  24. _chat_thread_items.insert(thread_id, item);
  25. ui->side_chat_lb->SetSelected(true);
  26. SetSelectChatItem(user_info->_uid);
  27. //更新聊天界面信息
  28. SetSelectChatPage(user_info->_uid);
  29. slot_side_chat();
  30. return;
  31. }
  32. }
  33. //如果没找到,则发送创建请求
  34. auto uid = UserMgr::GetInstance()->GetUid();
  35. QJsonObject jsonObj;
  36. jsonObj["uid"] = uid;
  37. jsonObj["other_id"] = user_info->_uid;
  38. QJsonDocument doc(jsonObj);
  39. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  40. //发送tcp请求给chat server
  41. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_CREATE_PRIVATE_CHAT_REQ, jsonData);
  42. }

客户端注册服务器返回的消息ID_CREATE_PRIVATE_CHAT_RSP,进行处理

  1. _handlers.insert(ID_CREATE_PRIVATE_CHAT_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "parse create private chat json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get create private chat failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive create private chat rsp Success";
  23. int uid = jsonObj["uid"].toInt();
  24. int other_id = jsonObj["other_id"].toInt();
  25. int thread_id = jsonObj["thread_id"].toInt();
  26. //发送信号通知界面
  27. emit sig_create_private_chat(uid, other_id, thread_id);
  28. });

编写槽函数和sig_create_private_chat连接,并且增加聊天条目

  1. //连接tcp返回的创建私聊的回复
  2. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_create_private_chat,
  3. this, &ChatDialog::slot_create_private_chat);

具体处理的槽函数

  1. void ChatDialog::slot_create_private_chat(int uid, int other_id, int thread_id)
  2. {
  3. auto* chat_user_wid = new ChatUserWid();
  4. auto user_info = UserMgr::GetInstance()->GetFriendById(other_id);
  5. chat_user_wid->SetInfo(user_info);
  6. QListWidgetItem* item = new QListWidgetItem;
  7. item->setSizeHint(chat_user_wid->sizeHint());
  8. qDebug() << "chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  9. ui->chat_user_list->insertItem(0, item);
  10. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  11. _chat_thread_items.insert(thread_id, item);
  12. auto chat_thread_data = std::make_shared<ChatThreadData>();
  13. chat_thread_data->_user1_id = uid;
  14. chat_thread_data->_user2_id = other_id;
  15. chat_thread_data->_last_msg_id = 0;
  16. chat_thread_data->_thread_id = thread_id;
  17. UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, other_id);
  18. ui->side_chat_lb->SetSelected(true);
  19. SetSelectChatItem(user_info->_uid);
  20. //更新聊天界面信息
  21. SetSelectChatPage(user_info->_uid);
  22. slot_side_chat();
  23. return;
  24. }

聊天消息重构

ChaUserWid重构

之前我们的会话列表由一个一个的ChatUserWid构成

image-20250622085350930

原来的ChatUserWid内部存储的是UserInfo结构,目前我们已经增加了ChatThread数据库内容,所以要将会话列表的每个ChatUserWid中存储ChatThreadData结构。

接下来我们定义这几个结构

  1. class ChatUserWid : public ListItemBase
  2. {
  3. Q_OBJECT
  4. public:
  5. explicit ChatUserWid(QWidget *parent = nullptr);
  6. ~ChatUserWid();
  7. QSize sizeHint() const override;
  8. void SetChatData(std::shared_ptr<ChatThreadData> chat_data);
  9. std::shared_ptr<ChatThreadData> GetChatData();
  10. void ShowRedPoint(bool bshow);
  11. void updateLastMsg(std::vector<std::shared_ptr<TextChatData>> msgs);
  12. private:
  13. Ui::ChatUserWid *ui;
  14. std::shared_ptr<ChatThreadData> _chat_data;
  15. };

具体定义

  1. void ChatUserWid::SetChatData(std::shared_ptr<ChatThreadData> chat_data) {
  2. _chat_data = chat_data;
  3. auto other_id = _chat_data->GetOtherId();
  4. auto other_info = UserMgr::GetInstance()->GetFriendById(other_id);
  5. // 加载图片
  6. QPixmap pixmap(other_info->_icon);
  7. // 设置图片自动缩放
  8. ui->icon_lb->setPixmap(pixmap.scaled(ui->icon_lb->size(), Qt::KeepAspectRatio, Qt::SmoothTransformation));
  9. ui->icon_lb->setScaledContents(true);
  10. ui->user_name_lb->setText(other_info->_name);
  11. ui->user_chat_lb->setText(chat_data->GetLastMsg());
  12. }
  13. std::shared_ptr<ChatThreadData> ChatUserWid::GetChatData()
  14. {
  15. return _chat_data;
  16. }

这样我们就将聊天会话的信息写入到了ChatUserWid这样一个个小条目了。

消息类抽象

因为我们将来要存储文本,文件以及图片不同类型的消息,那么就将原来的消息抽象出一个基类

  1. class ChatDataBase {
  2. public:
  3. ChatDataBase(int msg_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type,
  4. QString content,int _send_uid);
  5. ChatDataBase(QString unique_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type,
  6. QString content, int send_uid);
  7. int GetMsgId() { return _msg_id; }
  8. int GetThreadId() { return _thread_id; }
  9. ChatFormType GetFormType() { return _form_type; }
  10. ChatMsgType GetMsgType() { return _msg_type; }
  11. QString GetContent() { return _content; }
  12. int GetSendUid() { return _send_uid; }
  13. QString GetMsgContent(){return _content;}
  14. void SetUniqueId(int unique_id);
  15. QString GetUniqueId();
  16. private:
  17. //客户端本地唯一标识
  18. QString _unique_id;
  19. //消息id
  20. int _msg_id;
  21. //会话id
  22. int _thread_id;
  23. //群聊还是私聊
  24. ChatFormType _form_type;
  25. //文本信息为0,图片为1,文件为2
  26. ChatMsgType _msg_type;
  27. QString _content;
  28. //发送者id
  29. int _send_uid;
  30. };

然后基于上面的基类,我们可以定义不同类型的消息,如文本消息

  1. class TextChatData : public ChatDataBase {
  2. public:
  3. TextChatData(int msg_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type, QString content,
  4. int send_uid):
  5. ChatDataBase(msg_id, thread_id, form_type, msg_type, content, send_uid)
  6. {
  7. }
  8. TextChatData(QString unique_id, int thread_id, ChatFormType form_type, ChatMsgType msg_type, QString content,
  9. int send_uid):
  10. ChatDataBase(unique_id, thread_id, form_type, msg_type, content, send_uid)
  11. {
  12. }
  13. };

有了这个文本消息后,我们可以将基类指针ChatDataBase存储起来,将来通过实现虚函数,进行多态调用.

ChatThreadData聊天线程

聊天线程数据,重构和完善

  1. //客户端本地存储的聊天线程数据结构
  2. class ChatThreadData {
  3. public:
  4. ChatThreadData(int other_id, int thread_id, int last_msg_id):
  5. _other_id(other_id), _thread_id(thread_id), _last_msg_id(last_msg_id){}
  6. void AddMsg(std::shared_ptr<ChatDataBase> msg);
  7. void SetLastMsgId(int msg_id);
  8. void SetOtherId(int other_id);
  9. int GetOtherId();
  10. QString GetGroupName();
  11. QMap<int, std::shared_ptr<ChatDataBase>> GetMsgMap();
  12. int GetThreadId();
  13. QMap<int, std::shared_ptr<ChatDataBase>>& GetMsgMapRef();
  14. void AppendMsg(int msg_id, std::shared_ptr<ChatDataBase> base_msg);
  15. QString GetLastMsg();
  16. private:
  17. //如果是私聊,则为对方的id;如果是群聊,则为0
  18. int _other_id;
  19. int _last_msg_id;
  20. int _thread_id;
  21. QString _last_msg;
  22. //群聊信息,成员列表
  23. std::vector<int> _group_members;
  24. //群聊名称
  25. QString _group_name;
  26. //缓存消息map,抽象为基类,因为会有图片等其他类型消息
  27. QMap<int, std::shared_ptr<ChatDataBase>> _msg_map;
  28. };

具体实现

  1. void ChatThreadData::AddMsg(std::shared_ptr<ChatDataBase> msg)
  2. {
  3. _msg_map.insert(msg->GetMsgId(), msg);
  4. }
  5. void ChatThreadData::SetLastMsgId(int msg_id)
  6. {
  7. _last_msg_id = msg_id;
  8. }
  9. void ChatThreadData::SetOtherId(int other_id)
  10. {
  11. _other_id = other_id;
  12. }
  13. int ChatThreadData::GetOtherId() {
  14. return _other_id;
  15. }
  16. QString ChatThreadData::GetGroupName()
  17. {
  18. return _group_name;
  19. }
  20. QMap<int, std::shared_ptr<ChatDataBase>> ChatThreadData::GetMsgMap() {
  21. return _msg_map;
  22. }
  23. int ChatThreadData::GetThreadId()
  24. {
  25. return _thread_id;
  26. }
  27. QMap<int, std::shared_ptr<ChatDataBase>>& ChatThreadData::GetMsgMapRef()
  28. {
  29. return _msg_map;
  30. }
  31. void ChatThreadData::AppendMsg(int msg_id, std::shared_ptr<ChatDataBase> base_msg) {
  32. _msg_map.insert(msg_id, base_msg);
  33. _last_msg = base_msg->GetMsgContent();
  34. _last_msg_id = msg_id;
  35. }
  36. QString ChatThreadData::GetLastMsg()
  37. {
  38. return _last_msg;
  39. }

好友认证

对于好友认证时,如果双方通过,也要默认建立聊天消息,并且产生会话列表.

我们先从这块接入聊天消息列表,完善整体流程

proto协议修改

因为认证添加好友后,会生成两条聊天信息(比如,我们已经是好友了等),同时通知给对方,协议格式增加和修改如下

  1. message AddFriendMsg{
  2. int32 sender_id = 1;
  3. string unique_id = 2;
  4. int32 msg_id = 3;
  5. int32 thread_id = 4;
  6. string msgcontent = 5;
  7. }
  8. message AuthFriendReq{
  9. int32 fromuid = 1;
  10. int32 touid = 2;
  11. repeated AddFriendMsg textmsgs = 3;
  12. }
  13. message AuthFriendRsp{
  14. int32 error = 1;
  15. int32 fromuid = 2;
  16. int32 touid = 3;
  17. }

服务器接收好友申请

服务器收到A向B添加好友的请求,会更新数据库申请记录,同时转发给B

  1. void LogicSystem::AddFriendApply(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data)
  2. {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto uid = root["uid"].asInt();
  7. auto desc = root["applyname"].asString();
  8. auto bakname = root["bakname"].asString();
  9. auto touid = root["touid"].asInt();
  10. std::cout << "user login uid is " << uid << " applydesc is "
  11. << desc << " bakname is " << bakname << " touid is " << touid << endl;
  12. Json::Value rtvalue;
  13. rtvalue["error"] = ErrorCodes::Success;
  14. Defer defer([this, &rtvalue, session]() {
  15. std::string return_str = rtvalue.toStyledString();
  16. session->Send(return_str, ID_ADD_FRIEND_RSP);
  17. });
  18. //先更新数据库
  19. MysqlMgr::GetInstance()->AddFriendApply(uid, touid, desc, bakname);
  20. //查询redis 查找touid对应的server ip
  21. auto to_str = std::to_string(touid);
  22. auto to_ip_key = USERIPPREFIX + to_str;
  23. std::string to_ip_value = "";
  24. bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);
  25. if (!b_ip) {
  26. return;
  27. }
  28. auto& cfg = ConfigMgr::Inst();
  29. auto self_name = cfg["SelfServer"]["Name"];
  30. std::string base_key = USER_BASE_INFO + std::to_string(uid);
  31. auto apply_info = std::make_shared<UserInfo>();
  32. bool b_info = GetBaseInfo(base_key, uid, apply_info);
  33. //直接通知对方有申请消息
  34. if (to_ip_value == self_name) {
  35. auto session = UserMgr::GetInstance()->GetSession(touid);
  36. if (session) {
  37. //在内存中则直接发送通知对方
  38. Json::Value notify;
  39. notify["error"] = ErrorCodes::Success;
  40. notify["applyuid"] = uid;
  41. notify["name"] = apply_info->name;
  42. notify["desc"] = desc;
  43. if (b_info) {
  44. notify["icon"] = apply_info->icon;
  45. notify["sex"] = apply_info->sex;
  46. notify["nick"] = apply_info->nick;
  47. }
  48. std::string return_str = notify.toStyledString();
  49. session->Send(return_str, ID_NOTIFY_ADD_FRIEND_REQ);
  50. }
  51. return ;
  52. }
  53. AddFriendReq add_req;
  54. add_req.set_applyuid(uid);
  55. add_req.set_touid(touid);
  56. add_req.set_name(apply_info->name);
  57. add_req.set_desc(desc);
  58. if (b_info) {
  59. add_req.set_icon(apply_info->icon);
  60. add_req.set_sex(apply_info->sex);
  61. add_req.set_nick(apply_info->nick);
  62. }
  63. //发送通知
  64. ChatGrpcClient::GetInstance()->NotifyAddFriend(to_ip_value,add_req);
  65. }

如果不在一个服务器,则通过grpc通知对端所在服务器, 对端服务器收到后,组织消息转发

  1. Status ChatServiceImpl::NotifyAddFriend(ServerContext* context, const AddFriendReq* request, AddFriendRsp* reply)
  2. {
  3. //查找用户是否在本服务器
  4. auto touid = request->touid();
  5. auto session = UserMgr::GetInstance()->GetSession(touid);
  6. Defer defer([request, reply]() {
  7. reply->set_error(ErrorCodes::Success);
  8. reply->set_applyuid(request->applyuid());
  9. reply->set_touid(request->touid());
  10. });
  11. //用户不在内存中则直接返回
  12. if (session == nullptr) {
  13. return Status::OK;
  14. }
  15. //在内存中则直接发送通知对方
  16. Json::Value rtvalue;
  17. rtvalue["error"] = ErrorCodes::Success;
  18. rtvalue["applyuid"] = request->applyuid();
  19. rtvalue["name"] = request->name();
  20. rtvalue["desc"] = request->desc();
  21. rtvalue["icon"] = request->icon();
  22. rtvalue["sex"] = request->sex();
  23. rtvalue["nick"] = request->nick();
  24. std::string return_str = rtvalue.toStyledString();
  25. session->Send(return_str, ID_NOTIFY_ADD_FRIEND_REQ);
  26. return Status::OK;
  27. }

服务器收到同意申请

当B客户同意添加好友,会将请求发送给服务器

服务器收到后会执行

  1. void LogicSystem::AuthFriendApply(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. auto uid = root["fromuid"].asInt();
  6. auto touid = root["touid"].asInt();
  7. auto back_name = root["back"].asString();
  8. std::cout << "from " << uid << " auth friend to " << touid << std::endl;
  9. Json::Value rtvalue;
  10. rtvalue["error"] = ErrorCodes::Success;
  11. auto user_info = std::make_shared<UserInfo>();
  12. std::string base_key = USER_BASE_INFO + std::to_string(touid);
  13. bool b_info = GetBaseInfo(base_key, touid, user_info);
  14. if (b_info) {
  15. rtvalue["name"] = user_info->name;
  16. rtvalue["nick"] = user_info->nick;
  17. rtvalue["icon"] = user_info->icon;
  18. rtvalue["sex"] = user_info->sex;
  19. rtvalue["uid"] = touid;
  20. }
  21. else {
  22. rtvalue["error"] = ErrorCodes::UidInvalid;
  23. }
  24. Defer defer([this, &rtvalue, session]() {
  25. std::string return_str = rtvalue.toStyledString();
  26. session->Send(return_str, ID_AUTH_FRIEND_RSP);
  27. });
  28. //先更新数据库, 放到事务中,此处不再处理
  29. //MysqlMgr::GetInstance()->AuthFriendApply(uid, touid);
  30. std::vector<std::shared_ptr<AddFriendMsg>> chat_datas;
  31. //更新数据库添加好友
  32. MysqlMgr::GetInstance()->AddFriend(uid, touid,back_name, chat_datas);
  33. //查询redis 查找touid对应的server ip
  34. auto to_str = std::to_string(touid);
  35. auto to_ip_key = USERIPPREFIX + to_str;
  36. std::string to_ip_value = "";
  37. bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);
  38. if (!b_ip) {
  39. return;
  40. }
  41. auto& cfg = ConfigMgr::Inst();
  42. auto self_name = cfg["SelfServer"]["Name"];
  43. //直接通知对方有认证通过消息
  44. if (to_ip_value == self_name) {
  45. auto session = UserMgr::GetInstance()->GetSession(touid);
  46. if (session) {
  47. //在内存中则直接发送通知对方
  48. Json::Value notify;
  49. notify["error"] = ErrorCodes::Success;
  50. notify["fromuid"] = uid;
  51. notify["touid"] = touid;
  52. std::string base_key = USER_BASE_INFO + std::to_string(uid);
  53. auto user_info = std::make_shared<UserInfo>();
  54. bool b_info = GetBaseInfo(base_key, uid, user_info);
  55. if (b_info) {
  56. notify["name"] = user_info->name;
  57. notify["nick"] = user_info->nick;
  58. notify["icon"] = user_info->icon;
  59. notify["sex"] = user_info->sex;
  60. }
  61. else {
  62. notify["error"] = ErrorCodes::UidInvalid;
  63. }
  64. for(auto & chat_data : chat_datas)
  65. {
  66. Json::Value chat;
  67. chat["sender"] = chat_data->sender_id();
  68. chat["msg_id"] = chat_data->msg_id();
  69. chat["thread_id"] = chat_data->thread_id();
  70. chat["unique_id"] = chat_data->unique_id();
  71. chat["msg_content"] = chat_data->msgcontent();
  72. notify["chat_datas"].append(chat);
  73. rtvalue["chat_datas"].append(chat);
  74. }
  75. std::string return_str = notify.toStyledString();
  76. session->Send(return_str, ID_NOTIFY_AUTH_FRIEND_REQ);
  77. }
  78. return ;
  79. }
  80. AuthFriendReq auth_req;
  81. auth_req.set_fromuid(uid);
  82. auth_req.set_touid(touid);
  83. for(auto& chat_data : chat_datas)
  84. {
  85. auto text_msg = auth_req.add_textmsgs();
  86. text_msg->CopyFrom(*chat_data);
  87. Json::Value chat;
  88. chat["sender"] = chat_data->sender_id();
  89. chat["msg_id"] = chat_data->msg_id();
  90. chat["thread_id"] = chat_data->thread_id();
  91. chat["unique_id"] = chat_data->unique_id();
  92. chat["msg_content"] = chat_data->msgcontent();
  93. rtvalue["chat_datas"].append(chat);
  94. }
  95. //发送通知
  96. ChatGrpcClient::GetInstance()->NotifyAuthFriend(to_ip_value, auth_req);
  97. }

数据库处理

  1. bool MysqlDao::AddFriend(const int& from, const int& to, std::string back_name,
  2. std::vector<std::shared_ptr<AddFriendMsg>>& chat_datas) {
  3. auto con = pool_->getConnection();
  4. if (con == nullptr) {
  5. return false;
  6. }
  7. Defer defer([this, &con]() {
  8. pool_->returnConnection(std::move(con));
  9. });
  10. try {
  11. //开始事务
  12. con->_con->setAutoCommit(false);
  13. std::string reverse_back;
  14. std::string apply_desc;
  15. {
  16. // 1. 锁定并读取
  17. std::unique_ptr<sql::PreparedStatement> selStmt(con->_con->prepareStatement(
  18. "SELECT back_name, descs "
  19. "FROM friend_apply "
  20. "WHERE from_uid = ? AND to_uid = ? "
  21. "FOR UPDATE"
  22. ));
  23. selStmt->setInt(1, to);
  24. selStmt->setInt(2, from);
  25. std::unique_ptr<sql::ResultSet> rsSel(selStmt->executeQuery());
  26. if (rsSel->next()) {
  27. reverse_back = rsSel->getString("back_name");
  28. apply_desc = rsSel->getString("descs");
  29. }
  30. else {
  31. // 没有对应的申请记录,直接 rollback 并返回失败
  32. con->_con->rollback();
  33. return false;
  34. }
  35. }
  36. {
  37. // 2. 执行真正的更新
  38. std::unique_ptr<sql::PreparedStatement> updStmt(con->_con->prepareStatement(
  39. "UPDATE friend_apply "
  40. "SET status = 1 "
  41. "WHERE from_uid = ? AND to_uid = ?"
  42. ));
  43. updStmt->setInt(1, to);
  44. updStmt->setInt(2, from);
  45. if (updStmt->executeUpdate() != 1) {
  46. // 更新行数不对,回滚
  47. con->_con->rollback();
  48. return false;
  49. }
  50. }
  51. {
  52. // 3. 准备第一个SQL语句, 插入认证方好友数据
  53. std::unique_ptr<sql::PreparedStatement> pstmt(con->_con->prepareStatement("INSERT IGNORE INTO friend(self_id, friend_id, back) "
  54. "VALUES (?, ?, ?) "
  55. ));
  56. //反过来的申请时from,验证时to
  57. pstmt->setInt(1, from); // from id
  58. pstmt->setInt(2, to);
  59. pstmt->setString(3, back_name);
  60. // 执行更新
  61. int rowAffected = pstmt->executeUpdate();
  62. if (rowAffected < 0) {
  63. con->_con->rollback();
  64. return false;
  65. }
  66. //准备第二个SQL语句,插入申请方好友数据
  67. std::unique_ptr<sql::PreparedStatement> pstmt2(con->_con->prepareStatement("INSERT IGNORE INTO friend(self_id, friend_id, back) "
  68. "VALUES (?, ?, ?) "
  69. ));
  70. //反过来的申请时from,验证时to
  71. pstmt2->setInt(1, to); // from id
  72. pstmt2->setInt(2, from);
  73. pstmt2->setString(3, reverse_back);
  74. // 执行更新
  75. int rowAffected2 = pstmt2->executeUpdate();
  76. if (rowAffected2 < 0) {
  77. con->_con->rollback();
  78. return false;
  79. }
  80. }
  81. // 4. 创建 chat_thread
  82. long long threadId = 0;
  83. {
  84. std::unique_ptr<sql::PreparedStatement> threadStmt(con->_con->prepareStatement(
  85. "INSERT INTO chat_thread (type, created_at) VALUES ('private', NOW());"
  86. ));
  87. threadStmt->executeUpdate();
  88. std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());
  89. std::unique_ptr<sql::ResultSet> rs(
  90. stmt->executeQuery("SELECT LAST_INSERT_ID()")
  91. );
  92. if (rs->next()) {
  93. threadId = rs->getInt64(1);
  94. }
  95. else {
  96. return false;
  97. }
  98. }
  99. // 5. 插入 private_chat
  100. {
  101. std::unique_ptr<sql::PreparedStatement> pcStmt(con->_con->prepareStatement(
  102. "INSERT INTO private_chat(thread_id, user1_id, user2_id) VALUES (?, ?, ?)"
  103. ));
  104. pcStmt->setInt64(1, threadId);
  105. pcStmt->setInt(2, from);
  106. pcStmt->setInt(3, to);
  107. if (pcStmt->executeUpdate() < 0) return false;
  108. }
  109. // 6. 可选:插入初始消息到 chat_message
  110. if (apply_desc.empty() == false)
  111. {
  112. std::unique_ptr<sql::PreparedStatement> msgStmt(con->_con->prepareStatement(
  113. "INSERT INTO chat_message(thread_id, sender_id, recv_id, content,created_at, updated_at, status) VALUES (?, ?, ?, ?,NOW(),NOW(),?)"
  114. ));
  115. msgStmt->setInt64(1, threadId);
  116. msgStmt->setInt(2, to);
  117. msgStmt->setInt(3, from);
  118. msgStmt->setString(4, apply_desc);
  119. msgStmt->setInt(5, 0);
  120. if (msgStmt->executeUpdate() < 0) { return false; }
  121. std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());
  122. std::unique_ptr<sql::ResultSet> rs(
  123. stmt->executeQuery("SELECT LAST_INSERT_ID()")
  124. );
  125. if (rs->next()) {
  126. auto messageId = rs->getInt64(1);
  127. auto tx_data = std::make_shared<AddFriendMsg>();
  128. tx_data->set_sender_id(to);
  129. tx_data->set_msg_id(messageId);
  130. tx_data->set_msgcontent(apply_desc);
  131. tx_data->set_thread_id(threadId);
  132. tx_data->set_unique_id("");
  133. std::cout << "addfriend insert message success" << std::endl;
  134. chat_datas.push_back(tx_data);
  135. }
  136. else {
  137. return false;
  138. }
  139. }
  140. {
  141. std::unique_ptr<sql::PreparedStatement> msgStmt(con->_con->prepareStatement(
  142. "INSERT INTO chat_message(thread_id, sender_id, recv_id, content, created_at, updated_at, status) VALUES (?, ?, ?, ?,NOW(),NOW(),?)"
  143. ));
  144. msgStmt->setInt64(1, threadId);
  145. msgStmt->setInt(2, from);
  146. msgStmt->setInt(3, to);
  147. msgStmt->setString(4, "We are friends now!");
  148. msgStmt->setInt(5, 0);
  149. if (msgStmt->executeUpdate() < 0) { return false; }
  150. std::unique_ptr<sql::Statement> stmt(con->_con->createStatement());
  151. std::unique_ptr<sql::ResultSet> rs(
  152. stmt->executeQuery("SELECT LAST_INSERT_ID()")
  153. );
  154. if (rs->next()) {
  155. auto messageId = rs->getInt64(1);
  156. auto tx_data = std::make_shared<AddFriendMsg>();
  157. tx_data->set_sender_id(from);
  158. tx_data->set_msg_id(messageId);
  159. tx_data->set_msgcontent("We are friends now!");
  160. tx_data->set_thread_id(threadId);
  161. tx_data->set_unique_id("");
  162. chat_datas.push_back(tx_data);
  163. }
  164. else {
  165. return false;
  166. }
  167. }
  168. // 提交事务
  169. con->_con->commit();
  170. std::cout << "addfriend insert friends success" << std::endl;
  171. return true;
  172. }
  173. catch (sql::SQLException& e) {
  174. // 如果发生错误,回滚事务
  175. if (con) {
  176. con->_con->rollback();
  177. }
  178. std::cerr << "SQLException: " << e.what();
  179. std::cerr << " (MySQL error code: " << e.getErrorCode();
  180. std::cerr << ", SQLState: " << e.getSQLState() << " )" << std::endl;
  181. return false;
  182. }
  183. return true;
  184. }

服务器收到同意通知

B同意A的申请,此时B所在的服务器会将同意的通知发送到A所在的服务器

下面是A所在的服务器收到请求后,发送通知给A的逻辑

  1. Status ChatServiceImpl::NotifyAuthFriend(ServerContext* context, const AuthFriendReq* request,
  2. AuthFriendRsp* reply) {
  3. //查找用户是否在本服务器
  4. auto touid = request->touid();
  5. auto fromuid = request->fromuid();
  6. auto session = UserMgr::GetInstance()->GetSession(touid);
  7. Defer defer([request, reply]() {
  8. reply->set_error(ErrorCodes::Success);
  9. reply->set_fromuid(request->fromuid());
  10. reply->set_touid(request->touid());
  11. });
  12. //用户不在内存中则直接返回
  13. if (session == nullptr) {
  14. return Status::OK;
  15. }
  16. //在内存中则直接发送通知对方
  17. Json::Value rtvalue;
  18. rtvalue["error"] = ErrorCodes::Success;
  19. rtvalue["fromuid"] = request->fromuid();
  20. rtvalue["touid"] = request->touid();
  21. std::string base_key = USER_BASE_INFO + std::to_string(fromuid);
  22. auto user_info = std::make_shared<UserInfo>();
  23. bool b_info = GetBaseInfo(base_key, fromuid, user_info);
  24. if (b_info) {
  25. rtvalue["name"] = user_info->name;
  26. rtvalue["nick"] = user_info->nick;
  27. rtvalue["icon"] = user_info->icon;
  28. rtvalue["sex"] = user_info->sex;
  29. }
  30. else {
  31. rtvalue["error"] = ErrorCodes::UidInvalid;
  32. }
  33. for(auto& msg : request->textmsgs()) {
  34. Json::Value chat;
  35. chat["sender"] = msg.sender_id();
  36. chat["msg_id"] = msg.msg_id();
  37. chat["thread_id"] = msg.thread_id();
  38. chat["unique_id"] = msg.unique_id();
  39. chat["msg_content"] = msg.msgcontent();
  40. rtvalue["chat_datas"].append(chat);
  41. }
  42. std::string return_str = rtvalue.toStyledString();
  43. session->Send(return_str, ID_NOTIFY_AUTH_FRIEND_REQ);
  44. return Status::OK;
  45. }

客户端收到好友同意回复

当A申请B加好友,B同意后,服务器会回复给B消息,这样B的客户端要处理同意的回包

  1. _handlers.insert(ID_AUTH_FRIEND_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "Auth Friend Failed, err is Json Parse Err" << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "Auth Friend Failed, err is " << err;
  20. return;
  21. }
  22. auto name = jsonObj["name"].toString();
  23. auto nick = jsonObj["nick"].toString();
  24. auto icon = jsonObj["icon"].toString();
  25. auto sex = jsonObj["sex"].toInt();
  26. auto uid = jsonObj["uid"].toInt();
  27. std::vector<std::shared_ptr<TextChatData>> chat_datas;
  28. for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {
  29. auto send_uid = data["sender"].toInt();
  30. auto msg_id = data["msg_id"].toInt();
  31. auto thread_id = data["thread_id"].toInt();
  32. auto unique_id = data["unique_id"].toInt();
  33. auto msg_content = data["msg_content"].toString();
  34. auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,
  35. ChatMsgType::TEXT, msg_content, send_uid);
  36. chat_datas.push_back(chat_data);
  37. }
  38. auto rsp = std::make_shared<AuthRsp>(uid, name, nick, icon, sex);
  39. rsp->SetChatDatas(chat_datas);
  40. emit sig_auth_rsp(rsp);
  41. qDebug() << "Auth Friend Success " ;
  42. });

界面和好友状态更新

  1. void ChatDialog::slot_auth_rsp(std::shared_ptr<AuthRsp> auth_rsp)
  2. {
  3. qDebug() << "receive slot_auth_rsp uid is " << auth_rsp->_uid
  4. << " name is " << auth_rsp->_name << " nick is " << auth_rsp->_nick;
  5. //判断如果已经是好友则跳过
  6. auto bfriend = UserMgr::GetInstance()->CheckFriendById(auth_rsp->_uid);
  7. if (bfriend) {
  8. return;
  9. }
  10. UserMgr::GetInstance()->AddFriend(auth_rsp);
  11. int randomValue = QRandomGenerator::global()->bounded(100); // 生成0到99之间的随机整数
  12. int str_i = randomValue % strs.size();
  13. int head_i = randomValue % heads.size();
  14. int name_i = randomValue % names.size();
  15. auto* chat_user_wid = new ChatUserWid();
  16. auto chat_thread_data = std::make_shared<ChatThreadData>(auth_rsp->_uid, auth_rsp->_thread_id, 0);
  17. UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, auth_rsp->_uid);
  18. for (auto& chat_msg : auth_rsp->_chat_datas) {
  19. chat_thread_data->AppendMsg(chat_msg->GetMsgId(), chat_msg);
  20. }
  21. chat_user_wid->SetChatData(chat_thread_data);
  22. QListWidgetItem* item = new QListWidgetItem;
  23. //qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  24. item->setSizeHint(chat_user_wid->sizeHint());
  25. ui->chat_user_list->insertItem(0, item);
  26. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  27. _chat_thread_items.insert(auth_rsp->_thread_id, item);
  28. }

客户端收到好友同意通知

A加B为好友,B同意后,服务器通知A,以下为A收到通知后的处理

  1. _handlers.insert(ID_NOTIFY_AUTH_FRIEND_REQ, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "Auth Friend Failed, err is " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "Auth Friend Failed, err is " << err;
  20. return;
  21. }
  22. int from_uid = jsonObj["fromuid"].toInt();
  23. QString name = jsonObj["name"].toString();
  24. QString nick = jsonObj["nick"].toString();
  25. QString icon = jsonObj["icon"].toString();
  26. int sex = jsonObj["sex"].toInt();
  27. std::vector<std::shared_ptr<TextChatData>> chat_datas;
  28. for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {
  29. auto send_uid = data["sender"].toInt();
  30. auto msg_id = data["msg_id"].toInt();
  31. auto thread_id = data["thread_id"].toInt();
  32. auto unique_id = data["unique_id"].toInt();
  33. auto msg_content = data["msg_content"].toString();
  34. auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,
  35. ChatMsgType::TEXT, msg_content, send_uid);
  36. chat_datas.push_back(chat_data);
  37. }
  38. auto auth_info = std::make_shared<AuthInfo>(from_uid,name,
  39. nick, icon, sex);
  40. auth_info->SetChatDatas(chat_datas);
  41. emit sig_add_auth_friend(auth_info);
  42. });

界面添加好友会话状态更新

  1. void ChatDialog::slot_add_auth_friend(std::shared_ptr<AuthInfo> auth_info) {
  2. qDebug() << "receive slot_add_auth__friend uid is " << auth_info->_uid
  3. << " name is " << auth_info->_name << " nick is " << auth_info->_nick;
  4. //判断如果已经是好友则跳过
  5. auto bfriend = UserMgr::GetInstance()->CheckFriendById(auth_info->_uid);
  6. if (bfriend) {
  7. return;
  8. }
  9. UserMgr::GetInstance()->AddFriend(auth_info);
  10. auto* chat_user_wid = new ChatUserWid();
  11. auto chat_thread_data = std::make_shared<ChatThreadData>(auth_info->_uid, auth_info->_thread_id, 0);
  12. UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, auth_info->_uid);
  13. for (auto& chat_msg : auth_info->_chat_datas) {
  14. chat_thread_data->AppendMsg(chat_msg->GetMsgId(), chat_msg);
  15. }
  16. chat_user_wid->SetChatData(chat_thread_data);
  17. QListWidgetItem* item = new QListWidgetItem;
  18. //qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  19. item->setSizeHint(chat_user_wid->sizeHint());
  20. ui->chat_user_list->insertItem(0, item);
  21. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  22. _chat_thread_items.insert(auth_info->_thread_id, item);
  23. }

效果展示

image-20250622131525274

GRPC同步认证消息认证

分布式认证就是让两个客户端分别登录不同的服务器,注意因为我们修改了连接检测和记录的方式,改为通过心跳定时更新,为了避免两个客户端同时登录到一个服务器的情况,可以在一个客户端登录服务器后,另一个客户端延迟一分钟登录。

同时要注意StatusServer要将getChatServer这个函数打开

  1. ChatServer StatusServiceImpl::getChatServer() {
  2. std::lock_guard<std::mutex> guard(_server_mtx);
  3. auto minServer = _servers.begin()->second;
  4. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name);
  5. if (count_str.empty()) {
  6. //不存在则默认设置为最大
  7. minServer.con_count = INT_MAX;
  8. }
  9. else {
  10. minServer.con_count = std::stoi(count_str);
  11. }
  12. // 使用范围基于for循环
  13. for ( auto& server : _servers) {
  14. if (server.second.name == minServer.name) {
  15. continue;
  16. }
  17. auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
  18. if (count_str.empty()) {
  19. server.second.con_count = INT_MAX;
  20. }
  21. else {
  22. server.second.con_count = std::stoi(count_str);
  23. }
  24. if (server.second.con_count < minServer.con_count) {
  25. minServer = server.second;
  26. }
  27. }
  28. return minServer;
  29. }

两个客户端登录后,确保后台看到两个用户登录不同的Server

1019用户登录Server2

image-20250625212522598

1002用户登录Server1

image-20250625212740526

这样二者都登陆成功了,然后任意一方向对方发送添加好友请求,另一方同意,看到的效果如下

image-20250625212823862

聊天记录增量加载

客户端逻辑

聊天记录增量加载,可以在加载完聊天会话列表后,继续分页加载聊天信息。

因为qt支持信号和槽函数机制,所以我们可以加载完会话列表后发送, 在UserMgr中设置一个当前加载的_cur_load_chat_index用来记录将要加载的会话消息。

我们对外暴露两个接口,分别是获取当前要加载会话信息,和下次加载的会话信息

  1. std::shared_ptr<ChatThreadData> UserMgr::GetCurLoadData()
  2. {
  3. if (_cur_load_chat_index >= _chat_thread_ids.size()) {
  4. return nullptr;
  5. }
  6. auto iter = _chat_map.find(_chat_thread_ids[_cur_load_chat_index]);
  7. if (iter == _chat_map.end()) {
  8. return nullptr;
  9. }
  10. return iter.value();
  11. }

然后封装加载消息的函数

  1. void ChatDialog::loadChatMsg() {
  2. //发送聊天记录请求
  3. _cur_load_chat = UserMgr::GetInstance()->GetCurLoadData();
  4. if (_cur_load_chat == nullptr) {
  5. return;
  6. }
  7. showLoadingDlg(true);
  8. //发送请求给服务器
  9. //发送请求逻辑
  10. QJsonObject jsonObj;
  11. jsonObj["thread_id"] = _cur_load_chat->GetThreadId();
  12. jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();
  13. QJsonDocument doc(jsonObj);
  14. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  15. //发送tcp请求给chat server
  16. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);
  17. }

接下来我们在加载完会话列表后调用这个函数

  1. void ChatDialog::slot_load_chat_thread(bool load_more, int last_thread_id,
  2. std::vector<std::shared_ptr<ChatThreadInfo>> chat_threads)
  3. {
  4. for (auto& cti : chat_threads) {
  5. //先处理单聊,群聊跳过,以后添加
  6. if (cti->_type == "group") {
  7. continue;
  8. }
  9. auto uid = UserMgr::GetInstance()->GetUid();
  10. auto other_uid = 0;
  11. if (uid == cti->_user1_id) {
  12. other_uid = cti->_user2_id;
  13. }
  14. else {
  15. other_uid = cti->_user1_id;
  16. }
  17. auto chat_thread_data = std::make_shared<ChatThreadData>(other_uid, cti->_thread_id, 0);
  18. UserMgr::GetInstance()->AddChatThreadData(chat_thread_data, other_uid);
  19. auto* chat_user_wid = new ChatUserWid();
  20. chat_user_wid->SetChatData(chat_thread_data);
  21. QListWidgetItem* item = new QListWidgetItem;
  22. //qDebug()<<"chat_user_wid sizeHint is " << chat_user_wid->sizeHint();
  23. item->setSizeHint(chat_user_wid->sizeHint());
  24. ui->chat_user_list->addItem(item);
  25. ui->chat_user_list->setItemWidget(item, chat_user_wid);
  26. _chat_thread_items.insert(cti->_thread_id, item);
  27. }
  28. UserMgr::GetInstance()->SetLastChatThreadId(last_thread_id);
  29. if (load_more) {
  30. //发送请求逻辑
  31. QJsonObject jsonObj;
  32. auto uid = UserMgr::GetInstance()->GetUid();
  33. jsonObj["uid"] = uid;
  34. jsonObj["thread_id"] = last_thread_id;
  35. QJsonDocument doc(jsonObj);
  36. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  37. //发送tcp请求给chat server
  38. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_THREAD_REQ, jsonData);
  39. return;
  40. }
  41. showLoadingDlg(false);
  42. //继续加载聊天数据
  43. loadChatMsg();
  44. }

在收到服务器回复时处理消息

  1. _handlers.insert(ID_LOAD_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "parse create private chat json parse failed " << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "get create private chat failed, error is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive create private chat rsp Success";
  23. int thread_id = jsonObj["thread_id"].toInt();
  24. int last_msg_id = jsonObj["last_message_id"].toInt();
  25. bool load_more = jsonObj["load_more"].toBool();
  26. std::vector<std::shared_ptr<TextChatData>> chat_datas;
  27. for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {
  28. auto send_uid = data["sender"].toInt();
  29. auto msg_id = data["msg_id"].toInt();
  30. auto thread_id = data["thread_id"].toInt();
  31. auto unique_id = data["unique_id"].toInt();
  32. auto msg_content = data["msg_content"].toString();
  33. QString chat_time = data["chat_time"].toString();
  34. auto chat_data = std::make_shared<TextChatData>(msg_id, thread_id, ChatFormType::PRIVATE,
  35. ChatMsgType::TEXT, msg_content, send_uid, chat_time);
  36. chat_datas.push_back(chat_data);
  37. }
  38. //发送信号通知界面
  39. emit sig_load_chat_msg(thread_id, last_msg_id, load_more, chat_datas);
  40. });

界面收到sig_load_chat_msg后添加消息,并且判断是否还有剩余消息加载

  1. void ChatDialog::slot_load_chat_msg(int thread_id, int msg_id, bool load_more, std::vector<std::shared_ptr<TextChatData>> msglists)
  2. {
  3. _cur_load_chat->SetLastMsgId(msg_id);
  4. //加载聊天信息
  5. for (auto& chat_msg : msglists) {
  6. _cur_load_chat->AppendMsg(chat_msg->GetMsgId(), chat_msg);
  7. }
  8. //还有未加载完的消息,就继续加载
  9. if (load_more) {
  10. //发送请求给服务器
  11. //发送请求逻辑
  12. QJsonObject jsonObj;
  13. jsonObj["thread_id"] = _cur_load_chat->GetThreadId();
  14. jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();
  15. QJsonDocument doc(jsonObj);
  16. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  17. //发送tcp请求给chat server
  18. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);
  19. return;
  20. }
  21. //获取下一个chat_thread
  22. _cur_load_chat = UserMgr::GetInstance()->GetNextLoadData();
  23. //都加载完了
  24. if(!_cur_load_chat){
  25. //更新聊天界面信息
  26. SetSelectChatItem();
  27. SetSelectChatPage();
  28. showLoadingDlg(false);
  29. return;
  30. }
  31. //继续加载下一个聊天
  32. //发送请求给服务器
  33. //发送请求逻辑
  34. QJsonObject jsonObj;
  35. jsonObj["thread_id"] = _cur_load_chat->GetThreadId();
  36. jsonObj["message_id"] = _cur_load_chat->GetLastMsgId();
  37. QJsonDocument doc(jsonObj);
  38. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  39. //发送tcp请求给chat server
  40. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_LOAD_CHAT_MSG_REQ, jsonData);
  41. }

服务器逻辑

注册消息

  1. _fun_callbacks[ID_LOAD_CHAT_MSG_REQ] = std::bind(&LogicSystem::LoadChatMsg, this,
  2. placeholders::_1, placeholders::_2, placeholders::_3);

具体逻辑处理

  1. void LogicSystem::LoadChatMsg(std::shared_ptr<CSession> session,
  2. const short& msg_id, const string& msg_data) {
  3. Json::Reader reader;
  4. Json::Value root;
  5. reader.parse(msg_data, root);
  6. auto thread_id = root["thread_id"].asInt();
  7. auto message_id = root["message_id"].asInt();
  8. Json::Value rtvalue;
  9. rtvalue["error"] = ErrorCodes::Success;
  10. rtvalue["thread_id"] = thread_id;
  11. Defer defer([this, &rtvalue, session]() {
  12. std::string return_str = rtvalue.toStyledString();
  13. session->Send(return_str, ID_LOAD_CHAT_MSG_RSP);
  14. });
  15. int page_size = 10;
  16. std::shared_ptr<PageResult> res = MysqlMgr::GetInstance()->LoadChatMsg(thread_id, message_id, page_size);
  17. if (!res) {
  18. rtvalue["error"] = ErrorCodes::LOAD_CHAT_FAILED;
  19. return;
  20. }
  21. rtvalue["last_message_id"] = res->next_cursor;
  22. rtvalue["load_more"] = res->load_more;
  23. for (auto& chat : res->messages) {
  24. Json::Value chat_data;
  25. chat_data["sender"] = chat.sender_id;
  26. chat_data["msg_id"] = chat.message_id;
  27. chat_data["thread_id"] = chat.thread_id;
  28. chat_data["unique_id"] = 0;
  29. chat_data["msg_content"] = chat.content;
  30. chat_data["chat_time"] = chat.chat_time;
  31. rtvalue["chat_datas"].append(chat_data);
  32. }
  33. }

数据库新增根据thread_idmessage_id返回分页数据

  1. std::shared_ptr<PageResult> MysqlMgr::LoadChatMsg(int threadId, int lastId, int pageSize)
  2. {
  3. return _dao.LoadChatMsg(threadId, lastId, pageSize);
  4. }

具体在MysqlDao层面实现分页加载

  1. std::shared_ptr<PageResult> MysqlDao::LoadChatMsg(int thread_id, int last_message_id, int page_size)
  2. {
  3. auto con = pool_->getConnection();
  4. if (!con) {
  5. return nullptr;
  6. }
  7. Defer defer([this, &con]() {
  8. pool_->returnConnection(std::move(con));
  9. });
  10. auto& conn = con->_con;
  11. try {
  12. auto page_res = std::make_shared<PageResult>();
  13. page_res->load_more = false;
  14. page_res->next_cursor = last_message_id;
  15. // SQL:多取一条,用于判断是否还有更多
  16. const std::string sql = R"(
  17. SELECT message_id, thread_id, sender_id, recv_id, content,
  18. created_at, updated_at, status
  19. FROM chat_message
  20. WHERE thread_id = ?
  21. AND message_id > ?
  22. ORDER BY message_id ASC
  23. LIMIT ?
  24. )";
  25. uint32_t fetch_limit = page_size + 1;
  26. auto pstmt = std::unique_ptr<sql::PreparedStatement>(
  27. conn->prepareStatement(sql)
  28. );
  29. pstmt->setInt(1, thread_id);
  30. pstmt->setInt(2, last_message_id);
  31. pstmt->setInt(3, fetch_limit);
  32. auto rs = std::unique_ptr<sql::ResultSet>(pstmt->executeQuery());
  33. // 读取 fetch_limit 条记录
  34. while (rs->next()) {
  35. ChatMessage msg;
  36. msg.message_id = rs->getUInt64("message_id");
  37. msg.thread_id = rs->getUInt64("thread_id");
  38. msg.sender_id = rs->getUInt64("sender_id");
  39. msg.recv_id = rs->getUInt64("recv_id");
  40. msg.content = rs->getString("content");
  41. msg.chat_time = rs->getString("created_at");
  42. msg.status = rs->getInt("status");
  43. page_res->messages.push_back(std::move(msg));
  44. }
  45. return page_res;
  46. }
  47. catch (sql::SQLException& e) {
  48. std::cerr << "SQLException: " << e.what() << std::endl;
  49. conn->rollback();
  50. return nullptr;
  51. }
  52. return nullptr;
  53. }

效果展示

image-20250702125912799

发送和接收消息同步

客户端缓存发送消息

我们需要在客户端缓存一下发送的消息,等到服务器回复后再将收到的消息放入ChatThreadData中。

为了标识消息的唯一性,我们需要在客户端生成唯一unique_id,构造成ChatTextData先放到ChatThreadData中存起来。

  1. //已发送的消息,还未收到回应的。
  2. QMap<QString, std::shared_ptr<TextChatData>> _msg_unrsp_map;

实现发送逻辑

  1. void ChatPage::on_send_btn_clicked()
  2. {
  3. if (_chat_data == nullptr) {
  4. qDebug() << "friend_info is empty";
  5. return;
  6. }
  7. auto user_info = UserMgr::GetInstance()->GetUserInfo();
  8. auto pTextEdit = ui->chatEdit;
  9. ChatRole role = ChatRole::Self;
  10. QString userName = user_info->_name;
  11. QString userIcon = user_info->_icon;
  12. const QVector<MsgInfo>& msgList = pTextEdit->getMsgList();
  13. QJsonObject textObj;
  14. QJsonArray textArray;
  15. int txt_size = 0;
  16. auto thread_id = _chat_data->GetThreadId();
  17. for(int i=0; i<msgList.size(); ++i)
  18. {
  19. //消息内容长度不合规就跳过
  20. if(msgList[i].content.length() > 1024){
  21. continue;
  22. }
  23. QString type = msgList[i].msgFlag;
  24. ChatItemBase *pChatItem = new ChatItemBase(role);
  25. pChatItem->setUserName(userName);
  26. pChatItem->setUserIcon(QPixmap(userIcon));
  27. QWidget *pBubble = nullptr;
  28. //生成唯一id
  29. QUuid uuid = QUuid::createUuid();
  30. //转为字符串
  31. QString uuidString = uuid.toString();
  32. if(type == "text")
  33. {
  34. pBubble = new TextBubble(role, msgList[i].content);
  35. if(txt_size + msgList[i].content.length()> 1024){
  36. textObj["fromuid"] = user_info->_uid;
  37. textObj["touid"] = _chat_data->GetOtherId();
  38. textObj["thread_id"] = thread_id;
  39. textObj["text_array"] = textArray;
  40. QJsonDocument doc(textObj);
  41. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  42. //发送并清空之前累计的文本列表
  43. txt_size = 0;
  44. textArray = QJsonArray();
  45. textObj = QJsonObject();
  46. //发送tcp请求给chat server
  47. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);
  48. }
  49. //将bubble和uid绑定,以后可以等网络返回消息后设置是否送达
  50. //_bubble_map[uuidString] = pBubble;
  51. txt_size += msgList[i].content.length();
  52. QJsonObject obj;
  53. QByteArray utf8Message = msgList[i].content.toUtf8();
  54. auto content = QString::fromUtf8(utf8Message);
  55. obj["content"] = content;
  56. obj["unique_id"] = uuidString;
  57. textArray.append(obj);
  58. //todo... 注意,此处先按私聊处理
  59. auto txt_msg = std::make_shared<TextChatData>(uuidString, thread_id, ChatFormType::PRIVATE,
  60. ChatMsgType::TEXT, content, user_info->_uid, 0);
  61. //将未回复的消息加入到未回复列表中,以便后续处理
  62. _chat_data->AppendUnRspMsg(uuidString,txt_msg);
  63. }
  64. else if(type == "image")
  65. {
  66. pBubble = new PictureBubble(QPixmap(msgList[i].content) , role);
  67. }
  68. else if(type == "file")
  69. {
  70. }
  71. //发送消息
  72. if(pBubble != nullptr)
  73. {
  74. pChatItem->setWidget(pBubble);
  75. pChatItem->setStatus(0);
  76. ui->chat_data_list->appendChatItem(pChatItem);
  77. _unrsp_item_map[uuidString] = pChatItem;
  78. }
  79. }
  80. qDebug() << "textArray is " << textArray ;
  81. //发送给服务器
  82. textObj["text_array"] = textArray;
  83. textObj["fromuid"] = user_info->_uid;
  84. textObj["touid"] = _chat_data->GetOtherId();
  85. textObj["thread_id"] = thread_id;
  86. QJsonDocument doc(textObj);
  87. QByteArray jsonData = doc.toJson(QJsonDocument::Compact);
  88. //发送并清空之前累计的文本列表
  89. txt_size = 0;
  90. textArray = QJsonArray();
  91. textObj = QJsonObject();
  92. //发送tcp请求给chat server
  93. emit TcpMgr::GetInstance()->sig_send_data(ReqId::ID_TEXT_CHAT_MSG_REQ, jsonData);
  94. }

相比于之前,我们在json中增加了unique_idthread_id字段,服务器收到后根据thread_id生成消息放入到数据库,并携带unique_id回传给客户端。

客户端缓存消息放入UserMgr

  1. //将未回复的消息加入到未回复列表中,以便后续处理
  2. _chat_data->AppendUnRspMsg(uuidString,txt_msg);

此外,客户端需要设置聊天文本状态为未回复

  1. pChatItem->setStatus(0);

切换聊天不丢失状态

如果此时切换页面,再切回来,也要保证之前服务器未回复的消息能重新加载

切换的逻辑在

  1. void ChatDialog::SetSelectChatPage(int thread_id)
  2. {
  3. if (ui->chat_user_list->count() <= 0) {
  4. return;
  5. }
  6. if (thread_id == 0) {
  7. auto item = ui->chat_user_list->item(0);
  8. //转为widget
  9. QWidget* widget = ui->chat_user_list->itemWidget(item);
  10. if (!widget) {
  11. return;
  12. }
  13. auto con_item = qobject_cast<ChatUserWid*>(widget);
  14. if (!con_item) {
  15. return;
  16. }
  17. //设置信息
  18. auto chat_data = con_item->GetChatData();
  19. ui->chat_page->SetChatData(chat_data);
  20. return;
  21. }
  22. auto find_iter = _chat_thread_items.find(thread_id);
  23. if (find_iter == _chat_thread_items.end()) {
  24. return;
  25. }
  26. //转为widget
  27. QWidget* widget = ui->chat_user_list->itemWidget(find_iter.value());
  28. if (!widget) {
  29. return;
  30. }
  31. //判断转化为自定义的widget
  32. // 对自定义widget进行操作, 将item 转化为基类ListItemBase
  33. ListItemBase* customItem = qobject_cast<ListItemBase*>(widget);
  34. if (!customItem) {
  35. qDebug() << "qobject_cast<ListItemBase*>(widget) is nullptr";
  36. return;
  37. }
  38. auto itemType = customItem->GetItemType();
  39. if (itemType == CHAT_USER_ITEM) {
  40. auto con_item = qobject_cast<ChatUserWid*>(customItem);
  41. if (!con_item) {
  42. return;
  43. }
  44. //设置信息
  45. auto chat_data = con_item->GetChatData();
  46. ui->chat_page->SetChatData(chat_data);
  47. return;
  48. }
  49. }

其中SetChatData是设置页面聊天信息列表

  1. void ChatPage::SetChatData(std::shared_ptr<ChatThreadData> chat_data) {
  2. _chat_data = chat_data;
  3. auto other_id = _chat_data->GetOtherId();
  4. if(other_id == 0) {
  5. //说明是群聊
  6. ui->title_lb->setText(_chat_data->GetGroupName());
  7. //todo...加载群聊信息和成员信息
  8. return;
  9. }
  10. //私聊
  11. auto friend_info = UserMgr::GetInstance()->GetFriendById(other_id);
  12. if (friend_info == nullptr) {
  13. return;
  14. }
  15. ui->title_lb->setText(friend_info->_name);
  16. ui->chat_data_list->removeAllItem();
  17. _unrsp_item_map.clear();
  18. for(auto & msg : chat_data->GetMsgMapRef()){
  19. AppendChatMsg(msg);
  20. }
  21. for (auto& msg : chat_data->GetMsgUnRspRef()) {
  22. AppendChatMsg(msg);
  23. }
  24. }

这样我们可以加载服务器已经回复的和服务器未回复的。保证完全,具体添加逻辑

  1. void ChatPage::AppendChatMsg(std::shared_ptr<ChatDataBase> msg)
  2. {
  3. auto self_info = UserMgr::GetInstance()->GetUserInfo();
  4. ChatRole role;
  5. if (msg->GetSendUid() == self_info->_uid) {
  6. role = ChatRole::Self;
  7. ChatItemBase* pChatItem = new ChatItemBase(role);
  8. pChatItem->setUserName(self_info->_name);
  9. pChatItem->setUserIcon(QPixmap(self_info->_icon));
  10. QWidget* pBubble = nullptr;
  11. if (msg->GetMsgType() == ChatMsgType::TEXT) {
  12. pBubble = new TextBubble(role, msg->GetMsgContent());
  13. }
  14. pChatItem->setWidget(pBubble);
  15. auto status = msg->GetStatus();
  16. pChatItem->setStatus(status);
  17. ui->chat_data_list->appendChatItem(pChatItem);
  18. if (status == 0) {
  19. _unrsp_item_map[msg->GetUniqueId()] = pChatItem;
  20. }
  21. }
  22. else {
  23. role = ChatRole::Other;
  24. ChatItemBase* pChatItem = new ChatItemBase(role);
  25. auto friend_info = UserMgr::GetInstance()->GetFriendById(msg->GetSendUid());
  26. if (friend_info == nullptr) {
  27. return;
  28. }
  29. pChatItem->setUserName(friend_info->_name);
  30. pChatItem->setUserIcon(QPixmap(friend_info->_icon));
  31. QWidget* pBubble = nullptr;
  32. if (msg->GetMsgType() == ChatMsgType::TEXT) {
  33. pBubble = new TextBubble(role, msg->GetMsgContent());
  34. }
  35. pChatItem->setWidget(pBubble);
  36. auto status = msg->GetStatus();
  37. pChatItem->setStatus(status);
  38. ui->chat_data_list->appendChatItem(pChatItem);
  39. if (status == 0) {
  40. _unrsp_item_map[msg->GetUniqueId()] = pChatItem;
  41. }
  42. }
  43. }

其中_unrsp_item_map是聊天页面上的服务器未回复的聊天记录的,每次切换页面清掉,再重新创建加载。

这么做效率不高,后期给大家介绍Module View Delegate模式去优化聊天数据加载和管理。

这里先把持久化存储功能先实现再说。

客户端收到服务器回复

收到服务器回复后,需要组织数据发送给ChatDialog界面,将未回复的消息更新为已回复。

  1. _handlers.insert(ID_TEXT_CHAT_MSG_RSP, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "Chat Msg Rsp Failed, err is Json Parse Err" << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "Chat Msg Rsp Failed, err is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive Text Chat Rsp Success " ;
  23. //收到消息后转发给页面
  24. auto thread_id = jsonObj["thread_id"].toInt();
  25. auto sender = jsonObj["fromuid"].toInt();
  26. std::vector<std::shared_ptr<TextChatData>> chat_datas;
  27. for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {
  28. auto msg_id = data["message_id"].toInt();
  29. auto unique_id = data["unique_id"].toString();
  30. auto msg_content = data["content"].toString();
  31. QString chat_time = data["chat_time"].toString();
  32. int status = data["status"].toInt();
  33. auto chat_data = std::make_shared<TextChatData>(msg_id,unique_id, thread_id, ChatFormType::PRIVATE,
  34. ChatMsgType::TEXT, msg_content, sender, status, chat_time);
  35. chat_datas.push_back(chat_data);
  36. }
  37. //发送信号通知界面
  38. emit sig_chat_msg_rsp(thread_id, chat_datas);
  39. });

将信号和槽函数连接

  1. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_chat_msg_rsp, this, &ChatDialog::slot_add_chat_msg);

会触发槽函数, 槽函数内部检测消息,将消息存储到已经回复列表中。

  1. void ChatDialog::slot_add_chat_msg(int thread_id, std::vector<std::shared_ptr<TextChatData>> msglists) {
  2. auto chat_data = UserMgr::GetInstance()->GetChatThreadByThreadId(thread_id);
  3. if (chat_data == nullptr) {
  4. return;
  5. }
  6. //将消息放入数据中管理
  7. for (auto& msg : msglists) {
  8. chat_data->MoveMsg(msg);
  9. if (_cur_chat_thread_id != thread_id) {
  10. continue;
  11. }
  12. //更新聊天界面信息
  13. ui->chat_page->UpdateChatStatus(msg->GetUniqueId(),msg->GetStatus());
  14. }
  15. }

转移逻辑, 其实就是去未回复中查找对应消息,如果有就移动到已回复列表,如果没有就直接将回复的消息插入已回复列表中

  1. void ChatThreadData::MoveMsg(std::shared_ptr<ChatDataBase> msg) {
  2. auto iter = _msg_unrsp_map.find(msg->GetUniqueId());
  3. if (iter == _msg_unrsp_map.end()) {
  4. AddMsg(msg);
  5. return;
  6. }
  7. iter.value()->SetStatus(2);
  8. AddMsg(iter.value());
  9. _msg_unrsp_map.erase(iter);
  10. }
  11. void ChatThreadData::AddMsg(std::shared_ptr<ChatDataBase> msg)
  12. {
  13. _msg_map.insert(msg->GetMsgId(), msg);
  14. _last_msg = msg->GetMsgContent();
  15. _last_msg_id = msg->GetMsgId();
  16. }

对端收到消息通知

客户端对端收到通知消息

  1. _handlers.insert(ID_NOTIFY_TEXT_CHAT_MSG_REQ, [this](ReqId id, int len, QByteArray data) {
  2. Q_UNUSED(len);
  3. qDebug() << "handle id is " << id << " data is " << data;
  4. // 将QByteArray转换为QJsonDocument
  5. QJsonDocument jsonDoc = QJsonDocument::fromJson(data);
  6. // 检查转换是否成功
  7. if (jsonDoc.isNull()) {
  8. qDebug() << "Failed to create QJsonDocument.";
  9. return;
  10. }
  11. QJsonObject jsonObj = jsonDoc.object();
  12. if (!jsonObj.contains("error")) {
  13. int err = ErrorCodes::ERR_JSON;
  14. qDebug() << "Notify Chat Msg Failed, err is Json Parse Err" << err;
  15. return;
  16. }
  17. int err = jsonObj["error"].toInt();
  18. if (err != ErrorCodes::SUCCESS) {
  19. qDebug() << "Notify Chat Msg Failed, err is " << err;
  20. return;
  21. }
  22. qDebug() << "Receive Text Chat Notify Success " ;
  23. //收到消息后转发给页面
  24. auto thread_id = jsonObj["thread_id"].toInt();
  25. auto sender = jsonObj["fromuid"].toInt();
  26. std::vector<std::shared_ptr<TextChatData>> chat_datas;
  27. for (const QJsonValue& data : jsonObj["chat_datas"].toArray()) {
  28. auto msg_id = data["message_id"].toInt();
  29. auto unique_id = data["unique_id"].toString();
  30. auto msg_content = data["content"].toString();
  31. QString chat_time = data["chat_time"].toString();
  32. int status = data["status"].toInt();
  33. auto chat_data = std::make_shared<TextChatData>(msg_id, unique_id, thread_id, ChatFormType::PRIVATE,
  34. ChatMsgType::TEXT, msg_content, sender, status, chat_time);
  35. chat_datas.push_back(chat_data);
  36. }
  37. emit sig_text_chat_msg(chat_datas);
  38. });

这个消息连接槽函数

  1. //连接对端消息通知
  2. connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_text_chat_msg,
  3. this, &ChatDialog::slot_text_chat_msg);

因为被通知,可能此时不在对应的会话中

  1. void ChatDialog::slot_text_chat_msg(std::vector<std::shared_ptr<TextChatData>> msglists)
  2. {
  3. for (auto& msg : msglists) {
  4. //更新数据
  5. auto thread_id = msg->GetThreadId();
  6. auto thread_data = UserMgr::GetInstance()->GetChatThreadByThreadId(thread_id);
  7. thread_data->AddMsg(msg);
  8. if (_cur_chat_thread_id != thread_id) {
  9. continue;
  10. }
  11. ui->chat_page->AppendChatMsg(msg);
  12. }
  13. }

服务器逻辑

服务器在收到聊天消息后要将消息入库,并且判断对方是否通服,如果不是一个服务器,则用grpc通知对方所在的服务器,再通过对方服务器的Session通知对方。

如果是同一个服务器,则直接通过Session通知对方

  1. void LogicSystem::DealChatTextMsg(std::shared_ptr<CSession> session, const short& msg_id, const string& msg_data) {
  2. Json::Reader reader;
  3. Json::Value root;
  4. reader.parse(msg_data, root);
  5. auto uid = root["fromuid"].asInt();
  6. auto touid = root["touid"].asInt();
  7. const Json::Value arrays = root["text_array"];
  8. Json::Value rtvalue;
  9. rtvalue["error"] = ErrorCodes::Success;
  10. rtvalue["fromuid"] = uid;
  11. rtvalue["touid"] = touid;
  12. auto thread_id = root["thread_id"].asInt();
  13. rtvalue["thread_id"] = thread_id;
  14. std::vector<std::shared_ptr<ChatMessage>> chat_datas;
  15. auto timestamp = getCurrentTimestamp();
  16. for (const auto& txt_obj : arrays) {
  17. auto content = txt_obj["content"].asString();
  18. auto unique_id = txt_obj["unique_id"].asString();
  19. std::cout << "content is " << content << std::endl;
  20. std::cout << "unique_id is " << unique_id << std::endl;
  21. auto chat_msg = std::make_shared<ChatMessage>();
  22. chat_msg->chat_time = timestamp;
  23. chat_msg->sender_id = uid;
  24. chat_msg->recv_id = touid;
  25. chat_msg->unique_id = unique_id;
  26. chat_msg->thread_id = thread_id;
  27. chat_msg->content = content;
  28. chat_msg->status = 2;
  29. chat_datas.push_back(chat_msg);
  30. }
  31. //插入数据库
  32. MysqlMgr::GetInstance()->AddChatMsg(chat_datas);
  33. for (const auto& chat_data : chat_datas) {
  34. Json::Value chat_msg;
  35. chat_msg["message_id"] = chat_data->message_id;
  36. chat_msg["unique_id"] = chat_data->unique_id;
  37. chat_msg["content"] = chat_data->content;
  38. chat_msg["status"] = chat_data->status;
  39. chat_msg["chat_time"] = chat_data->chat_time;
  40. rtvalue["chat_datas"].append(chat_msg);
  41. }
  42. Defer defer([this, &rtvalue, session]() {
  43. std::string return_str = rtvalue.toStyledString();
  44. session->Send(return_str, ID_TEXT_CHAT_MSG_RSP);
  45. });
  46. //查询redis 查找touid对应的server ip
  47. auto to_str = std::to_string(touid);
  48. auto to_ip_key = USERIPPREFIX + to_str;
  49. std::string to_ip_value = "";
  50. bool b_ip = RedisMgr::GetInstance()->Get(to_ip_key, to_ip_value);
  51. if (!b_ip) {
  52. return;
  53. }
  54. auto& cfg = ConfigMgr::Inst();
  55. auto self_name = cfg["SelfServer"]["Name"];
  56. //直接通知对方有认证通过消息
  57. if (to_ip_value == self_name) {
  58. auto session = UserMgr::GetInstance()->GetSession(touid);
  59. if (session) {
  60. //在内存中则直接发送通知对方
  61. std::string return_str = rtvalue.toStyledString();
  62. session->Send(return_str, ID_NOTIFY_TEXT_CHAT_MSG_REQ);
  63. }
  64. return ;
  65. }
  66. TextChatMsgReq text_msg_req;
  67. text_msg_req.set_fromuid(uid);
  68. text_msg_req.set_touid(touid);
  69. text_msg_req.set_thread_id(thread_id);
  70. for (const auto& chat_data : chat_datas) {
  71. auto *text_msg = text_msg_req.add_textmsgs();
  72. text_msg->set_unique_id(chat_data->unique_id);
  73. text_msg->set_msgcontent(chat_data->content);
  74. text_msg->set_msg_id(chat_data->message_id);
  75. text_msg->set_chat_time(chat_data->chat_time);
  76. }
  77. //发送通知 todo...
  78. ChatGrpcClient::GetInstance()->NotifyTextChatMsg(to_ip_value, text_msg_req, rtvalue);
  79. }

数据库处理

  1. bool MysqlMgr::AddChatMsg(std::vector<std::shared_ptr<ChatMessage>>& chat_datas) {
  2. return _dao.AddChatMsg(chat_datas);
  3. }

Dao层做了详细的数据库操作

  1. bool MysqlDao::AddChatMsg(std::vector<std::shared_ptr<ChatMessage>>& chat_datas) {
  2. auto con = pool_->getConnection();
  3. if (!con) {
  4. return false;
  5. }
  6. Defer defer([this, &con]() {
  7. pool_->returnConnection(std::move(con));
  8. });
  9. auto& conn = con->_con;
  10. try {
  11. //关闭自动提交,以手动管理事务
  12. conn->setAutoCommit(false);
  13. auto pstmt = std::unique_ptr<sql::PreparedStatement>(
  14. conn->prepareStatement(
  15. "INSERT INTO chat_message "
  16. "(thread_id, sender_id, recv_id, content, created_at, updated_at, status) "
  17. "VALUES (?, ?, ?, ?, ?, ?, ?)"
  18. )
  19. );
  20. for (auto& msg : chat_datas) {
  21. // 普通字段
  22. pstmt->setUInt64(1, msg->thread_id);
  23. pstmt->setUInt64(2, msg->sender_id);
  24. pstmt->setUInt64(3, msg->recv_id);
  25. pstmt->setString(4, msg->content);
  26. pstmt->setString(5, msg->chat_time); // created_at
  27. pstmt->setString(6, msg->chat_time); // updated_at
  28. pstmt->setInt(7, msg->status);
  29. pstmt->executeUpdate();
  30. // 2. 取 LAST_INSERT_ID()
  31. std::unique_ptr<sql::Statement> keyStmt(
  32. conn->createStatement()
  33. );
  34. std::unique_ptr<sql::ResultSet> rs(
  35. keyStmt->executeQuery("SELECT LAST_INSERT_ID()")
  36. );
  37. if (rs->next()) {
  38. msg->message_id = rs->getUInt64(1);
  39. }
  40. else {
  41. continue;
  42. }
  43. }
  44. conn->commit();
  45. return true;
  46. }
  47. catch (sql::SQLException& e) {
  48. std::cerr << "SQLException: " << e.what() << std::endl;
  49. conn->rollback();
  50. return false;
  51. }
  52. return true;
  53. }

grpc协议完善

  1. message TextChatMsgReq {
  2. int32 fromuid = 1;
  3. int32 touid = 2;
  4. int32 thread_id = 3;
  5. repeated TextChatData textmsgs = 4;
  6. }
  7. message TextChatData{
  8. string unique_id = 1;
  9. int32 msg_id = 2;
  10. string msgcontent = 3;
  11. string chat_time = 4;
  12. }
  13. message TextChatMsgRsp {
  14. int32 error = 1;
  15. int32 fromuid = 2;
  16. int32 touid = 3;
  17. int32 thread_id = 4;
  18. repeated TextChatData textmsgs = 5;
  19. }

对端服务器处理

如果客户不在本服,则通知对端服务处理

  1. Status ChatServiceImpl::NotifyTextChatMsg(::grpc::ServerContext* context,
  2. const TextChatMsgReq* request, TextChatMsgRsp* reply) {
  3. //查找用户是否在本服务器
  4. auto touid = request->touid();
  5. auto session = UserMgr::GetInstance()->GetSession(touid);
  6. reply->set_error(ErrorCodes::Success);
  7. //用户不在内存中则直接返回
  8. if (session == nullptr) {
  9. return Status::OK;
  10. }
  11. //在内存中则直接发送通知对方
  12. Json::Value rtvalue;
  13. rtvalue["error"] = ErrorCodes::Success;
  14. rtvalue["fromuid"] = request->fromuid();
  15. rtvalue["touid"] = request->touid();
  16. rtvalue["thread_id"] = request->thread_id();
  17. //将聊天数据组织为数组
  18. Json::Value text_array;
  19. for (auto& msg : request->textmsgs()) {
  20. Json::Value element;
  21. element["content"] = msg.msgcontent();
  22. element["unique_id"] = msg.unique_id();
  23. element["message_id"] = msg.msg_id();
  24. element["chat_time"] = msg.chat_time();
  25. text_array.append(element);
  26. }
  27. rtvalue["chat_datas"] = text_array;
  28. std::string return_str = rtvalue.toStyledString();
  29. session->Send(return_str, ID_NOTIFY_TEXT_CHAT_MSG_REQ);
  30. return Status::OK;
  31. }

验证效果

image-20250725233631386

待完善部分

目前切换页面会将之前的记录删掉,这样每次重新加载会影响效率。

考虑以后采用多页缓存机制。

以后用Model View Delegate改造数据存储模式。

使用 Model/View 架构(QListView + QAbstractListModel + Delegate)

  • 思路:不要手动往布局里插 widget,而是把 “一条聊天消息” 抽象成一个数据结构,存到自定义的 QAbstractListModel
  • 在右侧放一个 QListView,并为它写一个 QStyledItemDelegate,统一负责绘制消息气泡、头像、时间等。
  • 优点:Qt 的视图会自动做 行缓存(view recycling)、懒加载 等优化,数据量大也能保持流畅。
  • 切换用户:只需 model->setMessages(userMessages)(内部发 beginResetModel()/endResetModel()),视图自动刷新。

方案一:在同一个 Model 里 reset 数据

  1. 维护一个消息列表

    1. class ChatModel : public QAbstractListModel {
    2. QVector<Message> m_msgs;
    3. public:
    4. // 必要的 override:rowCount(), data(), roleNames()...
    5. void setMessages(const QVector<Message>& msgs) {
    6. beginResetModel();
    7. m_msgs = msgs;
    8. endResetModel();
    9. }
    10. };
  2. 切换用户时

    1. // 假设你有一个 ChatModel* model 和 QListView* listView
    2. // listView->setModel(model); // 已经在初始化时做过一次
    3. void onUserClicked(const User& u) {
    4. QVector<Message> msgs = loadMessagesFromDb(u.id);
    5. model->setMessages(msgs);
    6. // 可选:滚到最底部
    7. listView->scrollToBottom();
    8. }
  3. 优点

    • 结构简单,一处 model,view 自动刷新。
    • 不需要销毁或创建 widget,性能最佳。

方案二:每个用户一个 Model,切换指针

如果你希望把每个用户的数据和 model 分开管理,也可以为每个用户维护独立的 ChatModel

  1. QMap<UserId, ChatModel*> modelPool;
  2. void onUserClicked(const User& u) {
  3. if (!modelPool.contains(u.id)) {
  4. // 第一次点击,创建并加载
  5. ChatModel* m = new ChatModel(this);
  6. m->setMessages(loadMessagesFromDb(u.id));
  7. modelPool[u.id] = m;
  8. }
  9. listView->setModel(modelPool[u.id]);
  10. listView->scrollToBottom();
  11. }
  • 优点:切换立刻就有缓存好的数据,不用每次都从数据库/网络加载。
  • 缺点:如果用户特别多,内存开销会比较大。

更细粒度的更新

如果你不想一次 beginResetModel()/endResetModel() 重刷全表,还可以在 model 里实现增删改接口:

  1. void ChatModel::appendMessage(const Message& m) {
  2. beginInsertRows(QModelIndex(), m_msgs.size(), m_msgs.size());
  3. m_msgs.append(m);
  4. endInsertRows();
  5. }
  6. void ChatModel::clearMessages() {
  7. beginRemoveRows(QModelIndex(), 0, m_msgs.size()-1);
  8. m_msgs.clear();
  9. endRemoveRows();
  10. }
  • 切换用户时先 clearMessages(),然后循环 appendMessage()
  • 这样 view 能做更细粒度的动画或局部刷新。

总结

  • 最简单:一个 model,内部维护 QVector<Message>,切换时调用 setMessages()
  • 缓存多用户:给每个用户分配一个 model,切换时调用 listView->setModel(...)
  • 增量更新:用 beginInsertRows/beginRemoveRows 实现局部刷新。

选哪种方案,取决于你的聊天数据量和内存/加载开销:

  • 少量用户、消息量大 → 方案一(reset)+ 分页加载
  • 用户量多、切换频繁 → 方案二(model 池)
  • 想要炫酷的动画或更精细性能 → 增量更新。
热门评论

热门文章

  1. 聊天项目(28) 分布式服务通知好友申请

    喜欢(507) 浏览(6245)
  2. 使用hexo搭建个人博客

    喜欢(533) 浏览(12247)
  3. Linux环境搭建和编码

    喜欢(594) 浏览(14001)
  4. Qt环境搭建

    喜欢(517) 浏览(25531)
  5. vscode搭建windows C++开发环境

    喜欢(596) 浏览(86192)

最新评论

  1. visual studio配置boost库 一giao里我离giaogiao:请问是修改成这样吗:.\b2.exe toolset=MinGW
  2. 聊天项目(13) 重置密码功能 Doraemon:万一一个用户多个邮箱呢 有可能的
  3. 堆排序 secondtonone1:堆排序非常实用,定时器就是这个原理制作的。
  4. 再谈单例模式 secondtonone1:是的,C++11以后返回局部static变量对象能保证线程安全了。
  5. 解决博客回复区被脚本注入的问题 secondtonone1:走到现在我忽然明白一个道理,无论工作也好生活也罢,最重要的是开心,即使一份安稳的工作不能给我带来事业上的积累也要合理的舍弃,所以我还是想去做喜欢的方向。
  6. 无锁并发队列 TenThousandOne:_head  和 _tail  替换为原子变量。那里pop的逻辑,val = _data[h] 可以移到循环外面吗
  7. 聊天项目(7) visualstudio配置grpc diablorrr:cmake文件得改一下 find_package(Boost REQUIRED COMPONENTS system filesystem),要加上filesystem。在target_link_libraries中也同样加上
  8. 答疑汇总(thread,async源码分析) Yagus:如果引用计数为0,则会执行 future 的析构进而等待任务执行完成,那么看到的输出将是 这边应该不对吧,std::future析构只在这三种情况都满足的时候才回block: 1.共享状态是std::async 创造的(类型是_Task_async_state) 2.共享状态没有ready 3.这个future是共享状态的最后一个引用 这边共享状态类型是“_Package_state”,引用计数即使为0也不应该block啊
  9. Qt 对话框 Spade2077:QDialog w(); //这里是不是不需要带括号
  10. string类 WangQi888888:确实错了,应该是!isspace(sind[index]). 否则不进入循环,还是原来的字符串“some string”
  11. 构造函数 secondtonone1:构造函数是类的基础知识,要着重掌握
  12. Qt MVC结构之QItemDelegate介绍 胡歌-此生不换:gpt, google
  13. 面试题汇总(一) secondtonone1:看到网络上经常提问的go的问题,做了一下汇总,结合自己的经验给出的答案,如有纰漏,望指正批评。
  14. 基于锁实现线程安全队列和栈容器 secondtonone1:我是博主,你认真学习的样子的很可爱,哈哈,我画的是链表由空变成1个的情况。其余情况和你思考的类似,只不过我用了一个无效节点表示tail的指向,最初head和tail指向的都是这个节点。
  15. boost::asio之socket的创建和连接 项空月:发现一些错别字 :每隔vector存储  是不是是每个. asio::mutable_buffers_1 o或者    是不是多打了个o
  16. C++ 线程安全的单例模式演变 183******95:单例模式的析构函数何时运行呢? 实际测试里:无论单例模式的析构函数为私有或公有,使用智能指针和辅助回收类,两种方法都无法在main()结束前调用单例的析构函数。
  17. protobuf配置和使用 熊二:你可以把dll放到系统目录,也可以配置环境变量,还能把dll丢到lib里
  18. 聊天项目(15) 客户端实现TCP管理者 lkx:已经在&QTcpSocket::readyRead 回调函数中做了处理了的。
  19. 利用C11模拟伪闭包实现连接的安全回收 搁浅:看chatgpt说 直接传递 shared_from_this() 更安全 提问: socket_.async_read_some(boost::asio::buffer(data_, BUFFSIZE), // 接收客户端发生来的数据 std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, shared_from_this())); socket_.async_read_some(boost::asio::buffer(data_, BUFFSIZE), std::bind(&Session::handle_read, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); 这两种方式有区别吗? 回答 : 第一种方式:this 是裸指针,可能会导致生命周期问题,虽然 shared_from_this() 提供了一定的保护,但 this 依然存在风险。 第二种方式:完全使用 shared_ptr 来管理生命周期,更加安全。 通常,第二种方式更推荐使用,因为它可以确保在异步操作完成之前,Session 对象的生命周期得到完全管理,避免使用裸指针的潜在风险。
  20. 类和对象 陈宇航:支持!!!!
  21. 聊天项目(9) redis服务搭建 pro_lin:redis线程池的析构函数,除了pop出队列,还要free掉redis连接把
  22. slice介绍和使用 恋恋风辰:切片作为引用类型极大的提高了数据传递的效率和性能,但也要注意切片的浅拷贝隐患,算是一把双刃剑,这世间的常态就是在两极之间寻求一种稳定。
  23. C++ 并发三剑客future, promise和async Yunfei:大佬您好,如果这个线程池中加入的异步任务的形参如果有右值引用,这个commit中的返回类型推导和bind绑定就会出现问题,请问实际工程中,是不是不会用到这种任务,如果用到了,应该怎么解决?
  24. 处理网络粘包问题 zyouth: //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < data_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } 把_b_head_parse = true;放在_socket.async_read_some前面是不是更好
  25. 创建项目和编译 secondtonone1:谢谢支持
  26. 利用栅栏实现同步 Dzher:作者你好!我觉得 std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); 这个例子中的assert fail并不会发生,原子变量设定了非relaxed内存序后一个线程的原子变量被写入,那么之后的读取一定会被同步的,c和d线程中只可能同时发生一个z++未执行的情况,最终z不是1就是2了,我测试了很多次都没有assert,请问我这个观点有什么错误,谢谢!
  27. interface应用 secondtonone1:interface是万能类型,但是使用时要转换为实际类型来使用。interface丰富了go的多态特性,也降低了传统面向对象语言的耦合性。
  28. 网络编程学习方法和图书推荐 Corleone:啥程度可以找工作

个人公众号

个人微信