Skip to content

Agent之:后台任务

后台任务就是将一个待执行任务放在后台去执行,而主流程可以继续做剩下的事情。适合耗时且不依赖其他任务的操作,实现并行处理。

1. 后台任务的架构

核心要点:

  • 子进程运行:任务以 subprocess.run() 在后台运行,不干扰主任务
  • 结果捕获:主线程捕获完成的子进程,将结果带入下一轮对话
主线程 (Agent Loop)              后台线程 (Worker)
┌─────────────────────┐         ┌─────────────────────┐
│ 1. drain_notifications()        │ subprocess.run()    │
│ 2. 注入 <background-results>    │ 捕获 stdout/stderr  │
│ 3. LLM 调用                     │ 判断状态            │
│ 4. 解析 tool_use                │ _persist_task()    │
│ 5. 执行工具, 返回结果            │ notification_queue │
└─────────────────────┘         └─────────────────────┘

2. 后台任务具体设计

2.1 工具系统

python
TOOLS = [
    {
        "name": "background_run",
        "description": "Run command in background thread. Returns task_id immediately.",
        "input_schema": {
            "type": "object",
            "properties": {"command": {"type": "string"}},
            "required": ["command"]
        }
    },
    {
        "name": "check_background",
        "description": "Check background task status. Omit task_id to list all.",
        "input_schema": {
            "type": "object",
            "properties": {"task_id": {"type": "string"}}
        }
    }
]

2.2 BackgroundManager 类

python
class BackgroundManager:
    def __init__(self):
        self.tasks = {}  # task_id -> {status, result, command, started_at}
        self._notification_queue = []  # 已完成任务的结果通知
        self._lock = threading.Lock()

    def run(self, command: str) -> str:
        """启动后台线程,立即返回 task_id。"""
        task_id = str(uuid.uuid4())[:8]
        # 启动后台线程
        thread = threading.Thread(target=self._execute, args=(task_id, command), daemon=True)
        thread.start()
        return f"Background task {task_id} started: {command[:80]}"

    def check(self, task_id: str = None) -> str:
        """查看单个任务状态,或列出所有任务。"""

    def drain_notifications(self) -> list:
        """返回所有已完成的任务,并清空所有待处理完成通知。"""

2.3 关键设计点

  • 结果持久化:后台任务结果以 JSON 存储在本地磁盘,会话关闭不丢失
  • 通知队列:完成任务结果推入队列,主循环通过 drain_notifications() 取出

3. 主循环

python
def agent_loop(messages: list):
    while True:
        # 排空后台通知,注入到新一轮输入
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}]{n['status']}:{n['preview']}"
                f"(output_file={n['output_file']})"
                for n in notifs
            )
            messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})

        # 大模型调用
        response = client.messages.create(model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS)

        # 处理工具调用...

相关章节

Released under the MIT License.