A2A(Agent2Agent)系列专题 (十四) 构建多代理系统:A2A 的协同工作

构建多代理系统:A2A 的协同工作

摘要:A2A(Agent2Agent)协议通过标准化的通信机制支持多个 AI 代理协同完成复杂任务。本文深入探讨如何设计和实现多代理系统,以费用报销和汇率转换为例,展示 Host Agent 和 Remote Agent 的协作逻辑。结合 GitHub 仓库的 google_adk 示例、Mermaid 图表和优化策略,我们将揭示 A2A 在企业级协作场景中的硬核实现细节,为开发者提供实用指导。

1. 引言:多代理系统的价值

在企业 AI 场景中,单一代理往往无法处理复杂任务。例如,费用报销可能需要验证金额、转换货币并生成报告,涉及多个专业模块。Google 的 A2A(Agent2Agent) 协议通过 AgentCard、任务委托和 HTTP/WebSocket 通信,允许 Host Agent 协调多个 Remote Agent 完成工作。

本文基于 GitHub 仓库 https://github.com/google/A2Asamples/python/agents/google_adk 示例,展示如何构建一个多代理系统,包括费用报销代理(ExpenseAgent)和汇率转换代理(ForexAgent)。我们将覆盖系统设计、代码实现、测试和优化,揭示 A2A 的协同能力。

2. 多代理系统设计

2.1 系统架构

多代理系统由以下组件组成:

  • Host Agent:协调任务,分配子任务给 Remote Agent。
  • Remote Agent:处理特定任务(如费用报销或汇率转换)。
  • A2A 协议:通过 AgentCard 和任务通信实现协作。

以下是架构图(参考规划):

graph TD
    A[User] --> B[Host Agent]
    B --> C[Remote Agent: Expense]
    B --> D[Remote Agent: Forex]
    C --> E[A2A Protocol]
    D --> E
    E --> F[Task Results]
    style B fill:#bbf,stroke:#333
    style C fill:#bfb,stroke:#333
    style D fill:#ffb,stroke:#333

2.2 协作流程

  1. 任务接收:用户通过 Host Agent 提交费用报销任务。
  2. 代理发现:Host Agent 获取 ExpenseAgent 和 ForexAgent 的 AgentCard。
  3. 任务分解:Host Agent 将任务分解为报销验证和货币转换。
  4. 任务委托:通过 A2A 协议将子任务分配给 Remote Agent。
  5. 结果整合:Host Agent 收集结果,生成最终响应。

2.3 设计考虑

  • 解耦性:各代理独立运行,支持动态扩展。
  • 可靠性:处理代理失败或网络中断(参考第十二篇)。
  • 性能:优化通信和任务调度(参考第十篇)。

3. 实现多代理系统

3.1 环境准备

  • 依赖:Python 3.10,aiohttpwebsocketsa2a 库。
  • 项目结构
    multi-agent-system/
    ├── host_agent.py     # Host Agent 实现
    ├── expense_agent.py  # 费用报销代理
    ├── forex_agent.py    # 汇率转换代理
    ├── client.py         # 测试客户端
    └── config/           # AgentCard 配置文件
        ├── expense.json
        ├── forex.json
    

3.2 配置 AgentCard

为每个代理创建 AgentCard:

config/expense.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "name": "ExpenseAgent",
  "description": "Processes expense reimbursements",
  "url": "http://localhost:8081/a2a",
  "authentication": {
    "schemes": ["Bearer"],
    "credentials": "expense_token"
  },
  "capabilities": {
    "interactionModes": ["text"],
    "pushNotifications": true
  },
  "schema": {
    "input": {
      "type": "object",
      "properties": {
        "amount": {"type": "number"},
        "currency": {"type": "string"}
      },
      "required": ["amount", "currency"]
    }
  }
}

config/forex.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
  "name": "ForexAgent",
  "description": "Converts currency amounts",
  "url": "http://localhost:8082/a2a",
  "authentication": {
    "schemes": ["Bearer"],
    "credentials": "forex_token"
  },
  "capabilities": {
    "interactionModes": ["text"],
    "pushNotifications": true
  },
  "schema": {
    "input": {
      "type": "object",
      "properties": {
        "amount": {"type": "number"},
        "from_currency": {"type": "string"},
        "to_currency": {"type": "string"}
      },
      "required": ["amount", "from_currency", "to_currency"]
    }
  }
}

3.3 实现 Remote Agent

expense_agent.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import asyncio
import json
from aiohttp import web
from a2a import A2AServer, AgentCard

