Butler
概述
Butler 是 EchoCenter 的核心代理,负责协调其他代理和处理用户请求。它是一个 AI 驱动的代理,能够理解用户意图并执行复杂的任务。
设计目标
- 为人类用户提供统一的对话入口。
- 将多 Agent 协调细节收敛在 Butler 背后。
- 对高风险操作统一走显式授权。
- 保持长对话连续性,同时避免上下文无限膨胀。
- 让观测能力可插拔、低耦合,线上可按配置开启或关闭。
设计总览
Butler 并不只是一个聊天机器人。在 EchoCenter 里,它是一个位于以下对象之间的编排层:
- 人类用户的 WebSocket / HTTP 入口
- Butler runtime service
- 基于 Eino 的推理大脑
- 持久化与授权状态
- 下游 Agent 和外部集成
当前实现上,Butler 主要拆成两层:
ButlerService负责偏传输层和应用层的事情,例如接收用户消息、流式返回、持久化、授权衔接和监控广播。EinoBrain负责偏模型运行时的事情,例如构造 prompt、维护 session 历史、运行时上下文压缩和模型调用。
这样拆分的原因是:
- 传输逻辑可以独立演进,不必频繁改模型编排
- 模型配置和上下文策略变更时,不会牵动 WebSocket 语义
- 观测系统可以挂在少量关键点上,而不必侵入所有业务代码
Design note: Butler is intentionally split into a service layer and a brain layer so transport concerns and model-runtime concerns can evolve independently.
运行时架构
ButlerService
ButlerService 负责用户侧的主流程:
- 接收用户输入
- 构建当前系统状态,包括在线 Agent 信息
- 启动一次带 trace 的推理会话
- 调用 brain 的流式推理
- 持久化最终回复
- 广播
CHAT_STREAM和最终CHAT - 按需要把回复转发到外部集成,例如飞书
授权和 Agent 监控也放在这一层,因为这些更接近应用工作流,而不是纯模型 prompt 逻辑。
EinoBrain
EinoBrain 是 Butler 的模型运行时。它主要负责:
- 构建 Butler system prompt
- 组装会话上下文
- 在上下文被压缩后注入滚动摘要
- 调用配置好的 OpenAI 兼容模型接口
- 把 assistant 回复追加回会话历史
这个 brain 是按 session 管理的。每个用户会话都会有一份独立的 runtime context。
会话状态
现在的会话状态不再是简单的“原始消息数组”,而是显式拆成:
SummaryRecentMessagesLastCompactedAt
这样设计的意义是:
Summary保存对旧消息的压缩记忆RecentMessages保留最近一段未压缩原始对话LastCompactedAt便于观测和排查运行时行为
Design note: the summary is hidden runtime memory, while recent messages preserve short-term conversational fidelity.
上下文压缩设计
Butler 的一个核心设计目标,是让长对话继续可用,但不要把所有历史原文都重复塞进 prompt。
当运行时上下文超过配置阈值时:
- Butler 会调用一个仅供 runtime 使用的内部 compactor
- 较旧消息会被总结进
Summary - 只保留最近窗口的原始消息
- 下一次模型调用会看到:
- system prompt
- 由
Summary构造的隐藏 system memory - 最近消息窗口
这个设计有几个关键点:
- compactor 不是用户可聊天的 Agent
- 压缩失败不会中断主对话
- compactor 可以复用 Butler 模型,也可以单独走一个便宜模型
- 压缩结果会作为隐藏的运行时记忆参与后续推理
所以 Butler 获得的是一种“滚动记忆”能力,而不是每次都完整重放所有历史。
Design note: compaction is best-effort and must never block the main user reply path.
请求生命周期
用户消息路径
- 用户向 Butler 发送消息。
ButlerService生成 stream id 和 session id。- 从 repository 构建当前系统状态。
- 启动一段 Butler runtime trace。
EinoBrain准备会话:- 追加当前用户输入
- 若超阈值则执行上下文压缩
- 生成最终 prompt 消息列表
- 以流式方式调用模型。
- 中间结果通过
CHAT_STREAM返回。 - 最终回复落库,并以
CHAT广播。 CHAT_STREAM_END用来结束本次流。
授权路径
敏感或具有副作用的操作,与普通聊天是明确分开的:
- Butler 判断当前操作需要授权。
- 向目标管理员发送
AUTH_REQUEST。 - 动作进入 pending 状态,等待批准或拒绝。
- 批准后继续执行;拒绝后以显式取消结果结束。
这个设计的目的是避免让 LLM 成为高风险操作的最终裁决者。
Agent 协调路径
当 Butler 需要别的 Agent 时:
- Butler 判断需要哪类下游能力。
- 通过现有 hub / message 通道把任务路由到目标 Agent。
- 下游 Agent 执行任务并返回结果。
- Butler 将返回结果整合成面向用户的最终回答。
换句话说,Butler 是控制平面,而专用 Agent 是执行平面。
观测设计
Butler 的观测能力被设计成可选、低耦合。
当前有两层:
- 官方 Eino callback 接入
github.com/cloudwego/eino-ext/callbacks/cozeloop - 本地很薄的一层 Butler span 用来补充通用模型 callback 不容易表达的业务事件,例如:
butler.user_messagebutler.context_compaction
这意味着:
- 模型 / tool / agent 执行可以自动进入 CozeLoop
- Butler 自身的重要生命周期也能被观测到
- 应用其余部分只依赖一层很薄的 observability 接口
- 关闭 CozeLoop 后会自然降级成 no-op
Design note: observability is optional by design; when disabled, Butler should keep the same business behavior.
设计边界
Butler 负责:
- 面向用户的总编排
- 高层推理与回答整合
- 敏感操作的授权门控
- 下游 Agent 协调
- 运行时记忆管理与上下文压缩
Butler 不负责:
- 充当通用 Coze Bot 适配器
- 替代专门的执行型 Agent
- 在 prompt 中无限保留原始长历史
- 绕过授权直接做不可逆操作
为什么这样设计
这个设计牺牲了一点实现简单性,换来了更稳定的运行时行为:
- 显式 session state 比简单历史数组更容易维护
- 上下文压缩能降低长对话的 token 压力
- 可选的 CozeLoop trace 让监控能力强但不侵入
- service 和 brain 分层,让后续替换模型或调整 runtime 策略更安全
工作流程
用户请求流程
1. 用户发送消息给 Butler
↓
2. Butler 接收消息
↓
3. AI 大脑分析消息
↓
4. 决定响应方式
↓
5. 发送响应给用户命令执行流程
1. Butler 检测到需要执行命令
↓
2. 发送授权请求给管理员
↓
3. 等待管理员批准/拒绝
↓
4. 如果批准,执行命令
↓
5. 流式返回结果给管理员代理协调流程
1. Butler 需要代理执行任务
↓
2. 发送指令给代理
↓
3. 代理执行任务
↓
4. 代理返回结果
↓
5. Butler 处理结果
↓
6. 发送最终响应给用户配置
环境变量
# Butler AI 配置
BUTLER_BASE_URL=https://api.siliconflow.cn/v1
BUTLER_API_TOKEN=your_api_token_here
BUTLER_MODEL=Qwen/Qwen3-8B
# 可选:给运行时上下文压缩单独指定模型
BUTLER_CONTEXT_COMPACTION_ENABLED=true
BUTLER_CONTEXT_COMPACTION_BASE_URL=
BUTLER_CONTEXT_COMPACTION_API_TOKEN=
BUTLER_CONTEXT_COMPACTION_MODEL=
# 可选:CozeLoop 观测
OBSERVABILITY_COZELOOP_ENABLED=false
OBSERVABILITY_SERVICE_NAME=echocenter-backend
COZELOOP_WORKSPACE_ID=
COZELOOP_API_TOKEN=配置说明
BUTLER_BASE_URL- Butler 模型 API 地址BUTLER_API_TOKEN- Butler 模型 API 令牌BUTLER_MODEL- Butler 模型名称BUTLER_CONTEXT_COMPACTION_*- 可选的运行时上下文压缩模型配置COZELOOP_WORKSPACE_ID/COZELOOP_API_TOKEN- 仅用于 CozeLoop 观测,不参与 Butler 调模型
Coze 接入说明
- 如果你要接入 CozeLoop 观测,请在
backend/.env中填写COZELOOP_WORKSPACE_ID和COZELOOP_API_TOKEN。 - 如果你要让 Butler 调模型,请填写
BUTLER_BASE_URL、BUTLER_API_TOKEN和BUTLER_MODEL。 - 如果你说的“扣子”是普通 Coze Bot / Runtime 接口,那么当前项目还没有单独的 Coze Bot 适配器;Butler 目前仍要求一套 OpenAI 兼容模型接口。
消息类型
CHAT
普通聊天消息:
{
"type": "CHAT",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "Hello, how can I help you?",
"timestamp": "2024-01-01T00:00:00Z"
}CHAT_STREAM
流式聊天消息:
{
"type": "CHAT_STREAM",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "Processing your request...",
"stream_id": "abc123"
}CHAT_STREAM_END
流结束消息:
{
"type": "CHAT_STREAM_END",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "",
"stream_id": "abc123"
}AUTH_REQUEST
授权请求:
{
"type": "AUTH_REQUEST",
"action_id": "cmd_123",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"command": "get_status 7",
"reasoning": "User requested to check agent status",
"timestamp": "2024-01-01T00:00:02Z"
}AI 大脑
EinoBrain
Butler 使用 Eino 作为 AI 大脑:
type EinoBrain struct {
baseURL string
apiToken string
model string
}功能:
- 调用 AI API
- 分析消息
- 生成响应
- 执行命令
ChatStream
流式聊天:
func (b *EinoBrain) ChatStream(ctx context.Context, prompt string) (string, error) {
// 调用 AI API
// 流式返回响应
}ExecuteCommand
执行命令:
func (b *EinoBrain) ExecuteCommand(ctx context.Context, command string, callback func(string) error) error {
// 解析命令
// 执行命令
// 流式返回结果
}工具函数
ExecuteCommandDirect
直接执行命令:
func ExecuteCommandDirect(ctx context.Context, command string) (string, error) {
// 执行命令
// 返回结果
}RegisterAgentResponse
注册代理响应:
func RegisterAgentResponse(agentID int, response string) error {
// 注册响应
// 通知等待的命令
}最佳实践
1. 错误处理
func (s *ButlerService) HandleUserMessage(ctx context.Context, senderID int, payload string) {
response, err := s.brain.ChatStream(ctx, payload)
if err != nil {
log.Printf("Error processing message: %v", err)
s.hub.BroadcastGeneric(map[string]interface{}{
"type": "CHAT",
"sender_id": s.butlerID,
"sender_name": s.butlerName,
"sender_role": "BUTLER",
"target_id": senderID,
"payload": "Sorry, I encountered an error processing your request.",
})
return
}
s.hub.BroadcastGeneric(map[string]interface{}{
"type": "CHAT",
"sender_id": s.butlerID,
"sender_name": s.butlerName,
"sender_role": "BUTLER",
"target_id": senderID,
"payload": response,
})
}2. 超时处理
func (s *ButlerService) ExecutePendingCommand(ctx context.Context, streamID string, senderID int, approved bool) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 执行命令
_, err := s.brain.ExecuteCommand(ctx, result, func(chunk string) error {
// ...
})
if err != nil {
log.Printf("Command execution timeout: %v", err)
}
}3. 日志记录
func (s *ButlerService) RequestAuthorization(actionID string, targetID int, command, reasoning string) {
log.Printf("[Butler] Requesting authorization for action: %s", actionID)
log.Printf("[Butler] Command: %s", command)
log.Printf("[Butler] Reasoning: %s", reasoning)
s.hub.BroadcastGeneric(map[string]interface{}{
"type": "AUTH_REQUEST",
"action_id": actionID,
"sender_id": s.butlerID,
"sender_name": s.butlerName,
"sender_role": "BUTLER",
"target_id": targetID,
"command": command,
"reasoning": reasoning,
})
}示例
处理用户消息
// 用户发送消息
{
"type": "CHAT",
"sender_id": 1,
"sender_name": "Admin",
"sender_role": "ADMIN",
"target_id": 2,
"payload": "Check the status of agent 7",
"timestamp": "2024-01-01T00:00:00Z"
}
// Butler 处理消息
{
"type": "CHAT",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "I'll check the status of agent 7. Let me send an authorization request.",
"timestamp": "2024-01-01T00:00:01Z"
}
// Butler 发送授权请求
{
"type": "AUTH_REQUEST",
"action_id": "cmd_123",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"command": "get_status 7",
"reasoning": "User requested to check agent status",
"timestamp": "2024-01-01T00:00:02Z"
}
// 管理员批准
{
"type": "AUTH_RESPONSE",
"action_id": "cmd_123",
"approved": true,
"sender_id": 1,
"sender_name": "Admin",
"sender_role": "ADMIN",
"target_id": 2,
"timestamp": "2024-01-01T00:00:03Z"
}
// Butler 执行命令
{
"type": "CHAT_STREAM",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "Checking status...",
"stream_id": "cmd_123"
}
{
"type": "CHAT_STREAM",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "Agent 7: Online",
"stream_id": "cmd_123"
}
{
"type": "CHAT_STREAM_END",
"sender_id": 2,
"sender_name": "Butler",
"sender_role": "BUTLER",
"target_id": 1,
"payload": "",
"stream_id": "cmd_123"
}扩展性
添加新命令
- 在 EinoBrain 中添加命令解析
- 在 tools.go 中添加命令执行
- 测试命令
添加新消息类型
- 定义消息类型
- 在处理逻辑中添加消息处理
- 测试消息处理
性能优化
- 异步处理
- 连接池
- 缓存
- 并发处理
安全性
- 授权请求
- 命令验证
- 输入过滤
- 错误处理