AgentHarness 课程

第四篇:多平台接入

1.8万字·44分钟·
BasePlatformAdapter架构、飞书/微信适配器

概述

Hermes Agent 的核心设计理念之一是"平台无关"——Agent 的推理能力、工具系统、记忆系统都与具体的即时通讯平台解耦。实现这一解耦的关键抽象层就是 Platform Adapter(平台适配器)。每一类即时通讯平台(飞书、微信、Telegram、Discord 等)对应一个 Adapter,它们统一继承自 BasePlatformAdapter,对外暴露一致的消息收发接口,对内处理各平台特有的协议差异。

本篇将深入剖析适配器的架构设计、生命周期管理,并以飞书和微信为实战案例,展示如何为 Hermes 接入一个全新的平台。


1. 适配器架构

1.1 BasePlatformAdapter 抽象基类

BasePlatformAdapter(源码位于 gateway/platforms/base.py,第 726 行)是所有平台适配器的根基类。它定义了平台适配器必须实现的核心接口,同时提供了大量通用的基础设施逻辑。

class BasePlatformAdapter(ABC):
    """Base class for platform adapters.

    Subclasses implement platform-specific logic for:
    - Connecting and authenticating
    - Receiving messages
    - Sending messages/responses
    - Handling media
    """

    def __init__(self, config: PlatformConfig, platform: Platform):
        self.config = config
        self.platform = platform
        self._message_handler: Optional[MessageHandler] = None
        self._running = False
        self._active_sessions: Dict[str, asyncio.Event] = {}
        self._pending_messages: Dict[str, MessageEvent] = {}
        self._background_tasks: set[asyncio.Task] = set()

基类中定义了三个抽象方法,子类必须实现:

抽象方法职责
connect()连接平台、认证、开始接收消息,返回 bool
disconnect()断开连接、释放资源
send(chat_id, content, reply_to, metadata)发送消息到指定聊天,返回 SendResult
get_chat_info(chat_id)获取聊天/频道信息

此外还有多个可选覆盖方法,子类可按需实现:

可选方法默认行为说明
edit_message()返回 success=False编辑已发送的消息
send_typing()空操作发送"正在输入"指示器
send_image()将 URL 作为文本发送发送图片附件
send_voice()将路径作为文本发送发送语音消息
send_video()将路径作为文本发送发送视频
send_document()将路径作为文本发送发送文件
format_message()原样返回平台特定的消息格式化

1.2 MessageEvent 数据结构

所有平台适配器接收到消息后,都会将其归一化为统一的 MessageEvent 数据类(第 603 行):

@dataclass
class MessageEvent:
    """Incoming message from a platform.

    Normalized representation that all adapters produce.
    """
    text: str
    message_type: MessageType = MessageType.TEXT
    source: SessionSource = None
    raw_message: Any = None
    message_id: Optional[str] = None
    media_urls: List[str] = field(default_factory=list)
    media_types: List[str] = field(default_factory=list)
    reply_to_message_id: Optional[str] = None
    reply_to_text: Optional[str] = None
    auto_skill: Optional[str | list[str]] = None
    internal: bool = False
    timestamp: datetime = field(default_factory=datetime.now)

关键设计点:

  • text:消息的文本内容,已经过平台特定的归一化处理
  • message_type:使用 MessageType 枚举统一表示消息类型(TEXT/PHOTO/VIDEO/AUDIO/VOICE/DOCUMENT 等)
  • sourceSessionSource 对象,包含 platform/chat_id/user_id/thread_id 等会话来源信息
  • media_urls:本地文件路径列表(而非远程 URL),便于 vision tool 等工具直接访问
  • auto_skill:自动绑定的技能名称,用于 Telegram Topic / Discord Channel 等场景

MessageEvent 还提供了命令解析的便捷方法:

def is_command(self) -> bool:
    return self.text.startswith("/")

def get_command(self) -> Optional[str]:
    if not self.is_command():
        return None
    parts = self.text.split(maxsplit=1)
    raw = parts[0][1:].lower() if parts else None
    if raw and "@" in raw:
        raw = raw.split("@", 1)[0]
    if raw and "/" in raw:
        return None  # 拒绝文件路径
    return raw

1.3 MessageType 枚举

class MessageType(Enum):
    TEXT = "text"
    LOCATION = "location"
    PHOTO = "photo"
    VIDEO = "video"
    AUDIO = "audio"
    VOICE = "voice"
    DOCUMENT = "document"
    STICKER = "sticker"
    COMMAND = "command"