class ExpenseAgent(A2AServer):
    def __init__(self):
        with open("config/expense.json") as f:
            card_data = json.load(f)
        super().__init__(card=AgentCard(**card_data))

    async def verify_auth(self, request):
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            raise web.HTTPUnauthorized(text="Missing token")
        token = auth_header.replace("Bearer ", "")
        if token != self.card.authentication["credentials"]:
            raise web.HTTPForbidden(text="Invalid token")
        return True

    async def handle_task(self, request, task: dict) -> dict:
        await self.verify_auth(request)
        task_id = task["taskId"]
        await self.notify_status(task_id, "in_progress")

        if task["type"] != "expense":
            await self.notify_status(task_id, "failed")
            return {"status": "failed", "error": "Invalid task type"}

        amount = task["data"]["amount"]
        currency = task["data"]["currency"]
        if amount <= 0:
            await self.notify_status(task_id, "failed")
            return {"status": "failed", "error": "Invalid amount"}

        await asyncio.sleep(1)  # 模拟处理
        result = {"status": "approved", "amount": amount, "currency": currency}
        await self.notify_status(task_id, "completed")
        return {"status": "completed", "result": result}

if __name__ == "__main__":
    server = ExpenseAgent()
    server.run(port=8081)

forex_agent.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import asyncio
import json
from aiohttp import web
from a2a import A2AServer, AgentCard

class ForexAgent(A2AServer):
    def __init__(self):
        with open("config/forex.json") as f:
            card_data = json.load(f)
        super().__init__(card=AgentCard(**card_data))

    async def verify_auth(self, request):
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            raise web.HTTPUnauthorized(text="Missing token")
        token = auth_header.replace("Bearer ", "")
        if token != self.card.authentication["credentials"]:
            raise web.HTTPForbidden(text="Invalid token")
        return True

    async def handle_task(self, request, task: dict) -> dict:
        await self.verify_auth(request)
        task_id = task["taskId"]
        await self.notify_status(task_id, "in_progress")

        if task["type"] != "forex":
            await self.notify_status(task_id, "failed")
            return {"status": "failed", "error": "Invalid task type"}

        amount = task["data"]["amount"]
        from_currency = task["data"]["from_currency"]
        to_currency = task["data"]["to_currency"]

        # 模拟汇率转换(固定汇率)
        rates = {"USD": {"EUR": 0.85}, "EUR": {"USD": 1.18}}
        if from_currency not in rates or to_currency not in rates[from_currency]:
            await self.notify_status(task_id, "failed")
            return {"status": "failed", "error": "Unsupported currency pair"}

        converted = amount * rates[from_currency][to_currency]
        result = {"converted_amount": round(converted, 2), "currency": to_currency}
        await self.notify_status(task_id, "completed")
        return {"status": "completed", "result": result}

if __name__ == "__main__":
    server = ForexAgent()
    server.run(port=8082)

3.4 实现 Host Agent

host_agent.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import aiohttp
from a2a import A2AClient

class HostAgent:
    def __init__(self):
        self.agents = {
            "expense": {"url": "http://localhost:8081/a2a", "token": "expense_token"},
            "forex": {"url": "http://localhost:8082/a2a", "token": "forex_token"}
        }

    async def get_client(self, agent_name: str):
        headers = {"Authorization": f"Bearer {self.agents[agent_name]['token']}"}
        session = aiohttp.ClientSession(headers=headers)
        return A2AClient(self.agents[agent_name]["url"], session=session), session

    async def process_expense(self, amount: float, currency: str, target_currency: str):
        # 获取代理客户端
        expense_client, expense_session = await self.get_client("expense")
        forex_client, forex_session = await self.get_client("forex")

        try:
            # 步骤 1:提交报销任务
            expense_task = {
                "taskId": "expense-001",
                "type": "expense",
                "data": {"amount": amount, "currency": currency}
            }
            expense_response = await expense_client.submit_task(expense_task)
            print(f"Expense task submitted: {expense_response}")

            # 监控报销状态
            async for update in expense_client.subscribe_task_updates("expense-001"):
                print(f"Expense update: {update}")
                if update["status"] == "completed":
                    break
                elif update["status"] == "failed":
                    return {"status": "failed", "error": "Expense processing failed"}

            # 步骤 2:汇率转换
            forex_task = {
                "taskId": "forex-001",
                "type": "forex",
                "data": {
                    "amount": amount,
                    "from_currency": currency,
                    "to_currency": target_currency
                }
            }
            forex_response = await forex_client.submit_task(forex_task)
            print(f"Forex task submitted: {forex_response}")

            # 监控汇率状态
            async for update in forex_client.subscribe_task_updates("forex-001"):
                print(f"Forex update: {update}")
                if update["status"] == "completed":
                    return {
                        "status": "completed",
                        "result": {
                            "expense": expense_response["result"],
                            "forex": update["result"]
                        }
                    }
                elif update["status"] == "failed":
                    return {"status": "failed", "error": "Forex conversion failed"}

        finally:
            await expense_session.close()
            await forex_session.close()

