0%

【长文】港大新开源的OpenHarness,到底做了啥?

扒一扒港大最近开源的OpenHarness。

前言

按照项目仓库的介绍,OpenHarness 是一个面向开源社区的 Agent Harness。它提供轻量、可扩展、可检查的 Agent 基础设施,包括:
• Agent loop
• tools / skills / plugins
• memory / session resume
• permissions / hooks
• multi-agent coordination
• provider workflows
• React TUI
• ohmo personal-agent app

截至目前,该仓库已有 9k star。本文将尝试挖掘与解读该项目的技术架构,提取其中对以后的项目有借鉴意义、可复用的部分。在详细分析之前,我们抛出一个问题:一个 harness 框架需要包含哪几点核心内容? 在文章的最后,我们将尝试回答这个问题。

Agent Loop

Agent Loop 是所有 Harness 框架最核心的部分,是驱动 AI 持续思考、调用工具、处理结果的心脏。

OpenHarness 的 Agent Loop 主要由 9 个文件构成,包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
engine/
├── query_engine.py ← 外壳:QueryEngine 类,管理会话历史
├── query.py ← 内核:run_query() 主循环 + _execute_tool_call()
├── messages.py ← 数据结构:ConversationMessage / ToolUseBlock / ToolResultBlock
├── stream_events.py ← 事件总线:7 种 StreamEvent 类型
└── cost_tracker.py ← 用量统计:跨 Turn 累加 token

api/
├── client.py ← LLM 接入:流式调用 + 指数退避重试
└── usage.py ← 用量快照:UsageSnapshot

tools/
└── base.py ← 工具抽象:BaseTool / ToolRegistry / ToolExecutionContext

services/compact/
└── __init__.py ← 上下文压缩:auto_compact_if_needed()

其中query.py整个框架唯一真正的循环驱动器,扮演最重要的角色。我们先来分析一下它的实现。

query

run_query 主循环

循环引擎的入口是 run_query() 函数,它接收一个 QueryContext 配置容器和一个 ConversationMessage 对象的列表作为输入。

run_query 入参

其中,QueryContext 是不可变的配置容器,在整个 run_query 生命周期内只读传入,唯一的可变状态是 tool_metadata 这个字典(通过引用共享)。其中包含以下字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@dataclass
class QueryContext:
api_client: SupportsStreamingMessages # LLM 接口
tool_registry: ToolRegistry # 工具注册表
permission_checker: PermissionChecker # 权限检查器
cwd: Path # 工作目录
model: str # 模型名
system_prompt: str # 系统提示词
max_tokens: int # 单次最大 token
context_window_tokens: int | None # 上下文窗口大小
auto_compact_threshold_tokens: int|None # 自动压缩阈值
permission_prompt: PermissionPrompt # 用户确认回调
ask_user_prompt: AskUserPrompt # 用户输入回调
max_turns: int | None = 200 # 最大轮次(默认200)
hook_executor: HookExecutor | None # 钩子执行器
tool_metadata: dict[str, object] | None # 跨轮次记忆字典

ConversationMessage 对象定义在 messages.py 中,是对话历史中的一条消息,基于 Pydantic BaseModel。其定义为:

1
2
3
class ConversationMessage(BaseModel):
role: Literal["user", "assistant"] # 消息角色,只能是这两种
content: list[ContentBlock] # 消息内容,是一个 Block 列表

content 里的每个 ContentBlock 是以下四种类型之一(用 type 字段区分):

  • TextBlock:文本块
  • ImageBlock:图片块
  • ToolUseBlock:模型发起的工具调用请求
  • ToolResultBlock:工具执行结果,回传给模型

ConversationMessage 就是 Agent Loop 中流转的最小数据单元。整个 run_query()messages: list[ConversationMessage] 参数,就是一个不断追加新消息的对话历史列表,每一轮循环都会往里 append 新的 assistant 消息和 tool result 消息。

run_query 循环骨架

为了精简代码与逻辑,我提取了几个循环的关键步骤,run_query() 的循环骨架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
while context.max_turns is None or turn_count < context.max_turns:
turn_count += 1

# 1. 压缩检查(每轮开始前)
async for event, usage in _stream_compaction(trigger="auto"):
yield event, usage

# 2. 调用 LLM
async for event in context.api_client.stream_message(...):
...

# 3. 处理 LLM 响应
messages.append(final_message)
yield AssistantTurnComplete(...)

# 4. 无工具调用 → 退出循环
if not final_message.tool_uses:
return

# 5. 执行工具(单个串行 / 多个并发)
...

# 6. 将工具结果追加回 messages
messages.append(ConversationMessage(role="user", content=tool_results))

其终止条件有三个:

  • LLM 不再请求工具,return(正常结束)
  • turn_count >= max_turns,抛出 MaxTurnsExceeded
  • API 报错且无法恢复,yield ErrorEvent + return

整体来说,run_query() 是一个 异步生成器(AsyncIterator),每一轮循环代表 Agent 的一个"思考-行动"周期。下面是对每一个阶段的详细解释。

阶段 1:Auto-Compact(每轮开始前)

1
2
3
async for event, usage in _stream_compaction(trigger="auto"):
yield event, usage
messages, was_compacted = last_compaction_result
  • 每轮循环开始前都会检查 token 是否超过 auto_compact_threshold_tokens
  • 压缩策略分两步:先做廉价的 microcompact(清空旧 ToolResult 内容),不够再做 LLM 全量摘要
  • 压缩过程中 yield CompactProgressEvent 给上层展示进度

阶段 2:调用 LLM(流式)