每种平台的原生消息类型在适配器内部被映射到这些统一枚举值。例如飞书的 post(富文本)、image(图片)分别映射为 TEXTPHOTO

1.4 SendResult 与重试机制

@dataclass
class SendResult:
    success: bool
    message_id: Optional[str] = None
    error: Optional[str] = None
    raw_response: Any = None
    retryable: bool = False

基类内置了完整的重试逻辑(_send_with_retry,第 1331 行):

  1. 首先尝试正常发送
  2. 如果是可重试的网络错误(ConnectionError、ConnectionReset 等),执行指数退避重试
  3. 如果是超时错误(可能已送达),直接返回失败(避免重复发送)
  4. 如果是格式化错误,尝试纯文本降级发送
  5. 所有重试耗尽后,向用户发送投递失败通知
_RETRYABLE_ERROR_PATTERNS = (
    "connecterror", "connectionerror", "connectionreset",
    "connectionrefused", "connecttimeout", "network",
    "broken pipe", "remotedisconnected", "eoferror",
)

1.5 网络安全工具函数

base.py 还提供了两个重要的网络安全工具函数:

is_network_accessible(host)(第 24 行)——判断主机地址是否暴露到非回环网络:

def is_network_accessible(host: str) -> bool:
    """Return True if host would expose the server beyond loopback."""
    try:
        addr = ipaddress.ip_address(host)
        if addr.is_loopback:
            return False
        if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
            return False
        return True
    except ValueError:
        # 主机名需要 DNS 解析
        pass
    # 通过 getaddrinfo 解析,任一地址非回环即为可访问
    ...

resolve_proxy_url(platform_env_var)(第 95 行)——按优先级解析代理 URL:

优先级 0: 平台专用环境变量 (e.g., DISCORD_PROXY)
优先级 1: HTTPS_PROXY / HTTP_PROXY / ALL_PROXY
优先级 2: macOS 系统代理 (scutil --proxy)

同时支持 SOCKS 和 HTTP 代理,SOCKS 代理使用 aiohttp_socks.ProxyConnector 并强制 rdns=True(远程 DNS 解析)。


2. 适配器生命周期

适配器从创建到销毁经历以下阶段:

2.1 初始化

adapter = FeishuAdapter(config)  # 子类构造函数

初始化阶段完成:

  • 调用 super().__init__(config, platform) 设置基类状态
  • config.extra 或环境变量加载平台特定配置
  • 初始化平台特定的缓存、锁、去重等数据结构

2.2 连接

success = await adapter.connect()

连接阶段的核心流程:

  1. 依赖检查:验证平台 SDK 是否已安装
  2. 凭证验证:确认 API Token/AppID 等必要凭证已配置
  3. 平台锁获取:通过 _acquire_platform_lock() 防止同一平台被多个 gateway 进程同时使用
  4. 建立连接:启动 WebSocket/Webhook/Long-Poll 等连接
  5. 标记已连接:调用 _mark_connected() 设置 _running = True 并写入运行时状态
def _mark_connected(self) -> None:
    self._running = True
    self._fatal_error_code = None
    self._fatal_error_message = None
    write_runtime_status(platform=self.platform.value, platform_state="connected")

2.3 消息接收

适配器通过平台特定的机制接收消息,然后调用 self.handle_message(event) 将其投递给基类的处理管线:

平台事件 → normalize → MessageEvent → handle_message() → _process_message_background()

handle_message() 方法(第 1429 行)的关键逻辑:

  1. Session 去重:同一 session 的消息串行处理(通过 _active_sessions 追踪)
  2. 命令旁路/approve/deny/stop 等关键命令绕过活跃 session 守卫
  3. 照片合并:连续到达的照片消息合并处理(照片连拍场景)
  4. 中断机制:非照片消息到达时设置 interrupt event,通知正在运行的 Agent
  5. 后台任务:通过 asyncio.create_task() 在后台处理消息

2.4 消息发送

基类的 _process_message_background() 方法(第 1540 行)处理完整的发送管线:

Agent响应 → extract_media() → extract_images() → extract_local_files()
         → auto-TTS → send_text → send_images → send_media_files → send_local_files

每一步之间可以插入 human delay(通过 HERMES_HUMAN_DELAY_MODE 环境变量控制),让消息发送更像真人。

2.5 断线重连