if __name__ == "__main__":
    host = HostAgent()
    asyncio.run(host.process_expense(100, "USD", "EUR"))

3.5 测试客户端

client.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio
from host_agent import HostAgent

async def test_multi_agent():
    host = HostAgent()
    result = await host.process_expense(100, "USD", "EUR")
    print(f"Final result: {result}")

if __name__ == "__main__":
    asyncio.run(test_multi_agent())

3.6 运行与测试

  1. 启动代理:
    1
    2
    3
    
    python expense_agent.py
    python forex_agent.py
    python host_agent.py
    
  2. 运行客户端:
    1
    
    python client.py
    
  3. 预期输出:
    Expense task submitted: {'taskId': 'expense-001', 'status': 'accepted'}
    Expense update: {'taskId': 'expense-001', 'status': 'in_progress'}
    Expense update: {'taskId': 'expense-001', 'status': 'completed'}
    Forex task submitted: {'taskId': 'forex-001', 'status': 'accepted'}
    Forex update: {'taskId': 'forex-001', 'status': 'in_progress'}
    Forex update: {'taskId': 'forex-001', 'status': 'completed'}
    Final result: {
        'status': 'completed',
        'result': {
            'expense': {'status': 'approved', 'amount': 100, 'currency': 'USD'},
            'forex': {'converted_amount': 85.0, 'currency': 'EUR'}
        }
    }
    

4. 优化与扩展

4.1 性能优化

  • 异步并发:已使用 asyncio.gather 并行提交任务(可扩展到更多代理)。
  • 缓存 AgentCard:使用 Redis 缓存 AgentCard,减少重复请求(参考第十篇)。
  • 负载均衡:为高并发场景引入 Nginx(参考第十篇)。

4.2 可靠性

  • 重试机制:为失败任务添加指数退避重试(参考第十二篇)。
  • 状态持久化:将任务状态存储到 Redis(参考第十二篇)。
  • 错误处理:捕获网络或代理异常,确保 Host Agent 优雅降级。

4.3 扩展功能

  • 多模态交互:为 ForexAgent 添加表单模式(参考第十一篇)。
  • 服务发现:使用 Consul 动态发现代理(参考第十篇)。
  • 日志审计:记录任务交互历史,便于调试(参考第十篇)。

5. 调试技巧

5.1 常见问题

  • 代理不可达:检查 url 配置和网络连接。
  • 认证失败:确保 Host Agent 的 token 与 Remote Agent 匹配。
  • 状态丢失:启用 stateTransitionHistory 记录历史状态。

5.2 调试工具

  • 日志:在 Host Agent 和 Remote Agent 中添加 logging
  • Postman:测试各代理的 HTTP 端点。
  • Redis:监控任务状态和通知。

6. 硬核设计:多代理协作的权衡

6.1 解耦性 vs 复杂性

  • 优势:独立代理便于扩展和维护。
  • 挑战:任务分解和协调增加逻辑复杂性。
  • 优化:使用标准化的 AgentCard 和任务格式。

6.2 性能 vs 可靠性

  • 优势:异步通信和并行处理提升性能。
  • 挑战:高并发下需管理连接和状态同步。
  • 优化:参考第十二篇的 ACK 和断线重连机制。

6.3 本地 vs 分布式

  • 优势:本地测试便于快速迭代。
  • challenge:分布式部署需考虑一致性和服务发现。
  • 优化:下一篇文章将介绍 Web 应用集成(第十五篇)。

7. 应用场景与展望

多代理系统适用于:

  • 财务自动化:报销、汇率转换和审计的协同处理。
  • 客服系统:结合多模态代理(参考第十一篇)。
  • 供应链管理:多代理协调库存和物流。

Future enhancements:

  • 动态编排:使用 AI 优化任务分配。
  • 跨云协作:支持 Google Cloud 和 AWS 代理(第十八篇)。
  • 标准化扩展:定义更多交互模式。

8. 结语:协作的未来

通过本教程,你已构建并测试了一个多代理系统,掌握了 A2A 的任务分解、代理协调和结果整合。A2A 的协同能力为企业 AI 系统提供了无限可能。下一篇文章将探讨如何构建 A2A Web 应用,连接前端和代理。

欢迎访问 A2A GitHub 仓库,加入社区,分享你的多代理开发经验!

参考资料

updatedupdated2025-05-082025-05-08