1
2
3
4
5
6
async for event in context.api_client.stream_message(ApiMessageRequest(...)):
if isinstance(event, ApiTextDeltaEvent):
yield AssistantTextDelta(text=event.text), None # 实时流式文字
if isinstance(event, ApiMessageCompleteEvent):
final_message = event.message # 完整消息
usage = event.usage
  • 以流式方式调用 API,边生成边 yield 文字 delta 给上层(TUI 实时显示)
  • 最终拿到完整的 final_message(一个 ConversationMessage

阶段 3:异常处理 — Reactive Compact

1
2
3
4
5
6
7
except Exception as exc:
if not reactive_compact_attempted and _is_prompt_too_long_error(exc):
# 强制压缩后 continue 重试本轮
async for event, usage in _stream_compaction(trigger="reactive", force=True):
yield event, usage
if was_compacted:
continue
  • 如果 API 报 context too long 类错误,触发一次被动强制压缩,然后 continue 重试当前轮
  • 只尝试一次(reactive_compact_attempted 标志位防止死循环)

阶段 4:消息追加 & 判断是否继续

1
2
3
4
5
messages.append(final_message)
yield AssistantTurnComplete(message=final_message, usage=usage), usage

if not final_message.tool_uses:
return # 模型没有调用工具 → 对话结束
  • assistant 消息追加到历史
  • 没有工具调用 → 正常结束,return

阶段 5:工具执行

1
2
3
4
5
6
if len(tool_calls) == 1:
# 顺序执行,立即 yield 事件
result = await _execute_tool_call(...)
else:
# 并发执行
results = await asyncio.gather(*[_run(tc) for tc in tool_calls])
  • 如果是单个工具调用,立即执行, 事件实时 yield
  • 如果是多个,用 asyncio.gather 并发执行,全部完成后统一 yield

阶段 6:工具结果回写 → 进入下一轮

1
2
messages.append(ConversationMessage(role="user", content=tool_results))
# 回到 while 循环顶部
  • 把所有 ToolResultBlock 包装成一条 role="user" 的消息追加到历史
  • 模型下一轮会看到工具结果,继续推理

下图为其完整的流程:

工具执行 _execute_tool_call( )

_execute_tool_call()run_query() 主循环中单次工具调用的完整生命周期管理器,其在 阶段 5:工具执行 阶段起作用。它工作在 LLM 输出完成之后、下一轮 LLM 调用之前 的这个窗口期,负责把模型请求的工具调用真正落地执行,并把结果写回 messages,驱动下一轮推理。

其函数签名为:

1
2
3
4
5
6
async def _execute_tool_call(
context: QueryContext,
tool_name: str,
tool_use_id: str,
tool_input: dict[str, object],
) -> ToolResultBlock:
  • 输入:工具名、工具调用 ID、工具参数
  • 输出:一个 ToolResultBlock(无论成功还是失败,永远返回结果,不抛异常)

其内部流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. Pre-Hook 检查

2. 查找工具(tool_registry.get)

3. 校验输入(input_model.model_validate)

4. 权限检查(permission_checker.evaluate)
├── 需要确认 → 弹出 permission_prompt 询问用户
└── 直接拒绝 → 返回 is_error=True

5. 工具执行 tool.execute(parsed_input, ToolExecutionContext)

6. 状态记忆更新 _record_tool_carryover(更新 tool_metadata 状态记忆)

7. Post-Hook 执行

8. 返回 ToolResultBlock
压缩机制 _stream_compaction()

这是一个嵌套的异步生成器,工作在每轮循环的入口处,设计非常精妙:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def _stream_compaction(*, trigger, force=False):
progress_queue = asyncio.Queue()

# 把压缩任务扔到后台 Task
task = asyncio.create_task(auto_compact_if_needed(...))

# 前台不断轮询 progress_queue,把进度事件 yield 出去
while True:
try:
event = await asyncio.wait_for(progress_queue.get(), timeout=0.05)
yield event, None
except asyncio.TimeoutError:
if task.done():
break
# 等待 Task 完成,取结果
last_compaction_result = await task

另外,_stream_compaction() 还有一个被动触发的时机,在 API 调用异常时,会触发压缩机制。

记忆系统 tool_metadata

记忆系统工作在工具执行的前后,它贯穿整个工具调用生命周期,有写入和读取两个方向。写入时机为工具执行完成后,读取时机为工具执行时注入。其结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tool_metadata = {
"task_focus_state": {
"goal": str, # 当前目标
"recent_goals": list, # 最近5个目标
"active_artifacts": list, # 最近8个活跃文件/URL
"verified_state": list, # 已验证的工作成果
"next_step": str, # 下一步计划
},
"read_file_state": list, # 最近6次文件读取记录
"invoked_skills": list, # 最近8个调用的技能
"async_agent_state": list, # 最近8个异步 Agent 活动
"recent_work_log": list, # 最近10条工作日志
"recent_verified_work": list, # 最近10条已验证工作
"permission_mode": str, # "plan" | "default"
}

其通过 _append_capped_unique() 方法来更新:

1
2
3
4
5
6
def _append_capped_unique(bucket, value, *, limit):
if value in bucket:
bucket.remove(value) # 去重(移到末尾)
bucket.append(value)
if len(bucket) > limit:
del bucket[:-limit] # 超限删头部(保留最新)

这是一个有界 LRU 队列,最近使用的排在末尾,超限时删除最旧的。

同时,其通过 _record_tool_carryover() 来调度,每次工具执行成功后,根据 tool_name 分发到不同的记忆桶。这些记忆会通过 ToolExecutionContext.metadata 传入每个工具的 execute() 方法,让工具能感知之前做过什么

query的外壳层QueryEngine

前面我们深入分析了 query.py 中的循环内核,但 run_query() 本身不持有会话历史,它只接收一个 messages 列表并在上面操作。那么,谁来管理跨轮次的会话状态?答案是 query_engine.py 中的 QueryEngine 类。

QueryEnginerun_query()直接调用者,也是外部世界(TUI、API 服务等)与 Agent Loop 交互的唯一入口。它的核心职责可以概括为三件事:管理会话历史累计用量统计组装并调用内核

其工作流程为:

核心状态

QueryEngine__init__ 中持有以下关键状态:

1
2
3
self._messages: list[ConversationMessage] = []   # 完整会话历史
self._cost_tracker = CostTracker() # 跨 Turn 用量累加器
self._tool_metadata = tool_metadata or {} # 跨轮次记忆字典(引用共享给 run_query)

其中,所有历史消息都存在 _messages_tool_metadata 通过引用传递给 QueryContext,因此 run_query() 内部对它的修改会直接反映到 QueryEngine 上。

对外入口 submit_message()

submit_message() 是用户发送消息的入口,其流程为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
async def submit_message(self, prompt: str | ConversationMessage) -> AsyncIterator[StreamEvent]:
# 1. 构造用户消息并追加到历史
user_message = ConversationMessage.from_user_text(prompt) if isinstance(prompt, str) else prompt
remember_user_goal(self._tool_metadata, user_message.text)
self._messages.append(user_message)

# 2. 组装 QueryContext(把自身配置打包成不可变容器)
context = QueryContext(
api_client=self._api_client,
tool_registry=self._tool_registry,
...
tool_metadata=self._tool_metadata,
)

# 3. 复制一份 messages 传给 run_query(避免内核直接操作原始列表)
query_messages = list(self._messages)

# 4. 如果是 Coordinator 角色,注入 Coordinator 上下文消息
coordinator_context = self._build_coordinator_context_message()
if coordinator_context is not None:
query_messages.append(coordinator_context)

# 5. 调用内核循环,转发事件流
async for event, usage in run_query(context, query_messages):
if isinstance(event, AssistantTurnComplete):
self._messages = list(query_messages) # 同步回写历史
if usage is not None:
self._cost_tracker.add(usage) # 累加用量
yield event

这里有几个值得注意的设计:

  • 调用 run_query() 前会先 list(self._messages) 做一次浅拷贝,内核在副本上操作,只有在收到 AssistantTurnComplete 事件时才同步回写,避免中途异常导致历史被污染
  • 在进入循环前调用 remember_user_goal() 将用户意图写入 tool_metadata,让记忆系统在第一轮工具执行时就能感知到当前目标
  • 对于 Coordinator 角色,会通过 _build_coordinator_context_message() 构造一条携带 worker 工具上下文的合成消息,追加到 query_messages 末尾

中断恢复 continue_pending()

除了 submit_message()QueryEngine 还提供了 continue_pending() 方法,用于恢复被中断的工具循环

1
2
3
4
async def continue_pending(self, *, max_turns: int | None = None) -> AsyncIterator[StreamEvent]:
context = QueryContext(...)
async for event, usage in run_query(context, self._messages):
...

submit_message() 的区别在于:它不追加新的用户消息,直接把当前 _messages(末尾是上次未处理完的 ToolResultBlock)传给 run_query(),让模型从中断处继续推理。

配合 has_pending_continuation() 方法使用,该方法检查对话是否在工具结果处中断:

1
2
3
4
def has_pending_continuation(self) -> bool:
# 最后一条消息是 user 且包含 ToolResultBlock
# 且倒数第二条 assistant 消息有 tool_uses
...

QueryEngine 与 run_query 的职责边界

QueryEngine run_query()
定位 对外 API 层 内核循环层
管理 会话历史 _messages、用量累计 CostTracker while 循环、工具调度
感知 知道这是第几次对话、总共花了多少 token 只知道这一次查询
输入 用户的 strConversationMessage 已组装好的 messages 列表
输出 聚合后的 StreamEvent 原始 (StreamEvent, UsageSnapshot) 元组流
可变状态 _messages_cost_tracker_tool_metadata 仅操作传入的 messages 副本

简单来说,QueryEngine有状态的会话管理器run_query()无状态的循环引擎。前者负责记住过去,后者负责执行当下。

Tools 工具体系

在 Agent Loop 章节中,我们看到 _execute_tool_call() 通过 tool_registry.get(tool_name) 查找工具、调用 tool.execute() 执行。但工具本身是如何定义、注册和组织的?这一章来拆解 OpenHarness 的工具体系。

OpenHarness 的工具体系由三层构成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tools/
├── base.py ← 抽象层:BaseTool / ToolRegistry / ToolResult
├── bash_tool.py ← 内置工具(37 个)
├── file_edit_tool.py
├── grep_tool.py
├── mcp_tool.py ← MCP 适配器:把外部 MCP 工具包装成 BaseTool
├── agent_tool.py ← 子 Agent 派生工具
├── skill_tool.py ← Skill 查询工具(桥接 skills/ 模块)
├── tool_search_tool.py ← 工具自省工具
└── ...

skills/
├── types.py ← SkillDefinition 数据结构
├── registry.py ← SkillRegistry
├── loader.py ← 三源加载:bundled / user / plugin
└── bundled/content/ ← 7 个内置 Skill(.md 文件)

plugins/
├── schemas.py ← PluginManifest(plugin.json 的 schema)
├── types.py ← LoadedPlugin 数据结构
└── loader.py ← 插件发现、加载、解析

抽象层:BaseTool 与 ToolRegistry

BaseTool

所有工具的基类定义在 tools/base.py 中,非常精简:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class BaseTool(ABC):
name: str # 工具名,如 "bash"、"read_file"
description: str # 工具描述,会被注入 LLM 的 tool schema
input_model: type[BaseModel] # Pydantic 模型,定义输入参数

@abstractmethod
async def execute(self, arguments: BaseModel, context: ToolExecutionContext) -> ToolResult:
"""Execute the tool."""

def is_read_only(self, arguments: BaseModel) -> bool:
"""Return whether the invocation is read-only."""
return False

def to_api_schema(self) -> dict[str, Any]:
"""Return the tool schema expected by the Anthropic Messages API."""
return {
"name": self.name,
"description": self.description,
"input_schema": self.input_model.model_json_schema(),
}

几个设计要点:

  • input_model 是一个 Pydantic BaseModel 子类,_execute_tool_call() 在调用 execute() 前会先用 input_model.model_validate(tool_input) 做参数校验,校验失败直接返回 is_error=TrueToolResultBlock,不会进入工具执行
  • is_read_only() 用于权限检查。在 default 模式下,只读工具(如 read_filegrep)直接放行,只有写操作才需要用户确认
  • to_api_schema() 把工具转换为 LLM API 需要的 JSON Schema 格式,input_model.model_json_schema() 自动从 Pydantic 模型生成

工具执行的上下文和返回值也定义在同一文件中:

1
2
3
4
5
6
7
8
9
10
@dataclass
class ToolExecutionContext:
cwd: Path # 当前工作目录
metadata: dict[str, Any] = field(...) # 跨轮次记忆 + 运行时信息

@dataclass(frozen=True)
class ToolResult:
output: str # 输出文本
is_error: bool = False # 是否为错误
metadata: dict[str, Any] = field(...) # 附加元数据(如 returncode)

ToolExecutionContext.metadata 就是上一章提到的 tool_metadata 记忆字典,通过引用传入每个工具的 execute() 方法。

ToolRegistry

工具注册表是一个简单的 dict[str, BaseTool] 包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, BaseTool] = {}

def register(self, tool: BaseTool) -> None:
self._tools[tool.name] = tool

def get(self, name: str) -> BaseTool | None:
return self._tools.get(name)

def list_tools(self) -> list[BaseTool]:
return list(self._tools.values())

def to_api_schema(self) -> list[dict[str, Any]]:
return [tool.to_api_schema() for tool in self._tools.values()]

to_api_schema() 会在每次调用 LLM 时被使用,把所有已注册工具的 schema 一次性传给 API,让模型知道自己可以调用哪些工具。

内置工具

OpenHarness 内置了 37 个工具,在 tools/__init__.pycreate_default_tool_registry() 函数中统一注册。这些工具可以按功能分为几类:

类别 工具 说明
Shell bash 执行 shell 命令,捕获 stdout/stderr
文件操作 read_filefile_writeedit_filenotebook_edit 读、写、编辑文件
搜索 grepglobtool_searchweb_search 正则搜索、文件匹配、工具自省、网页搜索
交互 ask_user_question 向用户提问
Agent agentsend_message 派生子 Agent、跨 Agent 通信
Team team_createteam_delete 创建/删除 Agent 团队
Task task_createtask_gettask_listtask_stoptask_outputtask_update 后台任务管理
Cron cron_createcron_listcron_deletecron_toggle 定时任务管理
模式切换 enter_plan_modeexit_plan_modeenter_worktreeexit_worktree 切换权限模式 / Git worktree
MCP mcp_authlist_mcp_resourcesread_mcp_resource MCP 服务认证与资源访问
其他 skillconfigbriefsleeptodo_writeweb_fetchlspremote_trigger 技能查询、配置、LSP 等

下面挑几个典型工具分析其实现模式。

BashTool — 最典型的工具实现

