AgentHarness 课程

第九篇:高级特性

2.4万字·1小时·
子Agent委派、Cron定时任务、浏览器自动化

概述

Hermes Agent 的基础能力(对话、工具调用、记忆管理)之上,还构建了一系列高级特性,使其从简单的对话助手进化为强大的自动化工作平台。子 Agent 委派实现了任务并行和上下文隔离,定时任务让 Agent 可以在无人值守时自主工作,浏览器自动化扩展了 Agent 的信息获取能力,智能模型路由在成本和质量之间找到平衡点。

这些特性并非相互独立——子 Agent 可以为 Cron 任务工作,浏览器自动化可以由子 Agent 调用,智能模型路由影响每个 Agent 实例(包括子 Agent 和 Cron Agent)的模型选择。理解这些特性的实现细节和协作方式,是掌握 Hermes Agent 高级用法的关键。

本篇将深入分析这些高级特性的源码实现,揭示其设计决策和技术细节。


1. 子 Agent 委派

1.1 架构概述

flowchart TB
    subgraph 父Agent
        A[接收用户任务] --> B[拆解为子任务]
        B --> C[delegate_task]
    end
    
    subgraph ThreadPoolExecutor
        C --> D1[子Agent 1: 前端开发]
        C --> D2[子Agent 2: 后端开发]
        C --> D3[子Agent 3: 测试编写]
    end
    
    D1 --> E[汇总结果]
    D2 --> E
    D3 --> E
    
    E --> F[父Agent 继续处理]
    
    style 父Agent fill:#e1f5fe
    style ThreadPoolExecutor fill:#fff3e0
sequenceDiagram
    participant User as 用户
    participant Parent as 父 Agent
    participant Child1 as 子 Agent 1
    participant Child2 as 子 Agent 2
    
    User->>Parent: 帮我开发一个番茄时钟
    Parent->>Parent: 拆解任务
    Parent->>Child1: 目标: 编写前端 UI
    Parent->>Child2: 目标: 编写后端 API
    
    par 并行执行
        Child1->>Child1: 使用 terminal/file 工具
        Child1-->>Parent: 前端代码完成
    and
        Child2->>Child2: 使用 terminal/file 工具
        Child2-->>Parent: 后端代码完成
    end
    
    Parent->>Parent: 合并结果
    Parent->>User: 番茄时钟开发完成

子 Agent 委派是 Hermes Agent 最强大的高级特性之一。它允许主 Agent 创建独立的子 Agent 实例来执行特定任务,每个子 Agent 拥有完全隔离的上下文、独立的终端会话和受限的工具集。父 Agent 的上下文中只看到委派调用的摘要结果,永远不会被子 Agent 的中间工具调用淹没。

核心源码位于 tools/delegate_tool.py,关键设计参数:

DELEGATE_BLOCKED_TOOLS = frozenset([
    "delegate_task",   # 禁止递归委派——子 Agent 不能创建孙 Agent
    "clarify",         # 禁止与用户交互——子 Agent 必须自主完成
    "memory",          # 禁止写入共享记忆——防止子 Agent 污染主 Agent 的记忆
    "send_message",    # 禁止跨平台发送消息——子 Agent 的输出只返回给父 Agent
    "execute_code",    # 禁止代码执行——子 Agent 应逐步推理,而非写脚本
])

_DEFAULT_MAX_CONCURRENT_CHILDREN = 3    # 最大并行子 Agent 数
MAX_DEPTH = 2                           # 委派深度限制(parent=0, child=1, grandchild rejected=2)
DEFAULT_MAX_ITERATIONS = 50             # 子 Agent 最大迭代次数
DEFAULT_TOOLSETS = ["terminal", "file", "web"]  # 默认工具集

每个被阻止的工具都有明确的理由:

  • delegate_task:递归委派会导致上下文爆炸和资源耗尽
  • clarify:子 Agent 在后台线程运行,无法与用户实时交互
  • memory:记忆是会话级的共享资源,子 Agent 的临时观察不应持久化
  • send_message:子 Agent 的结果应通过委派返回值传递,而非直接发送到聊天平台
  • execute_code:鼓励子 Agent 使用 atomic 工具调用,而非编写可能出错的脚本

