diff --git a/config.tasks.yaml b/config.tasks.yaml new file mode 100644 index 0000000..eb19f99 --- /dev/null +++ b/config.tasks.yaml @@ -0,0 +1,51 @@ +# ============================================ +# 自动化任务配置 +# ============================================ +# 支持定时任务,主动向指定联系人/群发送消息 +# +# 字段说明: +# name: 任务名称(唯一标识) +# target: 目标联系人/群名称(必须精确匹配聊天列表中的名称) +# schedule: Cron 表达式(分 时 日 月 周) +# message: 发送的消息内容,支持变量替换 +# enabled: 是否启用 +# once: 是否一次性任务(执行后自动禁用) +# +# 变量替换: +# {{date}} - 当前日期 (2024-01-15) +# {{time}} - 当前时间 (14:30) +# {{datetime}} - 当前日期时间 +# {{weekday}} - 星期几 +# {{weather}} - 天气(需要接入天气 API) +# ============================================ + +tasks: + # 示例:每天早安提醒 + # - name: "早安提醒" + # target: "尾巴~" + # schedule: "0 9 * * *" # 每天9:00 + # message: "早上好!今天{{date}},{{weekday}},天气晴朗~" + # enabled: false + # once: false + + # 示例:每周五提醒提交周报 + # - name: "周报提醒" + # target: "工作群" + # schedule: "0 18 * * 5" # 每周五18:00 + # message: "📋 提醒:今天记得提交周报~" + # enabled: false + + # 示例:每小时心跳检测 + # - name: "心跳检测" + # target: "文件传输助手" + # schedule: "0 */1 * * *" # 每小时整点 + # message: "[心跳检测] {{datetime}}" + # enabled: false + + # 示例:一次性任务 + # - name: "生日祝福" + # target: "张三" + # schedule: "0 9 15 6 *" # 6月15日9:00(仅执行一次) + # message: "生日快乐!🎂" + # enabled: false + # once: true diff --git a/requirements.txt b/requirements.txt index c189091..d28b25d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,9 @@ openai>=1.0.0 # 自动化控制(备选) pyautogui>=0.9.54 + +# 任务调度 +APScheduler>=3.10.0 pyyaml>=6.0 pillow>=9.0.0 diff --git a/src/core/__pycache__/engine.cpython-311.pyc b/src/core/__pycache__/engine.cpython-311.pyc index 62456ea..fd1fabc 100644 Binary files a/src/core/__pycache__/engine.cpython-311.pyc and b/src/core/__pycache__/engine.cpython-311.pyc differ diff --git a/src/core/engine.py b/src/core/engine.py index c9c0653..7ddb675 100644 --- a/src/core/engine.py +++ b/src/core/engine.py @@ -12,6 +12,7 @@ from enum import Enum from queue import Queue from src.core.contact_manager import ContactManager +from src.core.scheduler import TaskScheduler, TaskResult logger = logging.getLogger(__name__) @@ -178,6 +179,9 @@ class WeChatAgent: self.processor = MessageProcessor(vlm_client, llm_client, config) self.contact_manager = ContactManager(config.contacts) + # 任务调度器 + self.scheduler = TaskScheduler() + self._state = AgentState.IDLE self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() @@ -189,6 +193,7 @@ class WeChatAgent: "on_reply": [], # 发送回复 "on_error": [], # 发生错误 "on_state_change": [], # 状态变化 + "on_task_result": [], # 任务执行结果 } self._last_processed_time: Dict[str, float] = {} # 记录每个聊天的处理时间 @@ -208,9 +213,13 @@ class WeChatAgent: self._pause_event.clear() self._state = AgentState.RUNNING + # 启动消息轮询线程 self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() + # 启动任务调度器 + self._start_scheduler() + self._emit("on_state_change", self._state) logger.info("Agent 已启动") @@ -219,6 +228,10 @@ class WeChatAgent: self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) + + # 停止任务调度器 + self._stop_scheduler() + self._state = AgentState.IDLE self._emit("on_state_change", self._state) logger.info("Agent 已停止") @@ -237,6 +250,94 @@ class WeChatAgent: self._emit("on_state_change", self._state) logger.info("Agent 已恢复") + # ==================== 任务调度器 ==================== + + def _start_scheduler(self): + """启动任务调度器""" + try: + # 加载任务配置 + self.scheduler.load_tasks() + + # 注册任务结果回调 + self.scheduler.on_task_result(self._on_task_result) + + # 启动调度器 + self.scheduler.start() + + # 启动所有已启用的任务 + def send_callback(target: str, message: str) -> bool: + """发送消息的回调""" + # 切换到目标聊天 + if not self.wechat.switch_chat(target): + logger.error(f"切换聊天失败: {target}") + return False + time.sleep(0.5) # 等待切换完成 + # 发送消息 + return self.wechat.send_text(message) + + self.scheduler.start_tasks(send_callback) + logger.info("任务调度器已启动") + except Exception as e: + logger.error(f"启动任务调度器失败: {e}") + + def _stop_scheduler(self): + """停止任务调度器""" + try: + self.scheduler.stop() + logger.info("任务调度器已停止") + except Exception as e: + logger.error(f"停止任务调度器失败: {e}") + + def _on_task_result(self, result: TaskResult): + """任务执行结果回调""" + logger.info(f"任务完成: {result.task_name} -> {result.target}, 成功: {result.success}") + self._emit("on_task_result", { + "task_name": result.task_name, + "target": result.target, + "message": result.message, + "success": result.success, + "error": result.error, + }) + + def get_tasks(self) -> List[Dict]: + """获取所有任务""" + return self.scheduler.list_tasks() + + def enable_task(self, name: str) -> bool: + """启用任务""" + return self.scheduler.enable_task(name) + + def disable_task(self, name: str) -> bool: + """禁用任务""" + return self.scheduler.disable_task(name) + + def run_task_now(self, name: str) -> bool: + """立即执行任务""" + def send_callback(target: str, message: str) -> bool: + if not self.wechat.switch_chat(target): + return False + time.sleep(0.5) + return self.wechat.send_text(message) + + return self.scheduler.run_task_now(name, send_callback) + + def reload_tasks(self) -> int: + """重新加载任务""" + self.scheduler.stop() + self.scheduler.load_tasks() + + def send_callback(target: str, message: str) -> bool: + if not self.wechat.switch_chat(target): + return False + time.sleep(0.5) + return self.wechat.send_text(message) + + self.scheduler.start() + self.scheduler.start_tasks(send_callback) + return len(self.scheduler.tasks) + + # ==================== 消息处理 ==================== + def _run_loop(self): """主循环""" poll_interval = self.config.wechat.poll_interval diff --git a/src/core/scheduler.py b/src/core/scheduler.py new file mode 100644 index 0000000..fa5742a --- /dev/null +++ b/src/core/scheduler.py @@ -0,0 +1,266 @@ +""" +任务调度器 +支持 cron 表达式定时任务 +""" + +import logging +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional, Callable, Dict + +import yaml +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger + +logger = logging.getLogger(__name__) + + +@dataclass +class Task: + """任务定义""" + name: str + target: str # 目标联系人/群 + schedule: str # Cron 表达式 + message: str + enabled: bool = False + once: bool = False # 是否一次性任务 + + def __post_init__(self): + self._last_run: Optional[datetime] = None + + def should_run(self) -> bool: + """检查任务是否应该执行""" + if not self.enabled: + return False + + # 一次性任务检查 + if self.once and self._last_run is not None: + return False + + return True + + +@dataclass +class TaskResult: + """任务执行结果""" + task_name: str + target: str + success: bool + message: str + error: Optional[str] = None + + +class TaskScheduler: + """任务调度器""" + + def __init__(self, config_path: str = "config.tasks.yaml"): + self.config_path = config_path + self.tasks: Dict[str, Task] = {} + self._scheduler = BackgroundScheduler(timezone="Asia/Shanghai") + self._lock = threading.Lock() + self._callbacks: List[Callable] = [] + + # 变量替换函数 + self._var_funcs = { + "date": lambda: datetime.now().strftime("%Y-%m-%d"), + "time": lambda: datetime.now().strftime("%H:%M"), + "datetime": lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "weekday": lambda: ["周一", "周二", "周三", "周四", "周五", "周六", "周日"][datetime.now().weekday()], + } + + def load_tasks(self) -> int: + """加载任务配置""" + try: + with open(self.config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + task_list = config.get("tasks", []) + + with self._lock: + self.tasks.clear() + for task_data in task_list: + task = Task( + name=task_data.get("name", ""), + target=task_data.get("target", ""), + schedule=task_data.get("schedule", ""), + message=task_data.get("message", ""), + enabled=task_data.get("enabled", False), + once=task_data.get("once", False), + ) + if task.name: + self.tasks[task.name] = task + + logger.info(f"加载了 {len(self.tasks)} 个任务") + return len(self.tasks) + + except FileNotFoundError: + logger.warning(f"任务配置文件不存在: {self.config_path}") + return 0 + except Exception as e: + logger.error(f"加载任务配置失败: {e}") + return 0 + + def start(self): + """启动调度器""" + self._scheduler.start() + logger.info("任务调度器已启动") + + def stop(self): + """停止调度器""" + self._scheduler.shutdown(wait=False) + logger.info("任务调度器已停止") + + def start_tasks(self, send_callback: Callable[[str, str], bool]): + """启动所有已启用的任务 + + Args: + send_callback: 发送消息回调,签名: (target, message) -> bool + """ + with self._lock: + for task_name, task in self.tasks.items(): + if not task.enabled: + continue + + try: + self._add_job(task, send_callback) + logger.info(f"启动任务: {task_name} ({task.schedule}) -> {task.target}") + except Exception as e: + logger.error(f"启动任务失败 {task_name}: {e}") + + def _add_job(self, task: Task, send_callback: Callable): + """添加一个调度任务""" + def job(): + self._execute_task(task, send_callback) + + # 解析 cron 表达式 + parts = task.schedule.split() + if len(parts) == 5: + minute, hour, day, month, day_of_week = parts + else: + logger.error(f"Cron 表达式格式错误: {task.schedule}") + return + + trigger = CronTrigger( + minute=minute, + hour=hour, + day=day, + month=month, + day_of_week=day_of_week, + timezone="Asia/Shanghai" + ) + + self._scheduler.add_job( + func=job, + trigger=trigger, + name=task.name, + id=task.name, + replace_existing=True, + ) + + def _execute_task(self, task: Task, send_callback: Callable[[str, str], bool]): + """执行任务""" + logger.info(f"执行任务: {task.name} -> {task.target}") + + try: + # 变量替换 + message = self._replace_vars(task.message) + + # 发送消息 + success = send_callback(task.target, message) + + # 更新执行时间 + task._last_run = datetime.now() + + # 一次性任务执行后禁用 + if task.once: + task.enabled = False + logger.info(f"一次性任务执行完毕,已禁用: {task.name}") + + # 触发回调 + result = TaskResult( + task_name=task.name, + target=task.target, + success=success, + message=message, + ) + self._emit_result(result) + + except Exception as e: + logger.error(f"执行任务失败 {task.name}: {e}") + result = TaskResult( + task_name=task.name, + target=task.target, + success=False, + message=task.message, + error=str(e), + ) + self._emit_result(result) + + def _replace_vars(self, text: str) -> str: + """替换变量""" + for var_name, var_func in self._var_funcs.items(): + placeholder = f"{{{{{var_name}}}}}" + if placeholder in text: + text = text.replace(placeholder, var_func()) + return text + + def _emit_result(self, result: TaskResult): + """触发结果回调""" + for callback in self._callbacks: + try: + callback(result) + except Exception as e: + logger.error(f"任务结果回调失败: {e}") + + def on_task_result(self, callback: Callable[[TaskResult], None]): + """注册任务结果回调""" + self._callbacks.append(callback) + + def get_task(self, name: str) -> Optional[Task]: + """获取任务""" + return self.tasks.get(name) + + def enable_task(self, name: str) -> bool: + """启用任务""" + task = self.tasks.get(name) + if task: + task.enabled = True + logger.info(f"任务已启用: {name}") + return True + return False + + def disable_task(self, name: str) -> bool: + """禁用任务""" + task = self.tasks.get(name) + if task: + task.enabled = False + # 移除调度任务 + self._scheduler.remove_job(name) + logger.info(f"任务已禁用: {name}") + return True + return False + + def run_task_now(self, name: str, send_callback: Callable[[str, str], bool]) -> bool: + """立即执行任务""" + task = self.tasks.get(name) + if task: + self._execute_task(task, send_callback) + return True + return False + + def list_tasks(self) -> List[Dict]: + """列出所有任务""" + return [ + { + "name": t.name, + "target": t.target, + "schedule": t.schedule, + "message": t.message, + "enabled": t.enabled, + "once": t.once, + "last_run": t._last_run.strftime("%Y-%m-%d %H:%M:%S") if t._last_run else None, + } + for t in self.tasks.values() + ] diff --git a/src/wechat/__pycache__/controller.cpython-311.pyc b/src/wechat/__pycache__/controller.cpython-311.pyc index 7aab630..d3da0eb 100644 Binary files a/src/wechat/__pycache__/controller.cpython-311.pyc and b/src/wechat/__pycache__/controller.cpython-311.pyc differ diff --git a/src/wechat/controller.py b/src/wechat/controller.py index 6625d4a..08140d6 100644 --- a/src/wechat/controller.py +++ b/src/wechat/controller.py @@ -295,6 +295,64 @@ class WeChatController: logger.error(f"点击按钮失败: {e}") return False + def switch_chat(self, chat_name: str) -> bool: + """切换到指定聊天 + + Args: + chat_name: 聊天名称(联系人/群名称) + + Returns: + 是否切换成功 + """ + try: + # 确保窗口有焦点 + self.main_window.set_focus() + time.sleep(0.1) + + # 方式1: 在搜索框搜索 + search_box = self.main_window.window(class_name="Edit", title_re=".*搜索.*") + if search_box: + search_box.set_edit_text("") + search_box.set_edit_text(chat_name) + time.sleep(0.3) + + # 尝试点击第一个搜索结果 + try: + # 查找联系人列表中的项 + contact_list = self.main_window.window(class_name="ContactList") + items = contact_list.items() + for item in items[:5]: # 只看前5个 + if chat_name in item.text(): + item.click() + time.sleep(0.3) + logger.info(f"切换聊天成功: {chat_name}") + return True + except Exception: + pass + + # 清空搜索框 + search_box.set_edit_text("") + + # 方式2: 直接在聊天列表查找(如果滚动可见) + try: + chat_list = self.main_window.window(class_name="ChatList") + items = chat_list.items() + for item in items: + if chat_name in item.text(): + item.click() + time.sleep(0.3) + logger.info(f"切换聊天成功: {chat_name}") + return True + except Exception: + pass + + logger.warning(f"未找到聊天: {chat_name}") + return False + + except Exception as e: + logger.error(f"切换聊天失败: {e}") + return False + def is_connected(self) -> bool: """检查是否已连接""" return self._connected and self.app is not None