适配器通过 _set_fatal_error()_mark_connected() 管理连接状态:

def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None:
    self._running = False
    self._fatal_error_code = code
    self._fatal_error_message = message
    self._fatal_error_retryable = retryable
    write_runtime_status(platform=self.platform.value, platform_state="fatal",
                         error_code=code, error_message=message)

Gateway 的主循环会定期检查 fatal_error_retryable,如果为 True 则尝试自动重启适配器。


3. 飞书适配器实战

飞书适配器(gateway/platforms/feishu.pyFeishuAdapter 类在第 1017 行)是 Hermes 中最复杂、功能最完整的适配器之一,涵盖 WebSocket 长连接、Webhook、富文本处理、文件上传、交互卡片等能力。

3.1 WebSocket 长连接

飞书适配器支持两种连接模式:WebSocketWebhook。WebSocket 模式不需要公网 IP,适合本地开发和内网部署。

连接建立过程(connect() 方法,第 1201 行):

async def connect(self) -> bool:
    if not FEISHU_AVAILABLE:
        logger.error("[Feishu] lark-oapi not installed")
        return False
    if not self._app_id or not self._app_secret:
        logger.error("[Feishu] FEISHU_APP_ID or FEISHU_APP_SECRET not set")
        return False

    # 获取平台锁(防止同一 app_id 被多个 gateway 使用)
    acquired, existing = acquire_scoped_lock(
        _FEISHU_APP_LOCK_SCOPE, self._app_lock_identity, ...
    )
    if not acquired:
        self._set_fatal_error("feishu_app_lock", message, retryable=False)
        return False

    await self._connect_with_retry()
    self._mark_connected()
    return True

WebSocket 连接通过 lark-oapi SDK 的 FeishuWSClient 实现。SDK 运行在独立线程中:

def _run_official_feishu_ws_client(ws_client, adapter):
    """Run the official Lark WS client in its own thread-local event loop."""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    ws_client_module.loop = loop
    adapter._ws_thread_loop = loop
    # ... monkey-patch ping/pong 和重连参数
    ws_client.start()

3.2 消息收发

接收消息的事件处理器通过 lark-oapi 的 EventDispatcherHandler 构建:

def _build_event_handler(self) -> Any:
    return (
        EventDispatcherHandler.builder(
            self._encrypt_key, self._verification_token,
        )
        .register_p2_im_message_receive_v1(self._on_message_event)
        .register_p2_im_message_reaction_created_v1(self._on_reaction_event)
        .register_p2_card_action_trigger(self._on_card_action_trigger)
        .register_p2_im_chat_member_bot_added_v1(self._on_bot_added_to_chat)
        .register_p2_im_chat_member_bot_deleted_v1(self._on_bot_removed_from_chat)
        .build()
    )

接收到的消息通过 normalize_feishu_message() 归一化(第 612 行),支持以下消息类型:

飞书消息类型归一化处理
text直接提取 text 字段
post(富文本)递归解析 JSON payload,还原为 Markdown
image提取 image_key,下载并缓存到本地
file/audio/media提取 file_key,下载附件
merge_forward解析合并转发消息的摘要
interactive/card提取卡片标题和文本内容

发送消息使用飞书 IM API 的 CreateMessageRequest / ReplyMessageRequest。飞书适配器支持两种消息格式:

  • 纯文本:使用 msg_type="text"
  • 富文本 Post:使用 msg_type="post",支持 Markdown 渲染

3.3 富文本处理

飞书的 Post 富文本消息是一个嵌套的 JSON 结构。适配器提供了完整的解析链:

# 解析 Post payload
def parse_feishu_post_payload(payload) -> FeishuPostParseResult:
    resolved = _resolve_post_payload(payload)
    for row in resolved.get("content", []):
        row_text = "".join(_render_post_element(item, ...) for item in row)

_render_post_element() 函数(第 500 行)递归处理各种富文本元素:

def _render_post_element(element, image_keys, media_refs, mentioned_ids) -> str:
    tag = element.get("tag", "").lower()
    if tag == "text":
        return _render_text_element(element)    # 处理粗体、斜体、下划线、删除线、行内代码
    if tag == "a":
        return f"[{label}]({href})"             # 链接转 Markdown
    if tag == "at":
        mentioned_ids.append(user_id)           # 收集 @mention
        return f"@{display_name}"
    if tag in {"img", "image"}:
        image_keys.append(image_key)            # 收集图片 key
        return f"[Image: {alt}]"
    if tag == "code_block":
        return f"```{language}\n{code}\n```"    # 代码块直接映射