BashTool 是最能体现工具实现模式的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class BashToolInput(BaseModel):
command: str = Field(description="Shell command to execute")
cwd: str | None = Field(default=None, description="Working directory override")
timeout_seconds: int = Field(default=600, ge=1, le=600)

class BashTool(BaseTool):
name = "bash"
description = "Run a shell command in the local repository."
input_model = BashToolInput

async def execute(self, arguments: BashToolInput, context: ToolExecutionContext) -> ToolResult:
cwd = Path(arguments.cwd).expanduser() if arguments.cwd else context.cwd
process = await create_shell_subprocess(arguments.command, cwd=cwd, ...)

try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=arguments.timeout_seconds,
)
except asyncio.TimeoutError:
await _terminate_process(process, force=True)
return ToolResult(output=f"Command timed out after {arguments.timeout_seconds} seconds", is_error=True)

# ... merge stdout + stderr, truncate to 12000 chars ...
return ToolResult(output=text, is_error=process.returncode != 0, metadata={"returncode": process.returncode})

几个值得注意的防御性设计:

  • 超时保护:通过 asyncio.wait_for + timeout_seconds 参数(默认 600 秒),超时后强制 kill 进程
  • 输出截断:超过 12000 字符时截断并追加 ...[truncated]...,防止巨量输出撑爆上下文
  • 取消安全:捕获 asyncio.CancelledError,确保进程被正确终止后再 re-raise
  • 错误判定:通过 process.returncode != 0 自动标记 is_error

GrepTool — 双引擎降级策略

GrepTool 展示了一种有趣的双引擎降级模式:

1
2
3
4
5
6
7
8
async def execute(self, arguments, context):
# 优先使用 ripgrep(性能好)
matches = await _rg_grep(root=root, pattern=arguments.pattern, ...)
if matches is not None:
return _format_rg_result(matches, arguments.timeout_seconds)

# ripgrep 不可用时,降级到纯 Python 实现
return ToolResult(output=_python_grep_files(paths=root.glob(arguments.file_glob), ...))

它先尝试调用系统安装的 ripgrep(通过 shutil.which("rg") 检测),如果不可用则降级到纯 Python 的逐文件正则匹配。这保证了工具在任何环境下都能工作,同时在有 ripgrep 的环境下获得最佳性能。

FileEditTool — 沙箱路径校验

FileEditTool 展示了沙箱安全机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async def execute(self, arguments, context):
path = _resolve_path(context.cwd, arguments.path)

# 如果在 Docker 沙箱中运行,校验路径是否在允许范围内
if is_docker_sandbox_active():
allowed, reason = validate_sandbox_path(path, context.cwd)
if not allowed:
return ToolResult(output=f"Sandbox: {reason}", is_error=True)

if arguments.old_str not in original:
return ToolResult(output="old_str was not found in the file", is_error=True)

updated = original.replace(arguments.old_str, arguments.new_str, 1)
path.write_text(updated, encoding="utf-8")
return ToolResult(output=f"Updated {path}")

当 OpenHarness 运行在 Docker 沙箱模式下时,所有文件操作工具都会先通过 validate_sandbox_path() 校验路径是否在允许范围内,防止 LLM 通过工具逃逸到宿主机文件系统。

MCP 工具适配器

OpenHarness 通过 McpToolAdapter 把外部 MCP(Model Context Protocol)服务暴露的工具包装成标准的 BaseTool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class McpToolAdapter(BaseTool):
def __init__(self, manager: McpClientManager, tool_info: McpToolInfo) -> None:
self._manager = manager
self._tool_info = tool_info
# 命名规则:mcp__{server}__{tool}
self.name = f"mcp__{server_segment}__{tool_segment}"
self.description = tool_info.description or f"MCP tool {tool_info.name}"
# 从 JSON Schema 动态生成 Pydantic 模型
self.input_model = _input_model_from_schema(self.name, tool_info.input_schema)

async def execute(self, arguments, context):
output = await self._manager.call_tool(
self._tool_info.server_name,
self._tool_info.name,
arguments.model_dump(mode="json", exclude_none=True),
)
return ToolResult(output=output)

关键设计:

  • 动态模型生成_input_model_from_schema() 根据 MCP 工具的 JSON Schema 动态创建 Pydantic 模型,使得外部工具也能享受参数校验
  • 命名隔离:使用 mcp__{server}__{tool} 的命名规则,避免与内置工具冲突
  • 注册时机:在 create_default_tool_registry() 中,如果传入了 mcp_manager,会遍历所有 MCP 工具并逐一注册

Skills 技能系统

Skills 和 Tools 是两个不同层次的概念。Tool 是可执行的函数(如 bashread_file),而 Skill 是一段预定义的 Markdown 文本(如 “如何做 code review”、“如何写测试”),通过 skill 工具读取后注入 LLM 上下文,指导模型的行为模式。

SkillDefinition

Skill 的数据结构非常简单:

1
2
3
4
5
6
7
@dataclass(frozen=True)
class SkillDefinition:
name: str # 技能名,如 "review"、"debug"
description: str # 简短描述
content: str # 完整的 Markdown 内容
source: str # 来源:"bundled" / "user" / "plugin"
path: str | None # 文件路径

三源加载

load_skill_registry() 从三个来源加载技能,按优先级依次注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def load_skill_registry(cwd, *, extra_skill_dirs, extra_plugin_roots, settings):
registry = SkillRegistry()
# 1. 内置技能(bundled/content/*.md)
for skill in get_bundled_skills():
registry.register(skill)
# 2. 用户自定义技能(~/.openharness/skills/<name>/SKILL.md)
for skill in load_user_skills():
registry.register(skill)
# 3. 插件贡献的技能
for plugin in load_plugins(settings, cwd, extra_roots=extra_plugin_roots):
if plugin.enabled:
for skill in plugin.skills:
registry.register(skill)
return registry

OpenHarness 内置了 7 个技能:commitdebugdiagnoseplanreviewsimplifytest,都是 Markdown 文件,存放在 skills/bundled/content/ 目录下。

用户可以在 ~/.openharness/skills/ 下创建自定义技能,目录结构为 <skill-name>/SKILL.md,支持 YAML frontmatter 定义 namedescription

SkillTool — 桥接工具

SkillTool 是 Tools 和 Skills 之间的桥梁。当 LLM 调用 skill 工具时,它会从 SkillRegistry 中查找对应的技能并返回其 Markdown 内容:

1
2
3
4
5
6
7
8
9
10
class SkillTool(BaseTool):
name = "skill"
description = "Read a bundled, user, or plugin skill by name."

async def execute(self, arguments, context):
registry = load_skill_registry(context.cwd, ...)
skill = registry.get(arguments.name)
if skill is None:
return ToolResult(output=f"Skill not found: {arguments.name}", is_error=True)
return ToolResult(output=skill.content)

这意味着 LLM 可以在需要时主动"查阅手册"——比如在做 code review 前先调用 skill("review") 获取 review 指南,然后按照指南执行。

Plugins 插件系统

Plugins 是 OpenHarness 的最外层扩展机制,一个插件可以同时贡献 Skills、Commands、Agents、Hooks 和 MCP 服务。

插件结构

一个标准的插件目录结构为:

1
2
3
4
5
6
7
8
9
10
11
my-plugin/
├── plugin.json ← 插件清单(PluginManifest)
├── skills/ ← 贡献的技能
│ └── my-skill/
│ └── SKILL.md
├── commands/ ← 贡献的斜杠命令
│ └── my-command.md
├── agents/ ← 贡献的 Agent 定义
│ └── my-agent.md
├── hooks.json ← 贡献的钩子
└── mcp.json ← 贡献的 MCP 服务配置

其中 plugin.json 是插件的入口清单:

1
2
3
4
5
6
7
8
9
10
class PluginManifest(BaseModel):
name: str # 插件名
version: str = "0.0.0" # 版本
description: str = "" # 描述
enabled_by_default: bool = True # 是否默认启用
skills_dir: str = "skills" # 技能目录
hooks_file: str = "hooks.json" # 钩子配置文件
mcp_file: str = "mcp.json" # MCP 配置文件
commands: str | list | dict | None = None # 命令定义
agents: str | list | None = None # Agent 定义

插件发现与加载

load_plugins() 从两个位置发现插件:

1
2
3
4
5
6
7
def discover_plugin_paths(cwd, extra_roots):
roots = [
get_user_plugins_dir(), # ~/.openharness/plugins/
get_project_plugins_dir(cwd), # <project>/.openharness/plugins/
]
# + extra_roots(额外指定的目录)
...

加载过程会解析 plugin.json,然后分别加载插件贡献的各类资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
def load_plugin(path, enabled_plugins):
manifest = PluginManifest.model_validate_json(manifest_path.read_text(...))
enabled = enabled_plugins.get(manifest.name, manifest.enabled_by_default)

skills = _load_plugin_skills(path / manifest.skills_dir)
commands = _load_plugin_commands(path, manifest)
agents = _load_plugin_agents(path, manifest)
hooks = _load_plugin_hooks(path / manifest.hooks_file)
mcp = _load_plugin_mcp(path / manifest.mcp_file)

return LoadedPlugin(manifest=manifest, path=path, enabled=enabled,
skills=skills, commands=commands, agents=agents,
hooks=hooks, mcp_servers=mcp)

最终,LoadedPlugin 是一个聚合了所有贡献物的数据容器:

1
2
3
4
5
6
7
8
9
10
@dataclass(frozen=True)
class LoadedPlugin:
manifest: PluginManifest
path: Path
enabled: bool
skills: list[SkillDefinition] # 贡献的技能
commands: list[PluginCommandDefinition] # 贡献的斜杠命令
agents: list[AgentDefinition] # 贡献的 Agent 定义
hooks: dict[str, list] # 贡献的钩子
mcp_servers: dict[str, McpServerConfig] # 贡献的 MCP 服务

Memory / Session Resume

Agent 的一次对话可能持续数十轮,上下文窗口终究有限;而用户关掉终端后,下次还想从断点继续。OpenHarness 围绕这两个核心矛盾,构建了一套三层记忆体系:运行时记忆(tool_metadata)、上下文压缩(Compact)、会话持久化(Session Storage)。三者分别解决"当前轮次记住什么"、“上下文快满了怎么办”、"关掉再打开怎么恢复"的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
memory/
├── paths.py ← 持久化记忆目录路径
├── memdir.py ← MEMORY.md 提示词注入
├── manager.py ← 记忆文件 CRUD
├── scan.py ← 记忆文件扫描与元数据解析
├── search.py ← 启发式记忆检索
└── types.py ← MemoryHeader 数据结构

