A2A(Agent2Agent)系列专题 (八) 任务生命周期管理:从创建到完成

任务生命周期管理:从创建到完成

摘要:任务(Task)是 A2A(Agent2Agent)协议的核心工作单元,其生命周期管理确保了代理间协作的可靠性和一致性。本文深入剖析 A2A 的任务生命周期,聚焦状态机设计、状态转换逻辑、错误处理和实时更新机制。结合 GitHub 仓库的实现、Mermaid 图表和代码示例,我们将揭示 A2A 如何通过硬核的任务管理支持多代理系统的动态协作,为开发者提供深入的技术洞察。

1. 引言:任务管理的核心地位

在企业 AI 系统中,代理(Agent)通过任务(Task)协作完成复杂工作,例如处理费用报销、生成报表或协调物流。任务不仅承载了输入数据和输出结果,还需要在分布式环境中保持状态一致性和可靠性。Google 的 A2A(Agent2Agent) 协议通过任务生命周期管理,定义了任务从创建到完成(或失败)的完整流程,类似工作流系统(Workflow System)的状态机,但更轻量且针对代理间通信优化。

A2A 的任务生命周期以 JSON Schema 为基础,结合 HTTP 和 WebSocket 通信,确保动态性和实时性。本文将深入解析这一机制,结合 Google A2A GitHub 仓库 的实现,揭示其硬核内核。

2. 任务生命周期概览

A2A 的任务生命周期是一个状态机,定义了任务的合法状态和转换路径。核心状态包括:

  • Created:任务被 Host Agent 创建,等待分派。
  • In Progress:任务被 Remote Agent 接受并开始处理。
  • Completed:任务成功完成,返回结果。
  • Failed:任务失败,返回错误信息。
  • Canceled:任务被主动取消(可选状态)。

以下是任务生命周期的流程图:

flowchart TD
    A[Created] --> B[In Progress]
    B --> C{Outcome}
    C --> D[Completed]
    C --> E[Failed]
    C --> F[Canceled]
    D --> G[Result Returned]
    E --> H[Error Reported]
    F --> I[Task Aborted]

2.1 任务结构

任务以 JSON 格式定义,基于 a2a.json 的 Schema,包含以下字段:

  • taskId(字符串):任务的唯一标识符,例如 “task-001”。
  • type(字符串):任务类型,例如 “expense”。
  • data(对象):输入数据,符合 Remote Agent 的 schema.input
  • status(字符串):当前状态,枚举值包括 createdin_progresscompletedfailedcanceled
  • result(对象):输出结果,仅在 completed 状态存在,符合 schema.output
  • error(对象):错误信息,仅在 failed 状态存在。

示例任务(初始状态):

1
2
3
4
5
6
7
8
9
{
  "taskId": "task-001",
  "type": "expense",
  "data": {
    "amount": 100,
    "currency": "USD"
  },
  "status": "created"
}

完成状态:

1
2
3
4
5
6
7
8
9
{
  "taskId": "task-001",
  "type": "expense",
  "status": "completed",
  "result": {
    "status": "approved",
    "message": "Processed 100 USD"
  }
}

2.2 设计原则

A2A 的任务生命周期遵循以下原则:

  • 一致性:状态机确保所有代理对任务状态的认知一致。
  • 可靠性:通过幂等性和错误处理,防止状态丢失或重复执行。
  • 动态性:支持实时状态更新和动态交互(例如中途请求表单)。
  • 可扩展性:允许自定义 resulterror 结构,适配复杂场景。

3. 状态转换:生命周期的动态逻辑

3.1 状态转换路径

任务状态的转换由以下事件驱动:

  • Created → In Progress:Host Agent 提交任务,Remote Agent 接受并开始处理。
  • In Progress → Completed:Remote Agent 成功完成任务,返回结果。
  • In Progress → Failed:Remote Agent 遇到错误,返回错误信息。
  • In Progress → Canceled:Host Agent 或 Remote Agent 主动取消任务。
  • Created → Canceled:任务未分配前被取消。

以下是状态转换的时序图:

sequenceDiagram
    participant H as Host Agent
    participant R as Remote Agent
    H->>R: POST /task (status: created)
    R-->>H: Task Accepted (status: in_progress)
    R->>R: Process Task
    alt Success
        R-->>H: Task Result (status: completed)
    else Failure
        R-->>H: Task Error (status: failed)
    else Cancellation
        H->>R: Cancel Task
        R-->>H: Task Aborted (status: canceled)
    end

3.2 状态更新的机制

状态更新通过以下方式实现:

  • HTTP 轮询:Host Agent 定期查询任务状态(GET /task/{taskId})。
  • WebSocket 推送:Remote Agent 通过 WebSocket 发送实时更新(task_update 事件)。
  • 幂等性taskId 确保重复请求不会导致状态冲突。

示例 WebSocket 更新:

1
2
3
4
5
6
{
  "event": "task_update",
  "taskId": "task-001",
  "status": "in_progress",
  "progress": 50
}

3.3 动态交互

任务生命周期支持动态调整,例如:

  • 表单请求:Remote Agent 在 in_progress 状态发现数据不足,请求 Host Agent 提供表单输入。
  • 模式切换:任务可能从文本交互切换到音视频(基于 AgentCard 的 interactionModes)。

这种动态性依赖于协商机制(见第七篇)和 Schema 验证。

4. 错误处理与可靠性

4.1 错误类型

任务可能因以下原因进入 failed 状态:

  • 输入错误:任务数据不符合 schema.input(例如缺少 amount)。
  • 逻辑错误:Remote Agent 的处理失败(例如金额为负)。
  • 通信错误:网络中断或 Remote Agent 不可用。
  • 超时:任务未在预期时间内完成。

错误信息通过 error 字段返回:

1
2
3
4
5
6
7
8
9
{
  "taskId": "task-001",
  "type": "expense",
  "status": "failed",
  "error": {
    "code": "INVALID_INPUT",
    "message": "Amount must be positive"
  }
}

4.2 可靠性机制

A2A 通过以下方式确保可靠性:

  • 幂等性:重复提交相同 taskId 的任务不会导致重复执行。
  • 重试机制:Host Agent 可在通信失败时重试(需开发者实现)。
  • 状态同步:WebSocket 推送或 HTTP 轮询保持状态一致。
  • 日志记录:建议代理记录状态转换历史(capabilities.stateTransitionHistory)。

5. 通信协议:支撑生命周期的基石

任务生命周期依赖于 HTTP 和 WebSocket:

  • HTTP

    • 用途:提交任务(POST /task)、查询状态(GET /task/{taskId})。
    • 优势:简单,适合低频交互。
    • 局限:实时性较差,需轮询。
  • WebSocket

    • 用途:推送状态更新(task_update)、支持流式交互。
    • 优势:低延迟,适配动态场景。
    • 局限:连接管理复杂。

通信流程对比图:

graph TD
    A[Host Agent] -->|HTTP| B[Remote Agent]
    A -->|WebSocket| C[Remote Agent]
    B --> D[Task Response]
    C --> E[Status Updates]
    C --> F[Dynamic Requests]
    style B fill:#bbf,stroke:#333
    style C fill:#bfb,stroke:#333

6. 代码示例:实现任务生命周期

以下是一个基于 samples/python/agents/google_adk 的费用报销代理,展示任务生命周期的管理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Remote Agent:费用报销服务器
from a2a import A2AServer, AgentCard, Task
import asyncio

class ExpenseAgent(A2AServer):
    def __init__(self):
        card = AgentCard(
            name="ExpenseAgent",
            description="Processes expense reimbursements",
            url="http://localhost:8080/a2a",
            capabilities={
                "streaming": False,
                "pushNotifications": True,
                "interactionModes": ["text", "form"]
            },
            schema={
                "input": {
                    "type": "object",
                    "properties": {
                        "amount": {"type": "number"},
                        "currency": {"type": "string"}
                    },
                    "required": ["amount", "currency"]
                }
            }
        )
        super().__init__(card=card)

    async def handle_task(self, task: Task) -> dict:
        # 状态:Created → In Progress
        await self.notify_status(task["taskId"], "in_progress")

        if task["type"] != "expense":
            await self.notify_status(task["taskId"], "failed")
            return {"status": "failed", "error": "Invalid task type"}

        amount = task["data"]["amount"]
        currency = task["data"]["currency"]
        if amount <= 0:
            await self.notify_status(task["taskId"], "failed")
            return {
                "status": "failed",
                "error": {"code": "INVALID_INPUT", "message": "Amount must be positive"}
            }

        # 模拟处理
        await asyncio.sleep(1)  # 模拟耗时操作
        result = {"status": "approved", "message": f"Processed {amount} {currency}"}
        await self.notify_status(task["taskId"], "completed")
        return {"status": "completed", "result": result}

# Host Agent:提交并监控任务
from a2a import A2AClient

async def expense_client(remote_url: str):
    client = A2AClient(remote_url)
    task = {
        "taskId": "task-001",
        "type": "expense",
        "data": {"amount": 100, "currency": "USD"}
    }

    # 提交任务
    response = await client.submit_task(task)
    print(f"Task submitted: {response}")

    # 监控状态(WebSocket)
    async for update in client.subscribe_task_updates(task["taskId"]):
        print(f"Status update: {update['status']}")
        if update["status"] in ["completed", "failed"]:
            print(f"Final result: {update}")
            break

if __name__ == "__main__":
    server = ExpenseAgent()
    asyncio.run(expense_client("http://localhost:8080/a2a"))
    server.run(port=8080)

代码解析

  1. 服务器:实现任务状态转换(createdin_progresscompleted/failed),通过 notify_status 推送更新。
  2. 客户端:提交任务并通过 WebSocket 订阅状态更新,展示实时监控。
  3. 错误处理:验证输入并返回标准化的错误信息。
  4. 异步支持:使用 asyncio 确保高并发性能。

7. 硬核设计:任务管理的权衡

7.1 状态机的优势

  • 清晰性:明确的状态和转换路径,便于调试和维护。
  • 一致性:状态机确保 Host 和 Remote Agent 的认知同步。
  • 灵活性:支持动态交互(如表单请求)和状态扩展。

7.2 性能与复杂性

  • 挑战:实时状态更新(WebSocket)可能增加服务器负载。
  • 优化:GitHub Issues 提到批量更新和压缩状态消息的方案。
  • 复杂性:多任务并发需要高效的调度逻辑,开发者需处理竞争条件。

7.3 分布式场景

在分布式系统中,任务管理面临以下问题:

  • 状态同步:多代理协作时,如何保证任务状态的一致性?
  • 超时处理:分布式网络延迟可能导致状态更新丢失。
  • 可扩展性:高负载下,任务管理的性能瓶颈需优化。

8. 应用场景与展望

A2A 的任务生命周期管理适用于以下场景:

  • 企业自动化:协调财务、HR 代理,跟踪多步骤任务。
  • 实时交互:支持客服场景中的动态任务更新。
  • 分布式系统:管理跨云平台的代理协作。

未来,A2A 可能引入以下改进:

  • 嵌套任务:支持子任务,适配复杂工作流。
  • 智能调度:优化任务分配和状态更新。
  • 分布式一致性:集成 Paxos 或 Raft 算法,确保状态同步。

9. 结语:任务管理的未来

任务生命周期管理是 A2A 协议的支柱,通过状态机、实时更新和错误处理,实现了代理间协作的可靠性和动态性。A2A 的设计为企业 AI 系统提供了坚实的基础,未来将在性能和分布式支持上进一步突破。

在下一篇文章中,我们将探讨 A2A 的通信机制,深入分析 HTTP 与 WebSocket 的实现细节。欢迎访问 A2A GitHub 仓库,加入社区,探索 AI 协作的未来!

updatedupdated2025-04-172025-04-17