3.4 文件上传

飞书适配器覆盖了 send_image()send_voice()send_video()send_document() 等方法,实现本地文件到飞书 CDN 的上传:

本地文件 → 读取字节 → 调用飞书 Upload API → 获取 image_key/file_key → 构建消息体发送

3.5 @机器人触发

飞书群聊中,用户必须 @机器人 才会触发响应。适配器在消息事件处理中检查 mentioned_ids

  • 如果消息中包含 @_user_XXX 占位符,解析出被 @ 的 user_id
  • 与机器人的 bot_open_id / bot_user_id 比对
  • 只有 @ 了机器人的消息才投递到 handle_message()

3.6 批量发送优化

飞书适配器实现了智能的批量发送机制(FeishuBatchState):

@dataclass
class FeishuBatchState:
    events: Dict[str, MessageEvent] = field(default_factory=dict)
    tasks: Dict[str, asyncio.Task] = field(default_factory=dict)
    counts: Dict[str, int] = field(default_factory=dict)

当多条消息短时间内到达时,适配器会延迟处理以合并为一次批量请求,减少 API 调用次数。


4. 微信适配器

微信适配器(gateway/platforms/weixin.pyWeixinAdapter 类在第 1040 行)连接到腾讯 iLink Bot API,支持微信个人号的消息收发。

4.1 架构设计

微信适配器采用 Long-Polling 模式(与飞书的 WebSocket 模式不同):

async def _poll_loop(self) -> None:
    sync_buf = _load_sync_buf(self._hermes_home, self._account_id)
    while self._running:
        response = await _get_updates(
            self._session, base_url=self._base_url,
            token=self._token, sync_buf=sync_buf,
            timeout_ms=LONG_POLL_TIMEOUT_MS,    # 35 秒
        )

核心设计特点:

  • Context Token 机制:每次回复必须携带最新的 context_token,通过 ContextTokenStore 持久化到磁盘
  • AES-128-ECB 加密:CDN 文件传输使用 AES 加密,通过 cryptography 库实现
  • 消息去重:使用 MessageDeduplicator 防止重复处理
  • QR 登录:支持通过 QR 码完成微信登录认证
class WeixinAdapter(BasePlatformAdapter):
    MAX_MESSAGE_LENGTH = 4000

    def __init__(self, config: PlatformConfig):
        super().__init__(config, Platform.WEIXIN)
        self._token_store = ContextTokenStore(hermes_home)
        self._typing_cache = TypingTicketCache()
        self._dedup = MessageDeduplicator(ttl_seconds=300)

4.2 连接流程

async def connect(self) -> bool:
    if not check_weixin_requirements():
        self._set_fatal_error("weixin_missing_dependency", ..., retryable=False)
        return False
    if not self._token:
        self._set_fatal_error("weixin_missing_token", ..., retryable=False)
        return False

    self._acquire_platform_lock('weixin-bot-token', self._token, 'Weixin bot token')
    self._session = aiohttp.ClientSession()
    self._token_store.restore(self._account_id)
    self._poll_task = asyncio.create_task(self._poll_loop())
    self._mark_connected()

4.3 API 端点

微信适配器使用以下 iLink API 端点:

端点用途
ilink/bot/getupdatesLong-Poll 获取新消息
ilink/bot/sendmessage发送消息
ilink/bot/sendtyping发送输入指示
ilink/bot/getconfig获取配置(含 typing ticket)
ilink/bot/getuploadurl获取 CDN 上传地址
ilink/bot/get_bot_qrcode获取登录 QR 码

5. API Server 适配器

gateway/platforms/api_server.py 提供了 OpenAI 兼容的 HTTP API 接口,使任何 OpenAI 兼容的前端(如 Open WebUI、LobeChat、LibreChat)都能连接到 Hermes Agent:

POST /v1/chat/completions    — Chat Completions 格式(无状态,可通过 Header 维持会话)
POST /v1/responses           — Responses API 格式(通过 previous_response_id 维持状态)
GET  /v1/models              — 列出可用模型
POST /v1/runs                — 启动异步运行,立即返回 run_id
GET  /v1/runs/{run_id}/events — SSE 事件流
GET  /health                 — 健康检查

ResponseStore 使用 SQLite 持久化存储会话状态,支持跨 gateway 重启保持对话。


6. 接入新平台模式