services/
├── compact/__init__.py ← 上下文压缩:microcompact + full LLM summarization
├── session_storage.py ← 会话快照持久化
└── session_backend.py ← 会话后端抽象接口

personalization/
├── extractor.py ← 正则模式提取环境事实
├── rules.py ← 本地规则文件管理
└── session_hook.py ← 会话结束时的记忆提取钩子

第一层:运行时记忆 tool_metadata

在 Agent Loop 章节中我们已经介绍过 tool_metadata 的结构和写入机制。这里从更宏观的视角审视它在记忆体系中的定位。

tool_metadata 是一个纯内存字典,生命周期等于一次 QueryEngine 实例的存活期。它通过引用共享给 QueryContext,在 run_query() 内核中被 _record_tool_carryover() 持续更新,又通过 ToolExecutionContext.metadata 注入每个工具的 execute() 方法。

其核心设计思想是有界 LRU:每个记忆桶都有容量上限(如 read_file_state 最多 6 条、recent_work_log 最多 10 条),超限时删除最旧的条目。这保证了记忆的时效性——Agent 总是记住最近做过什么,而不是被远古历史淹没。

tool_metadata 有一个天然局限:它只存在于内存中,进程退出即消失。为了跨会话保留关键状态,OpenHarness 在会话持久化时会选择性序列化其中的部分字段:

1
2
3
4
5
6
7
8
9
10
11
_PERSISTED_TOOL_METADATA_KEYS = (
"permission_mode",
"read_file_state",
"invoked_skills",
"async_agent_state",
"recent_work_log",
"recent_verified_work",
"task_focus_state",
"compact_checkpoints",
"compact_last",
)

只有这 9 个键会被写入磁盘。像 permission_mode(当前权限模式)、task_focus_state(当前工作焦点)这类对恢复上下文至关重要的字段被保留,而临时性的中间状态则被丢弃。

第二层:上下文压缩 Compact

当对话历史的 token 数逼近上下文窗口时,必须在"丢失信息"和"无法继续"之间做出取舍。OpenHarness 的压缩系统采用渐进式四级降级策略,从最廉价的操作开始,逐级升级到最昂贵的 LLM 摘要:

第 1 级:Microcompact

1
2
3
4
def microcompact_messages(messages, *, keep_recent=5):
# 找到所有可压缩工具(read_file, bash, grep 等)的 ToolResultBlock
# 保留最近 5 个,其余替换为 "[Old tool result content cleared]"
...

Microcompact 是零 LLM 调用的纯规则操作。它只清空旧的 ToolResultBlock 内容,保留最近 5 个工具结果不动。这是因为旧的 bash 输出、grep 结果等在后续推理中价值极低,清掉它们可以廉价地释放大量 token。

可压缩的工具类型被硬编码在 COMPACTABLE_TOOLS 集合中:

1
2
3
4
COMPACTABLE_TOOLS = frozenset({
"read_file", "bash", "grep", "glob",
"web_search", "web_fetch", "edit_file", "write_file",
})

第 2 级:Context Collapse

1
2
3
4
def try_context_collapse(messages, *, preserve_recent):
# 对较旧的消息中的超长文本块做头尾截断
# 保留前 900 字符 + 后 500 字符,中间用 "...[collapsed N chars]..." 替代
...

如果 Microcompact 后 token 仍然超标,进入 Context Collapse。这一步对较旧消息中的超长 TextBlock 做确定性截断:保留头部 900 字符和尾部 500 字符,中间折叠。同样不需要 LLM 调用。

第 3 级:Session Memory

1
2
3
4
def try_session_memory_compaction(messages, *, preserve_recent=12, trigger, metadata):
# 把较旧的消息压缩为一条 "Session memory summary" 消息
# 每条消息只保留 160 字符的摘要
...

Session Memory 是一种确定性摘要:遍历较旧的消息,每条提取 160 字符的文本摘要(或工具调用名称),拼接成一条合成的 "Session memory summary" 消息。总量限制在 48 行 / 4000 字符以内。这一步仍然不调用 LLM,但信息损失比前两级更大。

第 4 级:Full Compact(LLM 摘要)

当前三级都不够时,触发完整的 LLM 摘要。其流程为:

1
2
3
4
5
6
7
8
9
10
11
1. Microcompact(先做一轮廉价清理)

2. 分割:older(待摘要)+ newer(保留最近 6 条)

3. 构造 compact prompt,发送 older + prompt 给 LLM

4. LLM 返回 <analysis> + <summary> 结构化摘要

5. 提取 <summary>,丢弃 <analysis>(思考过程)

6. 重建消息列表:boundary_marker + summary + newer + attachments

Compact prompt 要求 LLM 输出两个 XML 块:<analysis> 是思考过程(会被丢弃),<summary> 是结构化摘要(会被保留)。摘要必须包含 9 个章节:用户请求、技术概念、文件与代码、错误与修复、问题解决、用户原始消息、待办任务、当前工作、建议下一步。

Compact Attachments

Full Compact 的一个精妙设计是 Compact Attachments——在压缩时从 tool_metadata 中提取关键状态,作为独立的"附件消息"注入到压缩后的消息列表中:

1
2
3
4
5
6
7
8
9
10
11
def _build_compact_attachments(messages, *, metadata):
return [
create_task_focus_attachment_if_needed(metadata), # 当前工作焦点
create_recent_verified_work_attachment_if_needed(...), # 已验证的工作
_create_recent_attachments_attachment_if_needed(...), # 本地附件路径
create_recent_files_attachment_if_needed(...), # 最近读取的文件
create_plan_attachment_if_needed(metadata), # Plan 模式状态
create_invoked_skills_attachment_if_needed(...), # 已调用的技能
create_async_agent_attachment_if_needed(...), # 异步 Agent 状态
create_work_log_attachment_if_needed(...), # 工作日志
]

这些 Attachment 确保了压缩后模型不会"失忆"——即使旧消息被摘要替代,关键的工作状态仍然以结构化形式保留在上下文中。

触发时机

压缩有三种触发方式:

触发方式 时机 行为
auto 每轮循环开始前,_stream_compaction(trigger="auto") 检查 token 是否超过阈值,超过则自动触发
reactive API 返回 prompt too long 错误时 强制压缩后 continue 重试当前轮
manual 用户执行 /compact 命令 直接调用 compact_conversation()

Auto-compact 的阈值计算为:

1
threshold = context_window - max_output_tokens(20000) - buffer(13000)

例如对于 200K 上下文窗口的模型,阈值约为 167K token。

容错机制

压缩过程本身也可能失败(比如摘要请求也超出上下文),OpenHarness 设计了多重容错:

  • PTL Retry:如果摘要请求本身 prompt too long,调用 truncate_head_for_ptl_retry() 丢弃最旧的 1/5 对话轮次后重试,最多重试 3 次
  • 流式重试:摘要调用超时(25 秒)或异常时,最多重试 2 次
  • 连续失败上限AutoCompactState.consecutive_failures 达到 3 次后停止尝试,避免死循环
  • Passthrough 降级:如果所有压缩手段都失败,返回原始消息不做任何修改

第三层:会话持久化 Session Storage

会话持久化解决的是跨进程的记忆问题——用户关掉终端后,下次启动时能恢复到上次的对话状态。

存储结构

每个项目的会话存储在 ~/.openharness/data/sessions/<project-name>-<hash>/ 目录下:

1
2
3
def get_project_session_dir(cwd):
digest = sha1(str(Path(cwd).resolve()).encode("utf-8")).hexdigest()[:12]
return get_sessions_dir() / f"{path.name}-{digest}"

目录名由项目名 + 路径哈希前 12 位组成,保证不同项目的会话互不干扰。目录下包含:

  • latest.json:最近一次会话快照
  • session-<id>.json:按 ID 命名的历史快照

快照内容

每次保存的快照包含完整的会话恢复所需信息:

1
2
3
4
5
6
7
8
9
10
11
12
payload = {
"session_id": sid,
"cwd": str(Path(cwd).resolve()),
"model": model,
"system_prompt": system_prompt,
"messages": [message.model_dump(mode="json") for message in messages],
"usage": usage.model_dump(),
"tool_metadata": _persistable_tool_metadata(tool_metadata),
"created_at": now,
"summary": summary, # 第一条用户消息的前 80 字符
"message_count": len(messages),
}

注意 messages 是完整序列化的——每条 ConversationMessage 通过 Pydantic 的 model_dump(mode="json") 转为 JSON,包括所有的 TextBlockToolUseBlockToolResultBlock。这意味着恢复时可以精确还原对话历史,模型看到的上下文与中断前完全一致。

恢复流程

CLI 提供两种恢复方式:

1
2
3
4
5
6
7
8
# 恢复最近一次会话
openharness --continue

# 按 ID 恢复指定会话
openharness --resume <session_id>

# 交互式选择
openharness --resume

恢复的核心逻辑在 CLI 中:

1
2
3
4
5
6
7
8
9
10
11
if continue_session:
session_data = load_session_snapshot(cwd) # 加载 latest.json
elif resume:
session_data = load_session_by_id(cwd, resume) # 加载 session-<id>.json

# 传给 REPL
run_repl(
restore_messages=session_data.get("messages"),
restore_tool_metadata=session_data.get("tool_metadata"),
model=session_data.get("model") or model,
)

REPL 收到 restore_messages 后,通过 QueryEngine.load_messages() 直接替换内存中的会话历史,restore_tool_metadata 则恢复运行时记忆字典。恢复完成后,has_pending_continuation() 会检查对话是否在工具结果处中断——如果是,自动调用 continue_pending() 让模型从断点继续推理。

Session Backend 抽象

会话存储通过 SessionBackend Protocol 做了抽象:

1
2
3
4
5
6
class SessionBackend(Protocol):
def save_snapshot(self, *, cwd, model, system_prompt, messages, usage, ...) -> Path: ...
def load_latest(self, cwd) -> dict | None: ...
def list_snapshots(self, cwd, limit=20) -> list[dict]: ...
def load_by_id(self, cwd, session_id) -> dict | None: ...
def export_markdown(self, *, cwd, messages) -> Path: ...

