第九篇:高级特性
概述
Hermes Agent 的基础能力(对话、工具调用、记忆管理)之上,还构建了一系列高级特性,使其从简单的对话助手进化为强大的自动化工作平台。子 Agent 委派实现了任务并行和上下文隔离,定时任务让 Agent 可以在无人值守时自主工作,浏览器自动化扩展了 Agent 的信息获取能力,智能模型路由在成本和质量之间找到平衡点。
这些特性并非相互独立——子 Agent 可以为 Cron 任务工作,浏览器自动化可以由子 Agent 调用,智能模型路由影响每个 Agent 实例(包括子 Agent 和 Cron Agent)的模型选择。理解这些特性的实现细节和协作方式,是掌握 Hermes Agent 高级用法的关键。
本篇将深入分析这些高级特性的源码实现,揭示其设计决策和技术细节。
1. 子 Agent 委派
1.1 架构概述
flowchart TB
subgraph 父Agent
A[接收用户任务] --> B[拆解为子任务]
B --> C[delegate_task]
end
subgraph ThreadPoolExecutor
C --> D1[子Agent 1: 前端开发]
C --> D2[子Agent 2: 后端开发]
C --> D3[子Agent 3: 测试编写]
end
D1 --> E[汇总结果]
D2 --> E
D3 --> E
E --> F[父Agent 继续处理]
style 父Agent fill:#e1f5fe
style ThreadPoolExecutor fill:#fff3e0
sequenceDiagram
participant User as 用户
participant Parent as 父 Agent
participant Child1 as 子 Agent 1
participant Child2 as 子 Agent 2
User->>Parent: 帮我开发一个番茄时钟
Parent->>Parent: 拆解任务
Parent->>Child1: 目标: 编写前端 UI
Parent->>Child2: 目标: 编写后端 API
par 并行执行
Child1->>Child1: 使用 terminal/file 工具
Child1-->>Parent: 前端代码完成
and
Child2->>Child2: 使用 terminal/file 工具
Child2-->>Parent: 后端代码完成
end
Parent->>Parent: 合并结果
Parent->>User: 番茄时钟开发完成
子 Agent 委派是 Hermes Agent 最强大的高级特性之一。它允许主 Agent 创建独立的子 Agent 实例来执行特定任务,每个子 Agent 拥有完全隔离的上下文、独立的终端会话和受限的工具集。父 Agent 的上下文中只看到委派调用的摘要结果,永远不会被子 Agent 的中间工具调用淹没。
核心源码位于 tools/delegate_tool.py,关键设计参数:
DELEGATE_BLOCKED_TOOLS = frozenset([
"delegate_task", # 禁止递归委派——子 Agent 不能创建孙 Agent
"clarify", # 禁止与用户交互——子 Agent 必须自主完成
"memory", # 禁止写入共享记忆——防止子 Agent 污染主 Agent 的记忆
"send_message", # 禁止跨平台发送消息——子 Agent 的输出只返回给父 Agent
"execute_code", # 禁止代码执行——子 Agent 应逐步推理,而非写脚本
])
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3 # 最大并行子 Agent 数
MAX_DEPTH = 2 # 委派深度限制(parent=0, child=1, grandchild rejected=2)
DEFAULT_MAX_ITERATIONS = 50 # 子 Agent 最大迭代次数
DEFAULT_TOOLSETS = ["terminal", "file", "web"] # 默认工具集
每个被阻止的工具都有明确的理由:
delegate_task:递归委派会导致上下文爆炸和资源耗尽clarify:子 Agent 在后台线程运行,无法与用户实时交互memory:记忆是会话级的共享资源,子 Agent 的临时观察不应持久化send_message:子 Agent 的结果应通过委派返回值传递,而非直接发送到聊天平台execute_code:鼓励子 Agent 使用 atomic 工具调用,而非编写可能出错的脚本
1.2 隔离上下文设计
每个子 Agent 获得全新的对话上下文,这是委派最核心的设计原则:
child = AIAgent(
ephemeral_system_prompt=child_prompt, # 定制系统提示词(非全局 SOUL.md)
skip_context_files=True, # 不注入 SOUL.md/AGENTS.md
skip_memory=True, # 不加载记忆
clarify_callback=None, # 禁止用户交互
quiet_mode=True, # 静默执行
log_prefix=f"[subagent-{task_index}]", # 独立日志前缀
session_db=parent._session_db, # 共享会话存储(只读)
parent_session_id=parent.session_id, # 关联父会话(用于追踪)
iteration_budget=None, # 新的迭代预算(不受父 Agent 限制)
)
ephemeral_system_prompt 由 _build_child_system_prompt() 构建,包含任务目标、上下文和工作空间路径:
def _build_child_system_prompt(goal, context=None, workspace_path=None):
parts = [
"You are a focused subagent working on a specific delegated task.",
f"YOUR TASK:\n{goal}",
]
if context:
parts.append(f"\nCONTEXT:\n{context}")
if workspace_path:
parts.append(f"\nWORKSPACE PATH:\n{workspace_path}")
parts.append(
"\nComplete this task using the tools available to you. "
"When finished, provide a clear, concise summary of:\n"
"- What you did\n- What you found or accomplished\n"
"- Any files you created or modified\n- Any issues encountered\n\n"
"Important workspace rule: Never assume a repository lives at "
"/workspace/... or any other container-style path unless the task "
"explicitly gives that path."
)
return "\n".join(parts)
工作空间路径的注入特别重要——子 Agent 不知道它在哪个目录工作,必须通过 workspace_path 显式告知。这避免了子 Agent 猜测路径(如 /workspace/ 这种容器常见路径),确保操作在正确的位置执行。
1.3 工具集控制
子 Agent 的工具集从父 Agent 继承,但经过严格过滤:
# 计算子 Agent 可用的工具集
if toolsets:
# 用户指定了工具集 → 与父 Agent 取交集(子 Agent 不能获得父 Agent 没有的工具)
child_toolsets = _strip_blocked_tools([t for t in toolsets if t in parent_toolsets])
else:
# 继承父 Agent 的工具集
child_toolsets = _strip_blocked_tools(parent_enabled)
_strip_blocked_tools() 移除包含完全被阻止工具的工具集:
def _strip_blocked_tools(toolsets):
blocked_toolset_names = {"delegation", "clarify", "memory", "code_execution"}
return [t for t in toolsets if t not in blocked_toolset_names]
工具集的交集操作确保子 Agent 不能获得比父 Agent 更多的权限——即使用户在 tasks 参数中指定了父 Agent 没有的工具集,也会被过滤掉。
1.4 委派深度限制
深度限制防止无限递归委派:
MAX_DEPTH = 2 # parent (depth=0) -> child (depth=1) -> grandchild rejected (depth=2)
实现方式:
# 在子 Agent 构建时设置深度
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
# 在 delegate_task() 入口检查
depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH:
return json.dumps({"error": "Delegation depth limit reached (2). "
"Subagents cannot spawn further subagents."})
这意味着:主 Agent(depth=0)可以委派给子 Agent(depth=1),子 Agent 不能再委派(depth=2 >= MAX_DEPTH=2)。深度限制选择为 2 而非更大的值,是因为每增加一层委派,错误传播和资源消耗的复杂度指数增长。
1.5 并行执行
批量模式下,多个子 Agent 使用 ThreadPoolExecutor 并行执行:
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
_run_single_child,
task_index=i, goal=t["goal"],
child=child, parent_agent=parent_agent,
)
futures[future] = i
for future in as_completed(futures):
entry = future.result()
results.append(entry)
# 显示完成进度
completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)"
并行限制通过配置控制:
# config.yaml
delegation:
max_concurrent_children: 3 # 默认 3,可通过环境变量覆盖
max_iterations: 50 # 子 Agent 最大迭代次数
model: "" # 子 Agent 使用的模型(可选)
provider: "" # 子 Agent 使用的 provider(可选)
base_url: "" # 自定义 API 端点(可选)
reasoning_effort: "" # 推理强度(如 "high"、"medium")
1.6 凭证池轮转
子 Agent 可以共享父 Agent 的凭证池(credential pool),实现 rate limit 时的密钥轮转:
def _resolve_child_credential_pool(effective_provider, parent_agent):
# 同一 provider → 共享父 Agent 的池(同步 cooldown 状态和轮转信息)
if effective_provider == parent_provider:
return parent_pool
# 不同 provider → 加载该 provider 自己的池
pool = load_pool(effective_provider)
return pool
子 Agent 在线程中执行前,从池中租用凭证:
leased_cred_id = child_pool.acquire_lease()
if leased_cred_id:
leased_entry = child_pool.current()
child._swap_credential(leased_entry)
执行完毕后释放租约(在 finally 块中):
finally:
if child_pool and leased_cred_id:
child_pool.release_lease(leased_cred_id)
租约机制确保凭证在使用期间不会被其他 Agent 实例抢占,同时在完成后及时归还给池中。
1.7 心跳机制
长时间运行的子 Agent 需要定期向父 Agent 报告活跃状态,防止 Gateway 的不活跃超时误判。如果没有心跳,父 Agent 在 delegate_task 阻塞期间没有任何活动,Gateway 可能在 10 分钟后将其标记为不活跃并终止。
_HEARTBEAT_INTERVAL = 30 # 秒
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
child_summary = child.get_activity_summary()
child_tool = child_summary.get("current_tool")
if child_tool:
desc = f"delegate_task: subagent running {child_tool}"
else:
desc = f"delegate_task: subagent {child_summary.get('last_activity_desc', 'working')}"
parent_agent._touch_activity(desc) # 更新父 Agent 的活跃时间戳
心跳线程在子 Agent 完成或失败后被清理:
finally:
_heartbeat_stop.set()
_heartbeat_thread.join(timeout=5)
1.8 进度回调
子 Agent 的工具调用可以实时显示在父 Agent 的界面中,提供执行透明度:
def _build_child_progress_callback(task_index, parent_agent, task_count=1):
spinner = getattr(parent_agent, '_delegate_spinner', None)
parent_cb = getattr(parent_agent, 'tool_progress_callback', None)
def _callback(event_type, tool_name=None, preview=None, **kwargs):
# 思考事件
if event_type in ("_thinking", "reasoning.available"):
if spinner:
short = (text[:55] + "...") if len(text) > 55 else text
spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
return
# 工具调用事件
if spinner:
emoji = get_tool_emoji(tool_name or "")
line = f" {prefix}├─ {emoji} {tool_name}"
if short:
line += f" \"{short}\""
spinner.print_above(line)
# Gateway 批量上报(每 5 个工具调用)
if parent_cb:
_batch.append(tool_name)
if len(_batch) >= _BATCH_SIZE:
parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
_batch.clear()
CLI 中显示为树状结构:
├─ 🔧 terminal "ls -la /app/src"
├─ 📄 read_file "main.py"
├─ 💭 "Analyzing the code structure for potential issues..."
├─ ✏️ write_file "main.py"
✓ [1/3] Fix authentication bug (12.3s)
Gateway 中通过 tool_progress_callback 批量上报,避免每条进度都触发平台消息(太频繁会导致速率限制)。
1.9 结果格式
子 Agent 的执行结果被结构化为 JSON,包含完整的执行统计:
entry = {
"task_index": 0,
"status": "completed", # completed | failed | interrupted | error
"summary": "Fixed the authentication bug...", # 子 Agent 的最终回复
"api_calls": 42, # LLM API 调用次数
"duration_seconds": 15.3, # 执行时长
"model": "gpt-4o", # 使用的模型
"exit_reason": "completed", # completed | max_iterations | interrupted
"tokens": {"input": 12345, "output": 6789}, # Token 消耗
"tool_trace": [ # 工具调用追踪(用于调试)
{"tool": "terminal", "args_bytes": 120, "result_bytes": 4500, "status": "ok"},
{"tool": "read_file", "args_bytes": 80, "result_bytes": 8200, "status": "ok"},
],
}
工具追踪使用 tool_call_id 正确配对并行工具调用与结果:
for msg in messages:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
entry = {"tool": fn.get("name"), "args_bytes": len(fn.get("arguments", ""))}
tool_trace.append(entry)
trace_by_id[tc.get("id")] = entry
elif msg.get("role") == "tool":
tc_id = msg.get("tool_call_id")
target = trace_by_id.get(tc_id)
if target:
target.update({"result_bytes": len(content), "status": "ok" or "error"})
1.10 全局工具名恢复
子 Agent 构建时会修改全局变量 model_tools._last_resolved_tool_names(因为 AIAgent 构造函数会调用 get_tool_definitions())。为了不影响父 Agent 后续的工具调用,必须在子 Agent 构建前后保存和恢复这个全局变量:
# 保存父 Agent 的工具名
_parent_tool_names = list(_model_tools._last_resolved_tool_names)
try:
for i, t in enumerate(task_list):
child = _build_child_agent(...) # 这会修改全局变量
child._delegate_saved_tool_names = _parent_tool_names
children.append((i, t, child))
finally:
# 无论构建是否成功,恢复父 Agent 的工具名
_model_tools._last_resolved_tool_names = _parent_tool_names
在 _run_single_child() 的 finally 块中也会恢复:
finally:
saved_tool_names = getattr(child, "_delegate_saved_tool_names", None)
if isinstance(saved_tool_names, list):
model_tools._last_resolved_tool_names = list(saved_tool_names)
1.11 委派凭证路由
子 Agent 可以使用与父 Agent 不同的 provider 和模型,通过 delegation 配置项控制:
def _resolve_delegation_credentials(cfg, parent_agent):
# 优先级:base_url > provider > 继承父 Agent
if configured_base_url:
return {"model": model, "provider": "custom", "base_url": base_url, ...}
if configured_provider:
runtime = resolve_runtime_provider(requested=provider)
return {"model": model, "provider": runtime["provider"], ...}
# 无覆盖 → 子 Agent 继承父 Agent 的所有凭证
return {"model": model, "provider": None, "base_url": None, ...}
这使得用户可以在主 Agent 使用高质量模型(如 Claude Opus)的同时,将子 Agent 路由到廉价模型(如 DeepSeek),在成本和质量之间取得平衡。
2. 定时任务
2.1 Cron 调度器
cron/scheduler.py 实现了基于 tick 的调度器。Gateway 每 60 秒调用一次 tick(),检查是否有到期的任务需要执行:
def tick(verbose=True, adapters=None, loop=None) -> int:
# 1. 获取文件锁(防止并发 tick)
# 2. 查询到期任务:get_due_jobs()
# 3. 逐个执行
# 4. 投递结果到目标平台
# 5. 更新任务状态
return executed # 返回执行的任务数
2.2 文件锁防并发
多进程场景(Gateway + systemd timer + 手动触发)需要互斥执行,避免同一任务被多次执行:
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
lock_fd = open(_LOCK_FILE, "w")
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # 非阻塞排他锁
# OSError → 另一个进程正在执行 tick,直接返回 0
跨平台支持 Windows(msvcrt)和 Unix(fcntl)。非阻塞模式(LOCK_NB)确保不会等待——如果锁已被持有,立即返回。
2.3 Cron 表达式
定时任务使用标准 Cron 表达式,通过 hermes_time 库解析。每个任务存储以下字段:
{
"id": "job_abc123",
"name": "每日代码审查",
"schedule": "0 9 * * 1-5", // 工作日早 9 点
"schedule_display": "Every weekday at 09:00",
"prompt": "检查 /app 项目的最新 commit...",
"skills": ["code-review"], // 可选:加载技能
"script": "collect_metrics.py", // 可选:预运行数据收集脚本
"deliver": "origin", // 投递目标
"model": "", // 可选:指定模型
"max_iterations": 50, // 可选:最大迭代次数
"origin": { // 创建来源(用于投递)
"platform": "telegram",
"chat_id": 123456,
"chat_name": "Dev Team"
}
}
2.4 预运行脚本
任务可以在 Agent 执行前运行数据收集脚本,将输出注入为上下文。这使得 Agent 可以基于最新数据做出决策:
def _run_job_script(script_path: str) -> tuple[bool, str]:
# 安全验证:脚本必须在 HERMES_HOME/scripts/ 内
scripts_dir = get_hermes_home() / "scripts"
path = (scripts_dir / raw).resolve()
path.relative_to(scripts_dir_resolved) # 防止路径遍历
# 执行并捕获输出
result = subprocess.run([sys.executable, str(path)],
capture_output=True, timeout=script_timeout,
cwd=str(path.parent)) # 在脚本目录中执行
# 输出脱敏(防止 API Key 泄露到 Agent 上下文)
stdout = redact_sensitive_text(stdout)
脚本超时通过多种方式配置:
def _get_script_timeout():
# 优先级:模块级覆盖 > 环境变量 > config.yaml > 默认值 120s
env_value = os.getenv("HERMES_CRON_SCRIPT_TIMEOUT")
cron_cfg = load_config().get("cron", {})
return cron_cfg.get("script_timeout_seconds", 120)
脚本输出被注入到 Agent 提示词中:
if success and script_output:
prompt = (
"## Script Output\n"
"The following data was collected by a pre-run script.\n\n"
f"```\n{script_output}\n```\n\n"
f"{original_prompt}"
)
2.5 任务执行前推进
循环任务在执行前就推进 next_run_at,这是一个关键的正确性保证:
# 在执行前推进——确保崩溃后不会重复触发
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
如果进程在任务执行过程中崩溃,任务不会在重启后再次触发(因为 next_run_at 已经推进到下一次)。一次性任务不推进,允许崩溃后重试。
2.6 SILENT_MARKER:安静模式
当 Cron Agent 判断没有新信息需要报告时,可以返回 [SILENT] 标记来抑制投递:
SILENT_MARKER = "[SILENT]"
if SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned [SILENT] — skipping delivery")
should_deliver = False
系统提示词中明确指导 Agent 使用此机制(通过 _build_job_prompt() 注入):
"SILENT: If there is genuinely nothing new to report, respond with
exactly \"[SILENT]\" (nothing else) to suppress delivery. Never combine
[SILENT] with content — either report your findings normally, or say
[SILENT] and nothing more."
任务输出始终保存到本地(save_job_output()),即使投递被抑制。这确保了审计日志的完整性。
2.7 投递目标解析
投递目标是灵活的,支持多种格式:
| 格式 | 示例 | 含义 |
|---|---|---|
local | local | 仅本地保存,不投递到任何平台 |
origin | origin | 投递到创建任务的聊天(需要 origin 信息) |
platform | telegram | 投递到该平台的默认频道(HOME_CHANNEL 环境变量) |
platform:chat_id | telegram:12345 | 投递到指定聊天的 ID |
platform:label | discord:Alice (dm) | 投递到频道目录中的人性化标签 |
投递时支持原生媒体文件发送——通过 BasePlatformAdapter.extract_media() 从 Agent 回复中提取 MEDIA: 标签,将文件作为原生附件发送(图片显示为图片,音频显示为语音消息等)。
投递还支持 live adapter(当 Gateway 运行时)和 standalone fallback(当 Gateway 未运行时)两条路径,确保在各种部署模式下都能投递。
2.8 不活跃超时
Cron 任务使用不活跃超时(而非总超时)来检测挂起,这是一个重要的设计选择:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) # 默认 10 分钟不活跃
while True:
done, _ = concurrent.futures.wait({_cron_future}, timeout=_POLL_INTERVAL)
if done:
break
_idle_secs = agent.get_activity_summary().get("seconds_since_activity", 0)
if _idle_secs >= _cron_inactivity_limit:
_inactivity_timeout = True
break
这意味着只要 Agent 在活跃工作(调用工具、接收流式 token),就不会被超时。一个需要执行 2 小时的任务,只要每 10 分钟内有至少一次活动,就不会被终止。
超时触发后,Agent 收到中断信号并生成诊断信息:
if _inactivity_timeout:
_activity = agent.get_activity_summary()
_last_desc = _activity.get("last_activity_desc", "unknown")
_secs_ago = _activity.get("seconds_since_activity", 0)
logger.error(
"Job '%s' idle for %.0fs (limit %.0fs) | last_activity=%s | tool=%s",
job_name, _secs_ago, _cron_inactivity_limit,
_last_desc, _cur_tool or "none",
)
agent.interrupt("Cron job timed out (inactivity)")
2.9 Cron Agent 配置
Cron 任务创建的 Agent 有特殊的配置:
agent = AIAgent(
platform="cron", # 触发 cron 平台提示
disabled_toolsets=["cronjob", "messaging", "clarify"], # 禁用特定工具集
skip_context_files=True, # 不注入项目上下文(scheduler 的 cwd 不相关)
skip_memory=True, # 不加载记忆(cron 提示词不应影响用户画像)
quiet_mode=True, # 静默执行
)
skip_memory=True 特别重要——Cron 任务的提示词通常是技术性的(如"检查服务器负载"),如果被注入到用户画像(USER.md),会污染 Agent 对用户的理解。
2.10 环境变量清理
每个 Cron 任务执行前注入的环境变量,在完成后必须清理,防止泄露到其他任务:
finally:
for key in (
"HERMES_SESSION_PLATFORM",
"HERMES_SESSION_CHAT_ID",
"HERMES_SESSION_CHAT_NAME",
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
):
os.environ.pop(key, None)
3. 浏览器自动化
3.1 多后端支持
tools/browser_tool.py 支持三种浏览器后端,提供从零成本到企业级的完整方案:
| 后端 | 类型 | 适用场景 | 配置方式 |
|---|---|---|---|
| Local Chromium | 本地 | 零成本,适合大多数场景 | agent-browser install |
| Browserbase | 云端 | 大规模并发、反检测 | BROWSERBASE_API_KEY |
| Browser Use | 云端 | Nous 订阅用户 | BROWSER_USE_API_KEY |
后端自动检测顺序:首先检查 Browser Use API key(Nous 订阅),然后检查 Browserbase API key,最后回退到本地 Chromium。
3.2 Accessibility Tree 交互
Hermes Agent 不使用截图+视觉模型的方式与网页交互,而是基于 Accessibility Tree(无障碍树)。这种设计有几个关键优势:
- 无视觉依赖:不需要多模态模型,任何文本模型都能使用
- 高效:Accessibility Tree 是文本表示,比截图小几个数量级
- 精确交互:通过引用 ID(
@e1,@e2)精确定位元素,不受视觉位置变化影响 - 可复现:相同的页面状态产生相同的 Tree 表示
# 获取页面快照 → 返回文本化的无障碍树
snapshot = browser_snapshot(task_id="task_123")
# 输出示例:
# [navigation] 'Main Menu'
# [link] 'Home' @e1
# [link] 'Products' @e2
# [main]
# [heading] 'Welcome' @e3
# [textbox] 'Search...' @e4
# [button] 'Submit' @e5
元素通过引用标识符进行交互:
browser_click("@e5", task_id="task_123") # 点击按钮
browser_type("@e4", "search query", task_id=...) # 输入文本
browser_scroll("down", task_id=...) # 滚动页面
browser_navigate("https://example.com", task_id=...) # 导航
3.3 会话隔离
每个任务使用独立的浏览器会话,通过 task_id 隔离:
result = browser_navigate("https://example.com", task_id="task_123")
不同 task_id 的操作在独立的浏览器上下文中执行,Cookie、localStorage 等状态互不影响。任务完成后自动清理浏览器会话,防止资源泄露。
4. 智能模型路由
4.1 复杂度检测
agent/smart_model_routing.py 实现了基于启发式的复杂度检测。设计哲学是保守——只有在明确简单的情况下才路由到廉价模型,任何疑似复杂的信号都保留主模型。
_COMPLEX_KEYWORDS = {
"debug", "debugging", "implement", "implementation",
"refactor", "patch", "traceback", "stacktrace", "exception", "error",
"analyze", "investigate", "architecture", "design",
"compare", "benchmark", "optimize", "review",
"terminal", "shell", "tool", "tools",
"pytest", "test", "tests",
"plan", "delegate", "subagent", "cron",
"docker", "kubernetes",
}
关键词选择覆盖了几乎所有需要工具调用或深度推理的场景。
4.2 路由决策流程
choose_cheap_model_route() 的判断逻辑是一系列否定条件——只有通过所有过滤的消息才会被路由:
def choose_cheap_model_route(user_message, routing_config):
text = user_message.strip()
# 硬性排除条件(任一满足就保留主模型)
if len(text) > max_chars: return None # 超过 160 字符
if len(text.split()) > max_words: return None # 超过 28 个词
if text.count("\n") > 1: return None # 多行消息
if "```" in text: return None # 包含代码块
if "`" in text: return None # 包含行内代码
if _URL_RE.search(text): return None # 包含 URL
# 关键词检测
words = {token.strip(".,:;!?()[]{}\"'`") for token in text.lower().split()}
if words & _COMPLEX_KEYWORDS: return None # 包含复杂关键词
# 通过所有过滤 → 路由到廉价模型
return dict(cheap_model, routing_reason="simple_turn")
典型的简单消息(会被路由到廉价模型):
- "你好"
- "今天天气怎么样?"
- "谢谢"
- "帮我总结一下"
典型的复杂消息(保留主模型):
- "帮我 debug 这个 error"(包含
debug、error关键词) - "请分析这段代码:
def foo(): ..."(包含代码块) - "帮我实现一个 HTTP 服务器"(包含
implement关键词)
4.3 配置示例
# config.yaml
smart_model_routing:
enabled: true
max_simple_chars: 160
max_simple_words: 28
cheap_model:
provider: openrouter
model: deepseek-chat
api_key_env: OPENROUTER_API_KEY
4.4 运行时解析
resolve_turn_route() 在检测到简单消息后,解析廉价模型的运行时配置。如果解析失败(provider 不可用、API Key 缺失等),自动回退到主模型:
def resolve_turn_route(user_message, routing_config, primary):
route = choose_cheap_model_route(user_message, routing_config)
if not route:
return {"model": primary["model"], ...} # 使用主模型
try:
runtime = resolve_runtime_provider(requested=route["provider"])
except Exception:
return {"model": primary["model"], ...} # 回退到主模型
return {
"model": route["model"],
"runtime": runtime,
"label": f"smart route → {route['model']} ({runtime['provider']})",
}
label 字段用于日志和调试,让用户知道当前 turn 使用了哪个模型。
5. STT/TTS 语音
5.1 语音识别(STT)
Hermes Agent 支持多种语音识别后端,覆盖从隐私优先到延迟优先的不同需求:
| 后端 | 类型 | 特点 |
|---|---|---|
| faster-whisper | 本地 | CTranslate2 优化,CPU 实时转写,隐私优先,无 API 费用 |
| Groq Whisper | 云端 | 极低延迟(通常 < 1 秒),适合实时场景 |
| OpenAI Whisper | 云端 | 高准确率,多语言支持 |
faster-whisper 使用 CTranslate2 推理引擎,相比 OpenAI 的原始实现速度快 4 倍,内存占用少 2 倍。在 CPU 上也能实现实时转写(音频时长 ≤ 转写时长)。
5.2 语音合成(TTS)
语音合成同样支持多后端:
| 后端 | 类型 | 特点 |
|---|---|---|
| Edge TTS | 云端(免费) | 微软 Edge 在线 TTS,支持多种语言和音色,无需 API Key |
| ElevenLabs | 云端(付费) | 高质量、可克隆音色,适合个性化场景 |
| OpenAI TTS | 云端(付费) | 通过 Nous 订阅可用,集成简单 |
语音输出通过 Gateway 的平台适配器投递。例如在 Telegram 上作为 voice message 发送(.ogg 格式),在 WhatsApp 上作为音频附件发送,在 Discord 上作为文件附件发送。
5.3 语音交互流程
完整的语音交互流程:
用户语音消息
↓
平台适配器接收音频文件(格式因平台而异:.ogg, .mp4, .opus 等)
↓
STT 后端转写为文本(支持多种音频格式自动转换)
↓
Agent 处理文本(与文本消息完全相同的处理流程)
↓
Agent 回复文本
↓
TTS 后端合成音频(可选,根据平台配置和用户偏好决定)
↓
平台适配器发送音频回复(自动选择最佳发送方式)
思考题
-
委派深度与递归:为什么将 MAX_DEPTH 设为 2(parent -> child,不允许 grandchild)?如果允许更深层的委派,会带来哪些技术挑战?考虑上下文传递质量(每层摘要都会丢失信息)、错误传播(子 Agent 失败的级联效应)和资源消耗(每层一个 AIAgent 实例)。
-
并行子 Agent 的工具集冲突:多个子 Agent 并行运行时,共享
model_tools._last_resolved_tool_names全局变量。_delegate_saved_tool_names如何解决这个竞态?如果移除这个机制,在什么场景下会出问题?考虑并行子 Agent 使用不同工具集的情况。 -
Cron 任务的不活跃超时:使用不活跃超时(而非总超时)设计 Cron 任务超时有什么优势?如果一个 Cron 任务需要执行 2 小时的长时间操作(如大规模代码重构),应该如何配置?考虑 HERMES_CRON_TIMEOUT=0(无限)的风险。
-
Smart Model Routing 的准确性:当前的路由决策基于启发式关键词和长度阈值。这种方法的局限性是什么?考虑多语言输入(中文消息不包含英文关键词但仍然复杂)、隐含复杂度的简单消息(如 "fix it"——只有两个词但需要完整的项目理解)等场景。
-
浏览器 Accessibility Tree vs 截图:基于 Accessibility Tree 的交互方式相比截图+视觉模型有什么优势和局限?在什么场景下截图方式更合适?考虑动态内容(Canvas、WebGL)、视觉布局理解和交互精度。
本篇涉及的核心源文件:
tools/delegate_tool.py— 子 Agent 委派、并行执行、凭证池、进度回调cron/scheduler.py— tick 调度、文件锁、投递目标解析、不活跃超时tools/browser_tool.py— 浏览器自动化、Accessibility Tree、多后端支持agent/smart_model_routing.py— 复杂度检测、模型路由、回退机制