当你需要为 Hermes 接入一个全新的即时通讯平台时,遵循以下模式:

6.1 继承 BasePlatformAdapter

# gateway/platforms/my_platform.py

from gateway.platforms.base import (
    BasePlatformAdapter, MessageEvent, MessageType, SendResult
)
from gateway.config import Platform, PlatformConfig


class MyPlatformAdapter(BasePlatformAdapter):
    MAX_MESSAGE_LENGTH = 4096

    def __init__(self, config: PlatformConfig):
        super().__init__(config, Platform.MY_PLATFORM)
        # 从 config.extra 或环境变量加载配置
        self._api_token = config.token or os.getenv("MY_PLATFORM_TOKEN", "")

    async def connect(self) -> bool:
        """建立与平台的连接"""
        # 1. 验证依赖和凭证
        # 2. 获取平台锁
        # 3. 建立 SDK 连接
        # 4. 启动消息接收循环
        self._mark_connected()
        return True

    async def disconnect(self) -> None:
        """断开连接"""
        self._running = False
        # 清理资源
        self._release_platform_lock()
        self._mark_disconnected()

    async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
        """发送消息"""
        try:
            # 调用平台 API 发送
            result = await self._api_send(chat_id, content)
            return SendResult(success=True, message_id=result["id"])
        except Exception as e:
            return SendResult(success=False, error=str(e))

    async def get_chat_info(self, chat_id) -> Dict[str, Any]:
        """获取聊天信息"""
        return {"name": "...", "type": "dm"}

6.2 实现消息归一化

将平台特定的消息格式转换为 MessageEvent

def _normalize_message(self, raw_msg) -> MessageEvent:
    text = raw_msg.get("text", "")
    msg_type = self._map_message_type(raw_msg.get("type"))

    source = self.build_source(
        chat_id=raw_msg["chat_id"],
        user_id=raw_msg["user_id"],
        chat_type="group" if raw_msg.get("is_group") else "dm",
    )

    return MessageEvent(
        text=text,
        message_type=msg_type,
        source=source,
        message_id=raw_msg["id"],
        raw_message=raw_msg,
    )

6.3 注册配置

gateway/config.pyPlatform 枚举中添加新平台标识:

class Platform(str, Enum):
    FEISHU = "feishu"
    WEIXIN = "weixin"
    TELEGRAM = "telegram"
    MY_PLATFORM = "my_platform"  # 新增

6.4 特性矩阵

不同平台支持的功能存在差异,以下是 Hermes 已有适配器的特性矩阵:

特性FeishuWeixinAPI ServerTelegramDiscord
连接方式WebSocket/WebhookLong-PollHTTP ServerLong-Poll/WebhookWebSocket
文本消息YesYesYesYesYes
富文本Post (JSON)纯文本MarkdownMarkdownV2Discord MD
图片收发YesYes-YesYes
文件上传YesYes (CDN)-YesYes
语音消息YesYes-Yes-
编辑消息Yes--YesYes
输入指示-Yes-YesYes
@触发Yes--YesYes
交互卡片Yes---Buttons
消息去重Yes (持久化)Yes (TTL)-YesYes
群聊权限规则引擎策略配置API Key--
代理支持---YesYes

思考题

  1. 设计决策:为什么 BasePlatformAdapter 选择用抽象基类(ABC)而非 Protocol(结构化子类型)?如果改用 Protocol,会有什么影响?

  2. 消息归一化:飞书的 Post 富文本消息是一个深度嵌套的 JSON 结构。normalize_feishu_message() 通过递归解析将其转换为 Markdown 文本。考虑一下,这种"先转为 Markdown 再让 Agent 处理"的策略有什么优势和潜在的信息损失?

  3. 重试策略:基类的 _send_with_retry() 对"超时错误"和"连接错误"采取不同策略——前者不重试(因为消息可能已经送达),后者重试。请思考:在什么场景下这个策略会导致问题?你能想到更好的方案吗?

  4. 平台锁:飞书适配器使用 acquire_scoped_lock() 防止同一个 app_id 被多个 gateway 进程同时使用。如果不加这个锁,在实际部署中会发生什么问题?

  5. 架构扩展:如果你要为 Hermes 接入 Slack 平台,Slack 的 Socket Mode 使用 WebSocket 连接、消息格式是 JSON block kit。请设计 SlackAdapter 的消息归一化方案,思考哪些 Slack 消息类型需要特殊处理。