默认实现 OpenHarnessSessionBackend 基于本地文件系统,但这个 Protocol 设计意味着可以轻松替换为数据库、云存储等后端。

持久化项目记忆 MEMORY.md

除了会话级别的持久化,OpenHarness 还提供了项目级别的持久化记忆——MEMORY.md 系统。

存储结构

每个项目的持久化记忆存储在 ~/.openharness/data/memory/<project-name>-<hash>/ 目录下:

1
2
3
def get_project_memory_dir(cwd):
digest = sha1(str(Path(cwd).resolve()).encode("utf-8")).hexdigest()[:12]
return get_data_dir() / "memory" / f"{path.name}-{digest}"

目录下包含:

  • MEMORY.md:记忆索引文件,列出所有记忆条目
  • <slug>.md:各个记忆文件,支持 YAML frontmatter

记忆管理

memory/manager.py 提供了记忆文件的 CRUD 操作:

1
2
3
4
5
6
7
8
def add_memory_entry(cwd, title, content) -> Path:
# 1. 将 title 转为 slug(如 "SSH Config" → "ssh_config")
# 2. 写入 <slug>.md
# 3. 在 MEMORY.md 中追加索引条目 "- [title](slug.md)"

def remove_memory_entry(cwd, name) -> bool:
# 1. 删除对应的 .md 文件
# 2. 从 MEMORY.md 中移除索引行

记忆注入

在每次对话开始时,load_memory_prompt() 会将 MEMORY.md 的内容注入系统提示词:

1
2
3
4
5
6
7
8
9
10
def load_memory_prompt(cwd, *, max_entrypoint_lines=200):
lines = [
"# Memory",
f"- Persistent memory directory: {memory_dir}",
"- Use this directory to store durable user or project context.",
]
if entrypoint.exists():
content_lines = entrypoint.read_text().splitlines()[:200]
lines.extend(["", "## MEMORY.md", "```md", *content_lines, "```"])
return "\n".join(lines)

这意味着 Agent 在每次对话中都能"看到"项目的持久化记忆,并且可以通过工具主动读写记忆文件。

记忆检索

memory/search.py 提供了基于关键词的启发式检索:

1
2
3
4
5
def find_relevant_memories(query, cwd, *, max_results=5):
for header in scan_memory_files(cwd):
meta_hits = sum(1 for t in tokens if t in meta) # 元数据匹配权重 2x
body_hits = sum(1 for t in tokens if t in body) # 正文匹配权重 1x
score = meta_hits * 2.0 + body_hits

检索时,元数据(标题 + 描述)的匹配权重是正文的 2 倍,确保标注良好的记忆优先浮出。支持 ASCII 词和汉字的混合分词。

个性化:会话结束时的事实提取

OpenHarness 还有一个巧妙的被动学习机制:在每次会话结束时,session_hook.py 会自动从对话历史中提取环境相关的事实。

1
2
3
4
5
6
7
def update_rules_from_session(messages):
combined = "\n".join(all_text_from_messages)
new_facts = extract_facts_from_text(combined) # 正则模式匹配
existing = load_facts()
merged = merge_facts(existing, new_facts)
save_facts(merged) # 持久化到 facts.json
save_local_rules(facts_to_rules_markdown(merged)) # 生成 rules.md

extractor.py 中定义了 10 种正则模式,用于捕获:

类型 示例
SSH 主机 ssh user@192.168.1.1
IP 地址 10.0.0.5
数据路径 /mnt/data/landing/...
Conda 环境 conda activate myenv
Python 版本 Python 3.11.4
API 端点 https://api.example.com/v1/
环境变量 export CUDA_VISIBLE_DEVICES=0,1
Git 仓库 github.com/org/repo
Ray 集群 ray start --address 10.0.0.1:6379
Cron 表达式 0 3 * * * /path/to/script

提取的事实通过 key 去重(如 ssh_host:user@192.168.1.1),新事实的 confidence 更高时覆盖旧值。最终生成的 rules.md 会被注入后续会话的系统提示词,让 Agent 自动感知用户的工作环境。

三层记忆的协作关系

层次 存储介质 生命周期 写入时机 读取时机
tool_metadata 内存字典 单次 QueryEngine 实例 每次工具执行后 每次工具执行时注入
Compact 内存(消息列表重建) 单次对话 token 超阈值时 压缩后的下一轮推理
Session Storage 磁盘 JSON 跨进程 会话结束 / 手动保存 --continue / --resume
MEMORY.md 磁盘 Markdown 跨会话(永久) Agent 主动写入 每次对话注入系统提示词
Personalization 磁盘 JSON + Markdown 跨会话(永久) 会话结束时自动提取 后续会话注入系统提示词

简单来说,tool_metadata工作记忆(Working Memory),Compact 是短期记忆的压缩策略(Short-term Memory Compression),Session Storage 是情景记忆(Episodic Memory),MEMORY.md语义记忆(Semantic Memory),Personalization 是程序性记忆(Procedural Memory)。五者协同,让 Agent 在有限的上下文窗口内尽可能多地保留有用信息,同时支持跨会话的连续性。

Permissions & Hooks

LLM 驱动的 Agent 拥有执行 shell 命令、编辑文件、发起网络请求等能力,这意味着一次错误的工具调用可能造成不可逆的后果。OpenHarness 在 Agent Loop 的工具执行路径上设置了两道关卡:Permission(权限检查)Hook(生命周期钩子)。前者是一套确定性的规则引擎,决定某个工具调用能不能跑;后者是一套可扩展的事件总线,允许外部逻辑在关键节点介入。

1
2
3
4
5
6
7
8
9
10
11
12
13
permissions/
├── __init__.py ← 延迟导入入口
├── modes.py ← PermissionMode 枚举(DEFAULT / PLAN / FULL_AUTO)
└── checker.py ← PermissionChecker 规则引擎

hooks/
├── __init__.py ← 延迟导入入口
├── events.py ← HookEvent 枚举(6 种生命周期事件)
├── schemas.py ← 4 种 Hook 定义(Command / Prompt / Http / Agent)
├── loader.py ← HookRegistry 注册表 + 加载函数
├── executor.py ← HookExecutor 执行引擎
├── types.py ← HookResult / AggregatedHookResult
└── hot_reload.py ← 配置文件变更时热重载

权限模式 PermissionMode

OpenHarness 定义了三种权限模式,通过 PermissionMode 枚举表示:

1
2
3
4
class PermissionMode(str, Enum):
DEFAULT = "default" # 只读工具自动放行,变更类工具需用户确认
PLAN = "plan" # 只读工具自动放行,变更类工具直接拒绝
FULL_AUTO = "full_auto" # 所有工具自动放行

三种模式的设计对应三种使用场景:

模式 适用场景
DEFAULT 日常交互开发
PLAN 纯规划 / 代码审查
FULL_AUTO CI/CD、沙箱环境

模式可以通过 CLI 参数 --permission-mode、配置文件 permission.mode、或运行时 /permissions 命令动态切换。--dangerously-skip-permissions 标志会强制设为 FULL_AUTO,仅建议在沙箱环境中使用。

权限配置 PermissionSettings

权限模式只是最粗粒度的控制。PermissionSettings 提供了更细粒度的规则:

1
2
3
4
5
6
class PermissionSettings(BaseModel):
mode: PermissionMode = PermissionMode.DEFAULT
allowed_tools: list[str] = [] # 白名单:无论模式如何,始终放行
denied_tools: list[str] = [] # 黑名单:无论模式如何,始终拒绝
path_rules: list[PathRuleConfig] = [] # 路径级 glob 规则
denied_commands: list[str] = [] # 命令级 glob 拒绝模式

这四个列表构成了一套优先级递减的规则链:黑名单 > 白名单 > 路径规则 > 命令规则 > 模式默认行为。

PermissionChecker 决策引擎

PermissionChecker.evaluate() 是权限系统的核心,它接收工具名、只读标志、文件路径和命令字符串,返回一个 PermissionDecision

1
2
3
4
5
@dataclass(frozen=True)
class PermissionDecision:
allowed: bool
requires_confirmation: bool = False # 仅 DEFAULT 模式下为 True
reason: str = ""

决策流程按以下顺序逐级判定,命中即返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. 敏感路径硬编码拦截(SENSITIVE_PATH_PATTERNS)
↓ 未命中
2. denied_tools 黑名单
↓ 未命中
3. allowed_tools 白名单
↓ 未命中
4. path_rules 路径级 glob 匹配
↓ 未命中
5. denied_commands 命令级 glob 匹配
↓ 未命中
6. FULL_AUTO → 放行
↓ 非 FULL_AUTO
7. is_read_only → 放行
↓ 非只读
8. PLAN → 拒绝
↓ 非 PLAN
9. DEFAULT → requires_confirmation=True(需用户确认)

敏感路径硬编码保护

决策链的第一步是不可覆盖的硬编码保护。无论用户如何配置权限模式或白名单,以下路径始终被拒绝:

1
2
3
4
5
6
7
8
9
10
11
SENSITIVE_PATH_PATTERNS: tuple[str, ...] = (
"*/.ssh/*", # SSH 密钥
"*/.aws/credentials", "*/.aws/config", # AWS 凭证
"*/.config/gcloud/*", # GCP 凭证
"*/.azure/*", # Azure 凭证
"*/.gnupg/*", # GPG 密钥
"*/.docker/config.json", # Docker 凭证
"*/.kube/config", # Kubernetes 凭证
"*/.openharness/credentials.json", # OpenHarness 自身凭证
"*/.openharness/copilot_auth.json",
)

这是一道纵深防御(Defence in Depth)措施,即使 LLM 被 prompt injection 诱导去读取 ~/.ssh/id_rsa,权限系统也会在工具执行前将其拦截。使用 fnmatch 做 glob 匹配,同时对路径做 rstrip("/") + 追加 / 的双重匹配,确保目录级工具(如 grep --root /home/user/.ssh)也能被正确拦截。

路径解析的两层探测

在调用 evaluate() 之前,_execute_tool_call() 会通过 _resolve_permission_file_path() 提取文件路径。这个函数采用两层探测策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _resolve_permission_file_path(cwd, raw_input, parsed_input):
# 第一层:从原始 dict 中查找
for key in ("file_path", "path", "root"):
value = raw_input.get(key)
if isinstance(value, str) and value.strip():
return str((cwd / Path(value).expanduser()).resolve())

