兰 亭 墨 苑
期货 · 量化 · AI · 终身学习
首页
归档
编辑文章
标题 *
URL 别名 *
内容 *
(支持 Markdown 格式)
逐句解释 `src/chatroom_do.js` 文件中的代码。这个文件定义了一个 Cloudflare Durable Object (DO),用于实现持久化的聊天室功能,包括消息存储、用户状态管理和 WebSocket 连接处理。 --- ```javascript // src/chatroom_do.js (Corrected and Final Version) ``` * **`// src/chatroom_do.js (Corrected and Final Version)`**: 注释,表明这是聊天室 Durable Object 的源文件,并且是经过修正的最终版本。 ```javascript // 定义应用层协议的消息类型 const MSG_TYPE_CHAT = 'chat'; const MSG_TYPE_DELETE = 'delete'; const MSG_TYPE_RENAME = 'rename'; const MSG_TYPE_SYSTEM_STATE = 'system_state'; const MSG_TYPE_HISTORY = 'history'; const MSG_TYPE_OFFER = 'offer'; const MSG_TYPE_ANSWER = 'answer'; const MSG_TYPE_CANDIDATE = 'candidate'; const MSG_TYPE_CALL_END = 'call_end'; ``` * **`// 定义应用层协议的消息类型`**: 注释,说明以下常量定义了客户端和服务器之间通过 WebSocket 传输的不同消息类型。 * **`const MSG_TYPE_CHAT = 'chat';`**: 表示普通聊天消息。 * **`const MSG_TYPE_DELETE = 'delete';`**: 表示删除消息的请求。 * **`const MSG_TYPE_RENAME = 'rename';`**: 表示用户改名的请求。 * **`const MSG_TYPE_SYSTEM_STATE = 'system_state';`**: 表示系统状态更新(例如在线用户列表)。 * **`const MSG_TYPE_HISTORY = 'history';`**: 表示请求或发送历史消息。 * **`const MSG_TYPE_OFFER = 'offer';`**: WebRTC 信令消息,表示发起方发送的 SDP Offer。 * **`const MSG_TYPE_ANSWER = 'answer';`**: WebRTC 信令消息,表示接收方发送的 SDP Answer。 * **`const MSG_TYPE_CANDIDATE = 'candidate';`**: WebRTC 信令消息,表示 ICE Candidate(网络连接信息)。 * **`const MSG_TYPE_CALL_END = 'call_end';`**: WebRTC 信令消息,表示通话结束。 ```javascript export class HibernatingChatRoom { constructor(state, env) { this.state = state; this.env = env; // 在实例首次创建时,将内存状态初始化为 null。 // 这是 `loadState` 函数判断是否需要从持久化存储加载数据的关键。 this.messages = null; this.userStats = null; } ``` * **`export class HibernatingChatRoom { ... }`**: 定义并导出了 `HibernatingChatRoom` 类。这是 Cloudflare Durable Object 的核心类,每个聊天室实例都是这个类的一个对象。 * **`constructor(state, env) { ... }`**: 类的构造函数,在 Durable Object 实例被 Cloudflare 平台创建时调用。 * **`this.state = state;`**: `state` 对象是 Durable Object 运行时提供的,它包含了 `state.storage`(用于持久化存储)和 `state.getWebSockets()` 等方法,用于管理 WebSocket 连接。 * **`this.env = env;`**: `env` 对象包含了在 Cloudflare Worker 配置中绑定的环境变量和资源(例如 R2 存储桶)。 * **`this.messages = null;`**: 初始化 `messages` 数组为 `null`。`messages` 将用于存储聊天消息历史。将其初始化为 `null` 是一个重要的标志,表示当前内存中还没有加载持久化数据。 * **`this.userStats = null;`**: 初始化 `userStats` 为 `null`。`userStats` 将用于存储用户的统计数据(如消息数量、在线时长)。同样,`null` 标志着数据尚未从持久化存储加载。 ```javascript /** * 从持久化存储加载状态到内存中。 * 这个函数是幂等的:在 Durable Object 实例的生命周期内,它只会真正执行一次加载操作。 * 后续的调用会因为 this.messages 不再是 null 而直接返回。 */ // 新的、更稳健的 loadState async loadState() { if (this.messages === null) { // 只在实例生命周期内从存储加载一次 console.log("State not in memory. Loading from storage..."); const data = await this.state.storage.get(["messages", "userStats"]); this.messages = data.get("messages") || []; const storedUserStats = data.get("userStats"); console.log('Raw storedUserStats from storage:', storedUserStats); // Add this line // 更稳健地处理:确保 storedUserStats 是一个对象才进行转换 if (storedUserStats && typeof storedUserStats === 'object') { this.userStats = new Map(Object.entries(storedUserStats)); } else { this.userStats = new Map(); // 如果数据损坏或不存在,则重新开始 } console.log(`State loaded. Messages: ${this.messages.length}, Users: ${this.userStats.size}`); console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats))); // Add this line } } ``` * **`async loadState() { ... }`**: 异步函数,负责从 Durable Object 的持久化存储中加载聊天室的状态。 * **`if (this.messages === null) { ... }`**: **核心逻辑**:这个条件确保 `loadState` 在 Durable Object 实例的整个生命周期中只执行一次实际的加载操作。一旦 `this.messages` 被赋值(不再是 `null`),后续调用将直接跳过加载逻辑,提高效率。 * **`console.log("State not in memory. Loading from storage...");`**: 打印日志,表示正在从存储加载数据。 * **`const data = await this.state.storage.get(["messages", "userStats"]);`**: 使用 `this.state.storage.get()` 方法从持久化存储中获取名为 "messages" 和 "userStats" 的数据。 * **`this.messages = data.get("messages") || [];`**: 从获取到的数据中取出 "messages",如果不存在则默认为一个空数组。 * **`const storedUserStats = data.get("userStats");`**: 从获取到的数据中取出 "userStats"。 * **`console.log('Raw storedUserStats from storage:', storedUserStats);`**: 调试日志,显示从存储中读取的原始 `userStats` 数据。 * **`if (storedUserStats && typeof storedUserStats === 'object') { this.userStats = new Map(Object.entries(storedUserStats)); } else { this.userStats = new Map(); }`**: **关键修复**:Durable Object 的 `state.storage` 默认会将 `Map` 对象序列化为普通 JavaScript 对象。因此,在加载时,需要将存储的普通对象(如果存在且是对象类型)转换回 `Map` 对象,以便在内存中正确使用。如果数据损坏或不存在,则初始化为空 `Map`。 * **`console.log(`State loaded. Messages: ${this.messages.length}, Users: ${this.userStats.size}`);`**: 打印加载状态的摘要。 * **`console.log('Loaded userStats:', JSON.stringify(Object.fromEntries(this.userStats)));`**: 调试日志,显示加载并转换后的 `userStats` Map 的内容。 ```javascript /** * 将当前内存中的状态写入持久化存储。 * 这是一个“直写(write-through)”策略,确保每次状态变更都持久化,防止数据丢失。 */ // 这是最终修复后的版本 async saveState() { if (this.messages === null) { console.warn("Attempted to save state before loading. Aborting save."); return; } // ***核心改动在这里*** // 在保存前,明确地将 Map 转换为普通的、JSON友好的对象。 // 这保证了存储操作的稳定性和可预测性。 const serializableUserStats = Object.fromEntries(this.userStats); console.log("Saving state to storage..."); console.log('Saving userStats:', JSON.stringify(serializableUserStats)); // Add this line try { await this.state.storage.put({ "messages": this.messages, "userStats": serializableUserStats // 保存转换后的普通对象,而不是 Map }); console.log("State saved successfully."); } catch(e) { console.error("Failed to save state:", e); } } ``` * **`async saveState() { ... }`**: 异步函数,负责将当前内存中的聊天室状态保存到持久化存储。 * **`if (this.messages === null) { ... }`**: 检查是否在状态加载之前尝试保存,如果是则发出警告并中止保存,因为此时内存中的数据可能不完整或不正确。 * **`const serializableUserStats = Object.fromEntries(this.userStats);`**: **核心改动**:在保存 `userStats` 之前,将其从 `Map` 对象转换回普通的 JavaScript 对象。这是因为 `state.storage.put()` 更适合存储普通的 JSON 可序列化对象,而不是 `Map` 实例。`Object.fromEntries()` 是 `Object.entries()` 的逆操作。 * **`console.log("Saving state to storage...");`**: 打印日志,表示正在保存状态。 * **`console.log('Saving userStats:', JSON.stringify(serializableUserStats));`**: 调试日志,显示即将保存的 `userStats` 对象。 * **`try { await this.state.storage.put({ ... }); ... } catch(e) { ... }`**: 使用 `this.state.storage.put()` 方法将 `messages` 数组和转换后的 `userStats` 对象保存到持久化存储。使用 `try...catch` 块捕获并处理保存过程中可能发生的错误。 ```javascript // Main fetch handler - 这是所有外部请求(包括WebSocket升级)的入口 async fetch(request) { // **核心修复**: 在处理任何请求之前,首先确保状态已从存储中加载。 // 这保证了即使DO刚刚从休眠中唤醒,它也能拥有正确的历史数据。 await this.loadState(); const url = new URL(request.url); // --- 新增:处理 /history-messages 路径 --- if (url.pathname === '/history-messages') { return new Response(JSON.stringify(this.messages), { headers: { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*', // 或者更严格地设置为 'https://chats.want.biz' 'Access-Control-Allow-Methods': 'GET,HEAD,POST,OPTIONS', 'Access-Control-Max-Age': '86400', }, }); } const upgradeHeader = request.headers.get("Upgrade"); if (upgradeHeader !== "websocket") { return new Response("Expected Upgrade: websocket", { status: 426 }); } const username = url.searchParams.get("username") || "Anonymous"; const { 0: client, 1: server } = new WebSocketPair(); this.state.acceptWebSocket(server, [username]); return new Response(null, { status: 101, webSocket: client }); } ``` * **`async fetch(request) { ... }`**: 这是 Durable Object 的主要入口点,处理所有发送到该 DO 实例的 HTTP 请求(包括 WebSocket 升级请求)。 * **`await this.loadState();`**: **核心修复**:在处理任何请求之前,强制加载 Durable Object 的状态。这确保了无论 DO 实例是首次创建还是从休眠中唤醒,它都能在处理请求时拥有最新的持久化数据。 * **`const url = new URL(request.url);`**: 解析请求的 URL。 * **`if (url.pathname === '/history-messages') { ... }`**: 处理 `/history-messages` 路径的 GET 请求。这是为了让外部 Worker 或客户端可以通过 HTTP 请求获取聊天历史。 * **`return new Response(JSON.stringify(this.messages), { headers: { ... } });`**: 返回包含所有聊天消息的 JSON 响应,并设置适当的 CORS 头部。 * **`const upgradeHeader = request.headers.get("Upgrade");`**: 获取请求头中的 `Upgrade` 字段。 * **`if (upgradeHeader !== "websocket") { ... }`**: 检查 `Upgrade` 头部是否为 "websocket"。如果不是,则返回 426 状态码(Upgrade Required),表示期望 WebSocket 连接。 * **`const username = url.searchParams.get("username") || "Anonymous";`**: 从 URL 查询参数中获取用户名,如果未提供则默认为 "Anonymous"。 * **`const { 0: client, 1: server } = new WebSocketPair();`**: 创建一个 WebSocketPair 对象。`client` 是一个 `WebSocket` 对象,可以返回给客户端;`server` 是一个 `WebSocket` 对象,用于在 Durable Object 内部与客户端通信。 * **`this.state.acceptWebSocket(server, [username]);`**: 接受 WebSocket 连接。`server` 端 WebSocket 被 Durable Object 接管,并将其与提供的标签(这里是用户名)关联起来。 * **`return new Response(null, { status: 101, webSocket: client });`**: 返回一个 HTTP 101 (Switching Protocols) 响应,将 `client` 端 WebSocket 返回给客户端,完成 WebSocket 升级握手。 ```javascript // --- WebSocket Handlers --- // 新的、带安全检查的 webSocketOpen async webSocketOpen(ws) { await this.loadState(); // 确保状态已加载 const username = this.state.getTags(ws)[0]; console.log(`WebSocket opened for: ${username}`); console.log(`Username from tags: ${username}`); // Added log // *** 核心增加部分:安全检查 *** // 这是一个保险措施,确保 this.userStats 绝对是 Map 类型 if (!(this.userStats instanceof Map)) { console.error("CRITICAL: this.userStats is not a Map after loading! Re-initializing."); this.userStats = new Map(); } let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 }; console.log(`User ${username} stats before update:`, JSON.stringify(stats)); stats.lastSeen = Date.now(); stats.onlineSessions = (stats.onlineSessions || 0) + 1; if (stats.onlineSessions === 1) { stats.currentSessionStart = Date.now(); } this.userStats.set(username, stats); console.log(`User ${username} stats after update:`, JSON.stringify(this.userStats.get(username))); console.log(`userStats map size after webSocketOpen: ${this.userStats.size}`); // Added log this.broadcastSystemState(); await this.saveState(); // Ensure state is saved after userStats update } ``` * **`async webSocketOpen(ws) { ... }`**: 当一个新的 WebSocket 连接成功建立时,Durable Object 会自动调用此方法。 * **`await this.loadState();`**: 确保在处理 WebSocket 连接打开事件时,Durable Object 的状态是最新的。 * **`const username = this.state.getTags(ws)[0];`**: 从 WebSocket 关联的标签中获取用户名。 * **`console.log(...)`**: 调试日志。 * **`if (!(this.userStats instanceof Map)) { ... }`**: **核心增加部分**:一个防御性编程检查,确保 `this.userStats` 确实是一个 `Map` 实例。如果由于某种原因(例如存储数据损坏或反序列化问题)它不是 `Map`,则重新初始化为一个新的 `Map`,防止后续操作出错。 * **`let stats = this.userStats.get(username) || { messageCount: 0, totalOnlineDuration: 0 };`**: 获取该用户的统计数据,如果不存在则初始化一个新对象。 * **`stats.lastSeen = Date.now();`**: 更新用户最后在线时间。 * **`stats.onlineSessions = (stats.onlineSessions || 0) + 1;`**: 增加在线会话计数。 * **`if (stats.onlineSessions === 1) { stats.currentSessionStart = Date.now(); }`**: 如果这是用户当前唯一的在线会话,记录会话开始时间。 * **`this.userStats.set(username, stats);`**: 更新 `userStats` Map 中该用户的统计数据。 * **`this.broadcastSystemState();`**: 广播系统状态(例如更新在线用户列表)给所有连接的客户端。 * **`await this.saveState();`**: 在用户统计数据更新后,将状态保存到持久化存储。 ```javascript async webSocketMessage(ws, message) { await this.loadState(); const username = this.state.getTags(ws)[0]; const user = { ws, username }; try { const data = JSON.parse(message); switch (data.type) { case MSG_TYPE_CHAT: await this.handleChatMessage(user, data.payload); break; case MSG_TYPE_DELETE: await this.handleDeleteMessage(data.payload); break; case MSG_TYPE_RENAME: await this.handleRename(user, data.payload); break; // WebRTC 信号转发逻辑保持不变 case MSG_TYPE_OFFER: this.handleOffer(user, data.payload); break; case MSG_TYPE_ANSWER: this.handleAnswer(user, data.payload); break; case MSG_TYPE_CANDIDATE: this.handleCandidate(user, data.payload); break; case MSG_TYPE_CALL_END: this.handleCallEnd(user, data.payload); break; default: console.warn('Unknown message type:', data.type); } } catch (err) { console.error('Failed to handle message:', err); this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } }); } } ``` * **`async webSocketMessage(ws, message) { ... }`**: 当 Durable Object 接收到来自客户端的 WebSocket 消息时,自动调用此方法。 * **`await this.loadState();`**: 确保状态已加载。 * **`const username = this.state.getTags(ws)[0];`**: 获取发送消息的用户名。 * **`const user = { ws, username };`**: 创建一个包含 WebSocket 和用户名的对象,方便后续处理。 * **`try { ... } catch (err) { ... }`**: 捕获消息处理过程中的错误。 * **`const data = JSON.parse(message);`**: 将接收到的 JSON 字符串消息解析为 JavaScript 对象。 * **`switch (data.type) { ... }`**: 根据消息的 `type` 字段,将消息分发到不同的处理函数。 * `MSG_TYPE_CHAT`: 调用 `handleChatMessage` 处理聊天消息。 * `MSG_TYPE_DELETE`: 调用 `handleDeleteMessage` 处理删除消息请求。 * `MSG_TYPE_RENAME`: 调用 `handleRename` 处理改名请求。 * WebRTC 信令消息 (`OFFER`, `ANSWER`, `CANDIDATE`, `CALL_END`): 调用相应的 `handle...` 函数进行转发。 * `default`: 如果消息类型未知,则打印警告。 * **`console.error('Failed to handle message:', err);`**: 打印错误信息。 * **`this.sendMessage(ws, { type: 'error', payload: { message: '消息处理失败' } });`**: 向发送方客户端发送一个错误消息。 ```javascript async webSocketClose(ws, code, reason, wasClean) { await this.loadState(); const username = this.state.getTags(ws)[0]; console.log(`WebSocket closed for: ${username}`); let stats = this.userStats.get(username); if (stats) { console.log(`User ${username} stats before close update:`, JSON.stringify(stats)); stats.lastSeen = Date.now(); stats.onlineSessions = (stats.onlineSessions || 1) - 1; if (stats.onlineSessions === 0 && stats.currentSessionStart) { stats.totalOnlineDuration += (Date.now() - stats.currentSessionStart); delete stats.currentSessionStart; } this.userStats.set(username, stats); console.log(`User ${username} stats after close update:`, JSON.stringify(this.userStats.get(username))); console.log(`userStats map size after webSocketClose: ${this.userStats.size}`); // Added log } this.broadcastSystemState(); await this.saveState(); } ``` * **`async webSocketClose(ws, code, reason, wasClean) { ... }`**: 当 WebSocket 连接关闭时,Durable Object 会自动调用此方法。 * **`await this.loadState();`**: 确保状态已加载。 * **`const username = this.state.getTags(ws)[0];`**: 获取关闭连接的用户名。 * **`let stats = this.userStats.get(username);`**: 获取该用户的统计数据。 * **`if (stats) { ... }`**: 如果找到了用户的统计数据: * **`stats.lastSeen = Date.now();`**: 更新最后在线时间。 * **`stats.onlineSessions = (stats.onlineSessions || 1) - 1;`**: 减少在线会话计数。 * **`if (stats.onlineSessions === 0 && stats.currentSessionStart) { ... }`**: 如果所有会话都已关闭,计算并累加本次会话的总在线时长,并清除 `currentSessionStart`。 * **`this.userStats.set(username, stats);`**: 更新 `userStats` Map。 * **`this.broadcastSystemState();`**: 广播系统状态,通知其他客户端该用户可能已离线。 * **`await this.saveState();`**: 将更新后的用户统计数据保存到持久化存储。 ```javascript async webSocketError(ws, error) { // loadState 不是必须的,因为 webSocketClose 会被调用 const username = this.state.getTags(ws)[0]; console.error(`WebSocket error for user ${username}:`, error); } ``` * **`async webSocketError(ws, error) { ... }`**: 当 WebSocket 连接发生错误时,Durable Object 会自动调用此方法。 * **`const username = this.state.getTags(ws)[0];`**: 获取发生错误的用户。 * **`console.error(...)`**: 打印错误信息。这里不需要 `loadState`,因为通常错误发生后会紧接着调用 `webSocketClose`,而 `webSocketClose` 会处理状态加载和保存。 ```javascript // --- Core Logic (所有核心逻辑函数也需要先加载状态) --- async handleChatMessage(user, payload) { await this.loadState(); try { let message; if (payload.type === 'image') { message = await this.#processImageMessage(user, payload); } else if (payload.type === 'audio') { message = await this.#processAudioMessage(user, payload); } else { message = this.#processTextMessage(user, payload); } this.messages.push(message); if (this.messages.length > 100) this.messages.shift(); let stats = this.userStats.get(user.username); if (stats) { stats.messageCount = (stats.messageCount || 0) + 1; this.userStats.set(user.username, stats); console.log(`User ${user.username} messageCount updated to: ${stats.messageCount}`); console.log(`userStats map size after handleChatMessage: ${this.userStats.size}`); // Added log } this.broadcast({ type: MSG_TYPE_CHAT, payload: message }); this.broadcastSystemState(); // Add this line to update active users after a new message await this.saveState(); // Ensure state is saved after userStats update } catch (error) { console.error('处理聊天消息失败:', error); this.sendMessage(user.ws, { type: 'error', payload: { message: `消息发送失败: ${error.message}` } }); } } ``` * **`async handleChatMessage(user, payload) { ... }`**: 处理聊天消息的核心逻辑。 * **`await this.loadState();`**: 确保状态已加载。 * **`try { ... } catch (error) { ... }`**: 捕获消息处理过程中的错误。 * **`if (payload.type === 'image') { ... } else if (payload.type === 'audio') { ... } else { ... }`**: 根据消息类型(文本、图片、音频)调用不同的私有处理方法 (`#process...Message`) 来构建消息对象。 * **`this.messages.push(message);`**: 将新消息添加到 `messages` 数组。 * **`if (this.messages.length > 100) this.messages.shift();`**: 限制消息历史的长度为 100 条,超出则移除最旧的消息。 * **`let stats = this.userStats.get(user.username); if (stats) { ... }`**: 更新发送消息用户的消息计数。 * **`this.broadcast({ type: MSG_TYPE_CHAT, payload: message });`**: 将新消息广播给所有连接的客户端。 * **`this.broadcastSystemState();`**: 广播系统状态,确保在线用户列表等信息及时更新。 * **`await this.saveState();`**: 将更新后的消息历史和用户统计数据保存到持久化存储。 * **`this.sendMessage(user.ws, { type: 'error', payload: { message: `消息发送失败: ${error.message}` } });`**: 如果处理失败,向发送方发送错误消息。 ```javascript // ... (所有 #process... 和 handle... 方法保持不变,因为它们都被 `await this.loadState()` 保护了) ... #processTextMessage(user, payload) { return { id: crypto.randomUUID(), username: user.username, timestamp: Date.now(), text: payload.text, }; } async #processImageMessage(user, payload) { const imageUrl = await this.uploadImageToR2(payload.image, payload.filename); return { id: crypto.randomUUID(), username: user.username, timestamp: Date.now(), type: 'image', imageUrl, filename: payload.filename, size: payload.size, caption: payload.caption || '' }; } async #processAudioMessage(user, payload) { const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType); return { id: crypto.randomUUID(), username: user.username, timestamp: Date.now(), type: 'audio', audioUrl, filename: payload.filename, size: payload.size, }; } ``` * **`#processTextMessage(user, payload) { ... }`**: 私有方法,用于创建文本消息对象。生成唯一 ID、记录用户名、时间戳和文本内容。 * **`async #processImageMessage(user, payload) { ... }`**: 私有异步方法,用于处理图片消息。 * **`const imageUrl = await this.uploadImageToR2(payload.image, payload.filename);`**: 调用 `uploadImageToR2` 方法将图片数据上传到 R2 存储桶,并获取图片 URL。 * 返回包含图片 URL、文件名、大小和可选标题的图片消息对象。 * **`async #processAudioMessage(user, payload) { ... }`**: 私有异步方法,用于处理音频消息。 * **`const audioUrl = await this.uploadAudioToR2(payload.audio, payload.filename, payload.mimeType);`**: 调用 `uploadAudioToR2` 方法将音频数据上传到 R2 存储桶,并获取音频 URL。 * 返回包含音频 URL、文件名、大小和 MIME 类型的音频消息对象。 ```javascript async handleDeleteMessage(payload) { await this.loadState(); this.messages = this.messages.filter(m => m.id !== payload.id); this.broadcast({ type: MSG_TYPE_DELETE, payload: payload }); await this.saveState(); } ``` * **`async handleDeleteMessage(payload) { ... }`**: 处理删除消息的请求。 * **`await this.loadState();`**: 确保状态已加载。 * **`this.messages = this.messages.filter(m => m.id !== payload.id);`**: 从 `messages` 数组中过滤掉指定 ID 的消息。 * **`this.broadcast({ type: MSG_TYPE_DELETE, payload: payload });`**: 广播删除消息事件给所有客户端,通知它们更新 UI。 * **`await this.saveState();`**: 将更新后的消息历史保存到持久化存储。 ```javascript async handleRename(user, payload) { await this.loadState(); const oldUsername = user.username; const newUsername = payload.newUsername; if (oldUsername === newUsername) return; const socketsToUpdate = this.state.getWebSockets(oldUsername); for (const sock of socketsToUpdate) { this.state.setTags(sock, [newUsername]); } if (this.userStats.has(oldUsername)) { const stats = this.userStats.get(oldUsername); const existingNewStats = this.userStats.get(newUsername) || { messageCount: 0, totalOnlineDuration: 0 }; existingNewStats.messageCount += stats.messageCount || 0; existingNewStats.totalOnlineDuration += stats.totalOnlineDuration || 0; if (stats.onlineSessions > 0) { existingNewStats.onlineSessions = (existingNewStats.onlineSessions || 0) + stats.onlineSessions; existingNewStats.currentSessionStart = stats.currentSessionStart; } this.userStats.set(newUsername, existingNewStats); this.userStats.delete(oldUsername); } this.messages.forEach(msg => { if (msg.username === oldUsername) msg.username = newUsername; }); this.broadcastSystemState(); // this.broadcast({ type: MSG_TYPE_HISTORY, payload: this.messages }); // Removed, as history is fetched via HTTP await this.saveState(); } ``` * **`async handleRename(user, payload) { ... }`**: 处理用户改名的请求。 * **`await this.loadState();`**: 确保状态已加载。 * **`const oldUsername = user.username; const newUsername = payload.newUsername;`**: 获取旧用户名和新用户名。 * **`if (oldUsername === newUsername) return;`**: 如果新旧用户名相同,则不做任何操作。 * **`const socketsToUpdate = this.state.getWebSockets(oldUsername); for (const sock of socketsToUpdate) { this.state.setTags(sock, [newUsername]); }`**: 获取所有与旧用户名关联的 WebSocket 连接,并更新它们的标签为新用户名。 * **`if (this.userStats.has(oldUsername)) { ... }`**: 如果旧用户名存在于 `userStats` 中: * 将旧用户的统计数据(消息计数、在线时长、在线会话数)合并到新用户的统计数据中(如果新用户已存在,则累加;否则创建新条目)。 * **`this.userStats.set(newUsername, existingNewStats);`**: 更新新用户的统计数据。 * **`this.userStats.delete(oldUsername);`**: 删除旧用户的统计数据。 * **`this.messages.forEach(msg => { if (msg.username === oldUsername) msg.username = newUsername; });`**: 遍历所有历史消息,将旧用户名替换为新用户名。 * **`this.broadcastSystemState();`**: 广播系统状态,通知所有客户端用户列表已更新。 * **`await this.saveState();`**: 将更新后的用户统计数据和消息历史保存到持久化存储。 ```javascript // --- 辅助方法 --- getWsByUsername(username) { const wss = this.state.getWebSockets(username); return wss.length > 0 ? wss[0] : null; } handleOffer(fromUser, payload) { const targetWs = this.getWsByUsername(payload.target); if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 offer`); this.sendMessage(targetWs, { type: MSG_TYPE_OFFER, payload: { from: fromUser.username, sdp: payload.sdp } }); } handleAnswer(fromUser, payload) { const targetWs = this.getWsByUsername(payload.target); if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 answer`); this.sendMessage(targetWs, { type: MSG_TYPE_ANSWER, payload: { from: fromUser.username, sdp: payload.sdp } }); } handleCandidate(fromUser, payload) { const targetWs = this.getWsByUsername(payload.target); if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 candidate`); this.sendMessage(targetWs, { type: MSG_TYPE_CANDIDATE, payload: { from: fromUser.username, candidate: payload.candidate } }); } handleCallEnd(fromUser, payload) { const targetWs = this.getWsByUsername(payload.target); if (!targetWs) return console.warn(`用户 ${payload.target} 不在线,无法转发 call_end`); this.sendMessage(targetWs, { type: MSG_TYPE_CALL_END, payload: { from: fromUser.username } }); } ``` * **`getWsByUsername(username) { ... }`**: 辅助方法,根据用户名获取其对应的第一个 WebSocket 连接。 * **`handleOffer(fromUser, payload) { ... }`**: 处理 WebRTC SDP Offer 消息。查找目标用户的 WebSocket,并将 Offer 转发给它。 * **`handleAnswer(fromUser, payload) { ... }`**: 处理 WebRTC SDP Answer 消息。查找目标用户的 WebSocket,并将 Answer 转发给它。 * **`handleCandidate(fromUser, payload) { ... }`**: 处理 WebRTC ICE Candidate 消息。查找目标用户的 WebSocket,并将 Candidate 转发给它。 * **`handleCallEnd(fromUser, payload) { ... }`**: 处理 WebRTC 通话结束消息。查找目标用户的 WebSocket,并将结束通知转发给它。 ```javascript async uploadImageToR2(imageData, filename) { try { const base64Data = imageData.split(',')[1]; if (!base64Data) throw new Error('无效的图片数据'); const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0)); const fileExtension = filename.split('.').pop() || 'jpg'; const key = `chat-images/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`; await this.env.R2_BUCKET.put(key, imageBuffer, { httpMetadata: { contentType: this.getContentType(fileExtension), cacheControl: 'public, max-age=31536000' }, }); return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`; } catch (error) { console.error('R2 上传失败:', error); throw new Error(`图片上传失败: ${error.message}`); } } getContentType(extension) { const contentTypes = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'webp': 'image/webp' }; return contentTypes[extension.toLowerCase()] || 'image/jpeg'; } async uploadAudioToR2(audioData, filename, mimeType) { try { const base64Data = audioData.split(',')[1]; if (!base64Data) throw new Error('无效的音频数据'); const audioBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0)); const fileExtension = filename.split('.').pop() || 'bin'; const key = `chat-audio/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`; await this.env.R2_BUCKET.put(key, audioBuffer, { httpMetadata: { contentType: mimeType || 'application/octet-stream', cacheControl: 'public, max-age=31536000' }, }); return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`; } catch (error) { console.error('R2 音频上传失败:', error); throw new Error(`音频上传失败: ${error.message}`); } } ``` * **`async uploadImageToR2(imageData, filename) { ... }`**: 异步方法,将 Base64 编码的图片数据上传到 Cloudflare R2 存储桶。 * **`const base64Data = imageData.split(',')[1];`**: 从 `data:image/...;base64,...` 格式中提取 Base64 数据部分。 * **`const imageBuffer = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));`**: 使用 `atob` 解码 Base64 字符串为二进制数据,并转换为 `Uint8Array`。 * **`const key = `chat-images/${Date.now()}-${crypto.randomUUID()}.${fileExtension}`;`**: 生成一个唯一的 R2 对象键(文件名),包含时间戳和 UUID。 * **`await this.env.R2_BUCKET.put(key, imageBuffer, { ... });`**: 使用 `env.R2_BUCKET.put()` 方法将图片数据上传到 R2。`httpMetadata` 设置了 `Content-Type` 和缓存控制。 * **`return `https://pub-8dfbdda6df204465aae771b4c080140b.r2.dev/${key}`;`**: 返回 R2 对象的公共访问 URL。 * **`try...catch`**: 捕获并处理上传错误。 * **`getContentType(extension) { ... }`**: 辅助方法,根据文件扩展名返回对应的 MIME 类型。 * **`async uploadAudioToR2(audioData, filename, mimeType) { ... }`**: 异步方法,将 Base64 编码的音频数据上传到 Cloudflare R2 存储桶。逻辑与 `uploadImageToR2` 类似,只是处理的是音频数据。 ```javascript sendMessage(ws, message) { try { ws.send(JSON.stringify(message)); } catch (err) { console.error('Failed to send message:', err); } } broadcast(message) { for (const ws of this.state.getWebSockets()) { this.sendMessage(ws, message); } } broadcastSystemState() { // 确保状态已加载,以防万一 if (!this.userStats) return; // 1. 获取当前所有在线用户的名字 (基于WebSocket连接) const onlineUsernames = new Set(); for (const ws of this.state.getWebSockets()) { const username = this.state.getTags(ws)[0]; if (username) { onlineUsernames.add(username); } } // 2. 构建完整的用户列表,包含在线状态 // 遍历所有已知用户(包括不活跃的),并标记其在线状态 const userList = Array.from(this.userStats.keys()).map(username => { return { username, isOnline: onlineUsernames.has(username) }; }); // 3. 广播这个列表 this.broadcast({ type: MSG_TYPE_SYSTEM_STATE, payload: { users: userList } // 前端会根据这个列表来更新UI }); } } ``` * **`sendMessage(ws, message) { ... }`**: 辅助方法,用于向单个 WebSocket 连接发送 JSON 消息。包含 `try...catch` 确保发送失败时不会崩溃。 * **`broadcast(message) { ... }`**: 辅助方法,用于向所有当前连接到此 Durable Object 实例的 WebSocket 客户端广播消息。它遍历 `this.state.getWebSockets()` 返回的所有 WebSocket 连接,并调用 `sendMessage`。 * **`broadcastSystemState() { ... }`**: 广播系统状态(主要是在线用户列表)给所有客户端。 * **`if (!this.userStats) return;`**: 防御性检查,确保 `userStats` 已加载。 * **`const onlineUsernames = new Set(); for (const ws of this.state.getWebSockets()) { ... }`**: 遍历所有当前活跃的 WebSocket 连接,从它们的标签中提取用户名,并存储在一个 `Set` 中,以获取唯一的在线用户列表。 * **`const userList = Array.from(this.userStats.keys()).map(username => { ... });`**: 遍历 `this.userStats` 中所有已知用户的用户名(包括在线和离线的),为每个用户创建一个对象,包含其用户名和 `isOnline` 状态(通过检查其是否在 `onlineUsernames` Set 中)。 * **`this.broadcast({ type: MSG_TYPE_SYSTEM_STATE, payload: { users: userList } });`**: 将包含完整用户列表(及其在线状态)的系统状态消息广播给所有客户端。 --- **总结:** `chatroom_do.js` 文件是 Cloudflare Durable Object 的核心实现,它: 1. **管理持久化状态**:通过 `loadState` 和 `saveState` 方法,将聊天消息 (`messages`) 和用户统计数据 (`userStats`) 在内存和 Durable Object 的持久化存储之间同步。特别处理了 `Map` 对象在存储时的序列化和反序列化问题。 2. **处理 WebSocket 连接**:作为 WebSocket 服务器,处理连接的建立 (`fetch` 和 `webSocketOpen`)、消息的接收 (`webSocketMessage`) 和连接的关闭 (`webSocketClose`)。 3. **实现聊天室核心功能**:包括发送/接收文本、图片、音频消息,删除消息,用户改名等。 4. **维护用户统计**:记录用户的消息数量、在线会话数和总在线时长。 5. **支持 WebRTC 信令**:转发 WebRTC 的 SDP Offer/Answer 和 ICE Candidate 消息,实现点对点通信的信令交换。 6. **集成 R2 存储**:提供将图片和音频文件上传到 Cloudflare R2 的功能。 7. **广播机制**:能够向所有连接的客户端广播消息和系统状态更新(如在线用户列表)。 8. **HTTP 接口**:除了 WebSocket,还提供了一个 HTTP GET 接口 `/history-messages` 用于获取历史消息。 这个 Durable Object 实例为每个聊天室提供了独立的、持久化的状态和实时通信能力,并且能够从休眠中快速恢复,确保了聊天室的连续性和数据完整性。
配图 (可多选)
选择新图片文件或拖拽到此处
标签
更新文章
删除文章