1.2 隔离上下文设计

每个子 Agent 获得全新的对话上下文,这是委派最核心的设计原则:

child = AIAgent(
    ephemeral_system_prompt=child_prompt,   # 定制系统提示词(非全局 SOUL.md)
    skip_context_files=True,                # 不注入 SOUL.md/AGENTS.md
    skip_memory=True,                       # 不加载记忆
    clarify_callback=None,                  # 禁止用户交互
    quiet_mode=True,                        # 静默执行
    log_prefix=f"[subagent-{task_index}]",  # 独立日志前缀
    session_db=parent._session_db,          # 共享会话存储(只读)
    parent_session_id=parent.session_id,    # 关联父会话(用于追踪)
    iteration_budget=None,                  # 新的迭代预算(不受父 Agent 限制)
)

ephemeral_system_prompt_build_child_system_prompt() 构建,包含任务目标、上下文和工作空间路径:

def _build_child_system_prompt(goal, context=None, workspace_path=None):
    parts = [
        "You are a focused subagent working on a specific delegated task.",
        f"YOUR TASK:\n{goal}",
    ]
    if context:
        parts.append(f"\nCONTEXT:\n{context}")
    if workspace_path:
        parts.append(f"\nWORKSPACE PATH:\n{workspace_path}")
    parts.append(
        "\nComplete this task using the tools available to you. "
        "When finished, provide a clear, concise summary of:\n"
        "- What you did\n- What you found or accomplished\n"
        "- Any files you created or modified\n- Any issues encountered\n\n"
        "Important workspace rule: Never assume a repository lives at "
        "/workspace/... or any other container-style path unless the task "
        "explicitly gives that path."
    )
    return "\n".join(parts)

工作空间路径的注入特别重要——子 Agent 不知道它在哪个目录工作,必须通过 workspace_path 显式告知。这避免了子 Agent 猜测路径(如 /workspace/ 这种容器常见路径),确保操作在正确的位置执行。

1.3 工具集控制

子 Agent 的工具集从父 Agent 继承,但经过严格过滤:

# 计算子 Agent 可用的工具集
if toolsets:
    # 用户指定了工具集 → 与父 Agent 取交集(子 Agent 不能获得父 Agent 没有的工具)
    child_toolsets = _strip_blocked_tools([t for t in toolsets if t in parent_toolsets])
else:
    # 继承父 Agent 的工具集
    child_toolsets = _strip_blocked_tools(parent_enabled)

_strip_blocked_tools() 移除包含完全被阻止工具的工具集:

def _strip_blocked_tools(toolsets):
    blocked_toolset_names = {"delegation", "clarify", "memory", "code_execution"}
    return [t for t in toolsets if t not in blocked_toolset_names]

工具集的交集操作确保子 Agent 不能获得比父 Agent 更多的权限——即使用户在 tasks 参数中指定了父 Agent 没有的工具集,也会被过滤掉。

1.4 委派深度限制

深度限制防止无限递归委派:

MAX_DEPTH = 2  # parent (depth=0) -> child (depth=1) -> grandchild rejected (depth=2)

实现方式:

# 在子 Agent 构建时设置深度
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1

# 在 delegate_task() 入口检查
depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH:
    return json.dumps({"error": "Delegation depth limit reached (2). "
                                "Subagents cannot spawn further subagents."})

这意味着:主 Agent(depth=0)可以委派给子 Agent(depth=1),子 Agent 不能再委派(depth=2 >= MAX_DEPTH=2)。深度限制选择为 2 而非更大的值,是因为每增加一层委派,错误传播和资源消耗的复杂度指数增长。

1.5 并行执行

批量模式下,多个子 Agent 使用 ThreadPoolExecutor 并行执行:

