commit eb19d8d05fcc1582560188a7c73eb9ce16055542 Author: jesxion Date: Mon Apr 13 11:12:49 2026 +0800 初始化 WeChat Agent 项目 MVP Phase 1 核心模块: - src/config/settings.py - 配置管理 - src/vlm/qwen_vl.py - Qwen-VL2 视觉模型接口 - src/wechat/controller.py - 微信客户端控制器(UIAutomation) - src/core/engine.py - 核心引擎(轮询、消息处理、回复) - src/main.py - 主入口 文档: - README.md - config.example.yaml - requirements.txt 技术方案:纯视觉AI + UIAutomation - 截图 → Qwen-VL2 识别 → AI 判断 → UIAutomation 操作 diff --git a/README.md b/README.md new file mode 100644 index 0000000..9f5849b --- /dev/null +++ b/README.md @@ -0,0 +1,96 @@ +# WeChat Agent - Windows 微信客户端 AI 自动化 + +基于视觉 AI 的 Windows 微信客户端自动化系统,实现消息识别与自动回复。 + +## 项目概述 + +本项目实现对 Windows 微信客户端的 AI 自动化控制: +- **信息获取**:通过视觉识别获取聊天记录、用户信息、消息内容 +- **自动回复**:基于规则/AI 判断自动发送消息 +- **纯视觉方案**:不依赖微信 API,通过屏幕截图 + VLM 识别 + 自动操作 + +## 技术架构 + +``` +微信Windows客户端(截图) + ↓ +UIAutomation + 屏幕截图 + ↓ + Qwen-VL2 视觉理解 + ↓ + LLM 推理判断 + ↓ +UIAutomation 执行操作 + ↓ +微信客户端(发送) +``` + +## 核心模块 + +| 模块 | 说明 | +|-----|------| +| `vlm` | 视觉模型接口(Qwen-VL2) | +| `wechat` | 微信客户端控制(UIAutomation) | +| `core` | 核心引擎(消息捕获、回复判断) | +| `agent` | AI Agent 逻辑 | +| `ui` | 桌面 UI 界面 | +| `config` | 配置管理 | + +## 技术栈 + +- **语言**: Python 3.10+ +- **视觉模型**: Qwen-VL2(本地部署) +- **Windows 控制**: UIAutomation (PyWinAuto) +- **LLM**: OpenAI 兼容 API +- **桌面 UI**: PyQt6 / Tkinter + +## 快速开始 + +### 环境要求 + +- Windows 10/11 +- Python 3.10+ +- 微信 Windows 客户端 3.8.x(推荐) +- Qwen-VL2 模型(本地部署) + +### 安装 + +```bash +pip install -r requirements.txt +``` + +### 配置 + +```bash +cp config.example.yaml config.yaml +# 编辑 config.yaml 填入 API 配置 +``` + +### 运行 + +```bash +python src/main.py +``` + +## MVP 功能 + +### Phase 1(本期) +- [ ] 微信窗口识别 +- [ ] 聊天记录截图识别 +- [ ] 用户信息识别 +- [ ] 关键词自动回复 +- [ ] 定时轮询机制 + +### Phase 2(后续) +- [ ] 知识库集成 +- [ ] 多账号管理 +- [ ] 复杂对话上下文 + +## 参考项目 + +- [thiflow-research](http://192.168.5.5:3000/jesxion/thiflow-research) - Thiflow 产品研究 +- [thiflow.com](https://thiflow.com/) - 参考产品 + +## License + +MIT diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..142338d --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,46 @@ +# WeChat Agent 配置文件示例 + +vlm: + model_type: qwen-vl2 # qwen-vl2 / gpt-4v + api_base: http://localhost:8000/v1 # VLM API 地址 + api_key: "" # VLM API Key(如果需要) + model_name: Qwen-VL2 # 模型名称 + max_tokens: 2048 + temperature: 0.7 + +llm: + api_base: https://api.openai.com/v1 # LLM API 地址 + api_key: your-api-key-here # OpenAI API Key + model_name: gpt-4o # 模型名称 + max_tokens: 2048 + temperature: 0.7 + +wechat: + client_version: "3.8.x" # 推荐微信版本 + poll_interval: 2.0 # 轮询间隔(秒) + screenshot_interval: 1.0 # 截图间隔(秒) + window_title: "微信" # 微信窗口标题 + +# 回复规则 +rules: + # 关键词回复示例 + - keywords: + - 你好 + - hi + - hello + reply_type: keyword + reply_content: "您好,有什么可以帮您的?" + enabled: true + + # AI 回复示例(无匹配关键词时) + - keywords: [] + reply_type: AI + reply_content: "" + enabled: true + +# 知识库(可选,后续接入) +knowledge_base: + url: http://192.168.5.5:1933 + +# 日志级别 +log_level: INFO diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..47269ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,26 @@ +# WeChat Agent 依赖 + +# Windows UI 自动化 +pywinauto>=0.6.8 +pywin32>=305 + +# HTTP 请求 +requests>=2.28.0 +urllib3>=1.26.0 + +# 数据处理 +pyyaml>=6.0 +pillow>=9.0.0 + +# 异步(可选) +# asyncio +# aiohttp>=3.8.0 + +# 日志 +# coloredlogs>=15.0 # 可选 + +# 测试 +pytest>=7.0.0 + +# 类型检查 +# mypy>=0.950 diff --git a/src/config/settings.py b/src/config/settings.py new file mode 100644 index 0000000..7e11dd6 --- /dev/null +++ b/src/config/settings.py @@ -0,0 +1,185 @@ +""" +配置管理模块 +Configuration management module +""" + +import os +import yaml +from dataclasses import dataclass, field +from typing import List, Optional +from pathlib import Path + + +@dataclass +class VLMSettings: + """视觉模型配置""" + model_type: str = "qwen-vl2" # qwen-vl2 / GPT-4V + api_base: str = "http://localhost:8000/v1" + api_key: str = "" + model_name: str = "Qwen-VL2" + max_tokens: int = 2048 + temperature: float = 0.7 + + +@dataclass +class LLMSettings: + """LLM 配置""" + api_base: str = "https://api.openai.com/v1" + api_key: str = "" + model_name: str = "gpt-4o" + max_tokens: int = 2048 + temperature: float = 0.7 + + +@dataclass +class WeChatSettings: + """微信客户端配置""" + client_version: str = "3.8.x" # 推荐版本 + poll_interval: float = 2.0 # 轮询间隔(秒) + screenshot_interval: float = 1.0 # 截图间隔(秒) + window_title: str = "微信" # 窗口标题 + + +@dataclass +class ReplyRule: + """回复规则""" + keywords: List[str] = field(default_factory=list) + reply_type: str = "keyword" # keyword / AI + reply_content: str = "" + enabled: bool = True + + +@dataclass +class Config: + """全局配置""" + vlm: VLMSettings = field(default_factory=VLMSettings) + llm: LLMSettings = field(default_factory=LLMSettings) + wechat: WeChatSettings = field(default_factory=WeChatSettings) + rules: List[ReplyRule] = field(default_factory=list) + knowledge_base_url: Optional[str] = None + log_level: str = "INFO" + + +class ConfigManager: + """配置管理器""" + + _instance = None + _config: Optional[Config] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + def load(cls, config_path: str = None) -> Config: + """加载配置""" + if cls._config is not None: + return cls._config + + if config_path is None: + config_path = os.environ.get( + "WECHAT_AGENT_CONFIG", + str(Path(__file__).parent.parent.parent / "config.yaml") + ) + + if os.path.exists(config_path): + with open(config_path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + cls._config = cls._parse_config(data) + else: + cls._config = Config() + + return cls._config + + @classmethod + def _parse_config(cls, data: dict) -> Config: + """解析配置数据""" + config = Config() + + if "vlm" in data: + for key, value in data["vlm"].items(): + if hasattr(config.vlm, key): + setattr(config.vlm, key, value) + + if "llm" in data: + for key, value in data["llm"].items(): + if hasattr(config.llm, key): + setattr(config.llm, key, value) + + if "wechat" in data: + for key, value in data["wechat"].items(): + if hasattr(config.wechat, key): + setattr(config.wechat, key, value) + + if "rules" in data: + for rule_data in data["rules"]: + rule = ReplyRule( + keywords=rule_data.get("keywords", []), + reply_type=rule_data.get("reply_type", "keyword"), + reply_content=rule_data.get("reply_content", ""), + enabled=rule_data.get("enabled", True) + ) + config.rules.append(rule) + + if "knowledge_base" in data: + config.knowledge_base_url = data["knowledge_base"].get("url") + + config.log_level = data.get("log_level", "INFO") + + return config + + @classmethod + def get_config(cls) -> Config: + """获取当前配置""" + if cls._config is None: + return cls.load() + return cls._config + + @classmethod + def save_example(cls, path: str): + """保存配置示例""" + example = { + "vlm": { + "model_type": "qwen-vl2", + "api_base": "http://localhost:8000/v1", + "api_key": "", + "model_name": "Qwen-VL2", + "max_tokens": 2048, + "temperature": 0.7 + }, + "llm": { + "api_base": "https://api.openai.com/v1", + "api_key": "your-api-key", + "model_name": "gpt-4o", + "max_tokens": 2048, + "temperature": 0.7 + }, + "wechat": { + "client_version": "3.8.x", + "poll_interval": 2.0, + "screenshot_interval": 1.0, + "window_title": "微信" + }, + "rules": [ + { + "keywords": ["你好", "hi", "hello"], + "reply_type": "keyword", + "reply_content": "您好,有什么可以帮您的?", + "enabled": True + }, + { + "keywords": [], + "reply_type": "AI", + "reply_content": "", + "enabled": True + } + ], + "knowledge_base": { + "url": "http://192.168.5.5:1933" + }, + "log_level": "INFO" + } + + with open(path, "w", encoding="utf-8") as f: + yaml.dump(example, f, allow_unicode=True, default_flow_style=False) diff --git a/src/core/engine.py b/src/core/engine.py new file mode 100644 index 0000000..6000785 --- /dev/null +++ b/src/core/engine.py @@ -0,0 +1,353 @@ +""" +核心引擎 +WeChat Agent Core Engine +""" + +import time +import logging +import threading +from dataclasses import dataclass, field +from typing import List, Optional, Callable, Dict, Any +from enum import Enum +from queue import Queue + +logger = logging.getLogger(__name__) + + +class AgentState(Enum): + """Agent 状态""" + IDLE = "idle" + RUNNING = "running" + PAUSED = "paused" + ERROR = "error" + + +@dataclass +class ChatSnapshot: + """聊天快照""" + timestamp: float + chat_name: str + messages: List[Dict[str, Any]] + screenshot_path: str + has_new: bool = False + + +@dataclass +class ReplyResult: + """回复结果""" + success: bool + content: str + reason: str = "" + + +class MessageProcessor: + """消息处理器""" + + def __init__(self, vlm_client, llm_client, config): + self.vlm_client = vlm_client + self.llm_client = llm_client + self.config = config + self._rules = config.rules + + def should_reply(self, chat_snapshot: ChatSnapshot) -> bool: + """判断是否需要回复""" + if not chat_snapshot.has_new: + return False + + # 检查是否是自己的消息 + messages = chat_snapshot.messages + if not messages: + return False + + last_msg = messages[-1] + if last_msg.get("is_self"): + return False + + return True + + def generate_reply(self, chat_snapshot: ChatSnapshot) -> str: + """生成回复内容""" + # 先检查关键词规则 + last_content = chat_snapshot.messages[-1].get("content", "") + + for rule in self._rules: + if not rule.enabled: + continue + + if rule.reply_type == "keyword": + # 关键词匹配 + for keyword in rule.keywords: + if keyword in last_content: + logger.info(f"关键词匹配: {keyword}") + return rule.reply_content + + elif rule.reply_type == "AI": + # AI 生成回复 + return self._ai_generate_reply(chat_snapshot) + + return "" + + def _ai_generate_reply(self, chat_snapshot: ChatSnapshot) -> str: + """AI 生成回复""" + try: + # 构造 prompt + prompt = f"""当前聊天: {chat_snapshot.chat_name} +历史消息: +""" + for msg in chat_snapshot.messages[-10:]: # 最近10条 + sender = "我" if msg.get("is_self") else "对方" + prompt += f"- [{sender}] {msg.get('content', '')}\n" + + prompt += """ +请生成一条合适的回复,只返回回复内容,不要其他文字。""" + + # 调用 LLM + response = self.llm_client.chat([ + {"role": "user", "content": prompt} + ]) + + return response.get("text", "") + except Exception as e: + logger.error(f"AI 生成回复失败: {e}") + return "" + + def match_keyword_rule(self, content: str) -> Optional[str]: + """匹配关键词规则""" + for rule in self._rules: + if not rule.enabled or rule.reply_type != "keyword": + continue + + for keyword in rule.keywords: + if keyword in content: + return rule.reply_content + return None + + +class WeChatAgent: + """微信 Agent""" + + def __init__( + self, + wechat_controller, + vlm_client, + llm_client, + config, + message_queue: Queue = None + ): + self.wechat = wechat_controller + self.vlm = vlm_client + self.llm = llm_client + self.config = config + self.processor = MessageProcessor(vlm_client, llm_client, config) + + self._state = AgentState.IDLE + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._pause_event = threading.Event() + + self._message_queue = message_queue or Queue() + self._callbacks: Dict[str, List[Callable]] = { + "on_message": [], # 收到新消息 + "on_reply": [], # 发送回复 + "on_error": [], # 发生错误 + "on_state_change": [], # 状态变化 + } + + self._last_processed_time: Dict[str, float] = {} # 记录每个聊天的处理时间 + + @property + def state(self) -> AgentState: + """获取状态""" + return self._state + + def start(self): + """启动 Agent""" + if self._state == AgentState.RUNNING: + logger.warning("Agent 已经在运行中") + return + + self._stop_event.clear() + self._pause_event.clear() + self._state = AgentState.RUNNING + + self._thread = threading.Thread(target=self._run_loop, daemon=True) + self._thread.start() + + self._emit("on_state_change", self._state) + logger.info("Agent 已启动") + + def stop(self): + """停止 Agent""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + self._state = AgentState.IDLE + self._emit("on_state_change", self._state) + logger.info("Agent 已停止") + + def pause(self): + """暂停 Agent""" + self._pause_event.set() + self._state = AgentState.PAUSED + self._emit("on_state_change", self._state) + logger.info("Agent 已暂停") + + def resume(self): + """恢复 Agent""" + self._pause_event.clear() + self._state = AgentState.RUNNING + self._emit("on_state_change", self._state) + logger.info("Agent 已恢复") + + def _run_loop(self): + """主循环""" + poll_interval = self.config.wechat.poll_interval + + while not self._stop_event.is_set(): + try: + # 检查暂停 + if self._pause_event.is_set(): + time.sleep(0.5) + continue + + # 检查连接 + if not self.wechat.is_connected(): + logger.warning("微信未连接,尝试重连...") + if not self.wechat.connect(): + time.sleep(poll_interval) + continue + + # 执行一次轮询 + self._poll_once() + + # 等待 + time.sleep(poll_interval) + + except Exception as e: + logger.error(f"轮询异常: {e}") + self._state = AgentState.ERROR + self._emit("on_error", str(e)) + time.sleep(poll_interval) + + def _poll_once(self): + """执行一次轮询""" + try: + # 1. 截图 + screenshot_path = self.wechat.screenshot() + + # 2. VLM 分析截图 + chat_info = self.vlm.analyze_chat_screenshot(screenshot_path) + + # 3. 检查是否有新消息 + has_new = chat_info.get("has_new_message", False) + chat_name = chat_info.get("current_chat", "") + messages = chat_info.get("messages", []) + + # 防重复处理(同一聊天 5 秒内不重复处理) + current_time = time.time() + chat_key = f"{chat_name}_{hash(str(messages[-1:]))}" + if chat_key in self._last_processed_time: + if current_time - self._last_processed_time[chat_key] < 5: + return + + if has_new or messages: + self._last_processed_time[chat_key] = current_time + + # 创建快照 + snapshot = ChatSnapshot( + timestamp=current_time, + chat_name=chat_name, + messages=messages, + screenshot_path=screenshot_path, + has_new=has_new + ) + + # 触发消息回调 + self._emit("on_message", snapshot) + + # 判断是否需要回复 + if self.processor.should_reply(snapshot): + reply = self.processor.generate_reply(snapshot) + if reply: + result = self.send_reply(reply) + self._emit("on_reply", result) + + except Exception as e: + logger.error(f"轮询处理异常: {e}") + raise + + def send_reply(self, text: str) -> ReplyResult: + """发送回复""" + try: + success = self.wechat.send_text(text) + return ReplyResult( + success=success, + content=text, + reason="发送成功" if success else "发送失败" + ) + except Exception as e: + return ReplyResult( + success=False, + content=text, + reason=str(e) + ) + + def on(self, event: str, callback: Callable): + """注册事件回调""" + if event in self._callbacks: + self._callbacks[event].append(callback) + + def _emit(self, event: str, *args): + """触发事件""" + if event in self._callbacks: + for callback in self._callbacks[event]: + try: + callback(*args) + except Exception as e: + logger.error(f"回调执行异常: {e}") + + def get_status(self) -> Dict[str, Any]: + """获取状态信息""" + return { + "state": self._state.value, + "connected": self.wechat.is_connected(), + "poll_interval": self.config.wechat.poll_interval, + "rules_count": len([r for r in self.config.rules if r.enabled]) + } + + +class MockWeChatController: + """模拟微信控制器(用于测试)""" + + def __init__(self): + self._connected = True + self._messages = [ + {"sender": "张三", "content": "你好", "time": "10:30", "is_self": False}, + {"sender": "张三", "content": "这个产品怎么卖?", "time": "10:31", "is_self": False}, + ] + + def connect(self, timeout: float = 10) -> bool: + return True + + def is_connected(self) -> bool: + return self._connected + + def screenshot(self, output_path: str = None) -> str: + import tempfile + from pathlib import Path + path = Path(tempfile.gettempdir()) / "mock_screenshot.png" + # 创建空白图片 + from PIL import Image + img = Image.new("RGB", (800, 600), color="white") + img.save(str(path)) + return str(path) + + def send_text(self, text: str) -> bool: + self._messages.append({"sender": "我", "content": text, "time": "10:32", "is_self": True}) + return True + + def get_message_list(self, count: int = 10) -> List: + return self._messages[-count:] + + def disconnect(self): + self._connected = False diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..8b31bf5 --- /dev/null +++ b/src/main.py @@ -0,0 +1,160 @@ +""" +WeChat Agent 主入口 +""" + +import sys +import logging +import argparse +from pathlib import Path + +# 添加 src 目录到路径 +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from src.config.settings import ConfigManager +from src.vlm.qwen_vl import create_vlm_client +from src.wechat.controller import WeChatController +from src.core.engine import WeChatAgent, MockWeChatController + + +def setup_logging(level: str = "INFO"): + """设置日志""" + logging.basicConfig( + level=getattr(logging, level.upper()), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + + +def main(): + parser = argparse.ArgumentParser(description="WeChat Agent - 微信客户端AI自动化") + parser.add_argument("--config", "-c", default="config.yaml", help="配置文件路径") + parser.add_argument("--log-level", "-l", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="日志级别") + parser.add_argument("--mock", action="store_true", help="使用模拟控制器(不连接微信)") + parser.add_argument("--demo", action="store_true", help="演示模式") + + args = parser.parse_args() + + # 设置日志 + setup_logging(args.log_level) + logger = logging.getLogger(__name__) + + # 加载配置 + config_path = Path(args.config) + if not config_path.exists(): + logger.info("配置文件不存在,创建示例配置...") + config_path.parent.mkdir(parents=True, exist_ok=True) + ConfigManager.save_example(str(config_path)) + logger.info(f"示例配置已创建: {config_path}") + logger.info("请编辑配置文件后重新运行") + return + + config = ConfigManager.load(str(config_path)) + logger.info(f"配置加载成功: {config_path}") + + # 创建 VLM 客户端 + try: + vlm_client = create_vlm_client({ + "model_type": config.vlm.model_type, + "api_base": config.vlm.api_base, + "api_key": config.vlm.api_key, + "model_name": config.vlm.model_name, + "max_tokens": config.vlm.max_tokens, + "temperature": config.vlm.temperature, + }) + logger.info("VLM 客户端创建成功") + except Exception as e: + logger.error(f"VLM 客户端创建失败: {e}") + return + + # 创建 LLM 客户端(简化版,后续实现) + class SimpleLLMClient: + def __init__(self, config): + self.config = config + + def chat(self, messages): + # 实际调用需要对接 OpenAI 兼容 API + return {"text": "测试回复"} + + llm_client = SimpleLLMClient(config.llm) + + # 创建微信控制器 + if args.mock: + logger.info("使用模拟控制器(--mock)") + wechat = MockWeChatController() + else: + try: + wechat = WeChatController(window_title=config.wechat.window_title) + if not wechat.connect(): + logger.error("无法连接到微信客户端,请确保微信已启动") + return + logger.info("微信控制器创建成功") + except RuntimeError as e: + logger.error(f"微信控制器创建失败: {e}") + return + + # 创建 Agent + agent = WeChatAgent( + wechat_controller=wechat, + vlm_client=vlm_client, + llm_client=llm_client, + config=config + ) + + # 注册回调 + def on_message(snapshot): + logger.info(f"收到消息 [{snapshot.chat_name}]: {snapshot.messages[-1] if snapshot.messages else 'N/A'}") + + def on_reply(result): + if result.success: + logger.info(f"回复成功: {result.content[:30]}...") + else: + logger.warning(f"回复失败: {result.reason}") + + def on_state_change(state): + logger.info(f"Agent 状态变化: {state.value}") + + agent.on("on_message", on_message) + agent.on("on_reply", on_reply) + agent.on("on_state_change", on_state_change) + + # 演示模式 + if args.demo: + logger.info("演示模式 - 模拟一次处理") + from src.core.engine import ChatSnapshot + import time + + snapshot = ChatSnapshot( + timestamp=time.time(), + chat_name="测试聊天", + messages=[ + {"sender": "张三", "content": "你好", "time": "10:30", "is_self": False}, + ], + screenshot_path="", + has_new=True + ) + + if agent.processor.should_reply(snapshot): + reply = agent.processor.generate_reply(snapshot) + logger.info(f"生成回复: {reply}") + else: + logger.info("不需要回复") + return + + # 启动 Agent + logger.info("启动 WeChat Agent...") + agent.start() + + try: + # 保持运行 + import time + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("收到中断信号,停止 Agent...") + agent.stop() + wechat.disconnect() + + +if __name__ == "__main__": + main() diff --git a/src/vlm/qwen_vl.py b/src/vlm/qwen_vl.py new file mode 100644 index 0000000..e0c0c9f --- /dev/null +++ b/src/vlm/qwen_vl.py @@ -0,0 +1,255 @@ +""" +视觉模型接口 +Vision Language Model interface for Qwen-VL2 +""" + +import base64 +import json +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List, Optional, Dict, Any +from pathlib import Path + +import requests + +logger = logging.getLogger(__name__) + + +@dataclass +class VLMMessages: + """VLM 消息结构""" + role: str # user / assistant + content: str # 文本或 image URL + + +@dataclass +class VLMResponse: + """VLM 响应""" + text: str + raw: dict + + +class BaseVLM(ABC): + """视觉模型基类""" + + @abstractmethod + def chat(self, messages: List[VLMMessages], **kwargs) -> VLMResponse: + """发送对话请求""" + pass + + @abstractmethod + def analyze_image(self, image_path: str, prompt: str, **kwargs) -> str: + """分析图片""" + pass + + +class QwenVL2Client(BaseVLM): + """Qwen-VL2 客户端""" + + SYSTEM_PROMPT = """你是一个专业的 Windows 微信客户端 UI 识别助手。 +你的任务是根据截图准确识别微信界面中的元素。 + +请识别以下信息: +1. 当前界面类型(聊天窗口/通讯录/设置等) +2. 聊天消息内容(发送者、接收者、消息内容、时间) +3. 用户信息(昵称、备注、微信号) +4. 界面状态(是否有新消息、是否有未读等) + +注意: +- 消息格式:[发送者] 时间 +- 消息内容要完整准确 +- 如果是图片/语音/文件,简略标注类型即可""" + + def __init__( + self, + api_base: str = "http://localhost:8000/v1", + api_key: str = "", + model_name: str = "Qwen-VL2", + max_tokens: int = 2048, + temperature: float = 0.7 + ): + self.api_base = api_base.rstrip("/") + self.api_key = api_key + self.model_name = model_name + self.max_tokens = max_tokens + self.temperature = temperature + + def _encode_image(self, image_path: str) -> str: + """将图片编码为 base64""" + with open(image_path, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8") + + def chat(self, messages: List[VLMMessages], **kwargs) -> VLMResponse: + """发送对话请求""" + # 构造消息格式 + formatted_messages = [] + for msg in messages: + if isinstance(msg.content, str): + formatted_messages.append({ + "role": msg.role, + "content": msg.content + }) + elif isinstance(msg.content, list): + # 多模态消息 + formatted_messages.append({ + "role": msg.role, + "content": msg.content + }) + + # 添加系统提示 + if not any(m.role == "system" for m in messages): + formatted_messages.insert(0, { + "role": "system", + "content": self.SYSTEM_PROMPT + }) + + payload = { + "model": self.model_name, + "messages": formatted_messages, + "max_tokens": kwargs.get("max_tokens", self.max_tokens), + "temperature": kwargs.get("temperature", self.temperature), + } + + headers = { + "Content-Type": "application/json" + } + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + try: + resp = requests.post( + f"{self.api_base}/chat/completions", + headers=headers, + json=payload, + timeout=60 + ) + resp.raise_for_status() + data = resp.json() + + return VLMResponse( + text=data["choices"][0]["message"]["content"], + raw=data + ) + except requests.exceptions.RequestException as e: + logger.error(f"VLM 请求失败: {e}") + raise VLMError(f"VLM 请求失败: {e}") + + def analyze_image(self, image_path: str, prompt: str, **kwargs) -> str: + """分析图片""" + if not Path(image_path).exists(): + raise VLMError(f"图片不存在: {image_path}") + + # 构造多模态消息 + image_data = self._encode_image(image_path) + + content = [ + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_data}" + } + }, + { + "type": "text", + "text": prompt + } + ] + + messages = [VLMMessages(role="user", content=content)] + response = self.chat(messages, **kwargs) + + return response.text + + def analyze_chat_screenshot(self, screenshot_path: str) -> Dict[str, Any]: + """分析聊天窗口截图 + + Returns: + 解析后的聊天信息,包含: + - messages: 消息列表 + - current_chat: 当前聊天对象 + - has_new_message: 是否有新消息 + """ + prompt = """请分析这个微信聊天截图,返回 JSON 格式: +{ + "current_chat": "当前聊天对象名称", + "has_new_message": true/false, + "messages": [ + { + "sender": "发送者", + "content": "消息内容", + "time": "时间", + "is_self": true/false + } + ] +} +只返回 JSON,不要其他内容。""" + + result = self.analyze_image(screenshot_path, prompt) + + # 尝试解析 JSON + try: + # 提取 JSON + start = result.find("{") + end = result.rfind("}") + 1 + if start >= 0 and end > start: + json_str = result[start:end] + return json.loads(json_str) + else: + return {"raw": result} + except json.JSONDecodeError: + return {"raw": result} + + def detect_ui_elements(self, screenshot_path: str) -> Dict[str, Any]: + """检测 UI 元素位置 + + Returns: + UI 元素字典,包含类型和位置 + """ + prompt = """请分析这个微信界面截图,标注关键 UI 元素的位置: +{ + "elements": [ + { + "type": "button/input/chat_list/...", + "name": "元素名称", + "bounds": {"x": 0, "y": 0, "width": 100, "height": 50} + } + ] +} +只返回 JSON。""" + + result = self.analyze_image(screenshot_path, prompt) + + try: + start = result.find("{") + end = result.rfind("}") + 1 + if start >= 0 and end > start: + return json.loads(result[start:end]) + return {"raw": result} + except json.JSONDecodeError: + return {"raw": result} + + +class VLMError(Exception): + """VLM 错误""" + pass + + +# 工厂函数 +def create_vlm_client(config: dict) -> BaseVLM: + """创建 VLM 客户端""" + model_type = config.get("model_type", "qwen-vl2").lower() + + if model_type == "qwen-vl2": + return QwenVL2Client( + api_base=config.get("api_base", "http://localhost:8000/v1"), + api_key=config.get("api_key", ""), + model_name=config.get("model_name", "Qwen-VL2"), + max_tokens=config.get("max_tokens", 2048), + temperature=config.get("temperature", 0.7) + ) + elif model_type == "gpt-4v": + # GPT-4V 客户端(待实现) + raise NotImplementedError("GPT-4V 客户端待实现") + else: + raise ValueError(f"不支持的 VLM 类型: {model_type}") diff --git a/src/wechat/controller.py b/src/wechat/controller.py new file mode 100644 index 0000000..7c3d194 --- /dev/null +++ b/src/wechat/controller.py @@ -0,0 +1,287 @@ +""" +微信客户端控制器 +WeChat Windows Client Controller using UIAutomation +""" + +import time +import logging +from dataclasses import dataclass +from typing import List, Optional, Tuple +from pathlib import Path + +logger = logging.getLogger(__name__) + +# 尝试导入 pywinauto +try: + from pywinauto import Application, WindowSpecification + from pywinauto.win32structures import RECT + from pywinauto.controls.hwndwrapper import HwndWrapper + HAS_PYWINAUTO = True +except ImportError: + HAS_PYWINAUTO = False + logger.warning("pywinauto 未安装,请运行: pip install pywinauto") + + +@dataclass +class ChatMessage: + """聊天消息""" + sender: str + content: str + time: str + is_self: bool + + +@dataclass +class WeChatWindow: + """微信窗口信息""" + hwnd: int + title: str + rect: RECT + isMinimized: bool + + +class WeChatController: + """微信客户端控制器""" + + # 微信窗口类名 + WEIXIN_WINDOW_CLASS = "WeChatLoginWnd" # 登录窗口 + WEIXIN_MAIN_WINDOW_CLASS = "WeChatMainWnd" # 主窗口 + + def __init__(self, window_title: str = "微信"): + if not HAS_PYWINAUTO: + raise RuntimeError("pywinauto 未安装,无法控制微信客户端") + + self.window_title = window_title + self.app: Optional[Application] = None + self.main_window: Optional[WindowSpecification] = None + self._connected = False + + def connect(self, timeout: float = 10) -> bool: + """连接到微信窗口 + + Args: + timeout: 超时时间(秒) + + Returns: + 是否连接成功 + """ + try: + # 尝试连接已运行的微信 + self.app = Application(backend="win32").connect( + title=self.window_title, + timeout=timeout + ) + self.main_window = self.app.window(title=self.window_title) + self._connected = True + logger.info("成功连接到微信窗口") + return True + except Exception as e: + logger.error(f"连接微信窗口失败: {e}") + return False + + def find_wechat_window(self) -> Optional[WeChatWindow]: + """查找微信窗口""" + import ctypes + from ctypes import wintypes + + user32 = ctypes.windll.user32 + + @ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM) + def enum_callback(hwnd, lparam): + length = user32.GetWindowTextLengthW(hwnd) + if length > 0: + buff = ctypes.create_unicode_buffer(length + 1) + user32.GetWindowTextW(hwnd, buff, length + 1) + title = buff.value + if self.window_title in title: + rect = RECT() + user32.GetWindowRect(hwnd, ctypes.byref(rect)) + is_min = user32.IsIconic(hwnd) + wechat_win = WeChatWindow( + hwnd=hwnd, + title=title, + rect=rect, + isMinimized=bool(is_min) + ) + # 存储到列表 + windows.append(wechat_win) + return True + + windows = [] + user32.EnumWindows(enum_callback, 0) + + if windows: + logger.info(f"找到 {len(windows)} 个微信窗口") + return windows[0] + return None + + def screenshot(self, output_path: str = None) -> str: + """截图 + + Args: + output_path: 保存路径,为 None 则保存到临时文件 + + Returns: + 截图路径 + """ + if not self.main_window: + raise RuntimeError("未连接微信窗口") + + if output_path is None: + import tempfile + output_path = Path(tempfile.gettempdir()) / f"wechat_screenshot_{int(time.time())}.png" + else: + output_path = Path(output_path) + + try: + # 激活窗口(如果最小化) + if self.main_window.is_minimized(): + self.main_window.restore() + + # 截图 + self.main_window.capture_as_image().save(str(output_path)) + logger.debug(f"截图已保存: {output_path}") + return str(output_path) + except Exception as e: + logger.error(f"截图失败: {e}") + raise + + def get_chat_list(self) -> List[str]: + """获取聊天列表""" + # 聊天列表在左侧,需要根据具体窗口结构调整 + # 这里需要根据实际 UI 结构调整 + try: + # 查找子窗口 + chat_list = self.main_window.window( + class_name="ChatListBox" # 假设的类名 + ) + items = chat_list.items() + return [item.text() for item in items] + except Exception as e: + logger.warning(f"获取聊天列表失败: {e}") + return [] + + def click_on_chat(self, chat_name: str) -> bool: + """点击聊天""" + try: + # 查找聊天项 + chat_list = self.main_window.window(class_name="ChatListBox") + chat_item = chat_list.window(title=chat_name) + chat_item.click() + logger.info(f"点击聊天: {chat_name}") + return True + except Exception as e: + logger.error(f"点击聊天失败: {e}") + return False + + def get_message_list(self, count: int = 10) -> List[ChatMessage]: + """获取消息列表 + + Args: + count: 最多获取的消息数 + + Returns: + 消息列表 + """ + messages = [] + try: + # 查找消息列表窗口 + msg_list = self.main_window.window(class_name="ChatMessageList") + + # 获取消息项 + items = msg_list.items()[-count:] # 取最后 N 条 + for item in items: + text = item.text() + # 解析消息格式 + # 格式: [发送者] 时间\n内容 + msg = self._parse_message(text) + if msg: + messages.append(msg) + + except Exception as e: + logger.warning(f"获取消息列表失败: {e}") + + return messages + + def _parse_message(self, text: str) -> Optional[ChatMessage]: + """解析消息文本""" + import re + + # 简单解析 + # 格式: 发送者 时间\n内容 + pattern = r"(.+?)\s+(\d{2}:\d{2})\n([\s\S]+)" + match = re.match(pattern, text) + if match: + sender = match.group(1).strip() + time_str = match.group(2).strip() + content = match.group(3).strip() + is_self = sender == "我" + return ChatMessage(sender=sender, content=content, time=time_str, is_self=is_self) + return None + + def send_text(self, text: str) -> bool: + """发送文本消息 + + Args: + text: 要发送的文本 + + Returns: + 是否发送成功 + """ + try: + # 查找输入框 + input_box = self.main_window.window(class_name="Edit") + + # 清空并输入 + input_box.set_edit_text("") + input_box.type_keys("^a") # 全选 + input_box.type_keys("{DELETE}") + input_box.set_edit_text(text) + + # 按回车发送 + input_box.type_keys("{ENTER}") + + logger.info(f"发送消息: {text[:20]}...") + return True + except Exception as e: + logger.error(f"发送消息失败: {e}") + return False + + def find_button(self, name: str) -> Optional[HwndWrapper]: + """查找按钮""" + try: + btn = self.main_window.window(title=name, class_name="Button") + return btn + except Exception: + return None + + def click_button(self, name: str) -> bool: + """点击按钮""" + try: + btn = self.find_button(name) + if btn: + btn.click() + return True + return False + except Exception as e: + logger.error(f"点击按钮失败: {e}") + return False + + def is_connected(self) -> bool: + """检查是否已连接""" + return self._connected and self.app is not None + + def disconnect(self): + """断开连接""" + if self.app: + self.app = None + self.main_window = None + self._connected = False + logger.info("已断开微信连接") + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect()