初始化 WeChat Agent 项目

MVP Phase 1 核心模块:
- src/config/settings.py     - 配置管理
- src/vlm/qwen_vl.py        - Qwen-VL2 视觉模型接口
- src/wechat/controller.py  - 微信客户端控制器(UIAutomation)
- src/core/engine.py        - 核心引擎(轮询、消息处理、回复)
- src/main.py               - 主入口

文档:
- README.md
- config.example.yaml
- requirements.txt

技术方案:纯视觉AI + UIAutomation
- 截图 → Qwen-VL2 识别 → AI 判断 → UIAutomation 操作
This commit is contained in:
2026-04-13 11:12:49 +08:00
commit eb19d8d05f
8 changed files with 1408 additions and 0 deletions

96
README.md Normal file
View File

@@ -0,0 +1,96 @@
# WeChat Agent - Windows 微信客户端 AI 自动化
基于视觉 AI 的 Windows 微信客户端自动化系统,实现消息识别与自动回复。
## 项目概述
本项目实现对 Windows 微信客户端的 AI 自动化控制:
- **信息获取**:通过视觉识别获取聊天记录、用户信息、消息内容
- **自动回复**:基于规则/AI 判断自动发送消息
- **纯视觉方案**:不依赖微信 API通过屏幕截图 + VLM 识别 + 自动操作
## 技术架构
```
微信Windows客户端截图
UIAutomation + 屏幕截图
Qwen-VL2 视觉理解
LLM 推理判断
UIAutomation 执行操作
微信客户端(发送)
```
## 核心模块
| 模块 | 说明 |
|-----|------|
| `vlm` | 视觉模型接口Qwen-VL2 |
| `wechat` | 微信客户端控制UIAutomation |
| `core` | 核心引擎(消息捕获、回复判断) |
| `agent` | AI Agent 逻辑 |
| `ui` | 桌面 UI 界面 |
| `config` | 配置管理 |
## 技术栈
- **语言**: Python 3.10+
- **视觉模型**: Qwen-VL2本地部署
- **Windows 控制**: UIAutomation (PyWinAuto)
- **LLM**: OpenAI 兼容 API
- **桌面 UI**: PyQt6 / Tkinter
## 快速开始
### 环境要求
- Windows 10/11
- Python 3.10+
- 微信 Windows 客户端 3.8.x推荐
- Qwen-VL2 模型(本地部署)
### 安装
```bash
pip install -r requirements.txt
```
### 配置
```bash
cp config.example.yaml config.yaml
# 编辑 config.yaml 填入 API 配置
```
### 运行
```bash
python src/main.py
```
## MVP 功能
### Phase 1本期
- [ ] 微信窗口识别
- [ ] 聊天记录截图识别
- [ ] 用户信息识别
- [ ] 关键词自动回复
- [ ] 定时轮询机制
### Phase 2后续
- [ ] 知识库集成
- [ ] 多账号管理
- [ ] 复杂对话上下文
## 参考项目
- [thiflow-research](http://192.168.5.5:3000/jesxion/thiflow-research) - Thiflow 产品研究
- [thiflow.com](https://thiflow.com/) - 参考产品
## License
MIT

46
config.example.yaml Normal file
View File

@@ -0,0 +1,46 @@
# WeChat Agent 配置文件示例
vlm:
model_type: qwen-vl2 # qwen-vl2 / gpt-4v
api_base: http://localhost:8000/v1 # VLM API 地址
api_key: "" # VLM API Key如果需要
model_name: Qwen-VL2 # 模型名称
max_tokens: 2048
temperature: 0.7
llm:
api_base: https://api.openai.com/v1 # LLM API 地址
api_key: your-api-key-here # OpenAI API Key
model_name: gpt-4o # 模型名称
max_tokens: 2048
temperature: 0.7
wechat:
client_version: "3.8.x" # 推荐微信版本
poll_interval: 2.0 # 轮询间隔(秒)
screenshot_interval: 1.0 # 截图间隔(秒)
window_title: "微信" # 微信窗口标题
# 回复规则
rules:
# 关键词回复示例
- keywords:
- 你好
- hi
- hello
reply_type: keyword
reply_content: "您好,有什么可以帮您的?"
enabled: true
# AI 回复示例(无匹配关键词时)
- keywords: []
reply_type: AI
reply_content: ""
enabled: true
# 知识库(可选,后续接入)
knowledge_base:
url: http://192.168.5.5:1933
# 日志级别
log_level: INFO

26
requirements.txt Normal file
View File

@@ -0,0 +1,26 @@
# WeChat Agent 依赖
# Windows UI 自动化
pywinauto>=0.6.8
pywin32>=305
# HTTP 请求
requests>=2.28.0
urllib3>=1.26.0
# 数据处理
pyyaml>=6.0
pillow>=9.0.0
# 异步(可选)
# asyncio
# aiohttp>=3.8.0
# 日志
# coloredlogs>=15.0 # 可选
# 测试
pytest>=7.0.0
# 类型检查
# mypy>=0.950

185
src/config/settings.py Normal file
View File

@@ -0,0 +1,185 @@
"""
配置管理模块
Configuration management module
"""
import os
import yaml
from dataclasses import dataclass, field
from typing import List, Optional
from pathlib import Path
@dataclass
class VLMSettings:
"""视觉模型配置"""
model_type: str = "qwen-vl2" # qwen-vl2 / GPT-4V
api_base: str = "http://localhost:8000/v1"
api_key: str = ""
model_name: str = "Qwen-VL2"
max_tokens: int = 2048
temperature: float = 0.7
@dataclass
class LLMSettings:
"""LLM 配置"""
api_base: str = "https://api.openai.com/v1"
api_key: str = ""
model_name: str = "gpt-4o"
max_tokens: int = 2048
temperature: float = 0.7
@dataclass
class WeChatSettings:
"""微信客户端配置"""
client_version: str = "3.8.x" # 推荐版本
poll_interval: float = 2.0 # 轮询间隔(秒)
screenshot_interval: float = 1.0 # 截图间隔(秒)
window_title: str = "微信" # 窗口标题
@dataclass
class ReplyRule:
"""回复规则"""
keywords: List[str] = field(default_factory=list)
reply_type: str = "keyword" # keyword / AI
reply_content: str = ""
enabled: bool = True
@dataclass
class Config:
"""全局配置"""
vlm: VLMSettings = field(default_factory=VLMSettings)
llm: LLMSettings = field(default_factory=LLMSettings)
wechat: WeChatSettings = field(default_factory=WeChatSettings)
rules: List[ReplyRule] = field(default_factory=list)
knowledge_base_url: Optional[str] = None
log_level: str = "INFO"
class ConfigManager:
"""配置管理器"""
_instance = None
_config: Optional[Config] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def load(cls, config_path: str = None) -> Config:
"""加载配置"""
if cls._config is not None:
return cls._config
if config_path is None:
config_path = os.environ.get(
"WECHAT_AGENT_CONFIG",
str(Path(__file__).parent.parent.parent / "config.yaml")
)
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
cls._config = cls._parse_config(data)
else:
cls._config = Config()
return cls._config
@classmethod
def _parse_config(cls, data: dict) -> Config:
"""解析配置数据"""
config = Config()
if "vlm" in data:
for key, value in data["vlm"].items():
if hasattr(config.vlm, key):
setattr(config.vlm, key, value)
if "llm" in data:
for key, value in data["llm"].items():
if hasattr(config.llm, key):
setattr(config.llm, key, value)
if "wechat" in data:
for key, value in data["wechat"].items():
if hasattr(config.wechat, key):
setattr(config.wechat, key, value)
if "rules" in data:
for rule_data in data["rules"]:
rule = ReplyRule(
keywords=rule_data.get("keywords", []),
reply_type=rule_data.get("reply_type", "keyword"),
reply_content=rule_data.get("reply_content", ""),
enabled=rule_data.get("enabled", True)
)
config.rules.append(rule)
if "knowledge_base" in data:
config.knowledge_base_url = data["knowledge_base"].get("url")
config.log_level = data.get("log_level", "INFO")
return config
@classmethod
def get_config(cls) -> Config:
"""获取当前配置"""
if cls._config is None:
return cls.load()
return cls._config
@classmethod
def save_example(cls, path: str):
"""保存配置示例"""
example = {
"vlm": {
"model_type": "qwen-vl2",
"api_base": "http://localhost:8000/v1",
"api_key": "",
"model_name": "Qwen-VL2",
"max_tokens": 2048,
"temperature": 0.7
},
"llm": {
"api_base": "https://api.openai.com/v1",
"api_key": "your-api-key",
"model_name": "gpt-4o",
"max_tokens": 2048,
"temperature": 0.7
},
"wechat": {
"client_version": "3.8.x",
"poll_interval": 2.0,
"screenshot_interval": 1.0,
"window_title": "微信"
},
"rules": [
{
"keywords": ["你好", "hi", "hello"],
"reply_type": "keyword",
"reply_content": "您好,有什么可以帮您的?",
"enabled": True
},
{
"keywords": [],
"reply_type": "AI",
"reply_content": "",
"enabled": True
}
],
"knowledge_base": {
"url": "http://192.168.5.5:1933"
},
"log_level": "INFO"
}
with open(path, "w", encoding="utf-8") as f:
yaml.dump(example, f, allow_unicode=True, default_flow_style=False)

353
src/core/engine.py Normal file
View File

@@ -0,0 +1,353 @@
"""
核心引擎
WeChat Agent Core Engine
"""
import time
import logging
import threading
from dataclasses import dataclass, field
from typing import List, Optional, Callable, Dict, Any
from enum import Enum
from queue import Queue
logger = logging.getLogger(__name__)
class AgentState(Enum):
"""Agent 状态"""
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
ERROR = "error"
@dataclass
class ChatSnapshot:
"""聊天快照"""
timestamp: float
chat_name: str
messages: List[Dict[str, Any]]
screenshot_path: str
has_new: bool = False
@dataclass
class ReplyResult:
"""回复结果"""
success: bool
content: str
reason: str = ""
class MessageProcessor:
"""消息处理器"""
def __init__(self, vlm_client, llm_client, config):
self.vlm_client = vlm_client
self.llm_client = llm_client
self.config = config
self._rules = config.rules
def should_reply(self, chat_snapshot: ChatSnapshot) -> bool:
"""判断是否需要回复"""
if not chat_snapshot.has_new:
return False
# 检查是否是自己的消息
messages = chat_snapshot.messages
if not messages:
return False
last_msg = messages[-1]
if last_msg.get("is_self"):
return False
return True
def generate_reply(self, chat_snapshot: ChatSnapshot) -> str:
"""生成回复内容"""
# 先检查关键词规则
last_content = chat_snapshot.messages[-1].get("content", "")
for rule in self._rules:
if not rule.enabled:
continue
if rule.reply_type == "keyword":
# 关键词匹配
for keyword in rule.keywords:
if keyword in last_content:
logger.info(f"关键词匹配: {keyword}")
return rule.reply_content
elif rule.reply_type == "AI":
# AI 生成回复
return self._ai_generate_reply(chat_snapshot)
return ""
def _ai_generate_reply(self, chat_snapshot: ChatSnapshot) -> str:
"""AI 生成回复"""
try:
# 构造 prompt
prompt = f"""当前聊天: {chat_snapshot.chat_name}
历史消息:
"""
for msg in chat_snapshot.messages[-10:]: # 最近10条
sender = "" if msg.get("is_self") else "对方"
prompt += f"- [{sender}] {msg.get('content', '')}\n"
prompt += """
请生成一条合适的回复,只返回回复内容,不要其他文字。"""
# 调用 LLM
response = self.llm_client.chat([
{"role": "user", "content": prompt}
])
return response.get("text", "")
except Exception as e:
logger.error(f"AI 生成回复失败: {e}")
return ""
def match_keyword_rule(self, content: str) -> Optional[str]:
"""匹配关键词规则"""
for rule in self._rules:
if not rule.enabled or rule.reply_type != "keyword":
continue
for keyword in rule.keywords:
if keyword in content:
return rule.reply_content
return None
class WeChatAgent:
"""微信 Agent"""
def __init__(
self,
wechat_controller,
vlm_client,
llm_client,
config,
message_queue: Queue = None
):
self.wechat = wechat_controller
self.vlm = vlm_client
self.llm = llm_client
self.config = config
self.processor = MessageProcessor(vlm_client, llm_client, config)
self._state = AgentState.IDLE
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._pause_event = threading.Event()
self._message_queue = message_queue or Queue()
self._callbacks: Dict[str, List[Callable]] = {
"on_message": [], # 收到新消息
"on_reply": [], # 发送回复
"on_error": [], # 发生错误
"on_state_change": [], # 状态变化
}
self._last_processed_time: Dict[str, float] = {} # 记录每个聊天的处理时间
@property
def state(self) -> AgentState:
"""获取状态"""
return self._state
def start(self):
"""启动 Agent"""
if self._state == AgentState.RUNNING:
logger.warning("Agent 已经在运行中")
return
self._stop_event.clear()
self._pause_event.clear()
self._state = AgentState.RUNNING
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
self._emit("on_state_change", self._state)
logger.info("Agent 已启动")
def stop(self):
"""停止 Agent"""
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
self._state = AgentState.IDLE
self._emit("on_state_change", self._state)
logger.info("Agent 已停止")
def pause(self):
"""暂停 Agent"""
self._pause_event.set()
self._state = AgentState.PAUSED
self._emit("on_state_change", self._state)
logger.info("Agent 已暂停")
def resume(self):
"""恢复 Agent"""
self._pause_event.clear()
self._state = AgentState.RUNNING
self._emit("on_state_change", self._state)
logger.info("Agent 已恢复")
def _run_loop(self):
"""主循环"""
poll_interval = self.config.wechat.poll_interval
while not self._stop_event.is_set():
try:
# 检查暂停
if self._pause_event.is_set():
time.sleep(0.5)
continue
# 检查连接
if not self.wechat.is_connected():
logger.warning("微信未连接,尝试重连...")
if not self.wechat.connect():
time.sleep(poll_interval)
continue
# 执行一次轮询
self._poll_once()
# 等待
time.sleep(poll_interval)
except Exception as e:
logger.error(f"轮询异常: {e}")
self._state = AgentState.ERROR
self._emit("on_error", str(e))
time.sleep(poll_interval)
def _poll_once(self):
"""执行一次轮询"""
try:
# 1. 截图
screenshot_path = self.wechat.screenshot()
# 2. VLM 分析截图
chat_info = self.vlm.analyze_chat_screenshot(screenshot_path)
# 3. 检查是否有新消息
has_new = chat_info.get("has_new_message", False)
chat_name = chat_info.get("current_chat", "")
messages = chat_info.get("messages", [])
# 防重复处理(同一聊天 5 秒内不重复处理)
current_time = time.time()
chat_key = f"{chat_name}_{hash(str(messages[-1:]))}"
if chat_key in self._last_processed_time:
if current_time - self._last_processed_time[chat_key] < 5:
return
if has_new or messages:
self._last_processed_time[chat_key] = current_time
# 创建快照
snapshot = ChatSnapshot(
timestamp=current_time,
chat_name=chat_name,
messages=messages,
screenshot_path=screenshot_path,
has_new=has_new
)
# 触发消息回调
self._emit("on_message", snapshot)
# 判断是否需要回复
if self.processor.should_reply(snapshot):
reply = self.processor.generate_reply(snapshot)
if reply:
result = self.send_reply(reply)
self._emit("on_reply", result)
except Exception as e:
logger.error(f"轮询处理异常: {e}")
raise
def send_reply(self, text: str) -> ReplyResult:
"""发送回复"""
try:
success = self.wechat.send_text(text)
return ReplyResult(
success=success,
content=text,
reason="发送成功" if success else "发送失败"
)
except Exception as e:
return ReplyResult(
success=False,
content=text,
reason=str(e)
)
def on(self, event: str, callback: Callable):
"""注册事件回调"""
if event in self._callbacks:
self._callbacks[event].append(callback)
def _emit(self, event: str, *args):
"""触发事件"""
if event in self._callbacks:
for callback in self._callbacks[event]:
try:
callback(*args)
except Exception as e:
logger.error(f"回调执行异常: {e}")
def get_status(self) -> Dict[str, Any]:
"""获取状态信息"""
return {
"state": self._state.value,
"connected": self.wechat.is_connected(),
"poll_interval": self.config.wechat.poll_interval,
"rules_count": len([r for r in self.config.rules if r.enabled])
}
class MockWeChatController:
"""模拟微信控制器(用于测试)"""
def __init__(self):
self._connected = True
self._messages = [
{"sender": "张三", "content": "你好", "time": "10:30", "is_self": False},
{"sender": "张三", "content": "这个产品怎么卖?", "time": "10:31", "is_self": False},
]
def connect(self, timeout: float = 10) -> bool:
return True
def is_connected(self) -> bool:
return self._connected
def screenshot(self, output_path: str = None) -> str:
import tempfile
from pathlib import Path
path = Path(tempfile.gettempdir()) / "mock_screenshot.png"
# 创建空白图片
from PIL import Image
img = Image.new("RGB", (800, 600), color="white")
img.save(str(path))
return str(path)
def send_text(self, text: str) -> bool:
self._messages.append({"sender": "", "content": text, "time": "10:32", "is_self": True})
return True
def get_message_list(self, count: int = 10) -> List:
return self._messages[-count:]
def disconnect(self):
self._connected = False

160
src/main.py Normal file
View File

@@ -0,0 +1,160 @@
"""
WeChat Agent 主入口
"""
import sys
import logging
import argparse
from pathlib import Path
# 添加 src 目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config.settings import ConfigManager
from src.vlm.qwen_vl import create_vlm_client
from src.wechat.controller import WeChatController
from src.core.engine import WeChatAgent, MockWeChatController
def setup_logging(level: str = "INFO"):
"""设置日志"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
def main():
parser = argparse.ArgumentParser(description="WeChat Agent - 微信客户端AI自动化")
parser.add_argument("--config", "-c", default="config.yaml", help="配置文件路径")
parser.add_argument("--log-level", "-l", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="日志级别")
parser.add_argument("--mock", action="store_true", help="使用模拟控制器(不连接微信)")
parser.add_argument("--demo", action="store_true", help="演示模式")
args = parser.parse_args()
# 设置日志
setup_logging(args.log_level)
logger = logging.getLogger(__name__)
# 加载配置
config_path = Path(args.config)
if not config_path.exists():
logger.info("配置文件不存在,创建示例配置...")
config_path.parent.mkdir(parents=True, exist_ok=True)
ConfigManager.save_example(str(config_path))
logger.info(f"示例配置已创建: {config_path}")
logger.info("请编辑配置文件后重新运行")
return
config = ConfigManager.load(str(config_path))
logger.info(f"配置加载成功: {config_path}")
# 创建 VLM 客户端
try:
vlm_client = create_vlm_client({
"model_type": config.vlm.model_type,
"api_base": config.vlm.api_base,
"api_key": config.vlm.api_key,
"model_name": config.vlm.model_name,
"max_tokens": config.vlm.max_tokens,
"temperature": config.vlm.temperature,
})
logger.info("VLM 客户端创建成功")
except Exception as e:
logger.error(f"VLM 客户端创建失败: {e}")
return
# 创建 LLM 客户端(简化版,后续实现)
class SimpleLLMClient:
def __init__(self, config):
self.config = config
def chat(self, messages):
# 实际调用需要对接 OpenAI 兼容 API
return {"text": "测试回复"}
llm_client = SimpleLLMClient(config.llm)
# 创建微信控制器
if args.mock:
logger.info("使用模拟控制器(--mock")
wechat = MockWeChatController()
else:
try:
wechat = WeChatController(window_title=config.wechat.window_title)
if not wechat.connect():
logger.error("无法连接到微信客户端,请确保微信已启动")
return
logger.info("微信控制器创建成功")
except RuntimeError as e:
logger.error(f"微信控制器创建失败: {e}")
return
# 创建 Agent
agent = WeChatAgent(
wechat_controller=wechat,
vlm_client=vlm_client,
llm_client=llm_client,
config=config
)
# 注册回调
def on_message(snapshot):
logger.info(f"收到消息 [{snapshot.chat_name}]: {snapshot.messages[-1] if snapshot.messages else 'N/A'}")
def on_reply(result):
if result.success:
logger.info(f"回复成功: {result.content[:30]}...")
else:
logger.warning(f"回复失败: {result.reason}")
def on_state_change(state):
logger.info(f"Agent 状态变化: {state.value}")
agent.on("on_message", on_message)
agent.on("on_reply", on_reply)
agent.on("on_state_change", on_state_change)
# 演示模式
if args.demo:
logger.info("演示模式 - 模拟一次处理")
from src.core.engine import ChatSnapshot
import time
snapshot = ChatSnapshot(
timestamp=time.time(),
chat_name="测试聊天",
messages=[
{"sender": "张三", "content": "你好", "time": "10:30", "is_self": False},
],
screenshot_path="",
has_new=True
)
if agent.processor.should_reply(snapshot):
reply = agent.processor.generate_reply(snapshot)
logger.info(f"生成回复: {reply}")
else:
logger.info("不需要回复")
return
# 启动 Agent
logger.info("启动 WeChat Agent...")
agent.start()
try:
# 保持运行
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到中断信号,停止 Agent...")
agent.stop()
wechat.disconnect()
if __name__ == "__main__":
main()

255
src/vlm/qwen_vl.py Normal file
View File

@@ -0,0 +1,255 @@
"""
视觉模型接口
Vision Language Model interface for Qwen-VL2
"""
import base64
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from pathlib import Path
import requests
logger = logging.getLogger(__name__)
@dataclass
class VLMMessages:
"""VLM 消息结构"""
role: str # user / assistant
content: str # 文本或 image URL
@dataclass
class VLMResponse:
"""VLM 响应"""
text: str
raw: dict
class BaseVLM(ABC):
"""视觉模型基类"""
@abstractmethod
def chat(self, messages: List[VLMMessages], **kwargs) -> VLMResponse:
"""发送对话请求"""
pass
@abstractmethod
def analyze_image(self, image_path: str, prompt: str, **kwargs) -> str:
"""分析图片"""
pass
class QwenVL2Client(BaseVLM):
"""Qwen-VL2 客户端"""
SYSTEM_PROMPT = """你是一个专业的 Windows 微信客户端 UI 识别助手。
你的任务是根据截图准确识别微信界面中的元素。
请识别以下信息:
1. 当前界面类型(聊天窗口/通讯录/设置等)
2. 聊天消息内容(发送者、接收者、消息内容、时间)
3. 用户信息(昵称、备注、微信号)
4. 界面状态(是否有新消息、是否有未读等)
注意:
- 消息格式:[发送者] 时间
- 消息内容要完整准确
- 如果是图片/语音/文件,简略标注类型即可"""
def __init__(
self,
api_base: str = "http://localhost:8000/v1",
api_key: str = "",
model_name: str = "Qwen-VL2",
max_tokens: int = 2048,
temperature: float = 0.7
):
self.api_base = api_base.rstrip("/")
self.api_key = api_key
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
def _encode_image(self, image_path: str) -> str:
"""将图片编码为 base64"""
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def chat(self, messages: List[VLMMessages], **kwargs) -> VLMResponse:
"""发送对话请求"""
# 构造消息格式
formatted_messages = []
for msg in messages:
if isinstance(msg.content, str):
formatted_messages.append({
"role": msg.role,
"content": msg.content
})
elif isinstance(msg.content, list):
# 多模态消息
formatted_messages.append({
"role": msg.role,
"content": msg.content
})
# 添加系统提示
if not any(m.role == "system" for m in messages):
formatted_messages.insert(0, {
"role": "system",
"content": self.SYSTEM_PROMPT
})
payload = {
"model": self.model_name,
"messages": formatted_messages,
"max_tokens": kwargs.get("max_tokens", self.max_tokens),
"temperature": kwargs.get("temperature", self.temperature),
}
headers = {
"Content-Type": "application/json"
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
try:
resp = requests.post(
f"{self.api_base}/chat/completions",
headers=headers,
json=payload,
timeout=60
)
resp.raise_for_status()
data = resp.json()
return VLMResponse(
text=data["choices"][0]["message"]["content"],
raw=data
)
except requests.exceptions.RequestException as e:
logger.error(f"VLM 请求失败: {e}")
raise VLMError(f"VLM 请求失败: {e}")
def analyze_image(self, image_path: str, prompt: str, **kwargs) -> str:
"""分析图片"""
if not Path(image_path).exists():
raise VLMError(f"图片不存在: {image_path}")
# 构造多模态消息
image_data = self._encode_image(image_path)
content = [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_data}"
}
},
{
"type": "text",
"text": prompt
}
]
messages = [VLMMessages(role="user", content=content)]
response = self.chat(messages, **kwargs)
return response.text
def analyze_chat_screenshot(self, screenshot_path: str) -> Dict[str, Any]:
"""分析聊天窗口截图
Returns:
解析后的聊天信息,包含:
- messages: 消息列表
- current_chat: 当前聊天对象
- has_new_message: 是否有新消息
"""
prompt = """请分析这个微信聊天截图,返回 JSON 格式:
{
"current_chat": "当前聊天对象名称",
"has_new_message": true/false,
"messages": [
{
"sender": "发送者",
"content": "消息内容",
"time": "时间",
"is_self": true/false
}
]
}
只返回 JSON不要其他内容。"""
result = self.analyze_image(screenshot_path, prompt)
# 尝试解析 JSON
try:
# 提取 JSON
start = result.find("{")
end = result.rfind("}") + 1
if start >= 0 and end > start:
json_str = result[start:end]
return json.loads(json_str)
else:
return {"raw": result}
except json.JSONDecodeError:
return {"raw": result}
def detect_ui_elements(self, screenshot_path: str) -> Dict[str, Any]:
"""检测 UI 元素位置
Returns:
UI 元素字典,包含类型和位置
"""
prompt = """请分析这个微信界面截图,标注关键 UI 元素的位置:
{
"elements": [
{
"type": "button/input/chat_list/...",
"name": "元素名称",
"bounds": {"x": 0, "y": 0, "width": 100, "height": 50}
}
]
}
只返回 JSON。"""
result = self.analyze_image(screenshot_path, prompt)
try:
start = result.find("{")
end = result.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(result[start:end])
return {"raw": result}
except json.JSONDecodeError:
return {"raw": result}
class VLMError(Exception):
"""VLM 错误"""
pass
# 工厂函数
def create_vlm_client(config: dict) -> BaseVLM:
"""创建 VLM 客户端"""
model_type = config.get("model_type", "qwen-vl2").lower()
if model_type == "qwen-vl2":
return QwenVL2Client(
api_base=config.get("api_base", "http://localhost:8000/v1"),
api_key=config.get("api_key", ""),
model_name=config.get("model_name", "Qwen-VL2"),
max_tokens=config.get("max_tokens", 2048),
temperature=config.get("temperature", 0.7)
)
elif model_type == "gpt-4v":
# GPT-4V 客户端(待实现)
raise NotImplementedError("GPT-4V 客户端待实现")
else:
raise ValueError(f"不支持的 VLM 类型: {model_type}")

287
src/wechat/controller.py Normal file
View File

@@ -0,0 +1,287 @@
"""
微信客户端控制器
WeChat Windows Client Controller using UIAutomation
"""
import time
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple
from pathlib import Path
logger = logging.getLogger(__name__)
# 尝试导入 pywinauto
try:
from pywinauto import Application, WindowSpecification
from pywinauto.win32structures import RECT
from pywinauto.controls.hwndwrapper import HwndWrapper
HAS_PYWINAUTO = True
except ImportError:
HAS_PYWINAUTO = False
logger.warning("pywinauto 未安装,请运行: pip install pywinauto")
@dataclass
class ChatMessage:
"""聊天消息"""
sender: str
content: str
time: str
is_self: bool
@dataclass
class WeChatWindow:
"""微信窗口信息"""
hwnd: int
title: str
rect: RECT
isMinimized: bool
class WeChatController:
"""微信客户端控制器"""
# 微信窗口类名
WEIXIN_WINDOW_CLASS = "WeChatLoginWnd" # 登录窗口
WEIXIN_MAIN_WINDOW_CLASS = "WeChatMainWnd" # 主窗口
def __init__(self, window_title: str = "微信"):
if not HAS_PYWINAUTO:
raise RuntimeError("pywinauto 未安装,无法控制微信客户端")
self.window_title = window_title
self.app: Optional[Application] = None
self.main_window: Optional[WindowSpecification] = None
self._connected = False
def connect(self, timeout: float = 10) -> bool:
"""连接到微信窗口
Args:
timeout: 超时时间(秒)
Returns:
是否连接成功
"""
try:
# 尝试连接已运行的微信
self.app = Application(backend="win32").connect(
title=self.window_title,
timeout=timeout
)
self.main_window = self.app.window(title=self.window_title)
self._connected = True
logger.info("成功连接到微信窗口")
return True
except Exception as e:
logger.error(f"连接微信窗口失败: {e}")
return False
def find_wechat_window(self) -> Optional[WeChatWindow]:
"""查找微信窗口"""
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
@ctypes.WINFUNCTYPE(wintypes.BOOL, wintypes.HWND, wintypes.LPARAM)
def enum_callback(hwnd, lparam):
length = user32.GetWindowTextLengthW(hwnd)
if length > 0:
buff = ctypes.create_unicode_buffer(length + 1)
user32.GetWindowTextW(hwnd, buff, length + 1)
title = buff.value
if self.window_title in title:
rect = RECT()
user32.GetWindowRect(hwnd, ctypes.byref(rect))
is_min = user32.IsIconic(hwnd)
wechat_win = WeChatWindow(
hwnd=hwnd,
title=title,
rect=rect,
isMinimized=bool(is_min)
)
# 存储到列表
windows.append(wechat_win)
return True
windows = []
user32.EnumWindows(enum_callback, 0)
if windows:
logger.info(f"找到 {len(windows)} 个微信窗口")
return windows[0]
return None
def screenshot(self, output_path: str = None) -> str:
"""截图
Args:
output_path: 保存路径,为 None 则保存到临时文件
Returns:
截图路径
"""
if not self.main_window:
raise RuntimeError("未连接微信窗口")
if output_path is None:
import tempfile
output_path = Path(tempfile.gettempdir()) / f"wechat_screenshot_{int(time.time())}.png"
else:
output_path = Path(output_path)
try:
# 激活窗口(如果最小化)
if self.main_window.is_minimized():
self.main_window.restore()
# 截图
self.main_window.capture_as_image().save(str(output_path))
logger.debug(f"截图已保存: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"截图失败: {e}")
raise
def get_chat_list(self) -> List[str]:
"""获取聊天列表"""
# 聊天列表在左侧,需要根据具体窗口结构调整
# 这里需要根据实际 UI 结构调整
try:
# 查找子窗口
chat_list = self.main_window.window(
class_name="ChatListBox" # 假设的类名
)
items = chat_list.items()
return [item.text() for item in items]
except Exception as e:
logger.warning(f"获取聊天列表失败: {e}")
return []
def click_on_chat(self, chat_name: str) -> bool:
"""点击聊天"""
try:
# 查找聊天项
chat_list = self.main_window.window(class_name="ChatListBox")
chat_item = chat_list.window(title=chat_name)
chat_item.click()
logger.info(f"点击聊天: {chat_name}")
return True
except Exception as e:
logger.error(f"点击聊天失败: {e}")
return False
def get_message_list(self, count: int = 10) -> List[ChatMessage]:
"""获取消息列表
Args:
count: 最多获取的消息数
Returns:
消息列表
"""
messages = []
try:
# 查找消息列表窗口
msg_list = self.main_window.window(class_name="ChatMessageList")
# 获取消息项
items = msg_list.items()[-count:] # 取最后 N 条
for item in items:
text = item.text()
# 解析消息格式
# 格式: [发送者] 时间\n内容
msg = self._parse_message(text)
if msg:
messages.append(msg)
except Exception as e:
logger.warning(f"获取消息列表失败: {e}")
return messages
def _parse_message(self, text: str) -> Optional[ChatMessage]:
"""解析消息文本"""
import re
# 简单解析
# 格式: 发送者 时间\n内容
pattern = r"(.+?)\s+(\d{2}:\d{2})\n([\s\S]+)"
match = re.match(pattern, text)
if match:
sender = match.group(1).strip()
time_str = match.group(2).strip()
content = match.group(3).strip()
is_self = sender == ""
return ChatMessage(sender=sender, content=content, time=time_str, is_self=is_self)
return None
def send_text(self, text: str) -> bool:
"""发送文本消息
Args:
text: 要发送的文本
Returns:
是否发送成功
"""
try:
# 查找输入框
input_box = self.main_window.window(class_name="Edit")
# 清空并输入
input_box.set_edit_text("")
input_box.type_keys("^a") # 全选
input_box.type_keys("{DELETE}")
input_box.set_edit_text(text)
# 按回车发送
input_box.type_keys("{ENTER}")
logger.info(f"发送消息: {text[:20]}...")
return True
except Exception as e:
logger.error(f"发送消息失败: {e}")
return False
def find_button(self, name: str) -> Optional[HwndWrapper]:
"""查找按钮"""
try:
btn = self.main_window.window(title=name, class_name="Button")
return btn
except Exception:
return None
def click_button(self, name: str) -> bool:
"""点击按钮"""
try:
btn = self.find_button(name)
if btn:
btn.click()
return True
return False
except Exception as e:
logger.error(f"点击按钮失败: {e}")
return False
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected and self.app is not None
def disconnect(self):
"""断开连接"""
if self.app:
self.app = None
self.main_window = None
self._connected = False
logger.info("已断开微信连接")
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()