with ThreadPoolExecutor(max_workers=max_children) as executor:
    futures = {}
    for i, t, child in children:
        future = executor.submit(
            _run_single_child,
            task_index=i, goal=t["goal"],
            child=child, parent_agent=parent_agent,
        )
        futures[future] = i

    for future in as_completed(futures):
        entry = future.result()
        results.append(entry)
        # 显示完成进度
        completion_line = f"{icon} [{idx+1}/{n_tasks}] {label}  ({dur}s)"

并行限制通过配置控制:

# config.yaml
delegation:
  max_concurrent_children: 3    # 默认 3,可通过环境变量覆盖
  max_iterations: 50            # 子 Agent 最大迭代次数
  model: ""                     # 子 Agent 使用的模型(可选)
  provider: ""                  # 子 Agent 使用的 provider(可选)
  base_url: ""                  # 自定义 API 端点(可选)
  reasoning_effort: ""          # 推理强度(如 "high"、"medium")

1.6 凭证池轮转

子 Agent 可以共享父 Agent 的凭证池(credential pool),实现 rate limit 时的密钥轮转:

def _resolve_child_credential_pool(effective_provider, parent_agent):
    # 同一 provider → 共享父 Agent 的池(同步 cooldown 状态和轮转信息)
    if effective_provider == parent_provider:
        return parent_pool
    # 不同 provider → 加载该 provider 自己的池
    pool = load_pool(effective_provider)
    return pool

子 Agent 在线程中执行前,从池中租用凭证:

leased_cred_id = child_pool.acquire_lease()
if leased_cred_id:
    leased_entry = child_pool.current()
    child._swap_credential(leased_entry)

执行完毕后释放租约(在 finally 块中):

finally:
    if child_pool and leased_cred_id:
        child_pool.release_lease(leased_cred_id)

租约机制确保凭证在使用期间不会被其他 Agent 实例抢占,同时在完成后及时归还给池中。

1.7 心跳机制

长时间运行的子 Agent 需要定期向父 Agent 报告活跃状态,防止 Gateway 的不活跃超时误判。如果没有心跳,父 Agent 在 delegate_task 阻塞期间没有任何活动,Gateway 可能在 10 分钟后将其标记为不活跃并终止。

_HEARTBEAT_INTERVAL = 30  # 秒

def _heartbeat_loop():
    while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
        child_summary = child.get_activity_summary()
        child_tool = child_summary.get("current_tool")
        if child_tool:
            desc = f"delegate_task: subagent running {child_tool}"
        else:
            desc = f"delegate_task: subagent {child_summary.get('last_activity_desc', 'working')}"
        parent_agent._touch_activity(desc)  # 更新父 Agent 的活跃时间戳

心跳线程在子 Agent 完成或失败后被清理:

finally:
    _heartbeat_stop.set()
    _heartbeat_thread.join(timeout=5)

1.8 进度回调

子 Agent 的工具调用可以实时显示在父 Agent 的界面中,提供执行透明度:

def _build_child_progress_callback(task_index, parent_agent, task_count=1):
    spinner = getattr(parent_agent, '_delegate_spinner', None)
    parent_cb = getattr(parent_agent, 'tool_progress_callback', None)

    def _callback(event_type, tool_name=None, preview=None, **kwargs):
        # 思考事件
        if event_type in ("_thinking", "reasoning.available"):
            if spinner:
                short = (text[:55] + "...") if len(text) > 55 else text
                spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
            return

        # 工具调用事件
        if spinner:
            emoji = get_tool_emoji(tool_name or "")
            line = f" {prefix}├─ {emoji} {tool_name}"
            if short:
                line += f"  \"{short}\""
            spinner.print_above(line)

        # Gateway 批量上报(每 5 个工具调用)
        if parent_cb:
            _batch.append(tool_name)
            if len(_batch) >= _BATCH_SIZE:
                parent_cb("subagent_progress", f"🔀 {prefix}{summary}")
                _batch.clear()

CLI 中显示为树状结构:

  ├─ 🔧 terminal  "ls -la /app/src"
  ├─ 📄 read_file  "main.py"
  ├─ 💭 "Analyzing the code structure for potential issues..."
  ├─ ✏️ write_file  "main.py"
  ✓ [1/3] Fix authentication bug  (12.3s)