# 第二层:从 Pydantic model 属性中查找
for attr in ("file_path", "path", "root"):
value = getattr(parsed_input, attr, None)
if isinstance(value, str) and value.strip():
return str((cwd / Path(value).expanduser()).resolve())

return None

先查 raw_input(原始 JSON dict),再查 parsed_input(Pydantic model 实例)。这是因为不同工具的参数命名可能不一致——read_filefile_pathgreprootbash 则没有路径参数。两层探测确保了路径规则能覆盖所有工具类型。

命令提取 _extract_permission_command() 同理,先查 raw_input["command"],再查 parsed_input.command

权限确认交互

PermissionDecision.requires_confirmation == True(DEFAULT 模式下的变更工具),_execute_tool_call() 会调用 context.permission_prompt 回调:

1
2
3
4
if decision.requires_confirmation and context.permission_prompt is not None:
confirmed = await context.permission_prompt(tool_name, decision.reason)
if not confirmed:
return ToolResultBlock(content=f"Permission denied for {tool_name}", is_error=True)

permission_prompt 是一个可注入的异步函数。在 TUI 模式下,它由 ui/permission_dialog.py 提供:

1
2
3
4
5
6
async def ask_permission(tool_name: str, reason: str) -> bool:
session = PromptSession()
response = await session.prompt_async(
f"Allow tool '{tool_name}'? [{reason}] [y/N]: "
)
return response.strip().lower() in {"y", "yes"}

在 API 模式或 FULL_AUTO 模式下,permission_promptNone,此时 requires_confirmation 的决策会直接变为拒绝。这个设计确保了无人值守环境下不会因为等待用户输入而挂起。

Hook 事件体系

Hooks 是 OpenHarness 的可扩展拦截点。与 Permission 的确定性规则不同,Hooks 允许用户注入任意逻辑——shell 脚本、HTTP 回调、甚至 LLM 判断。

6 种生命周期事件

1
2
3
4
5
6
7
class HookEvent(str, Enum):
SESSION_START = "session_start" # 会话启动时
SESSION_END = "session_end" # 会话结束时
PRE_COMPACT = "pre_compact" # 上下文压缩前
POST_COMPACT = "post_compact" # 上下文压缩后
PRE_TOOL_USE = "pre_tool_use" # 工具执行前
POST_TOOL_USE = "post_tool_use" # 工具执行后

这 6 个事件覆盖了 Agent 生命周期的关键节点。其中 PRE_TOOL_USEPOST_TOOL_USE_execute_tool_call() 内部触发,SESSION_STARTSESSION_ENDui/runtime.pystart_runtime() / close_runtime() 中触发,PRE_COMPACTPOST_COMPACTcompact/__init__.py 的压缩流程中触发。

4 种 Hook 类型

每种 Hook 类型对应一种执行方式:

1. CommandHookDefinition — Shell 命令

1
2
3
4
5
6
class CommandHookDefinition(BaseModel):
type: Literal["command"] = "command"
command: str # shell 命令模板
timeout_seconds: int = 30 # 超时(1-600 秒)
matcher: str | None = None # glob 匹配过滤
block_on_failure: bool = False # 失败时是否阻断

执行时,$ARGUMENTS 占位符会被替换为 JSON 序列化的 payload(经 shlex.quote 转义)。环境变量 OPENHARNESS_HOOK_EVENTOPENHARNESS_HOOK_PAYLOAD 也会注入子进程。以 returncode == 0 判定成功。

2. HttpHookDefinition — HTTP 回调

1
2
3
4
5
6
7
class HttpHookDefinition(BaseModel):
type: Literal["http"] = "http"
url: str # POST 目标 URL
headers: dict[str, str] = {} # 自定义请求头
timeout_seconds: int = 30
matcher: str | None = None
block_on_failure: bool = False

向指定 URL 发送 POST {"event": "...", "payload": {...}},以 HTTP 2xx 判定成功。适合与外部审计系统、Slack 通知等集成。

3. PromptHookDefinition — LLM 轻量验证

1
2
3
4
5
6
7
class PromptHookDefinition(BaseModel):
type: Literal["prompt"] = "prompt"
prompt: str # 验证提示词模板
model: str | None = None # 可指定模型,默认用当前模型
timeout_seconds: int = 30
matcher: str | None = None
block_on_failure: bool = True # 默认阻断

将 payload 注入 prompt 模板后发送给 LLM,要求返回 {"ok": true}{"ok": false, "reason": "..."}。系统提示词要求模型做简短判断。

4. AgentHookDefinition — LLM 深度验证

1
2
3
4
5
6
7
class AgentHookDefinition(BaseModel):
type: Literal["agent"] = "agent"
prompt: str
model: str | None = None
timeout_seconds: int = 60 # 更长的超时
matcher: str | None = None
block_on_failure: bool = True

与 Prompt 类型类似,但系统提示词额外要求模型"更彻底地推理 payload 后再决定",且默认超时更长。适合需要深度分析的安全审查场景。

Matcher 过滤

每种 Hook 都支持可选的 matcher 字段,使用 fnmatch 语法对事件 payload 做过滤:

1
2
3
4
5
6
def _matches_hook(hook, payload):
matcher = getattr(hook, "matcher", None)
if not matcher:
return True # 无 matcher 则匹配所有事件
subject = str(payload.get("tool_name") or payload.get("prompt") or payload.get("event") or "")
return fnmatch.fnmatch(subject, matcher)

例如,配置 matcher: "bash" 的 Hook 只会在 bash 工具调用时触发,而不会干扰 read_filegrep

HookRegistry 与加载

HookRegistry 是一个按事件分组的注册表:

1
2
3
4
5
6
7
8
9
class HookRegistry:
def __init__(self):
self._hooks: dict[HookEvent, list[HookDefinition]] = defaultdict(list)

def register(self, event: HookEvent, hook: HookDefinition) -> None:
self._hooks[event].append(hook)

def get(self, event: HookEvent) -> list[HookDefinition]:
return list(self._hooks.get(event, []))

加载函数 load_hook_registry() 从两个来源收集 Hook 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def load_hook_registry(settings, plugins=None) -> HookRegistry:
registry = HookRegistry()
# 来源 1:settings.hooks(用户配置文件)
for raw_event, hooks in settings.hooks.items():
event = HookEvent(raw_event)
for hook in hooks:
registry.register(event, hook)
# 来源 2:plugins[].hooks(插件贡献)
for plugin in plugins or []:
if plugin.enabled:
for raw_event, hooks in plugin.hooks.items():
event = HookEvent(raw_event)
for hook in hooks:
registry.register(event, hook)
return registry

这意味着 Hook 既可以通过项目配置文件(.openharness/settings.json)定义,也可以通过插件系统注入。

HookExecutor 执行引擎

HookExecutor 是 Hook 的运行时执行器,持有 HookRegistryHookExecutionContext

1
2
3
4
5
@dataclass
class HookExecutionContext:
cwd: Path
api_client: SupportsStreamingMessages # 用于 Prompt/Agent 类型的 LLM 调用
default_model: str

execute() 方法遍历指定事件的所有 Hook,逐个执行并收集结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def execute(self, event: HookEvent, payload: dict) -> AggregatedHookResult:
results = []
for hook in self._registry.get(event):
if not _matches_hook(hook, payload):
continue
if isinstance(hook, CommandHookDefinition):
results.append(await self._run_command_hook(hook, event, payload))
elif isinstance(hook, HttpHookDefinition):
results.append(await self._run_http_hook(hook, event, payload))
elif isinstance(hook, PromptHookDefinition):
results.append(await self._run_prompt_like_hook(hook, event, payload, agent_mode=False))
elif isinstance(hook, AgentHookDefinition):
results.append(await self._run_prompt_like_hook(hook, event, payload, agent_mode=True))
return AggregatedHookResult(results=results)

聚合结果与阻断

AggregatedHookResult 聚合所有 Hook 的执行结果,关键属性是 blocked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dataclass(frozen=True)
class AggregatedHookResult:
results: list[HookResult]

@property
def blocked(self) -> bool:
return any(result.blocked for result in self.results)

@property
def reason(self) -> str:
for result in self.results:
if result.blocked:
return result.reason or result.output
return ""

只要任意一个 Hook 的 blocked == True,整个事件就被阻断。在 _execute_tool_call() 中,PRE_TOOL_USE 的阻断会直接返回错误的 ToolResultBlock,工具不会被执行:

1
2
3
4
5
6
7
8
9
10
if context.hook_executor is not None:
pre_hooks = await context.hook_executor.execute(
HookEvent.PRE_TOOL_USE,
{"tool_name": tool_name, "tool_input": tool_input, ...},
)
if pre_hooks.blocked:
return ToolResultBlock(
content=pre_hooks.reason or f"pre_tool_use hook blocked {tool_name}",
is_error=True,
)

POST_TOOL_USE 的结果不影响工具执行(工具已经跑完了),仅用于审计和通知。

热重载 HookReloader

HookReloader 提供了配置文件变更时的自动重载能力:

1
2
3
4
5
6
7
8
9
10
11
class HookReloader:
def __init__(self, settings_path: Path):
self._settings_path = settings_path
self._last_mtime_ns = -1

def current_registry(self) -> HookRegistry:
stat = self._settings_path.stat()
if stat.st_mtime_ns != self._last_mtime_ns:
self._last_mtime_ns = stat.st_mtime_ns
self._registry = load_hook_registry(load_settings(self._settings_path))
return self._registry

通过比较文件的 st_mtime_ns(纳秒级修改时间),只在文件实际变更时重新加载。这意味着用户可以在会话进行中修改 Hook 配置,无需重启。

Permission 与 Hook 的协作

_execute_tool_call() 中,Permission 和 Hook 的执行顺序是精心设计的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
_execute_tool_call(tool_name, tool_input)

├─ 1. PRE_TOOL_USE Hook ──→ blocked? → 返回错误

├─ 2. 查找工具 ──→ 未找到? → 返回错误

├─ 3. 解析输入 ──→ 无效? → 返回错误

├─ 4. Permission 检查 ──→ 拒绝? → 返回错误
│ ──→ 需确认? → 弹窗 → 拒绝? → 返回错误

├─ 5. 执行工具

├─ 6. 记录 tool_metadata

└─ 7. POST_TOOL_USE Hook(仅通知,不阻断)

Hook 在 Permission 之前执行。这个顺序意味着:

  • Hook 可以在 Permission 检查之前就拦截请求(例如,一个 CI 环境的 Hook 可以拒绝所有非白名单仓库的操作)
  • Permission 是最后一道确定性防线,不受 Hook 配置影响
  • POST_TOOL_USE Hook 能拿到完整的执行结果(包括输出和错误状态),用于审计日志

这种"Hook 先行、Permission 兜底"的设计,让安全策略既可扩展又有底线保障。

Multi-Agent Coordination

单个 Agent 的能力受限于一条串行的推理链:它一次只能执行一个工具调用,一次只能关注一个子任务。当工程任务的规模扩大——跨多个文件的重构、并行的调研与实现、独立的测试验证——串行执行成为瓶颈。OpenHarness 的 Multi-Agent 系统将单一 Agent 扩展为一个协调者-工作者(Coordinator-Worker)架构,让多个 Agent 实例并发执行,同时共享权限、通信和文件系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
coordinator/
├── __init__.py ← 导出 AgentDefinition, TeamRegistry
├── agent_definitions.py ← Agent 定义加载(YAML → Pydantic model)
└── coordinator_mode.py ← Coordinator 模式检测 + 系统提示词 + XML 协议

swarm/
├── __init__.py ← 延迟导入入口
├── types.py ← 核心类型(TeammateExecutor, TeammateSpawnConfig, SpawnResult...)
├── registry.py ← BackendRegistry(后端自动检测与注册)
├── subprocess_backend.py ← SubprocessBackend(子进程执行后端)
├── in_process.py ← InProcessBackend(进程内 asyncio Task 执行后端)
├── mailbox.py ← 文件级异步消息队列(Leader ↔ Worker 通信)
├── permission_sync.py ← 跨进程权限同步协议
├── team_lifecycle.py ← 团队持久化(TeamFile / TeamMember / CRUD)
├── worktree.py ← Git worktree 文件系统隔离
├── lockfile.py ← 跨平台文件锁
└── spawn_utils.py ← 环境变量继承 + CLI 标志传播

Coordinator-Worker 架构

Multi-Agent 系统采用星型拓扑:一个 Coordinator(协调者)负责任务分解、结果综合和用户交互,多个 Worker(工作者)负责具体执行。

Coordinator 本身也是一个 Agent,但它的工具集被限制为三个专用工具:

工具 用途
agent 生成一个新的 Worker
send_message 向已有 Worker 发送后续指令
task_stop 终止一个正在运行的 Worker

Worker 则拥有完整的工具集(bashfile_readfile_editgrepglob 等),但看不到 Coordinator 与用户的对话历史。每个 Worker 的上下文是独立的,所有信息必须通过 Coordinator 的 prompt 显式传递。

Coordinator 模式检测

Coordinator 模式通过环境变量 CLAUDE_CODE_COORDINATOR_MODE 激活:

1
2
3
def is_coordinator_mode() -> bool:
val = os.environ.get("CLAUDE_CODE_COORDINATOR_MODE", "")
return val.lower() in {"1", "true", "yes"}

激活后,系统提示词被替换为一套专门的协调指令(get_coordinator_system_prompt()),包含:

  1. 角色定义:你是协调者,负责分解任务、指挥 Worker、综合结果
  2. 工具说明agent / send_message / task_stop 的使用方式
  3. 任务工作流:Research → Synthesis → Implementation → Verification 四阶段
  4. 并发策略:只读任务并行、写入任务串行、验证可与实现交错
  5. Prompt 编写规范:Worker 看不到对话历史,prompt 必须自包含

Worker 结果通知

Worker 完成任务后,结果以 XML 格式的 <task-notification> 注入 Coordinator 的对话流:

1
2
3
4
5
6
7
@dataclass
class TaskNotification:
task_id: str # Worker 的 agent ID
status: str # "completed" | "failed" | "killed"
summary: str # 人类可读的状态摘要
result: str | None # Worker 的最终文本输出
usage: dict | None # token 用量统计

序列化为 XML 后作为 user 角色消息注入:

1
2
3
4
5
6
7
8
9
10
11
<task-notification>
<task-id>agent-a1b</task-id>
<status>completed</status>
<summary>Agent "Investigate auth bug" completed</summary>
<result>Found null pointer in src/auth/validate.ts:42...</result>
<usage>
<total_tokens>15234</total_tokens>
<tool_uses>8</tool_uses>
<duration_ms>45000</duration_ms>
</usage>
</task-notification>

Coordinator 需要区分真正的用户消息和 <task-notification>——通过检测 <task-notification> 开头标签来判断。

Agent 定义系统

每个 Agent(无论是 Coordinator 还是 Worker)都由 AgentDefinition 描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class AgentDefinition(BaseModel):
# --- 必填 ---
name: str # 角色名(如 "researcher", "tester")
description: str # 何时使用此 Agent

# --- 提示词与工具 ---
system_prompt: str | None = None # 自定义系统提示词
tools: list[str] | None = None # 工具白名单(None = 全部)
disallowed_tools: list[str] | None = None # 工具黑名单

# --- 模型与权限 ---
model: str | None = None # 模型覆盖
permission_mode: str | None = None # 权限模式覆盖
max_turns: int | None = None # 最大推理轮次

# --- 扩展 ---
skills: list[str] = [] # 可用 Skill 列表
mcp_servers: list[Any] | None = None # MCP 服务器配置
hooks: dict[str, Any] | None = None # 会话级 Hook

# --- UI ---
color: str | None = None # 显示颜色

# --- 生命周期 ---
background: bool = False # 是否作为后台任务运行
isolation: str | None = None # "worktree" | "remote"

Agent 定义从 YAML 文件加载(支持 frontmatter 格式),存放在 ~/.openharness/agents/ 或项目级 .openharness/agents/ 目录下。get_builtin_agent_definitions() 提供内置的 Agent 模板。

执行后端 TeammateExecutor

TeammateExecutor 是一个 Protocol,定义了 Worker 的生命周期接口:

1
2
3
4
5
6
7
8
@runtime_checkable
class TeammateExecutor(Protocol):
type: BackendType

def is_available(self) -> bool: ...
async def spawn(self, config: TeammateSpawnConfig) -> SpawnResult: ...
async def send_message(self, agent_id: str, message: TeammateMessage) -> None: ...
async def shutdown(self, agent_id: str, *, force: bool = False) -> bool: ...

OpenHarness 提供了三种后端实现:

SubprocessBackend — 子进程模式

每个 Worker 作为独立的 OS 进程运行,通过 BackgroundTaskManager 管理:

1
2
3
4
5
6
7
class SubprocessBackend:
type: BackendType = "subprocess"

async def spawn(self, config: TeammateSpawnConfig) -> SpawnResult:
# 构建 CLI 命令:python -m openharness --task-worker [flags]
# 通过 BackgroundTaskManager.create_agent_task() 启动子进程
# 初始 prompt 通过 stdin 传入

子进程继承父进程的环境变量(API Key、代理设置等),通过 build_inherited_env_vars() 显式转发。关键的是,CLAUDE_CODE_COORDINATOR_MODE 被强制设为 "0",防止 Worker 递归进入协调模式。

InProcessBackend — 进程内模式

每个 Worker 作为当前进程内的 asyncio.Task 运行,通过 contextvars 实现上下文隔离:

1
2
3
4
5
6
7
8
9
class InProcessBackend:
type: BackendType = "in_process"

async def spawn(self, config: TeammateSpawnConfig) -> SpawnResult:
abort_controller = TeammateAbortController()
task = asyncio.create_task(
start_in_process_teammate(config, agent_id, abort_controller),
name=f"teammate-{agent_id}",
)

asyncio.create_task() 会自动复制当前的 Context,因此每个 Task 拥有独立的 ContextVar 状态。TeammateAbortController 提供双信号取消机制:

  • graceful cancel:设置 cancel_event,Agent 完成当前工具调用后退出
  • force cancel:设置 force_cancel,直接取消 asyncio Task

进程内模式的优势是零启动开销和共享内存,劣势是所有 Worker 共享同一个 Python 进程的 GIL。

Pane Backend — 终端面板模式

tmux 和 iTerm2 后端将每个 Worker 运行在独立的终端面板中,提供可视化的并行执行体验。PaneBackend Protocol 定义了面板管理接口(创建、发送命令、设置颜色/标题、隐藏/显示、重新布局等)。

BackendRegistry 自动检测

BackendRegistry 负责检测和选择最佳后端:

1
2
3
4
5
class BackendRegistry:
def detect_backend(self) -> BackendType:
# 优先级 1:in_process(如果之前的 spawn 失败回退过)
# 优先级 2:tmux(在 tmux 会话内且 tmux 二进制可用)
# 优先级 3:subprocess(始终可用的兜底方案)

面板后端的检测更复杂(detect_pane_backend()):

1
2
3
4
5
6
1. 在 tmux 内 → 使用 tmux
2. 在 iTerm2 内 + it2 CLI 可用 → 使用 iTerm2
3. 在 iTerm2 内 + 无 it2 + tmux 可用 → 回退到 tmux
4. 在 iTerm2 内 + 无 it2 + 无 tmux → 报错
5. 不在 tmux/iTerm2 + tmux 可用 → 使用 tmux(外部会话模式)
6. 无任何面板后端 → 报错并给出平台相关的安装指引

mark_in_process_fallback() 方法允许在 spawn 失败时永久切换到进程内模式,避免后续 spawn 反复失败。

文件级邮箱 TeammateMailbox

Leader 与 Worker 之间的通信通过文件级异步消息队列实现:

1
2
3
4
~/.openharness/teams/<team>/agents/<agent_id>/inbox/
├── 1713000000.123456_uuid1.json
├── 1713000001.234567_uuid2.json
└── .write_lock

每条消息是一个独立的 JSON 文件,文件名格式为 <timestamp>_<message_id>.json,按时间戳排序即可保证消息顺序。

1
2
3
4
5
6
7
8
9
@dataclass
class MailboxMessage:
id: str
type: MessageType # "user_message" | "permission_request" | "shutdown" | ...
sender: str
recipient: str
payload: dict[str, Any]
timestamp: float
read: bool = False

写入采用原子操作:先写 .tmp 文件,再 os.replace() 重命名,确保读者永远不会看到半写的消息。并发写入通过 exclusive_file_lock 保护——在 POSIX 系统上使用 fcntl.flock,在 Windows 上使用 msvcrt.locking

支持 7 种消息类型:

类型 方向 用途
user_message Leader → Worker 任务指令 / 后续消息
permission_request Worker → Leader 请求执行变更工具的权限
permission_response Leader → Worker 权限审批结果
sandbox_permission_request Worker → Leader 请求沙箱网络访问权限
sandbox_permission_response Leader → Worker 沙箱权限审批结果
shutdown Leader → Worker 终止请求
idle_notification Worker → Leader Worker 空闲通知

跨进程权限同步

在 Multi-Agent 场景下,Worker 可能需要执行变更工具(如 file_editbash),但权限确认的 UI 在 Leader 进程中。permission_sync.py 实现了一套跨进程权限协商协议

1
2
3
4
5
6
7
8
9
10
Worker                          Leader
│ │
├─ 需要执行 file_edit ──────────→│
│ (SwarmPermissionRequest) │
│ ├─ 弹窗确认 / 自动审批
│ │
│←──────────────────────────────┤
│ (SwarmPermissionResponse) │
│ │
├─ 继续执行 / 放弃 │

请求通过两种通道传递:

  1. 文件目录~/.openharness/teams/<team>/permissions/pending/<id>.json → Leader 读取 → 移动到 resolved/<id>.json
  2. 邮箱消息:通过 TeammateMailboxpermission_request / permission_response 消息类型

SwarmPermissionRequest 携带完整的工具调用上下文:

1
2
3
4
5
6
7
8
9
10
11
12
@dataclass
class SwarmPermissionRequest:
id: str # 请求唯一 ID
worker_id: str # Worker 的 agent ID
worker_name: str # Worker 名称
team_name: str # 团队名称
tool_name: str # 请求执行的工具
tool_use_id: str # 原始 tool_use_id
description: str # 人类可读的操作描述
input: dict[str, Any] # 工具输入参数
permission_suggestions: list # 建议的"始终允许"规则
status: str = "pending" # "pending" | "approved" | "rejected"

Leader 可以批准、拒绝、或修改输入后批准。批准时还可以附带 permission_updates("始终允许"规则),后续相同类型的操作将自动放行。

团队持久化 TeamLifecycleManager

团队的元数据持久化到磁盘:

1
2
3
4
5
6
7
~/.openharness/teams/<name>/
├── team.json ← TeamFile(团队元数据)
└── agents/
├── researcher@team/
│ └── inbox/ ← 邮箱目录
└── tester@team/
└── inbox/

TeamFile 记录团队的完整状态:

1
2
3
4
5
6
7
8
9
@dataclass
class TeamFile:
name: str
created_at: float
lead_agent_id: str # Leader 的 agent ID
lead_session_id: str | None # Leader 的 session UUID
members: dict[str, TeamMember] # agent_id → TeamMember
team_allowed_paths: list[AllowedPath] # 团队级路径白名单
hidden_pane_ids: list[str] # 隐藏的面板 ID

TeamMember 记录每个成员的运行时状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@dataclass
class TeamMember:
agent_id: str # "researcher@team"
name: str # "researcher"
backend_type: BackendType # "subprocess" | "in_process" | "tmux"
joined_at: float
status: str = "active" # "active" | "idle" | "stopped"
model: str | None = None
color: str | None = None
cwd: str = ""
worktree_path: str | None = None # Git worktree 路径
session_id: str | None = None # 实际 session UUID
permissions: list[str] = []
subscriptions: list[str] = [] # 事件订阅主题

TeamLifecycleManager 提供完整的 CRUD 操作,所有写入都是原子的(.tmp + rename)。成员的活跃状态(is_active)和权限模式(mode)可以在运行时动态更新。

Git Worktree 文件系统隔离

当多个 Worker 同时修改文件时,文件冲突是最大的风险。OpenHarness 通过 Git worktree 为每个 Worker 提供独立的文件系统视图:

1
2
3
4
5
6
class WorktreeManager:
async def create_worktree(self, repo_path, slug, branch=None, agent_id=None):
# 1. 验证 slug(防止路径遍历)
# 2. 检查是否已存在(快速恢复)
# 3. git worktree add -B worktree-<slug> <path> HEAD
# 4. 符号链接 node_modules / .venv 等大目录(避免重复)

Worktree 存储在 ~/.openharness/worktrees/<slug>/,每个 worktree 拥有独立的分支(worktree-<slug>),从 HEAD 创建。_symlink_common_dirs() 会将 node_modules.venv__pycache__.tox 等大目录符号链接到主仓库,避免磁盘浪费。

Slug 验证(validate_worktree_slug())防止路径遍历攻击:

  • 最大 64 字符
  • 每个 / 分隔的段必须匹配 [a-zA-Z0-9._-]+
  • 禁止 ...
  • 禁止绝对路径

环境继承与隔离

Worker 进程需要继承 Leader 的关键配置,但又不能继承所有状态。spawn_utils.py 精确控制了继承边界:

环境变量继承build_inherited_env_vars()):

类别 变量 原因
API 提供商 ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL, CLAUDE_CODE_USE_BEDROCK Worker 需要访问同一个 API
代理设置 HTTPS_PROXY, HTTP_PROXY, NO_PROXY 网络路由一致性
CA 证书 SSL_CERT_FILE, NODE_EXTRA_CA_CERTS TLS 验证一致性
配置目录 CLAUDE_CONFIG_DIR 共享 operator 级配置

强制覆盖

1
2
3
4
env = {
"OPENHARNESS_AGENT_TEAMS": "1", # 标记为团队成员
"CLAUDE_CODE_COORDINATOR_MODE": "0", # 禁止递归协调
}

CLI 标志继承build_inherited_cli_flags()):

  • --permission-mode:权限模式传播
  • --model:模型覆盖
  • --settings:配置文件路径
  • --plugin-dir:插件目录
  • --teammate-mode:执行后端模式

plan_mode_required 标志会抑制 --dangerously-skip-permissions,确保安全策略不被绕过。

会话清理

团队的生命周期与 Leader 的会话绑定。cleanup_session_teams() 在会话结束时执行清理:

1
2
3
4
async def cleanup_session_teams():
# 1. 杀死所有面板后端的 Worker 进程
# 2. 删除 Git worktree
# 3. 删除团队目录(team.json + 邮箱)

清理顺序很重要:先杀进程,再删目录。如果先删目录,面板中的 Worker 进程会变成孤儿进程。_kill_orphaned_teammate_panes() 遍历 TeamFile.members,对每个面板后端的成员调用 executor.kill_pane()

register_team_for_session_cleanup() / unregister_team_for_session_cleanup() 跟踪哪些团队是当前会话创建的,避免清理其他会话的团队。

结语

最后,我们终于可以来尝试回答文章开头的那个问题:一个 harness 框架需要包含哪几点核心内容? 我的个人观点是:

1
2
3
一个收敛的、可中断的 Agent Loop
使用架构级约束,而非补丁保障安全
记忆

1. 一个收敛的、可中断的 Agent Loop

所有 Harness 系统的本质都是一个 while 循环。既然是 while 循环,那就必须定义循环的终止条件,譬如 OpenHarness 的 run_query() 使用stop_reason == "end_turn"MAX_CONTINUATIONS 硬上限、token 耗尽触发 compact,SWE-agent 用 max_steps 计数器,AutoGPT 用 cycle_budget

循环的设计质量直接决定了系统的可靠性,一个好的 Agent Loop 至少需要做到三件事:

  • 单一驱动器:整个系统只有一个地方在 while True,所有其他模块(工具、记忆、权限)都是被它调用的。
  • 流式事件总线:用 AsyncIterator[StreamEvent] 把 LLM 输出、工具执行、状态变更统一成一条事件流。上层 UI 只需要订阅,不需要轮询。
  • 优雅降级:token 快满了就 compact(四级渐进压缩),API 挂了就指数退避重试,工具超时就返回错误文本让 LLM 自己判断。永远不要让循环因为一个异常而崩溃。

与之配套的是统一的工具抽象层。我们可以笼统的说:

1
一个 Harness 系统的价值 = Agent Loop 的稳定性 × 工具生态的丰富度 × 安全性

OpenHarness 的 ToolRegistry 把 37 个内置工具、动态 MCP 适配器、插件 Skill 统一为一个 name → BaseTool 字典,Agent Loop 只需要 registry.get(name).execute(context, params) 一行代码即可调用。

2. 安全是架构级约束

一个能执行 bash 命令、能 write_file 的 Agent,如果没有权限控制,将会是一场灾难,而使用 prompt 向 agent 约束安全行为规范同样十分愚蠢,也因此现在越来越多的设计将 openclaw 封闭在安全的沙箱中。

在这一方面,OpenHarness 确实做了相对充分的工作,譬如使用 DEFAULT(变更需确认)→ PLAN(变更直接拒绝)→ FULL_AUTO(全部放行)的三个层次的权限模式,并设计了不可覆盖的硬编码保护策略。同时,通过 Hook,用户可以注入安全拦截逻辑。而在 Multi-Agent 场景下,Worker 不能自己批准自己的危险操作,权限请求通过文件级邮箱同步回 Coordinator,由人类统一审批。

3. 记忆系统是持续工作的关键

这一点很好理解,没有记忆的 Agent,每次对话都是从零开始。这在工程场景中是不可接受的。
OpenHarness 构建了一套五层记忆体系,每层解决不同时间尺度的问题:

  • tool_metadata,通过有界LRU记录当前轮次
  • Compact,四级渐进压缩解决上下文过长的问题
  • Session Storage,解决关闭-恢复问题
  • MEMORY.md,项目级知识沉淀
  • Personalization,基于环境事实自动提取

这五层分别对应认知科学中的工作记忆、短期记忆压缩、情景记忆、语义记忆、程序性记忆。它们共同服务于一个目标:让 Agent 在任何时间点都能获得足够的上下文来做出正确决策。