第四篇:多平台接入
概述
Hermes Agent 的核心设计理念之一是"平台无关"——Agent 的推理能力、工具系统、记忆系统都与具体的即时通讯平台解耦。实现这一解耦的关键抽象层就是 Platform Adapter(平台适配器)。每一类即时通讯平台(飞书、微信、Telegram、Discord 等)对应一个 Adapter,它们统一继承自 BasePlatformAdapter,对外暴露一致的消息收发接口,对内处理各平台特有的协议差异。
本篇将深入剖析适配器的架构设计、生命周期管理,并以飞书和微信为实战案例,展示如何为 Hermes 接入一个全新的平台。
1. 适配器架构
1.1 BasePlatformAdapter 抽象基类
BasePlatformAdapter(源码位于 gateway/platforms/base.py,第 726 行)是所有平台适配器的根基类。它定义了平台适配器必须实现的核心接口,同时提供了大量通用的基础设施逻辑。
class BasePlatformAdapter(ABC):
"""Base class for platform adapters.
Subclasses implement platform-specific logic for:
- Connecting and authenticating
- Receiving messages
- Sending messages/responses
- Handling media
"""
def __init__(self, config: PlatformConfig, platform: Platform):
self.config = config
self.platform = platform
self._message_handler: Optional[MessageHandler] = None
self._running = False
self._active_sessions: Dict[str, asyncio.Event] = {}
self._pending_messages: Dict[str, MessageEvent] = {}
self._background_tasks: set[asyncio.Task] = set()
基类中定义了三个抽象方法,子类必须实现:
| 抽象方法 | 职责 |
|---|---|
connect() | 连接平台、认证、开始接收消息,返回 bool |
disconnect() | 断开连接、释放资源 |
send(chat_id, content, reply_to, metadata) | 发送消息到指定聊天,返回 SendResult |
get_chat_info(chat_id) | 获取聊天/频道信息 |
此外还有多个可选覆盖方法,子类可按需实现:
| 可选方法 | 默认行为 | 说明 |
|---|---|---|
edit_message() | 返回 success=False | 编辑已发送的消息 |
send_typing() | 空操作 | 发送"正在输入"指示器 |
send_image() | 将 URL 作为文本发送 | 发送图片附件 |
send_voice() | 将路径作为文本发送 | 发送语音消息 |
send_video() | 将路径作为文本发送 | 发送视频 |
send_document() | 将路径作为文本发送 | 发送文件 |
format_message() | 原样返回 | 平台特定的消息格式化 |
1.2 MessageEvent 数据结构
所有平台适配器接收到消息后,都会将其归一化为统一的 MessageEvent 数据类(第 603 行):
@dataclass
class MessageEvent:
"""Incoming message from a platform.
Normalized representation that all adapters produce.
"""
text: str
message_type: MessageType = MessageType.TEXT
source: SessionSource = None
raw_message: Any = None
message_id: Optional[str] = None
media_urls: List[str] = field(default_factory=list)
media_types: List[str] = field(default_factory=list)
reply_to_message_id: Optional[str] = None
reply_to_text: Optional[str] = None
auto_skill: Optional[str | list[str]] = None
internal: bool = False
timestamp: datetime = field(default_factory=datetime.now)
关键设计点:
text:消息的文本内容,已经过平台特定的归一化处理message_type:使用MessageType枚举统一表示消息类型(TEXT/PHOTO/VIDEO/AUDIO/VOICE/DOCUMENT 等)source:SessionSource对象,包含 platform/chat_id/user_id/thread_id 等会话来源信息media_urls:本地文件路径列表(而非远程 URL),便于 vision tool 等工具直接访问auto_skill:自动绑定的技能名称,用于 Telegram Topic / Discord Channel 等场景
MessageEvent 还提供了命令解析的便捷方法:
def is_command(self) -> bool:
return self.text.startswith("/")
def get_command(self) -> Optional[str]:
if not self.is_command():
return None
parts = self.text.split(maxsplit=1)
raw = parts[0][1:].lower() if parts else None
if raw and "@" in raw:
raw = raw.split("@", 1)[0]
if raw and "/" in raw:
return None # 拒绝文件路径
return raw
1.3 MessageType 枚举
class MessageType(Enum):
TEXT = "text"
LOCATION = "location"
PHOTO = "photo"
VIDEO = "video"
AUDIO = "audio"
VOICE = "voice"
DOCUMENT = "document"
STICKER = "sticker"
COMMAND = "command"
每种平台的原生消息类型在适配器内部被映射到这些统一枚举值。例如飞书的 post(富文本)、image(图片)分别映射为 TEXT 和 PHOTO。
1.4 SendResult 与重试机制
@dataclass
class SendResult:
success: bool
message_id: Optional[str] = None
error: Optional[str] = None
raw_response: Any = None
retryable: bool = False
基类内置了完整的重试逻辑(_send_with_retry,第 1331 行):
- 首先尝试正常发送
- 如果是可重试的网络错误(ConnectionError、ConnectionReset 等),执行指数退避重试
- 如果是超时错误(可能已送达),直接返回失败(避免重复发送)
- 如果是格式化错误,尝试纯文本降级发送
- 所有重试耗尽后,向用户发送投递失败通知
_RETRYABLE_ERROR_PATTERNS = (
"connecterror", "connectionerror", "connectionreset",
"connectionrefused", "connecttimeout", "network",
"broken pipe", "remotedisconnected", "eoferror",
)
1.5 网络安全工具函数
base.py 还提供了两个重要的网络安全工具函数:
is_network_accessible(host)(第 24 行)——判断主机地址是否暴露到非回环网络:
def is_network_accessible(host: str) -> bool:
"""Return True if host would expose the server beyond loopback."""
try:
addr = ipaddress.ip_address(host)
if addr.is_loopback:
return False
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
return False
return True
except ValueError:
# 主机名需要 DNS 解析
pass
# 通过 getaddrinfo 解析,任一地址非回环即为可访问
...
resolve_proxy_url(platform_env_var)(第 95 行)——按优先级解析代理 URL:
优先级 0: 平台专用环境变量 (e.g., DISCORD_PROXY)
优先级 1: HTTPS_PROXY / HTTP_PROXY / ALL_PROXY
优先级 2: macOS 系统代理 (scutil --proxy)
同时支持 SOCKS 和 HTTP 代理,SOCKS 代理使用 aiohttp_socks.ProxyConnector 并强制 rdns=True(远程 DNS 解析)。
2. 适配器生命周期
适配器从创建到销毁经历以下阶段:
2.1 初始化
adapter = FeishuAdapter(config) # 子类构造函数
初始化阶段完成:
- 调用
super().__init__(config, platform)设置基类状态 - 从
config.extra或环境变量加载平台特定配置 - 初始化平台特定的缓存、锁、去重等数据结构
2.2 连接
success = await adapter.connect()
连接阶段的核心流程:
- 依赖检查:验证平台 SDK 是否已安装
- 凭证验证:确认 API Token/AppID 等必要凭证已配置
- 平台锁获取:通过
_acquire_platform_lock()防止同一平台被多个 gateway 进程同时使用 - 建立连接:启动 WebSocket/Webhook/Long-Poll 等连接
- 标记已连接:调用
_mark_connected()设置_running = True并写入运行时状态
def _mark_connected(self) -> None:
self._running = True
self._fatal_error_code = None
self._fatal_error_message = None
write_runtime_status(platform=self.platform.value, platform_state="connected")
2.3 消息接收
适配器通过平台特定的机制接收消息,然后调用 self.handle_message(event) 将其投递给基类的处理管线:
平台事件 → normalize → MessageEvent → handle_message() → _process_message_background()
handle_message() 方法(第 1429 行)的关键逻辑:
- Session 去重:同一 session 的消息串行处理(通过
_active_sessions追踪) - 命令旁路:
/approve、/deny、/stop等关键命令绕过活跃 session 守卫 - 照片合并:连续到达的照片消息合并处理(照片连拍场景)
- 中断机制:非照片消息到达时设置 interrupt event,通知正在运行的 Agent
- 后台任务:通过
asyncio.create_task()在后台处理消息
2.4 消息发送
基类的 _process_message_background() 方法(第 1540 行)处理完整的发送管线:
Agent响应 → extract_media() → extract_images() → extract_local_files()
→ auto-TTS → send_text → send_images → send_media_files → send_local_files
每一步之间可以插入 human delay(通过 HERMES_HUMAN_DELAY_MODE 环境变量控制),让消息发送更像真人。
2.5 断线重连
适配器通过 _set_fatal_error() 和 _mark_connected() 管理连接状态:
def _set_fatal_error(self, code: str, message: str, *, retryable: bool) -> None:
self._running = False
self._fatal_error_code = code
self._fatal_error_message = message
self._fatal_error_retryable = retryable
write_runtime_status(platform=self.platform.value, platform_state="fatal",
error_code=code, error_message=message)
Gateway 的主循环会定期检查 fatal_error_retryable,如果为 True 则尝试自动重启适配器。
3. 飞书适配器实战
飞书适配器(gateway/platforms/feishu.py,FeishuAdapter 类在第 1017 行)是 Hermes 中最复杂、功能最完整的适配器之一,涵盖 WebSocket 长连接、Webhook、富文本处理、文件上传、交互卡片等能力。
3.1 WebSocket 长连接
飞书适配器支持两种连接模式:WebSocket 和 Webhook。WebSocket 模式不需要公网 IP,适合本地开发和内网部署。
连接建立过程(connect() 方法,第 1201 行):
async def connect(self) -> bool:
if not FEISHU_AVAILABLE:
logger.error("[Feishu] lark-oapi not installed")
return False
if not self._app_id or not self._app_secret:
logger.error("[Feishu] FEISHU_APP_ID or FEISHU_APP_SECRET not set")
return False
# 获取平台锁(防止同一 app_id 被多个 gateway 使用)
acquired, existing = acquire_scoped_lock(
_FEISHU_APP_LOCK_SCOPE, self._app_lock_identity, ...
)
if not acquired:
self._set_fatal_error("feishu_app_lock", message, retryable=False)
return False
await self._connect_with_retry()
self._mark_connected()
return True
WebSocket 连接通过 lark-oapi SDK 的 FeishuWSClient 实现。SDK 运行在独立线程中:
def _run_official_feishu_ws_client(ws_client, adapter):
"""Run the official Lark WS client in its own thread-local event loop."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ws_client_module.loop = loop
adapter._ws_thread_loop = loop
# ... monkey-patch ping/pong 和重连参数
ws_client.start()
3.2 消息收发
接收消息的事件处理器通过 lark-oapi 的 EventDispatcherHandler 构建:
def _build_event_handler(self) -> Any:
return (
EventDispatcherHandler.builder(
self._encrypt_key, self._verification_token,
)
.register_p2_im_message_receive_v1(self._on_message_event)
.register_p2_im_message_reaction_created_v1(self._on_reaction_event)
.register_p2_card_action_trigger(self._on_card_action_trigger)
.register_p2_im_chat_member_bot_added_v1(self._on_bot_added_to_chat)
.register_p2_im_chat_member_bot_deleted_v1(self._on_bot_removed_from_chat)
.build()
)
接收到的消息通过 normalize_feishu_message() 归一化(第 612 行),支持以下消息类型:
| 飞书消息类型 | 归一化处理 |
|---|---|
text | 直接提取 text 字段 |
post(富文本) | 递归解析 JSON payload,还原为 Markdown |
image | 提取 image_key,下载并缓存到本地 |
file/audio/media | 提取 file_key,下载附件 |
merge_forward | 解析合并转发消息的摘要 |
interactive/card | 提取卡片标题和文本内容 |
发送消息使用飞书 IM API 的 CreateMessageRequest / ReplyMessageRequest。飞书适配器支持两种消息格式:
- 纯文本:使用
msg_type="text" - 富文本 Post:使用
msg_type="post",支持 Markdown 渲染
3.3 富文本处理
飞书的 Post 富文本消息是一个嵌套的 JSON 结构。适配器提供了完整的解析链:
# 解析 Post payload
def parse_feishu_post_payload(payload) -> FeishuPostParseResult:
resolved = _resolve_post_payload(payload)
for row in resolved.get("content", []):
row_text = "".join(_render_post_element(item, ...) for item in row)
_render_post_element() 函数(第 500 行)递归处理各种富文本元素:
def _render_post_element(element, image_keys, media_refs, mentioned_ids) -> str:
tag = element.get("tag", "").lower()
if tag == "text":
return _render_text_element(element) # 处理粗体、斜体、下划线、删除线、行内代码
if tag == "a":
return f"[{label}]({href})" # 链接转 Markdown
if tag == "at":
mentioned_ids.append(user_id) # 收集 @mention
return f"@{display_name}"
if tag in {"img", "image"}:
image_keys.append(image_key) # 收集图片 key
return f"[Image: {alt}]"
if tag == "code_block":
return f"```{language}\n{code}\n```" # 代码块直接映射
3.4 文件上传
飞书适配器覆盖了 send_image()、send_voice()、send_video()、send_document() 等方法,实现本地文件到飞书 CDN 的上传:
本地文件 → 读取字节 → 调用飞书 Upload API → 获取 image_key/file_key → 构建消息体发送
3.5 @机器人触发
飞书群聊中,用户必须 @机器人 才会触发响应。适配器在消息事件处理中检查 mentioned_ids:
- 如果消息中包含
@_user_XXX占位符,解析出被 @ 的 user_id - 与机器人的
bot_open_id/bot_user_id比对 - 只有 @ 了机器人的消息才投递到
handle_message()
3.6 批量发送优化
飞书适配器实现了智能的批量发送机制(FeishuBatchState):
@dataclass
class FeishuBatchState:
events: Dict[str, MessageEvent] = field(default_factory=dict)
tasks: Dict[str, asyncio.Task] = field(default_factory=dict)
counts: Dict[str, int] = field(default_factory=dict)
当多条消息短时间内到达时,适配器会延迟处理以合并为一次批量请求,减少 API 调用次数。
4. 微信适配器
微信适配器(gateway/platforms/weixin.py,WeixinAdapter 类在第 1040 行)连接到腾讯 iLink Bot API,支持微信个人号的消息收发。
4.1 架构设计
微信适配器采用 Long-Polling 模式(与飞书的 WebSocket 模式不同):
async def _poll_loop(self) -> None:
sync_buf = _load_sync_buf(self._hermes_home, self._account_id)
while self._running:
response = await _get_updates(
self._session, base_url=self._base_url,
token=self._token, sync_buf=sync_buf,
timeout_ms=LONG_POLL_TIMEOUT_MS, # 35 秒
)
核心设计特点:
- Context Token 机制:每次回复必须携带最新的
context_token,通过ContextTokenStore持久化到磁盘 - AES-128-ECB 加密:CDN 文件传输使用 AES 加密,通过
cryptography库实现 - 消息去重:使用
MessageDeduplicator防止重复处理 - QR 登录:支持通过 QR 码完成微信登录认证
class WeixinAdapter(BasePlatformAdapter):
MAX_MESSAGE_LENGTH = 4000
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.WEIXIN)
self._token_store = ContextTokenStore(hermes_home)
self._typing_cache = TypingTicketCache()
self._dedup = MessageDeduplicator(ttl_seconds=300)
4.2 连接流程
async def connect(self) -> bool:
if not check_weixin_requirements():
self._set_fatal_error("weixin_missing_dependency", ..., retryable=False)
return False
if not self._token:
self._set_fatal_error("weixin_missing_token", ..., retryable=False)
return False
self._acquire_platform_lock('weixin-bot-token', self._token, 'Weixin bot token')
self._session = aiohttp.ClientSession()
self._token_store.restore(self._account_id)
self._poll_task = asyncio.create_task(self._poll_loop())
self._mark_connected()
4.3 API 端点
微信适配器使用以下 iLink API 端点:
| 端点 | 用途 |
|---|---|
ilink/bot/getupdates | Long-Poll 获取新消息 |
ilink/bot/sendmessage | 发送消息 |
ilink/bot/sendtyping | 发送输入指示 |
ilink/bot/getconfig | 获取配置(含 typing ticket) |
ilink/bot/getuploadurl | 获取 CDN 上传地址 |
ilink/bot/get_bot_qrcode | 获取登录 QR 码 |
5. API Server 适配器
gateway/platforms/api_server.py 提供了 OpenAI 兼容的 HTTP API 接口,使任何 OpenAI 兼容的前端(如 Open WebUI、LobeChat、LibreChat)都能连接到 Hermes Agent:
POST /v1/chat/completions — Chat Completions 格式(无状态,可通过 Header 维持会话)
POST /v1/responses — Responses API 格式(通过 previous_response_id 维持状态)
GET /v1/models — 列出可用模型
POST /v1/runs — 启动异步运行,立即返回 run_id
GET /v1/runs/{run_id}/events — SSE 事件流
GET /health — 健康检查
ResponseStore 使用 SQLite 持久化存储会话状态,支持跨 gateway 重启保持对话。
6. 接入新平台模式
当你需要为 Hermes 接入一个全新的即时通讯平台时,遵循以下模式:
6.1 继承 BasePlatformAdapter
# gateway/platforms/my_platform.py
from gateway.platforms.base import (
BasePlatformAdapter, MessageEvent, MessageType, SendResult
)
from gateway.config import Platform, PlatformConfig
class MyPlatformAdapter(BasePlatformAdapter):
MAX_MESSAGE_LENGTH = 4096
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.MY_PLATFORM)
# 从 config.extra 或环境变量加载配置
self._api_token = config.token or os.getenv("MY_PLATFORM_TOKEN", "")
async def connect(self) -> bool:
"""建立与平台的连接"""
# 1. 验证依赖和凭证
# 2. 获取平台锁
# 3. 建立 SDK 连接
# 4. 启动消息接收循环
self._mark_connected()
return True
async def disconnect(self) -> None:
"""断开连接"""
self._running = False
# 清理资源
self._release_platform_lock()
self._mark_disconnected()
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
"""发送消息"""
try:
# 调用平台 API 发送
result = await self._api_send(chat_id, content)
return SendResult(success=True, message_id=result["id"])
except Exception as e:
return SendResult(success=False, error=str(e))
async def get_chat_info(self, chat_id) -> Dict[str, Any]:
"""获取聊天信息"""
return {"name": "...", "type": "dm"}
6.2 实现消息归一化
将平台特定的消息格式转换为 MessageEvent:
def _normalize_message(self, raw_msg) -> MessageEvent:
text = raw_msg.get("text", "")
msg_type = self._map_message_type(raw_msg.get("type"))
source = self.build_source(
chat_id=raw_msg["chat_id"],
user_id=raw_msg["user_id"],
chat_type="group" if raw_msg.get("is_group") else "dm",
)
return MessageEvent(
text=text,
message_type=msg_type,
source=source,
message_id=raw_msg["id"],
raw_message=raw_msg,
)
6.3 注册配置
在 gateway/config.py 的 Platform 枚举中添加新平台标识:
class Platform(str, Enum):
FEISHU = "feishu"
WEIXIN = "weixin"
TELEGRAM = "telegram"
MY_PLATFORM = "my_platform" # 新增
6.4 特性矩阵
不同平台支持的功能存在差异,以下是 Hermes 已有适配器的特性矩阵:
| 特性 | Feishu | Weixin | API Server | Telegram | Discord |
|---|---|---|---|---|---|
| 连接方式 | WebSocket/Webhook | Long-Poll | HTTP Server | Long-Poll/Webhook | WebSocket |
| 文本消息 | Yes | Yes | Yes | Yes | Yes |
| 富文本 | Post (JSON) | 纯文本 | Markdown | MarkdownV2 | Discord MD |
| 图片收发 | Yes | Yes | - | Yes | Yes |
| 文件上传 | Yes | Yes (CDN) | - | Yes | Yes |
| 语音消息 | Yes | Yes | - | Yes | - |
| 编辑消息 | Yes | - | - | Yes | Yes |
| 输入指示 | - | Yes | - | Yes | Yes |
| @触发 | Yes | - | - | Yes | Yes |
| 交互卡片 | Yes | - | - | - | Buttons |
| 消息去重 | Yes (持久化) | Yes (TTL) | - | Yes | Yes |
| 群聊权限 | 规则引擎 | 策略配置 | API Key | - | - |
| 代理支持 | - | - | - | Yes | Yes |
思考题
-
设计决策:为什么
BasePlatformAdapter选择用抽象基类(ABC)而非 Protocol(结构化子类型)?如果改用 Protocol,会有什么影响? -
消息归一化:飞书的 Post 富文本消息是一个深度嵌套的 JSON 结构。
normalize_feishu_message()通过递归解析将其转换为 Markdown 文本。考虑一下,这种"先转为 Markdown 再让 Agent 处理"的策略有什么优势和潜在的信息损失? -
重试策略:基类的
_send_with_retry()对"超时错误"和"连接错误"采取不同策略——前者不重试(因为消息可能已经送达),后者重试。请思考:在什么场景下这个策略会导致问题?你能想到更好的方案吗? -
平台锁:飞书适配器使用
acquire_scoped_lock()防止同一个app_id被多个 gateway 进程同时使用。如果不加这个锁,在实际部署中会发生什么问题? -
架构扩展:如果你要为 Hermes 接入 Slack 平台,Slack 的 Socket Mode 使用 WebSocket 连接、消息格式是 JSON block kit。请设计
SlackAdapter的消息归一化方案,思考哪些 Slack 消息类型需要特殊处理。