Gateway 中通过 tool_progress_callback 批量上报,避免每条进度都触发平台消息(太频繁会导致速率限制)。

1.9 结果格式

子 Agent 的执行结果被结构化为 JSON,包含完整的执行统计:

entry = {
    "task_index": 0,
    "status": "completed",         # completed | failed | interrupted | error
    "summary": "Fixed the authentication bug...",    # 子 Agent 的最终回复
    "api_calls": 42,                # LLM API 调用次数
    "duration_seconds": 15.3,       # 执行时长
    "model": "gpt-4o",             # 使用的模型
    "exit_reason": "completed",     # completed | max_iterations | interrupted
    "tokens": {"input": 12345, "output": 6789},  # Token 消耗
    "tool_trace": [                 # 工具调用追踪(用于调试)
        {"tool": "terminal", "args_bytes": 120, "result_bytes": 4500, "status": "ok"},
        {"tool": "read_file", "args_bytes": 80, "result_bytes": 8200, "status": "ok"},
    ],
}

工具追踪使用 tool_call_id 正确配对并行工具调用与结果:

for msg in messages:
    if msg.get("role") == "assistant":
        for tc in msg.get("tool_calls") or []:
            entry = {"tool": fn.get("name"), "args_bytes": len(fn.get("arguments", ""))}
            tool_trace.append(entry)
            trace_by_id[tc.get("id")] = entry
    elif msg.get("role") == "tool":
        tc_id = msg.get("tool_call_id")
        target = trace_by_id.get(tc_id)
        if target:
            target.update({"result_bytes": len(content), "status": "ok" or "error"})

1.10 全局工具名恢复

子 Agent 构建时会修改全局变量 model_tools._last_resolved_tool_names(因为 AIAgent 构造函数会调用 get_tool_definitions())。为了不影响父 Agent 后续的工具调用,必须在子 Agent 构建前后保存和恢复这个全局变量:

# 保存父 Agent 的工具名
_parent_tool_names = list(_model_tools._last_resolved_tool_names)

try:
    for i, t in enumerate(task_list):
        child = _build_child_agent(...)  # 这会修改全局变量
        child._delegate_saved_tool_names = _parent_tool_names
        children.append((i, t, child))
finally:
    # 无论构建是否成功,恢复父 Agent 的工具名
    _model_tools._last_resolved_tool_names = _parent_tool_names

_run_single_child() 的 finally 块中也会恢复:

finally:
    saved_tool_names = getattr(child, "_delegate_saved_tool_names", None)
    if isinstance(saved_tool_names, list):
        model_tools._last_resolved_tool_names = list(saved_tool_names)

1.11 委派凭证路由

子 Agent 可以使用与父 Agent 不同的 provider 和模型,通过 delegation 配置项控制:

def _resolve_delegation_credentials(cfg, parent_agent):
    # 优先级:base_url > provider > 继承父 Agent
    if configured_base_url:
        return {"model": model, "provider": "custom", "base_url": base_url, ...}
    if configured_provider:
        runtime = resolve_runtime_provider(requested=provider)
        return {"model": model, "provider": runtime["provider"], ...}
    # 无覆盖 → 子 Agent 继承父 Agent 的所有凭证
    return {"model": model, "provider": None, "base_url": None, ...}

这使得用户可以在主 Agent 使用高质量模型(如 Claude Opus)的同时,将子 Agent 路由到廉价模型(如 DeepSeek),在成本和质量之间取得平衡。


2. 定时任务

2.1 Cron 调度器

cron/scheduler.py 实现了基于 tick 的调度器。Gateway 每 60 秒调用一次 tick(),检查是否有到期的任务需要执行:

def tick(verbose=True, adapters=None, loop=None) -> int:
    # 1. 获取文件锁(防止并发 tick)
    # 2. 查询到期任务:get_due_jobs()
    # 3. 逐个执行
    # 4. 投递结果到目标平台
    # 5. 更新任务状态
    return executed  # 返回执行的任务数

2.2 文件锁防并发

多进程场景(Gateway + systemd timer + 手动触发)需要互斥执行,避免同一任务被多次执行:

