兰 亭 墨 苑
期货 · 量化 · AI · 终身学习
首页
归档
编辑文章
标题 *
URL 别名 *
内容 *
(支持 Markdown 格式)
#!/usr/bin/env python3 """ 企业微信 <-> nanobot 桥接服务 (HTTP API 版本) 使用 gateway HTTP API 而非 CLI 调用,大幅提升响应速度 """ import asyncio import json import os import re import sys from pathlib import Path from typing import Any, Optional from datetime import datetime # 工作空间路径 NANOBOT_WORKSPACE = "/home/nanobot/.nanobot" try: import httpx HTTPX_OK = True except ImportError: HTTPX_OK = False print("[!] 错误:需要安装 httpx - pip install httpx") sys.exit(1) # ==================== 配置 ==================== # 从配置文件加载环境变量 CONFIG_FILE = Path(NANOBOT_WORKSPACE) / "config" / "wechat_config.env" if CONFIG_FILE.exists(): print(f"[+] 加载配置文件:{CONFIG_FILE}") with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ.setdefault(key.strip(), value.strip()) # 企业微信 API WEIXIN_CORP_ID = os.environ.get("WEIXIN_CORP_ID", "") WEIXIN_AGENT_SECRET = os.environ.get("WEIXIN_AGENT_SECRET", "") WEIXIN_PUSH_URL = os.environ.get("WEIXIN_PUSH_URL", "https://api.yuangs.cc/weixinpush") WEIXIN_UPLOAD_URL = os.environ.get("WEIXIN_UPLOAD_URL", "https://qyapi.weixin.qq.com/cgi-bin/media/upload") # 轮询认证 WECHAT_WEBHOOK_SECRET = os.environ.get("WECHAT_WEBHOOK_SECRET", "") # nanobot Gateway HTTP API NANOBOT_API_URL = os.environ.get("NANOBOT_API_URL", "http://127.0.0.1:18790") # nanobot CLI fallback NANOBOT_PATH = os.environ.get("NANOBOT_PATH", "/home/nanobot/.nanobot/.venv/bin/nanobot") OUTPUT_DIR = Path(NANOBOT_WORKSPACE) / "outputs" # 并发控制 MAX_CONCURRENT_TASKS = 5 # 文件清理 AUTO_CLEANUP_FILES = True CLEANUP_DELAY_SECONDS = 300 # 支持的文件类型 SUPPORTED_FILE_TYPES = { 'docx': 'file', 'pptx': 'file', 'xlsx': 'file', 'pdf': 'file', 'png': 'image', 'gif': 'image', 'jpg': 'image', 'jpeg': 'image', } # ==================== 企业微信客户端 ==================== class WeChatClient: """企业微信 API 客户端""" def __init__(self, corp_id: str = "", agent_secret: str = ""): self.corp_id = corp_id or WEIXIN_CORP_ID self.agent_secret = agent_secret or WEIXIN_AGENT_SECRET self._access_token: Optional[str] = None self._token_expires_at: float = 0 async def get_access_token(self) -> str: """获取访问令牌""" if self._access_token and datetime.now().timestamp() < self._token_expires_at: return self._access_token if not self.corp_id or not self.agent_secret: return "" async with httpx.AsyncClient() as client: try: resp = await client.get( "https://qyapi.weixin.qq.com/cgi-bin/gettoken", params={"corpid": self.corp_id, "corpsecret": self.agent_secret}, timeout=10 ) data = resp.json() if data.get("errcode") == 0: self._access_token = data["access_token"] self._token_expires_at = datetime.now().timestamp() + 7000 return self._access_token except Exception as e: print(f"[!] 获取 token 失败:{e}") return "" async def send_text(self, content: str, to_user: str = "@all") -> bool: """发送文本消息""" async with httpx.AsyncClient() as client: try: resp = await client.post( WEIXIN_PUSH_URL, json={ "msgtype": "text", "content": content, "to_user": to_user }, timeout=10 ) if resp.status_code == 200: data = resp.json() return data.get("status") == "success" return False except Exception as e: print(f"[!] 发送文本失败:{e}") return False async def upload_media(self, file_path: str, media_type: str = "file") -> Optional[str]: """上传文件到企业微信,返回 media_id""" if not self.corp_id or not self.agent_secret: print(f"[!] 未配置企业微信凭证,无法上传文件:{file_path}") return None token = await self.get_access_token() if not token: return None async with httpx.AsyncClient() as client: try: with open(file_path, 'rb') as f: files = {'media': f} resp = await client.post( WEIXIN_UPLOAD_URL, params={"type": media_type, "access_token": token}, files=files, timeout=30 ) data = resp.json() if data.get("errcode") == 0: media_id = data.get("media_id") print(f"[+] 文件上传成功:{media_id}") return media_id else: print(f"[!] 上传失败:{data}") return None except Exception as e: print(f"[!] 上传文件异常:{e}") return None async def send_file(self, file_path: str, to_user: str = "@all") -> bool: """发送文件消息""" ext = Path(file_path).suffix.lower().lstrip('.') media_type = SUPPORTED_FILE_TYPES.get(ext, 'file') media_id = await self.upload_media(file_path, media_type) if not media_id: return False async with httpx.AsyncClient() as client: try: token = await self.get_access_token() if media_type == 'image': msg_type = "image" else: msg_type = "file" resp = await client.post( f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}", json={ "touser": to_user, "msgtype": msg_type, "agentid": 1000002, msg_type: {"media_id": media_id}, "safe": 0 }, timeout=10 ) data = resp.json() return data.get("errcode") == 0 except Exception as e: print(f"[!] 发送文件失败:{e}") return False # ==================== nanobot 客户端 (HTTP API) ==================== class NanobotClient: """通过 HTTP API 调用 nanobot gateway""" def __init__(self, workspace: str): self.workspace = Path(workspace) self.nanobot_path = NANOBOT_PATH self.api_url = NANOBOT_API_URL @staticmethod def filter_thought(text: str) -> str: """清理思考标签""" if not text: return "" # 清除 think 标签 text = re.sub(r'<think.*?>.*?</think', '', text, flags=re.DOTALL | re.IGNORECASE) text = re.sub(r'</?think>', '', text, flags=re.IGNORECASE) return text.strip() async def chat(self, message: str, user_id: str, channel: str = "wechat") -> tuple[str, list[str]]: """发送消息并获取回复 - 使用 HTTP API Returns: tuple: (response_text, media_paths) """ # 注入微信环境上下文 enhanced_message = ( "[系统约束:你正运行在企业微信受限环境。" "1. 禁止输出思考标签或任何内心独白。" "2. 直接输出给用户的最终回复内容。" "3. 请尽量使用简洁的回复,适合手机屏幕阅读。" "生成文件时,请在回复中说明文件路径。]" f"用户消息:{message}" ) try: # 使用 HTTP API 调用 gateway async with httpx.AsyncClient() as client: resp = await client.post( f"{self.api_url}/api/message", json={ "message": enhanced_message, "user_id": user_id, "channel": channel, "timeout": 180 }, timeout=200 ) if resp.status_code == 200: data = resp.json() response = data.get("response", "") media = data.get("media", []) response = self.filter_thought(response) return (response if response else "(无回复)", media) else: try: error = resp.json().get("error", "unknown error") except: error = f"HTTP {resp.status_code}" print(f"[!] API 错误: {error}") return (f"[错误] {error}", []) except httpx.TimeoutException: return ("[超时] 消息处理超时,请稍后重试", []) except Exception as e: print(f"[!] API 调用异常:{e}") return (f"[错误] {str(e)}", []) # ==================== 文件处理器 ==================== class FileHandler: """文件处理工具""" @staticmethod def extract_file_paths(text: str) -> list[tuple[str, str]]: """从文本中提取文件路径""" paths = [] # 匹配 OUTPUT_DIR 下的文件路径 patterns = [ rf'{OUTPUT_DIR}/[\w\-\.]+\.\w{{2,4}}', rf'/tmp/[\w\-\.]+\.\w{{2,4}}', ] for pattern in patterns: matches = re.findall(pattern, text) for match in matches: if os.path.exists(match): ext = Path(match).suffix.lower().lstrip('.') paths.append((match, ext)) return paths @staticmethod async def cleanup_file(file_path: str, delay: int = CLEANUP_DELAY_SECONDS): """延迟清理文件""" await asyncio.sleep(delay) try: if os.path.exists(file_path): os.remove(file_path) print(f"[-] 已清理文件:{file_path}") except Exception as e: print(f"[!] 清理文件失败:{e}") # ==================== 消息处理器 ==================== class MessageProcessor: """消息处理器""" def __init__(self, wechat: WeChatClient, nanobot: NanobotClient): self.wechat = wechat self.nanobot = nanobot self.semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS) self.last_check_time = 0 async def process(self, content: str, user_id: str): """处理单条消息(带并发控制)""" async with self.semaphore: try: print(f"[>] 处理消息:{user_id}: {content[:50]}...") response, media_paths = await self.nanobot.chat(content, user_id) print(f"[<] 回复:{response[:100]}...") if media_paths: print(f"[<] 媒体:{media_paths}") await self.wechat.send_text(response, user_id) # 发送 API 返回的媒体文件 for file_path in media_paths: if os.path.exists(file_path): print(f"[#] 发送媒体文件:{file_path}") success = await self.wechat.send_file(file_path, user_id) if success: asyncio.create_task(FileHandler.cleanup_file(file_path)) # 提取并发送回复文本中的文件路径(兼容旧模式) files = FileHandler.extract_file_paths(response) for file_path, ext in files: if file_path not in media_paths: # 避免重复发送 print(f"[#] 发送文件:{file_path}") success = await self.wechat.send_file(file_path, user_id) if success: asyncio.create_task(FileHandler.cleanup_file(file_path)) except Exception as e: print(f"[!] 处理消息异常:{e}") await self.wechat.send_text(f"[错误] 消息处理失败:{e}", user_id) # ==================== 轮询服务 ==================== class PollingService: """轮询企业微信消息""" def __init__(self, wechat: WeChatClient, processor: MessageProcessor): self.wechat = wechat self.processor = processor self.last_msg_time = 0 self.processed_msg_ids = set() # 轮询接口地址 self.poll_url = "https://api.yuangs.cc/wechat/poll" # ACK 接口地址 self.ack_url = "https://api.yuangs.cc/wechat/ack" async def ack_message(self, msg_id: str) -> bool: """确认消息已处理,从队列中删除""" if not WECHAT_WEBHOOK_SECRET: return False async with httpx.AsyncClient() as client: try: resp = await client.post( self.ack_url, headers={"X-Webhook-Secret": WECHAT_WEBHOOK_SECRET, "Content-Type": "application/json"}, json={"msg_id": msg_id}, timeout=5 ) if resp.status_code == 200: data = resp.json() return data.get("status") == "ok" except Exception as e: print(f"[!] ACK 失败:{e}") return False async def poll(self): """轮询消息""" print(f"[+] 开始轮询企业微信消息 (HTTP API 模式)...") if not WECHAT_WEBHOOK_SECRET: print(f"[!] 警告:未配置 WECHAT_WEBHOOK_SECRET,轮询接口将无法认证") while True: try: # 调用轮询 API 获取消息 (GET 请求,带认证 Header) headers = {"X-Webhook-Secret": WECHAT_WEBHOOK_SECRET} if WECHAT_WEBHOOK_SECRET else {} async with httpx.AsyncClient() as client: resp = await client.get( self.poll_url, headers=headers, timeout=10 ) if resp.status_code == 200: data = resp.json() messages = data.get("messages", []) for msg in messages: msg_id = msg.get("id", "") user_id = msg.get("user_id", "") content = msg.get("content", "") # 跳过已处理的消息 if msg_id in self.processed_msg_ids: continue self.processed_msg_ids.add(msg_id) # 限制缓存大小 if len(self.processed_msg_ids) > 1000: self.processed_msg_ids = set(list(self.processed_msg_ids)[-500:]) # 确认消息 await self.ack_message(msg_id) print(f"[+] ACK 消息:{msg_id[:8]}...") # 处理消息 await self.processor.process(content, user_id) elif resp.status_code == 401: print(f"[!] 认证失败,请检查 WECHAT_WEBHOOK_SECRET") else: print(f"[!] 轮询失败:HTTP {resp.status_code}") except httpx.TimeoutException: pass # 超时是正常的,继续轮询 except Exception as e: print(f"[!] 轮询异常:{e}") # 轮询间隔 await asyncio.sleep(1) # ==================== 主程序 ==================== async def main(): print("=" * 50) print("企业微信 <-> nanobot 桥接服务 (HTTP API 版本)") print("=" * 50) print(f"[+] Gateway API: {NANOBOT_API_URL}") print(f"[+] 轮询地址: https://api.yuangs.cc/wechat/poll") # 检查 Gateway API 是否可用 async with httpx.AsyncClient() as client: try: resp = await client.get(f"{NANOBOT_API_URL}/api/health", timeout=5) if resp.status_code == 200: print(f"[+] Gateway API 连接成功") else: print(f"[!] Gateway API 返回 {resp.status_code}") except Exception as e: print(f"[!] Gateway API 不可用: {e}") print(f"[!] 请确保 nanobot gateway 正在运行") return # 创建客户端 wechat = WeChatClient() nanobot = NanobotClient(NANOBOT_WORKSPACE) processor = MessageProcessor(wechat, nanobot) polling = PollingService(wechat, processor) # 开始轮询 await polling.poll() if __name__ == "__main__": asyncio.run(main())
配图 (可多选)
选择新图片文件或拖拽到此处
标签
更新文章
删除文章