参考来源:苍何《万字保姆级教程:Hermes+Kimi K2.6 打造7x24h Agent军团》(CSDN,2026-04-21)
这篇文章让我们看到了AI Agent从”玩具”走向”工具”的可能性。苍何的”总管-总监”架构设计直观易懂,飞书消息网关的集成也极具创新性。
但我们发现,在生产环境中仍有进一步演进的空间:
- kimi-k2.6
- kimi-for-coding
- kimi-k2-thinking
- kimi-k2-turbo-preview
fallback_providers:
- provider: xfyun-coding; model: astron-code-latest
- provider: kimi-coding; model: kimi-k2.6
三、智能Provider选择策略
这是Session Daemon v3.1的核心逻辑,根据任务类型自动选择最优Provider:
Args: task: 任务对象,包含type、current_step、context等字段 Returns:
tuple: (provider_name, model_name, context_window_pct)
”“” task_type = task.get(‘type’, ‘create’) current_step = task.get(‘current_step’) or 0 context = json.loads(task.get(‘context’) or ‘{}’) is_complex = context.get(‘complex’, False)
需求分析和架构设计阶段:使用Kimi(复杂推理能力强)
if current_step == 1 and task_type == ‘create’:
return ('kimi-coding', 'kimi-k2.6', 30) # 需求分析
elif current_step == 2 and task_type == ‘create’:
return ('kimi-coding', 'kimi-k2.6', 40) # 架构设计
elif is_complex or task_type == ‘analyze’:
return ('kimi-coding', 'kimi-k2.6', 50) # 复杂分析
代码实现、测试、部署阶段:使用MiniMax(高性价比)
elif task_type in (‘create’, ‘refactor’) and current_step >= 3:
return ('minimax-cn', 'MiniMax-M2.7-highspeed', 85) # 代码实现
elif task_type == ‘test’:
return ('minimax-cn', 'MiniMax-M2.7-highspeed', 60) # 测试
elif task_type == ‘deploy’:
return ('minimax-cn', 'MiniMax-M2.7-highspeed', 50) # 部署
默认使用MiniMax
return (‘minimax-cn’, ‘MiniMax-M2.7-highspeed’, 85)
def __init__(self, config_path: str = "~/.hermes/config.yaml"): self.config_path = Path(config_path).expanduser() self.state_file = Path("~/.hermes/daemon_state.json").expanduser() self.max_sessions = 5 self.poll_interval = 60 # 秒 self.health_check_interval = 45 * 60 # 45分钟超时 self.running = True # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """优雅退出""" print(f" [Daemon] 收到信号 {signum},正在优雅退出...") self.running = False def _check_existing_lock(self) -> bool: """检查是否已有实例运行""" lock_file = Path("/tmp/hermes_daemon.lock") if lock_file.exists(): pid = int(lock_file.read_text().strip()) try: os.kill(pid, 0) # 检查进程是否存在 print(f"[Error] Daemon已在运行 (PID: {pid})") return True except ProcessLookupError: lock_file.unlink() # 删除孤儿锁文件 return False def _acquire_lock(self) -> bool: """获取单实例锁""" lock_file = Path("/tmp/hermes_daemon.lock") try: lock_file.write_text(str(os.getpid())) return True except Exception as e: print(f"[Error] 无法获取锁: {e}") return False def _get_active_tmux_sessions(self) -> List[str]: """获取当前tmux会话列表""" try: result = subprocess.run( ["tmux", "list-windows", "-t", "hermes", "-F", "#{window_name}"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return [line.strip() for line in result.stdout.strip().split(' ') if line.strip()] return [] except Exception as e: print(f"[Error] 获取tmux会话失败: {e}") return [] def _check_session_health(self, session_name: str, last_update: datetime) -> bool: """ 检查Session健康状态 Args: session_name: tmux窗口名称 last_update: 上次更新时间 Returns: bool: True表示健康,False表示需要重启 """ # 计算未更新时间 elapsed = datetime.now() - last_update if elapsed.total_seconds() > self.health_check_interval: print(f"[Health] Session '{session_name}' 已超时 ({elapsed}),标记为不健康") return False return True def _restart_session(self, session_name: str) -> bool: """ 重启不健康的Session Args: session_name: tmux窗口名称 Returns: bool: True表示重启成功 """ try: # 杀死旧窗口 subprocess.run(["tmux", "kill-window", "-t", f"hermes:{session_name}"], check=False) time.sleep(2) # 重新创建会话(需要根据任务上下文恢复) # 实际实现中需要从SQLite读取任务上下文 new_session = self._create_hermes_session(session_name, {}) return new_session is not None except Exception as e: print(f"[Error] 重启Session '{session_name}' 失败: {e}") return False def _create_hermes_session(self, task_id: str, task_context: Dict) -> Optional[str]: """ 创建新的Hermes Session Args: task_id: 任务ID task_context: 任务上下文 Returns: str: tmux窗口名称,失败返回None """ try: session_name = f"task_{task_id[:8]}" # 启动Hermes Session cmd = [ "tmux", "new-window", "-t", "hermes", "-n", session_name, "hermes", "chat", "--task-id", task_id ] result = subprocess.run(cmd, capture_output=True, timeout=30) if result.returncode == 0: print(f"[Session] 创建成功: {session_name}") return session_name else: print(f"[Error] 创建Session失败: {result.stderr}") return None except Exception as e: print(f"[Error] 创建Session异常: {e}") return None def _cleanup_orphan_windows(self): """清理孤儿tmux窗口""" try: # 获取所有hermes会话 result = subprocess.run( ["tmux", "list-windows", "-t", "hermes", "-F", "#{window_name} #{window_active}"], capture_output=True, text=True ) if result.returncode != 0: return active_sessions = set() for line in result.stdout.strip().split(' '): if not line: continue parts = line.split() if len(parts) >= 2: window_name = parts[0] is_active = parts[1] == '1' # 清理非活跃且非task开头的窗口 if not is_active and window_name.startswith('task_'): # 检查对应的任务是否还在运行 task_id = window_name.replace('task_', '') # 如果任务已完成或失败,清理窗口 # 实际实现需要查询SQLite pass except Exception as e: print(f"[Error] 清理孤儿窗口失败: {e}") def _save_state(self): """保存Daemon状态到文件""" state = self.state_file.parent.mkdir(parents=True, exist_ok=True) self.state_file.write_text(json.dumps(state, indent=2)) def run(self): """主循环""" # 单实例检查 if self._check_existing_lock(): sys.exit(1) if not self._acquire_lock(): sys.exit(1) print(f"[Daemon] Session Daemon v3.1 启动 (PID: )") print(f"[Daemon] 轮询间隔: {self.poll_interval}秒") print(f"[Daemon] 最大并发Session: {self.max_sessions}") while self.running: try: # 1. 健康检查 active = self._get_active_tmux_sessions() print(f"[Poll] 当前活跃Session: {len(active)}") # 2. 清理孤儿窗口 self._cleanup_orphan_windows() # 3. 保存状态 self._save_state() # 4. 休眠 for _ in range(self.poll_interval): if not self.running: break time.sleep(1) except KeyboardInterrupt: print(" [Daemon] 收到键盘中断") break except Exception as e: print(f"[Error] 主循环异常: {e}") time.sleep(5) print("[Daemon] Daemon已停止")
if name == "main":
main()
五、Task CLI与SQLite持久化
Task CLI提供完整的任务生命周期管理:
def __init__(self, db_path: str = "~/.hermes/tasks.db"): self.db_path = Path(db_path).expanduser() self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _init_db(self): """初始化数据库""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, name TEXT NOT NULL, type TEXT NOT NULL, status TEXT DEFAULT 'pending', current_step INTEGER DEFAULT 0, total_steps INTEGER DEFAULT 0, context TEXT DEFAULT '{}', state TEXT DEFAULT '{}', error_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, priority INTEGER DEFAULT 5, owner TEXT DEFAULT 'system', session_id TEXT, checkpoint_path TEXT, compacted_context TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS task_steps ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, step_order INTEGER NOT NULL, step_name TEXT NOT NULL, step_type TEXT NOT NULL, status TEXT DEFAULT 'pending', result TEXT, error TEXT, started_at TIMESTAMP, completed_at TIMESTAMP, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS task_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, event_type TEXT NOT NULL, event_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (task_id) REFERENCES tasks(id) ) """) conn.commit() def create(self, name: str, task_type: str, total_steps: int = 5, context: Optional[Dict] = None) -> str: """ 创建新任务 Args: name: 任务名称 task_type: 任务类型 (create/refactor/test/deploy/analyze) total_steps: 总步骤数 context: 任务上下文 Returns: str: 任务ID """ import uuid task_id = str(uuid.uuid4())[:12] with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO tasks (id, name, type, total_steps, context) VALUES (?, ?, ?, ?, ?) """, (task_id, name, task_type, total_steps, json.dumps(context or {}))) # 初始化步骤 for i in range(1, total_steps + 1): conn.execute(""" INSERT INTO task_steps (task_id, step_order, step_name, step_type) VALUES (?, ?, ?, ?) """, (task_id, i, f"Step {i}", task_type)) # 记录事件 conn.execute(""" INSERT INTO task_events (task_id, event_type, event_data) VALUES (?, 'created', ?) """, (task_id, json.dumps({"name": name, "type": task_type}))) conn.commit() print(f"[Task] 创建成功: {task_id} ({name})") return task_id def get(self, task_id: str) -> Optional[Task]: """获取任务详情""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row row = conn.execute( "SELECT * FROM tasks WHERE id = ?", (task_id,) ).fetchone() if row: return Task( id=row['id'], name=row['name'], type=row['type'], status=row['status'], current_step=row['current_step'], total_steps=row['total_steps'], context=json.loads(row['context']), error_count=row['error_count'], created_at=row['created_at'] ) return None def list(self, status: Optional[str] = None, limit: int = 50) -> List[Task]: """列出任务""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row if status: rows = conn.execute( "SELECT * FROM tasks WHERE status = ? ORDER BY created_at DESC LIMIT ?", (status, limit) ).fetchall() else: rows = conn.execute( "SELECT * FROM tasks ORDER BY created_at DESC LIMIT ?", (limit,) ).fetchall() return [ Task( id=row['id'], name=row['name'], type=row['type'], status=row['status'], current_step=row['current_step'], total_steps=row['total_steps'], context=json.loads(row['context']), error_count=row['error_count'], created_at=row['created_at'] ) for row in rows ] def update_status(self, task_id: str, status: str) -> bool: """更新任务状态""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" UPDATE tasks SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (status, task_id)) if cursor.rowcount > 0: conn.execute(""" INSERT INTO task_events (task_id, event_type, event_data) VALUES (?, 'status_changed', ?) """, (task_id, json.dumps({"status": status}))) conn.commit() return True return False def advance_step(self, task_id: str) -> bool: """推进到下一步""" with sqlite3.connect(self.db_path) as conn: # 获取当前任务 row = conn.execute( "SELECT current_step, total_steps FROM tasks WHERE id = ?", (task_id,) ).fetchone() if not row: return False current = row[0] total = row[1] if current >= total: # 任务完成 conn.execute(""" UPDATE tasks SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (task_id,)) else: # 推进步骤 new_step = current + 1 conn.execute(""" UPDATE tasks SET current_step = ?, status = 'running', updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (new_step, task_id)) # 更新步骤状态 conn.execute(""" UPDATE task_steps SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE task_id = ? AND step_order = ? """, (task_id, current)) conn.execute(""" UPDATE task_steps SET status = 'running', started_at = CURRENT_TIMESTAMP WHERE task_id = ? AND step_order = ? """, (task_id, new_step)) conn.execute(""" INSERT INTO task_events (task_id, event_type, event_data) VALUES (?, 'step_advanced', ?) """, (task_id, json.dumps())) conn.commit() return True def handle_failure(self, task_id: str, error: str) -> bool: """处理任务失败""" with sqlite3.connect(self.db_path) as conn: row = conn.execute( "SELECT error_count, max_retries FROM tasks WHERE id = ?", (task_id,) ).fetchone() if not row: return False error_count = row[0] + 1 max_retries = row[1] if error_count >= max_retries: # 超过最大重试次数,标记为失败 conn.execute(""" UPDATE tasks SET status = 'failed', error_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (error_count, task_id)) status = 'failed' else: # 可以重试,标记为pending conn.execute(""" UPDATE tasks SET status = 'pending', error_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (error_count, task_id)) status = 'pending_retry' conn.execute(""" INSERT INTO task_events (task_id, event_type, event_data) VALUES (?, 'failed', ?) """, (task_id, json.dumps({"error": error, "error_count": error_count, "status": status}))) conn.commit() return True
# create命令 create_parser = subparsers.add_parser("create") create_parser.add_argument("--name", required=True) create_parser.add_argument("--type", default="create") create_parser.add_argument("--steps", type=int, default=5) # list命令 list_parser = subparsers.add_parser("list") list_parser.add_argument("--status") list_parser.add_argument("--limit", type=int, default=50) # get命令 get_parser = subparsers.add_parser("get") get_parser.add_argument("task_id") args = parser.parse_args() cli = TaskCLI() if args.command == "create": task_id = cli.create(args.name, args.type, args.steps) print(f"任务ID: {task_id}") elif args.command == "list": tasks = cli.list(args.status, args.limit) for t in tasks: print(f"[{t.id}] {t.name} ({t.type}) - {t.status} - Step {t.current_step}/{t.total_steps}") elif args.command == "get": task = cli.get(args.task_id) if task: print(f"任务: {task.name}") print(f"类型: {task.type}") print(f"状态: {task.status}") print(f"进度: Step {task.current_step}/{task.total_steps}") print(f"错误次数: {task.error_count}") print(f"创建时间: {task.created_at}") else: print("任务不存在") else: parser.print_help()
if name == "main":
main()
我的思考
技术选型权衡
在搭建全自治引擎的过程中,我们做了几个关键的技术选型决策:
- 为什么选择SQLite而不是PostgreSQL?
最初我们考虑使用PostgreSQL来支持多进程并发访问。但实际测试发现:
- 为什么选择tmux而不是screen或supervisor?
- 为什么选择MiniMax作为主力模型?
Kimi的API费用较高(尤其是k2.6),而MiniMax的性价比突出。经过对比测试:
如果重来:我会一开始就在架构层面支持多Provider,而不是后期改造。多Provider的支持需要在任务调度层面就做设计,后期改造的成本很高。
set -e
echo ""
echo " Hermes 全自治引擎 v3.1 启动脚本"
echo "
"
tmux new-session -d -s hermes 2>/dev/null || true
python3 ~/.hermes/scripts/task_cli.py create --name "System Init" --type "system" --steps 1 2>/dev/null || true
sleep 3
if ps -p \(DAEMON_PID > /dev/null; then
echo "[成功] Daemon已启动 (PID: \)
DAEMON_PID)”
echo “[成功] Session Daemon: $DAEMON_PID”
echo “[成功] 日志文件: ~/.hermes/logs/daemon.log”
echo “”
echo “使用以下命令查看状态:”
echo “ tmux attach -t hermes”
echo “ tail -f ~/.hermes/logs/daemon.log”
else
echo “[失败] Daemon启动失败,请检查日志”
cat ~/.hermes/logs/daemon.log
exit 1
fi
运行结果
启动后,我们进行了72小时连续运行测试:
目前我们的方案是每个任务一个独立Session,优点是隔离性好,缺点是无法共享中间结果。如果任务之间有依赖关系(如Step 2的输出是Step 3的输入),需要通过文件系统传递数据,增加了复杂度。
各位园友,你们在类似场景下是如何解决这个问题的?
问题2:对于长时间运行的任务(如超过1小时的编译/测试),如何设计更细粒度的超时控制?
目前我们的健康检查是45分钟固定超时,但不同任务的合理超时时间差异很大。一个大型编译可能需要2小时,而一个简单的代码审查只需要5分钟。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/279250.html