_LOCK_FILE = _LOCK_DIR / ".tick.lock"

lock_fd = open(_LOCK_FILE, "w")
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)  # 非阻塞排他锁
# OSError → 另一个进程正在执行 tick,直接返回 0

跨平台支持 Windows(msvcrt)和 Unix(fcntl)。非阻塞模式(LOCK_NB)确保不会等待——如果锁已被持有,立即返回。

2.3 Cron 表达式

定时任务使用标准 Cron 表达式,通过 hermes_time 库解析。每个任务存储以下字段:

{
    "id": "job_abc123",
    "name": "每日代码审查",
    "schedule": "0 9 * * 1-5",          // 工作日早 9 点
    "schedule_display": "Every weekday at 09:00",
    "prompt": "检查 /app 项目的最新 commit...",
    "skills": ["code-review"],           // 可选:加载技能
    "script": "collect_metrics.py",      // 可选:预运行数据收集脚本
    "deliver": "origin",                 // 投递目标
    "model": "",                         // 可选:指定模型
    "max_iterations": 50,                // 可选:最大迭代次数
    "origin": {                          // 创建来源(用于投递)
        "platform": "telegram",
        "chat_id": 123456,
        "chat_name": "Dev Team"
    }
}

2.4 预运行脚本

任务可以在 Agent 执行前运行数据收集脚本,将输出注入为上下文。这使得 Agent 可以基于最新数据做出决策:

def _run_job_script(script_path: str) -> tuple[bool, str]:
    # 安全验证:脚本必须在 HERMES_HOME/scripts/ 内
    scripts_dir = get_hermes_home() / "scripts"
    path = (scripts_dir / raw).resolve()
    path.relative_to(scripts_dir_resolved)  # 防止路径遍历

    # 执行并捕获输出
    result = subprocess.run([sys.executable, str(path)],
                            capture_output=True, timeout=script_timeout,
                            cwd=str(path.parent))  # 在脚本目录中执行

    # 输出脱敏(防止 API Key 泄露到 Agent 上下文)
    stdout = redact_sensitive_text(stdout)

脚本超时通过多种方式配置:

def _get_script_timeout():
    # 优先级:模块级覆盖 > 环境变量 > config.yaml > 默认值 120s
    env_value = os.getenv("HERMES_CRON_SCRIPT_TIMEOUT")
    cron_cfg = load_config().get("cron", {})
    return cron_cfg.get("script_timeout_seconds", 120)

脚本输出被注入到 Agent 提示词中:

if success and script_output:
    prompt = (
        "## Script Output\n"
        "The following data was collected by a pre-run script.\n\n"
        f"```\n{script_output}\n```\n\n"
        f"{original_prompt}"
    )

2.5 任务执行前推进

循环任务在执行前就推进 next_run_at,这是一个关键的正确性保证:

# 在执行前推进——确保崩溃后不会重复触发
advance_next_run(job["id"])

success, output, final_response, error = run_job(job)

如果进程在任务执行过程中崩溃,任务不会在重启后再次触发(因为 next_run_at 已经推进到下一次)。一次性任务不推进,允许崩溃后重试。

2.6 SILENT_MARKER:安静模式

当 Cron Agent 判断没有新信息需要报告时,可以返回 [SILENT] 标记来抑制投递:

SILENT_MARKER = "[SILENT]"

if SILENT_MARKER in deliver_content.strip().upper():
    logger.info("Job '%s': agent returned [SILENT] — skipping delivery")
    should_deliver = False

系统提示词中明确指导 Agent 使用此机制(通过 _build_job_prompt() 注入):

"SILENT: If there is genuinely nothing new to report, respond with
exactly \"[SILENT]\" (nothing else) to suppress delivery. Never combine
[SILENT] with content — either report your findings normally, or say
[SILENT] and nothing more."

任务输出始终保存到本地(save_job_output()),即使投递被抑制。这确保了审计日志的完整性。

2.7 投递目标解析

投递目标是灵活的,支持多种格式:

