摘要:A2A(Agent2Agent)协议通过标准化的通信机制支持多个 AI 代理协同完成复杂任务。本文深入探讨如何设计和实现多代理系统,以费用报销和汇率转换为例,展示 Host Agent 和 Remote Agent 的协作逻辑。结合 GitHub 仓库的 google_adk
示例、Mermaid 图表和优化策略,我们将揭示 A2A 在企业级协作场景中的硬核实现细节,为开发者提供实用指导。
在企业 AI 场景中,单一代理往往无法处理复杂任务。例如,费用报销可能需要验证金额、转换货币并生成报告,涉及多个专业模块。Google 的 A2A(Agent2Agent) 协议通过 AgentCard、任务委托和 HTTP/WebSocket 通信,允许 Host Agent 协调多个 Remote Agent 完成工作。
本文基于 GitHub 仓库 https://github.com/google/A2A 的 samples/python/agents/google_adk
示例,展示如何构建一个多代理系统,包括费用报销代理(ExpenseAgent)和汇率转换代理(ForexAgent)。我们将覆盖系统设计、代码实现、测试和优化,揭示 A2A 的协同能力。
多代理系统由以下组件组成:
- Host Agent:协调任务,分配子任务给 Remote Agent。
- Remote Agent:处理特定任务(如费用报销或汇率转换)。
- A2A 协议:通过 AgentCard 和任务通信实现协作。
以下是架构图(参考规划):
graph TD
A[User] --> B[Host Agent]
B --> C[Remote Agent: Expense]
B --> D[Remote Agent: Forex]
C --> E[A2A Protocol]
D --> E
E --> F[Task Results]
style B fill:#bbf,stroke:#333
style C fill:#bfb,stroke:#333
style D fill:#ffb,stroke:#333
- 任务接收:用户通过 Host Agent 提交费用报销任务。
- 代理发现:Host Agent 获取 ExpenseAgent 和 ForexAgent 的 AgentCard。
- 任务分解:Host Agent 将任务分解为报销验证和货币转换。
- 任务委托:通过 A2A 协议将子任务分配给 Remote Agent。
- 结果整合:Host Agent 收集结果,生成最终响应。
- 解耦性:各代理独立运行,支持动态扩展。
- 可靠性:处理代理失败或网络中断(参考第十二篇)。
- 性能:优化通信和任务调度(参考第十篇)。
为每个代理创建 AgentCard:
config/expense.json:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| {
"name": "ExpenseAgent",
"description": "Processes expense reimbursements",
"url": "http://localhost:8081/a2a",
"authentication": {
"schemes": ["Bearer"],
"credentials": "expense_token"
},
"capabilities": {
"interactionModes": ["text"],
"pushNotifications": true
},
"schema": {
"input": {
"type": "object",
"properties": {
"amount": {"type": "number"},
"currency": {"type": "string"}
},
"required": ["amount", "currency"]
}
}
}
|
config/forex.json:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| {
"name": "ForexAgent",
"description": "Converts currency amounts",
"url": "http://localhost:8082/a2a",
"authentication": {
"schemes": ["Bearer"],
"credentials": "forex_token"
},
"capabilities": {
"interactionModes": ["text"],
"pushNotifications": true
},
"schema": {
"input": {
"type": "object",
"properties": {
"amount": {"type": "number"},
"from_currency": {"type": "string"},
"to_currency": {"type": "string"}
},
"required": ["amount", "from_currency", "to_currency"]
}
}
}
|
expense_agent.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import asyncio
import json
from aiohttp import web
from a2a import A2AServer, AgentCard
class ExpenseAgent(A2AServer):
def __init__(self):
with open("config/expense.json") as f:
card_data = json.load(f)
super().__init__(card=AgentCard(**card_data))
async def verify_auth(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise web.HTTPUnauthorized(text="Missing token")
token = auth_header.replace("Bearer ", "")
if token != self.card.authentication["credentials"]:
raise web.HTTPForbidden(text="Invalid token")
return True
async def handle_task(self, request, task: dict) -> dict:
await self.verify_auth(request)
task_id = task["taskId"]
await self.notify_status(task_id, "in_progress")
if task["type"] != "expense":
await self.notify_status(task_id, "failed")
return {"status": "failed", "error": "Invalid task type"}
amount = task["data"]["amount"]
currency = task["data"]["currency"]
if amount <= 0:
await self.notify_status(task_id, "failed")
return {"status": "failed", "error": "Invalid amount"}
await asyncio.sleep(1) # 模拟处理
result = {"status": "approved", "amount": amount, "currency": currency}
await self.notify_status(task_id, "completed")
return {"status": "completed", "result": result}
if __name__ == "__main__":
server = ExpenseAgent()
server.run(port=8081)
|
forex_agent.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| import asyncio
import json
from aiohttp import web
from a2a import A2AServer, AgentCard
class ForexAgent(A2AServer):
def __init__(self):
with open("config/forex.json") as f:
card_data = json.load(f)
super().__init__(card=AgentCard(**card_data))
async def verify_auth(self, request):
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise web.HTTPUnauthorized(text="Missing token")
token = auth_header.replace("Bearer ", "")
if token != self.card.authentication["credentials"]:
raise web.HTTPForbidden(text="Invalid token")
return True
async def handle_task(self, request, task: dict) -> dict:
await self.verify_auth(request)
task_id = task["taskId"]
await self.notify_status(task_id, "in_progress")
if task["type"] != "forex":
await self.notify_status(task_id, "failed")
return {"status": "failed", "error": "Invalid task type"}
amount = task["data"]["amount"]
from_currency = task["data"]["from_currency"]
to_currency = task["data"]["to_currency"]
# 模拟汇率转换(固定汇率)
rates = {"USD": {"EUR": 0.85}, "EUR": {"USD": 1.18}}
if from_currency not in rates or to_currency not in rates[from_currency]:
await self.notify_status(task_id, "failed")
return {"status": "failed", "error": "Unsupported currency pair"}
converted = amount * rates[from_currency][to_currency]
result = {"converted_amount": round(converted, 2), "currency": to_currency}
await self.notify_status(task_id, "completed")
return {"status": "completed", "result": result}
if __name__ == "__main__":
server = ForexAgent()
server.run(port=8082)
|
host_agent.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
| import asyncio
import aiohttp
from a2a import A2AClient
class HostAgent:
def __init__(self):
self.agents = {
"expense": {"url": "http://localhost:8081/a2a", "token": "expense_token"},
"forex": {"url": "http://localhost:8082/a2a", "token": "forex_token"}
}
async def get_client(self, agent_name: str):
headers = {"Authorization": f"Bearer {self.agents[agent_name]['token']}"}
session = aiohttp.ClientSession(headers=headers)
return A2AClient(self.agents[agent_name]["url"], session=session), session
async def process_expense(self, amount: float, currency: str, target_currency: str):
# 获取代理客户端
expense_client, expense_session = await self.get_client("expense")
forex_client, forex_session = await self.get_client("forex")
try:
# 步骤 1:提交报销任务
expense_task = {
"taskId": "expense-001",
"type": "expense",
"data": {"amount": amount, "currency": currency}
}
expense_response = await expense_client.submit_task(expense_task)
print(f"Expense task submitted: {expense_response}")
# 监控报销状态
async for update in expense_client.subscribe_task_updates("expense-001"):
print(f"Expense update: {update}")
if update["status"] == "completed":
break
elif update["status"] == "failed":
return {"status": "failed", "error": "Expense processing failed"}
# 步骤 2:汇率转换
forex_task = {
"taskId": "forex-001",
"type": "forex",
"data": {
"amount": amount,
"from_currency": currency,
"to_currency": target_currency
}
}
forex_response = await forex_client.submit_task(forex_task)
print(f"Forex task submitted: {forex_response}")
# 监控汇率状态
async for update in forex_client.subscribe_task_updates("forex-001"):
print(f"Forex update: {update}")
if update["status"] == "completed":
return {
"status": "completed",
"result": {
"expense": expense_response["result"],
"forex": update["result"]
}
}
elif update["status"] == "failed":
return {"status": "failed", "error": "Forex conversion failed"}
finally:
await expense_session.close()
await forex_session.close()
if __name__ == "__main__":
host = HostAgent()
asyncio.run(host.process_expense(100, "USD", "EUR"))
|
client.py:
1
2
3
4
5
6
7
8
9
10
| import asyncio
from host_agent import HostAgent
async def test_multi_agent():
host = HostAgent()
result = await host.process_expense(100, "USD", "EUR")
print(f"Final result: {result}")
if __name__ == "__main__":
asyncio.run(test_multi_agent())
|
- 启动代理:
1
2
3
| python expense_agent.py
python forex_agent.py
python host_agent.py
|
- 运行客户端:
- 预期输出:
Expense task submitted: {'taskId': 'expense-001', 'status': 'accepted'}
Expense update: {'taskId': 'expense-001', 'status': 'in_progress'}
Expense update: {'taskId': 'expense-001', 'status': 'completed'}
Forex task submitted: {'taskId': 'forex-001', 'status': 'accepted'}
Forex update: {'taskId': 'forex-001', 'status': 'in_progress'}
Forex update: {'taskId': 'forex-001', 'status': 'completed'}
Final result: {
'status': 'completed',
'result': {
'expense': {'status': 'approved', 'amount': 100, 'currency': 'USD'},
'forex': {'converted_amount': 85.0, 'currency': 'EUR'}
}
}
- 异步并发:已使用
asyncio.gather
并行提交任务(可扩展到更多代理)。 - 缓存 AgentCard:使用 Redis 缓存 AgentCard,减少重复请求(参考第十篇)。
- 负载均衡:为高并发场景引入 Nginx(参考第十篇)。
- 重试机制:为失败任务添加指数退避重试(参考第十二篇)。
- 状态持久化:将任务状态存储到 Redis(参考第十二篇)。
- 错误处理:捕获网络或代理异常,确保 Host Agent 优雅降级。
- 多模态交互:为 ForexAgent 添加表单模式(参考第十一篇)。
- 服务发现:使用 Consul 动态发现代理(参考第十篇)。
- 日志审计:记录任务交互历史,便于调试(参考第十篇)。
- 代理不可达:检查
url
配置和网络连接。 - 认证失败:确保 Host Agent 的 token 与 Remote Agent 匹配。
- 状态丢失:启用
stateTransitionHistory
记录历史状态。
- 日志:在 Host Agent 和 Remote Agent 中添加
logging
。 - Postman:测试各代理的 HTTP 端点。
- Redis:监控任务状态和通知。
- 优势:独立代理便于扩展和维护。
- 挑战:任务分解和协调增加逻辑复杂性。
- 优化:使用标准化的 AgentCard 和任务格式。
- 优势:异步通信和并行处理提升性能。
- 挑战:高并发下需管理连接和状态同步。
- 优化:参考第十二篇的 ACK 和断线重连机制。
- 优势:本地测试便于快速迭代。
- challenge:分布式部署需考虑一致性和服务发现。
- 优化:下一篇文章将介绍 Web 应用集成(第十五篇)。
多代理系统适用于:
- 财务自动化:报销、汇率转换和审计的协同处理。
- 客服系统:结合多模态代理(参考第十一篇)。
- 供应链管理:多代理协调库存和物流。
Future enhancements:
- 动态编排:使用 AI 优化任务分配。
- 跨云协作:支持 Google Cloud 和 AWS 代理(第十八篇)。
- 标准化扩展:定义更多交互模式。
通过本教程,你已构建并测试了一个多代理系统,掌握了 A2A 的任务分解、代理协调和结果整合。A2A 的协同能力为企业 AI 系统提供了无限可能。下一篇文章将探讨如何构建 A2A Web 应用,连接前端和代理。
欢迎访问 A2A GitHub 仓库,加入社区,分享你的多代理开发经验!