格式示例含义
locallocal仅本地保存,不投递到任何平台
originorigin投递到创建任务的聊天(需要 origin 信息)
platformtelegram投递到该平台的默认频道(HOME_CHANNEL 环境变量)
platform:chat_idtelegram:12345投递到指定聊天的 ID
platform:labeldiscord:Alice (dm)投递到频道目录中的人性化标签

投递时支持原生媒体文件发送——通过 BasePlatformAdapter.extract_media() 从 Agent 回复中提取 MEDIA: 标签,将文件作为原生附件发送(图片显示为图片,音频显示为语音消息等)。

投递还支持 live adapter(当 Gateway 运行时)和 standalone fallback(当 Gateway 未运行时)两条路径,确保在各种部署模式下都能投递。

2.8 不活跃超时

Cron 任务使用不活跃超时(而非总超时)来检测挂起,这是一个重要的设计选择:

_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))  # 默认 10 分钟不活跃

while True:
    done, _ = concurrent.futures.wait({_cron_future}, timeout=_POLL_INTERVAL)
    if done:
        break
    _idle_secs = agent.get_activity_summary().get("seconds_since_activity", 0)
    if _idle_secs >= _cron_inactivity_limit:
        _inactivity_timeout = True
        break

这意味着只要 Agent 在活跃工作(调用工具、接收流式 token),就不会被超时。一个需要执行 2 小时的任务,只要每 10 分钟内有至少一次活动,就不会被终止。

超时触发后,Agent 收到中断信号并生成诊断信息:

if _inactivity_timeout:
    _activity = agent.get_activity_summary()
    _last_desc = _activity.get("last_activity_desc", "unknown")
    _secs_ago = _activity.get("seconds_since_activity", 0)
    logger.error(
        "Job '%s' idle for %.0fs (limit %.0fs) | last_activity=%s | tool=%s",
        job_name, _secs_ago, _cron_inactivity_limit,
        _last_desc, _cur_tool or "none",
    )
    agent.interrupt("Cron job timed out (inactivity)")

2.9 Cron Agent 配置

Cron 任务创建的 Agent 有特殊的配置:

agent = AIAgent(
    platform="cron",                     # 触发 cron 平台提示
    disabled_toolsets=["cronjob", "messaging", "clarify"],  # 禁用特定工具集
    skip_context_files=True,             # 不注入项目上下文(scheduler 的 cwd 不相关)
    skip_memory=True,                    # 不加载记忆(cron 提示词不应影响用户画像)
    quiet_mode=True,                     # 静默执行
)

skip_memory=True 特别重要——Cron 任务的提示词通常是技术性的(如"检查服务器负载"),如果被注入到用户画像(USER.md),会污染 Agent 对用户的理解。

2.10 环境变量清理

每个 Cron 任务执行前注入的环境变量,在完成后必须清理,防止泄露到其他任务:

finally:
    for key in (
        "HERMES_SESSION_PLATFORM",
        "HERMES_SESSION_CHAT_ID",
        "HERMES_SESSION_CHAT_NAME",
        "HERMES_CRON_AUTO_DELIVER_PLATFORM",
        "HERMES_CRON_AUTO_DELIVER_CHAT_ID",
        "HERMES_CRON_AUTO_DELIVER_THREAD_ID",
    ):
        os.environ.pop(key, None)

3. 浏览器自动化

3.1 多后端支持

tools/browser_tool.py 支持三种浏览器后端,提供从零成本到企业级的完整方案:

后端类型适用场景配置方式
Local Chromium本地零成本,适合大多数场景agent-browser install
Browserbase云端大规模并发、反检测BROWSERBASE_API_KEY
Browser Use云端Nous 订阅用户BROWSER_USE_API_KEY

后端自动检测顺序:首先检查 Browser Use API key(Nous 订阅),然后检查 Browserbase API key,最后回退到本地 Chromium。

3.2 Accessibility Tree 交互

Hermes Agent 不使用截图+视觉模型的方式与网页交互,而是基于 Accessibility Tree(无障碍树)。这种设计有几个关键优势:

  1. 无视觉依赖:不需要多模态模型,任何文本模型都能使用
  2. 高效:Accessibility Tree 是文本表示,比截图小几个数量级
  3. 精确交互:通过引用 ID(@e1, @e2)精确定位元素,不受视觉位置变化影响
  4. 可复现:相同的页面状态产生相同的 Tree 表示
# 获取页面快照 → 返回文本化的无障碍树
snapshot = browser_snapshot(task_id="task_123")

# 输出示例:
# [navigation] 'Main Menu'
#   [link] 'Home' @e1
#   [link] 'Products' @e2
# [main]
#   [heading] 'Welcome' @e3
#   [textbox] 'Search...' @e4
#   [button] 'Submit' @e5

元素通过引用标识符进行交互:

browser_click("@e5", task_id="task_123")          # 点击按钮
browser_type("@e4", "search query", task_id=...)   # 输入文本
browser_scroll("down", task_id=...)                 # 滚动页面
browser_navigate("https://example.com", task_id=...)  # 导航

3.3 会话隔离

每个任务使用独立的浏览器会话,通过 task_id 隔离:

result = browser_navigate("https://example.com", task_id="task_123")

不同 task_id 的操作在独立的浏览器上下文中执行,Cookie、localStorage 等状态互不影响。任务完成后自动清理浏览器会话,防止资源泄露。


4. 智能模型路由

4.1 复杂度检测

agent/smart_model_routing.py 实现了基于启发式的复杂度检测。设计哲学是保守——只有在明确简单的情况下才路由到廉价模型,任何疑似复杂的信号都保留主模型。

_COMPLEX_KEYWORDS = {
    "debug", "debugging", "implement", "implementation",
    "refactor", "patch", "traceback", "stacktrace", "exception", "error",
    "analyze", "investigate", "architecture", "design",
    "compare", "benchmark", "optimize", "review",
    "terminal", "shell", "tool", "tools",
    "pytest", "test", "tests",
    "plan", "delegate", "subagent", "cron",
    "docker", "kubernetes",
}

关键词选择覆盖了几乎所有需要工具调用或深度推理的场景。

4.2 路由决策流程

choose_cheap_model_route() 的判断逻辑是一系列否定条件——只有通过所有过滤的消息才会被路由:

def choose_cheap_model_route(user_message, routing_config):
    text = user_message.strip()

    # 硬性排除条件(任一满足就保留主模型)
    if len(text) > max_chars:          return None  # 超过 160 字符
    if len(text.split()) > max_words:  return None  # 超过 28 个词
    if text.count("\n") > 1:           return None  # 多行消息
    if "```" in text:                  return None  # 包含代码块
    if "`" in text:                    return None  # 包含行内代码
    if _URL_RE.search(text):           return None  # 包含 URL

    # 关键词检测
    words = {token.strip(".,:;!?()[]{}\"'`") for token in text.lower().split()}
    if words & _COMPLEX_KEYWORDS:      return None  # 包含复杂关键词

    # 通过所有过滤 → 路由到廉价模型
    return dict(cheap_model, routing_reason="simple_turn")

典型的简单消息(会被路由到廉价模型):

  • "你好"
  • "今天天气怎么样?"
  • "谢谢"
  • "帮我总结一下"

典型的复杂消息(保留主模型):

  • "帮我 debug 这个 error"(包含 debugerror 关键词)
  • "请分析这段代码:def foo(): ..."(包含代码块)
  • "帮我实现一个 HTTP 服务器"(包含 implement 关键词)

4.3 配置示例

# config.yaml
smart_model_routing:
  enabled: true
  max_simple_chars: 160
  max_simple_words: 28
  cheap_model:
    provider: openrouter
    model: deepseek-chat
    api_key_env: OPENROUTER_API_KEY

4.4 运行时解析

resolve_turn_route() 在检测到简单消息后,解析廉价模型的运行时配置。如果解析失败(provider 不可用、API Key 缺失等),自动回退到主模型:

def resolve_turn_route(user_message, routing_config, primary):
    route = choose_cheap_model_route(user_message, routing_config)
    if not route:
        return {"model": primary["model"], ...}  # 使用主模型

    try:
        runtime = resolve_runtime_provider(requested=route["provider"])
    except Exception:
        return {"model": primary["model"], ...}  # 回退到主模型

    return {
        "model": route["model"],
        "runtime": runtime,
        "label": f"smart route → {route['model']} ({runtime['provider']})",
    }

label 字段用于日志和调试,让用户知道当前 turn 使用了哪个模型。


5. STT/TTS 语音

5.1 语音识别(STT)

Hermes Agent 支持多种语音识别后端,覆盖从隐私优先到延迟优先的不同需求:

后端类型特点
faster-whisper本地CTranslate2 优化,CPU 实时转写,隐私优先,无 API 费用
Groq Whisper云端极低延迟(通常 < 1 秒),适合实时场景
OpenAI Whisper云端高准确率,多语言支持

faster-whisper 使用 CTranslate2 推理引擎,相比 OpenAI 的原始实现速度快 4 倍,内存占用少 2 倍。在 CPU 上也能实现实时转写(音频时长 ≤ 转写时长)。

5.2 语音合成(TTS)

语音合成同样支持多后端:

后端类型特点
Edge TTS云端(免费)微软 Edge 在线 TTS,支持多种语言和音色,无需 API Key
ElevenLabs云端(付费)高质量、可克隆音色,适合个性化场景
OpenAI TTS云端(付费)通过 Nous 订阅可用,集成简单

语音输出通过 Gateway 的平台适配器投递。例如在 Telegram 上作为 voice message 发送(.ogg 格式),在 WhatsApp 上作为音频附件发送,在 Discord 上作为文件附件发送。

5.3 语音交互流程

完整的语音交互流程:

用户语音消息
    ↓
平台适配器接收音频文件(格式因平台而异:.ogg, .mp4, .opus 等)
    ↓
STT 后端转写为文本(支持多种音频格式自动转换)
    ↓
Agent 处理文本(与文本消息完全相同的处理流程)
    ↓
Agent 回复文本
    ↓
TTS 后端合成音频(可选,根据平台配置和用户偏好决定)
    ↓
平台适配器发送音频回复(自动选择最佳发送方式)

思考题

  1. 委派深度与递归:为什么将 MAX_DEPTH 设为 2(parent -> child,不允许 grandchild)?如果允许更深层的委派,会带来哪些技术挑战?考虑上下文传递质量(每层摘要都会丢失信息)、错误传播(子 Agent 失败的级联效应)和资源消耗(每层一个 AIAgent 实例)。

  2. 并行子 Agent 的工具集冲突:多个子 Agent 并行运行时,共享 model_tools._last_resolved_tool_names 全局变量。_delegate_saved_tool_names 如何解决这个竞态?如果移除这个机制,在什么场景下会出问题?考虑并行子 Agent 使用不同工具集的情况。

  3. Cron 任务的不活跃超时:使用不活跃超时(而非总超时)设计 Cron 任务超时有什么优势?如果一个 Cron 任务需要执行 2 小时的长时间操作(如大规模代码重构),应该如何配置?考虑 HERMES_CRON_TIMEOUT=0(无限)的风险。

  4. Smart Model Routing 的准确性:当前的路由决策基于启发式关键词和长度阈值。这种方法的局限性是什么?考虑多语言输入(中文消息不包含英文关键词但仍然复杂)、隐含复杂度的简单消息(如 "fix it"——只有两个词但需要完整的项目理解)等场景。

  5. 浏览器 Accessibility Tree vs 截图:基于 Accessibility Tree 的交互方式相比截图+视觉模型有什么优势和局限?在什么场景下截图方式更合适?考虑动态内容(Canvas、WebGL)、视觉布局理解和交互精度。


本篇涉及的核心源文件:

  • tools/delegate_tool.py — 子 Agent 委派、并行执行、凭证池、进度回调
  • cron/scheduler.py — tick 调度、文件锁、投递目标解析、不活跃超时
  • tools/browser_tool.py — 浏览器自动化、Accessibility Tree、多后端支持
  • agent/smart_model_routing.py — 复杂度检测、模型路由、回退机制