多智能体系统(Multi-Agent System)设计与实现

前言

在人工智能领域,单一智能体(Single Agent)的能力虽然强大,但在面对复杂、动态、多维度的真实世界任务时,往往显得力不从心。多智能体系统(Multi-Agent System,MAS)作为一种新兴的计算范式,通过多个具有不同能力、相互协作的智能体共同完成任务,展现出比单一智能体更强大的问题求解能力。

本文将从架构设计、通信机制、协作策略、代码实现等多个维度,深入探讨如何构建一个高效、可靠的多智能体系统。

一、为什么需要多智能体系统?

1.1 单一智能体的局限性

  • 能力边界:单一智能体的能力受限于其训练数据和模型规模,无法在所有领域都表现出色
  • 任务分解:复杂任务难以由单一智能体完整处理,需要分工协作
  • 可扩展性:增加新能力时需要重新训练或大幅调整单一智能体
  • 容错性:单一故障点导致系统整体脆弱

1.2 多智能体系统的优势

  • 专业分工:每个智能体可以专注于特定领域,实现专业化
  • 并行处理:多个智能体可以同时工作,提高效率
  • 分布式知识:不同智能体可以拥有不同领域的知识库
  • 涌现行为:通过简单规则的交互产生复杂的群体行为
  • 容错冗余:部分智能体故障不会导致整个系统崩溃

二、多智能体系统架构设计

2.1 核心架构模式

2.1.1 分层架构(Hierarchical Architecture)

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────┐
│ Supervisor Agent │
│ (任务分解与调度) │
└─────────────────┬───────────────────────┘

┌─────────┼─────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Planner │ │ Research │ │ Executor │
│ Agent │ │ Agent │ │ Agent │
└──────────┘ └──────────┘ └──────────┘

2.1.2 去中心化架构( Decentralized Architecture)

1
2
3
4
5
6
7
8
9
10
┌──────────┐     ┌──────────┐     ┌──────────┐
│ Agent A │◄───►│ Agent B │◄───►│ Agent C │
└──────────┘ └──────────┘ └──────────┘
▲ ▲ ▲
└───────────────┼───────────────┘

┌──────────┐
│ 共享 │
│ 知识库 │
└──────────┘

2.1.3 星型架构(Star Architecture)

1
2
3
4
5
6
7
8
9
10
11
                 ┌──────────┐
│ Hub │
│ Agent │
└────┬─────┘
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Specialist│ │ Specialist│ │ Specialist│
│ Agent 1 │ │ Agent 2 │ │ Agent 3 │
└──────────┘ └──────────┘ └──────────┘

2.2 智能体角色定义

在多智能体系统中,每个智能体应该具有明确的角色定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class AgentRole(Enum):
"""智能体角色枚举"""
SUPERVISOR = "supervisor" # 监督者:负责任务分配和结果整合
PLANNER = "planner" # 规划者:负责任务拆解和执行计划
RESEARCHER = "researcher" # 研究者:负责信息检索和知识获取
EXECUTOR = "executor" # 执行者:负责具体任务执行
CRITIC = "critic" # 批评者:负责质量评估和反馈
COORDINATOR = "coordinator" # 协调者:负责智能体间协调

@dataclass
class AgentCapability:
"""智能体能力定义"""
name: str
description: str
input_types: List[str]
output_types: List[str]
confidence: float = 1.0

@dataclass
class AgentConfig:
"""智能体配置"""
id: str
name: str
role: AgentRole
capabilities: List[AgentCapability]
model_name: str = "gpt-4"
max_retries: int = 3
timeout: int = 60

三、通信机制设计

3.1 消息传递协议

多智能体系统中的核心挑战之一是设计高效的通信机制。我们将实现一个完整的消息传递系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import json
import asyncio
from datetime import datetime
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass, field, asdict
from enum import Enum
import uuid

class MessageType(Enum):
"""消息类型"""
REQUEST = "request" # 请求消息
RESPONSE = "response" # 响应消息
BROADCAST = "broadcast" # 广播消息
NOTIFICATION = "notification" # 通知消息
HEARTBEAT = "heartbeat" # 心跳消息

class Priority(Enum):
"""消息优先级"""
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4

@dataclass
class Message:
"""消息结构"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: MessageType = MessageType.REQUEST
sender: str = ""
receivers: List[str] = field(default_factory=list) # 空列表表示广播
content: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
priority: Priority = Priority.NORMAL
reply_to: Optional[str] = None # 关联的消息ID

def to_json(self) -> str:
return json.dumps({
'id': self.id,
'type': self.type.value,
'sender': self.sender,
'receivers': self.receivers,
'content': self.content,
'metadata': self.metadata,
'timestamp': self.timestamp,
'priority': self.priority.value,
'reply_to': self.reply_to
}, ensure_ascii=False)

@classmethod
def from_json(cls, json_str: str) -> 'Message':
data = json.loads(json_str)
return cls(
id=data['id'],
type=MessageType(data['type']),
sender=data['sender'],
receivers=data['receivers'],
content=data['content'],
metadata=data['metadata'],
timestamp=data['timestamp'],
priority=Priority(data['priority']),
reply_to=data.get('reply_to')
)

class MessageBus:
"""消息总线 - 智能体间通信的中介"""

def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
self._message_queue: asyncio.Queue = asyncio.Queue()
self._running = False
self._message_history: List[Message] = []

def subscribe(self, agent_id: str, callback: Callable[[Message], None]):
"""订阅特定智能体的消息"""
if agent_id not in self._subscribers:
self._subscribers[agent_id] = []
self._subscribers[agent_id].append(callback)

def unsubscribe(self, agent_id: str, callback: Callable[[Message], None]):
"""取消订阅"""
if agent_id in self._subscribers:
self._subscribers[agent_id].remove(callback)

async def publish(self, message: Message):
"""发布消息"""
self._message_history.append(message)
await self._message_queue.put(message)

async def start_processing(self):
"""启动消息处理循环"""
self._running = True
while self._running:
try:
message = await asyncio.wait_for(
self._message_queue.get(),
timeout=1.0
)
await self._dispatch_message(message)
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"消息处理错误: {e}")

async def _dispatch_message(self, message: Message):
"""分发消息到订阅者"""
if message.receivers:
# 定向消息
for receiver in message.receivers:
if receiver in self._subscribers:
for callback in self._subscribers[receiver]:
await callback(message)
else:
# 广播消息
for agent_id, callbacks in self._subscribers.items():
if agent_id != message.sender: # 不发送给自己
for callback in callbacks:
await callback(message)

def get_history(self, agent_id: Optional[str] = None,
msg_type: Optional[MessageType] = None) -> List[Message]:
"""获取消息历史"""
results = self._message_history
if agent_id:
results = [m for m in results if m.sender == agent_id or agent_id in m.receivers]
if msg_type:
results = [m for m in results if m.type == msg_type]
return results

async def stop(self):
"""停止消息处理"""
self._running = False

3.2 智能体基类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional
from datetime import datetime

class BaseAgent(ABC):
"""智能体基类"""

def __init__(self, agent_id: str, name: str, role: AgentRole,
message_bus: MessageBus):
self.agent_id = agent_id
self.name = name
self.role = role
self.message_bus = message_bus
self._running = False
self._state: Dict[str, Any] = {}
self._pending_requests: Dict[str, asyncio.Future] = {}

# 订阅消息
self.message_bus.subscribe(self.agent_id, self._handle_message)

async def _handle_message(self, message: Message):
"""处理接收到的消息"""
if message.type == MessageType.REQUEST:
# 处理请求,发送响应
response = await self.process_request(message)
if message.id:
response.reply_to = message.id
await self.message_bus.publish(response)

elif message.type == MessageType.BROADCAST:
# 处理广播
await self.handle_broadcast(message)

elif message.type == MessageType.NOTIFICATION:
# 处理通知
await self.handle_notification(message)

@abstractmethod
async def process_request(self, message: Message) -> Message:
"""处理请求消息"""
pass

async def handle_broadcast(self, message: Message):
"""处理广播消息"""
pass

async def handle_notification(self, message: Message):
"""处理通知消息"""
pass

async def send_request(self, receiver: str, content: Dict[str, Any],
timeout: float = 30.0) -> Message:
"""发送请求并等待响应"""
request = Message(
type=MessageType.REQUEST,
sender=self.agent_id,
receivers=[receiver],
content=content
)

future = asyncio.Future()
self._pending_requests[request.id] = future

await self.message_bus.publish(request)

try:
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
self._pending_requests.pop(request.id, None)
raise TimeoutError(f"请求超时: {receiver}")
finally:
self._pending_requests.pop(request.id, None)

async def send_broadcast(self, content: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None):
"""发送广播消息"""
message = Message(
type=MessageType.BROADCAST,
sender=self.agent_id,
receivers=[], # 空列表表示广播
content=content,
metadata=metadata or {}
)
await self.message_bus.publish(message)

async def send_notification(self, receivers: List[str],
content: Dict[str, Any],
priority: Priority = Priority.NORMAL):
"""发送通知"""
message = Message(
type=MessageType.NOTIFICATION,
sender=self.agent_id,
receivers=receivers,
content=content,
priority=priority
)
await self.message_bus.publish(message)

def update_state(self, key: str, value: Any):
"""更新内部状态"""
self._state[key] = value
self._state[f"{key}_updated_at"] = datetime.now().isoformat()

def get_state(self, key: str, default: Any = None) -> Any:
"""获取状态值"""
return self._state.get(key, default)

async def start(self):
"""启动智能体"""
self._running = True
await self.on_start()
asyncio.create_task(self._run())

async def stop(self):
"""停止智能体"""
self._running = False
await self.on_stop()

async def _run(self):
"""运行循环"""
while self._running:
try:
await self.on_tick()
await asyncio.sleep(1)
except Exception as e:
await self.on_error(e)

async def on_start(self):
"""启动时的钩子"""
pass

async def on_stop(self):
"""停止时的钩子"""
pass

async def on_tick(self):
"""每个周期执行的钩子"""
pass

async def on_error(self, error: Exception):
"""错误处理钩子"""
print(f"{self.name} 错误: {error}")

四、具体智能体实现

4.1 规划器智能体(Planner Agent)

规划器负责将复杂任务分解为可执行的子任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from typing import List, Dict, Any

@dataclass
class Task:
"""任务定义"""
id: str
description: str
status: str = "pending" # pending, in_progress, completed, failed
dependencies: List[str] = field(default_factory=list)
assigned_to: Optional[str] = None
result: Optional[Dict[str, Any]] = None
priority: int = 2
created_at: str = field(default_factory=lambda: datetime.now().isoformat())

class PlannerAgent(BaseAgent):
"""规划器智能体 - 负责任务分解和规划"""

def __init__(self, agent_id: str, message_bus: MessageBus,
llm_client=None):
super().__init__(agent_id, "Planner", AgentRole.PLANNER, message_bus)
self.llm_client = llm_client
self.tasks: Dict[str, Task] = {}

async def process_request(self, message: Message) -> Message:
"""处理任务规划请求"""
content = message.content

if content.get('action') == 'plan':
# 执行任务规划
task_description = content.get('task', '')
subtasks = await self.create_plan(task_description)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'plan_id': str(uuid.uuid4()),
'subtasks': [asdict(t) for t in subtasks]
}
)

elif content.get('action') == 'update_task':
# 更新任务状态
task_id = content.get('task_id')
status = content.get('status')
result = content.get('result')

if task_id in self.tasks:
self.tasks[task_id].status = status
if result:
self.tasks[task_id].result = result

# 检查是否所有依赖任务都已完成
await self._check_task_completion(task_id)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={'status': 'updated', 'task_id': task_id}
)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={'status': 'error', 'message': 'Unknown action'}
)

async def create_plan(self, task_description: str) -> List[Task]:
"""使用LLM创建任务计划"""
if self.llm_client:
# 使用LLM进行智能任务分解
prompt = f"""请将以下任务分解为可执行的子任务:
任务:{task_description}

要求:
1. 每个子任务应该是原子性的,可以独立执行
2. 考虑任务之间的依赖关系
3. 为每个子任务指定合理的优先级

返回JSON格式的子任务列表"""
response = await self.llm_client.generate(prompt)
subtasks = self._parse_llm_response(response)
else:
# 基于规则的任务分解
subtasks = self._rule_based_planning(task_description)

# 存储任务
for task in subtasks:
self.tasks[task.id] = task

return subtasks

def _rule_based_planning(self, task_description: str) -> List[Task]:
"""基于规则的任务分解"""
# 简单的关键词匹配分解
tasks = []

if any(keyword in task_description for keyword in ['研究', '调研', '分析']):
tasks.append(Task(
id=str(uuid.uuid4()),
description="执行信息收集和调研",
priority=3
))

if any(keyword in task_description for keyword in ['设计', '规划', '方案']):
tasks.append(Task(
id=str(uuid.uuid4()),
description="制定详细设计方案",
dependencies=[tasks[-1].id] if tasks else [],
priority=2
))

if any(keyword in task_description for keyword in ['实现', '开发', '构建']):
tasks.append(Task(
id=str(uuid.uuid4()),
description="执行开发实现",
dependencies=[tasks[-1].id] if tasks else [],
priority=1
))

if any(keyword in task_description for keyword in ['测试', '验证']):
tasks.append(Task(
id=str(uuid.uuid4()),
description="执行测试验证",
dependencies=[tasks[-1].id] if tasks else [],
priority=1
))

return tasks

def _parse_llm_response(self, response: str) -> List[Task]:
"""解析LLM返回的任务列表"""
# 简化实现,实际应使用JSON解析
import re
tasks = []

# 提取任务列表
pattern = r'- \[\]\s*([^:]+):([^\\n]+)'
matches = re.findall(pattern, response)

for i, (title, desc) in enumerate(matches):
tasks.append(Task(
id=str(uuid.uuid4()),
description=f"{title.strip()}: {desc.strip()}",
priority=max(1, 3 - i)
))

return tasks

async def _check_task_completion(self, task_id: str):
"""检查任务是否完成,触发下游任务"""
task = self.tasks.get(task_id)
if not task or task.status != "completed":
return

# 找到依赖此任务的其他任务
dependent_tasks = [
t for t in self.tasks.values()
if task_id in t.dependencies and t.status == "pending"
]

# 通知下游任务可以开始
for dependent_task in dependent_tasks:
# 检查是否所有依赖都已完成
all_deps_complete = all(
self.tasks.get(dep_id, Task("", "")).status == "completed"
for dep_id in dependent_task.dependencies
)
if all_deps_complete:
await self.send_notification(
receivers=[dependent_task.assigned_to] if dependent_task.assigned_to else [],
content={
'type': 'task_ready',
'task_id': dependent_task.id,
'task_description': dependent_task.description
},
priority=Priority.HIGH
)

4.2 研究者智能体(Researcher Agent)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class ResearcherAgent(BaseAgent):
"""研究者智能体 - 负责信息检索和知识获取"""

def __init__(self, agent_id: str, message_bus: MessageBus,
search_engine=None, knowledge_base=None):
super().__init__(agent_id, "Researcher", AgentRole.RESEARCHER, message_bus)
self.search_engine = search_engine
self.knowledge_base = knowledge_base
self.search_results_cache: Dict[str, List[Dict]] = {}

async def process_request(self, message: Message) -> Message:
"""处理研究请求"""
content = message.content
action = content.get('action')

if action == 'search':
query = content.get('query', '')
max_results = content.get('max_results', 5)

results = await self.search(query, max_results)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'query': query,
'results': results,
'result_count': len(results)
}
)

elif action == 'deep_research':
topic = content.get('topic', '')
aspects = content.get('aspects', [])

research_data = await self.deep_research(topic, aspects)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'topic': topic,
'research_data': research_data
}
)

elif action == 'fact_check':
claims = content.get('claims', [])
checked = await self.fact_check(claims)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'checked_claims': checked
}
)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={'status': 'error', 'message': 'Unknown action'}
)

async def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""执行搜索"""
# 检查缓存
cache_key = f"{query}_{max_results}"
if cache_key in self.search_results_cache:
return self.search_results_cache[cache_key]

results = []

if self.search_engine:
# 使用实际搜索引擎
raw_results = await self.search_engine.search(query, max_results)
for item in raw_results:
results.append({
'title': item.get('title', ''),
'url': item.get('url', ''),
'snippet': item.get('snippet', ''),
'relevance_score': item.get('score', 0.0)
})
else:
# 模拟搜索结果
results = self._simulate_search(query, max_results)

# 缓存结果
self.search_results_cache[cache_key] = results

return results

def _simulate_search(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""生成模拟搜索结果"""
return [
{
'title': f"关于'{query}'的研究报告",
'url': f"https://example.com/research/{hash(query) % 10000}",
'snippet': f"本文档包含关于'{query}'的详细分析和研究资料...",
'relevance_score': 0.95 - (i * 0.1)
}
for i in range(min(max_results, 5))
]

async def deep_research(self, topic: str,
aspects: List[str]) -> Dict[str, Any]:
"""深度研究一个主题"""
research_data = {
'topic': topic,
'overview': '',
'aspects': {},
'sources': [],
'timestamp': datetime.now().isoformat()
}

# 获取主题概述
overview_results = await self.search(f"{topic} 概述", max_results=3)
if overview_results:
research_data['overview'] = overview_results[0].get('snippet', '')

# 研究各个角度
for aspect in aspects:
aspect_results = await self.search(f"{topic} {aspect}", max_results=5)
research_data['aspects'][aspect] = {
'findings': [r.get('snippet', '') for r in aspect_results],
'source_count': len(aspect_results)
}

# 收集所有来源
all_sources = set()
for aspect_data in research_data['aspects'].values():
for finding in aspect_data.get('findings', []):
# 从finding中提取URL(简化实现)
if 'https://' in finding:
start = finding.find('https://')
end = finding.find(' ', start)
if end == -1:
end = len(finding)
all_sources.add(finding[start:end])

research_data['sources'] = list(all_sources)

return research_data

async def fact_check(self, claims: List[str]) -> List[Dict[str, Any]]:
"""核实声明的真实性"""
checked_claims = []

for claim in claims:
# 搜索相关来源
search_results = await self.search(claim, max_results=3)

# 分析证据
supporting = 0
contradicting = 0

for result in search_results:
snippet = result.get('snippet', '').lower()
# 简化的情感分析
if any(word in snippet for word in ['证实', '确认', '证明', '支持']):
supporting += 1
if any(word in snippet for word in ['反驳', '否认', '错误', '虚假']):
contradicting += 1

if supporting > contradicting:
verdict = "支持"
elif contradicting > supporting:
verdict = "质疑"
else:
verdict = "不确定"

checked_claims.append({
'claim': claim,
'verdict': verdict,
'supporting_evidence': supporting,
'contradicting_evidence': contradicting,
'sources': search_results
})

return checked_claims

4.3 执行器智能体(Executor Agent)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import subprocess
from typing import Callable, Dict, Any

class ExecutorAgent(BaseAgent):
"""执行器智能体 - 负责具体任务的执行"""

def __init__(self, agent_id: str, message_bus: MessageBus,
tools: Dict[str, Callable] = None):
super().__init__(agent_id, "Executor", AgentRole.EXECUTOR, message_bus)
self.tools = tools or {}
self.execution_history: List[Dict] = []

async def process_request(self, message: Message) -> Message:
"""处理执行请求"""
content = message.content
action = content.get('action')

if action == 'execute':
task_id = content.get('task_id')
command = content.get('command')
params = content.get('params', {})

result = await self.execute_task(task_id, command, params)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content=result
)

elif action == 'execute_tool':
tool_name = content.get('tool_name')
tool_params = content.get('params', {})

result = await self.execute_tool(tool_name, tool_params)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content=result
)

elif action == 'execute_parallel':
tasks = content.get('tasks', [])

results = await self.execute_parallel(tasks)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'results': results
}
)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={'status': 'error', 'message': 'Unknown action'}
)

async def execute_task(self, task_id: str, command: str,
params: Dict[str, Any]) -> Dict[str, Any]:
"""执行单个任务"""
self.update_state(f"task_{task_id}_status", "running")

try:
# 模拟任务执行
await asyncio.sleep(0.1) # 模拟处理时间

result = {
'status': 'success',
'task_id': task_id,
'output': f"执行命令: {command}",
'params': params,
'execution_time': 0.1
}

# 记录执行历史
self.execution_history.append({
'task_id': task_id,
'command': command,
'params': params,
'result': result,
'timestamp': datetime.now().isoformat()
})

self.update_state(f"task_{task_id}_status", "completed")
self.update_state(f"task_{task_id}_result", result)

return result

except Exception as e:
self.update_state(f"task_{task_id}_status", "failed")
return {
'status': 'error',
'task_id': task_id,
'error': str(e)
}

async def execute_tool(self, tool_name: str,
params: Dict[str, Any]) -> Dict[str, Any]:
"""执行工具"""
if tool_name not in self.tools:
return {
'status': 'error',
'error': f"Unknown tool: {tool_name}",
'available_tools': list(self.tools.keys())
}

tool_func = self.tools[tool_name]

try:
if asyncio.iscoroutinefunction(tool_func):
result = await tool_func(**params)
else:
result = tool_func(**params)

return {
'status': 'success',
'tool': tool_name,
'result': result
}

except Exception as e:
return {
'status': 'error',
'tool': tool_name,
'error': str(e)
}

async def execute_parallel(self, tasks: List[Dict]) -> List[Dict]:
"""并行执行多个任务"""
async def run_task(task):
task_id = task.get('id', str(uuid.uuid4()))
command = task.get('command', '')
params = task.get('params', {})
return await self.execute_task(task_id, command, params)

# 并发执行所有任务
results = await asyncio.gather(
*[run_task(t) for t in tasks],
return_exceptions=True
)

# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'status': 'error',
'task_id': tasks[i].get('id', f"task_{i}"),
'error': str(result)
})
else:
processed_results.append(result)

return processed_results

def register_tool(self, name: str, func: Callable):
"""注册新工具"""
self.tools[name] = func

async def handle_notification(self, message: Message):
"""处理通知消息"""
content = message.content
if content.get('type') == 'cancel_task':
task_id = content.get('task_id')
# 取消任务
self.update_state(f"task_{task_id}_status", "cancelled")

五、协作策略与调度

5.1 任务调度器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from typing import List, Optional
import heapq

class TaskScheduler:
"""任务调度器 - 负责任务分配和负载均衡"""

def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.agents: Dict[str, BaseAgent] = {}
self.agent_loads: Dict[str, int] = {} # 当前任务数
self.task_queue: List[Tuple[int, Task]] = [] # 优先级队列
self.task_assignments: Dict[str, str] = {} # task_id -> agent_id

def register_agent(self, agent: BaseAgent):
"""注册智能体"""
self.agents[agent.agent_id] = agent
self.agent_loads[agent.agent_id] = 0

def unregister_agent(self, agent_id: str):
"""注销智能体"""
if agent_id in self.agents:
del self.agents[agent_id]
del self.agent_loads[agent_id]

async def submit_task(self, task: Task, preferred_agent: Optional[str] = None):
"""提交任务"""
# 如果指定了偏好智能体且可用,直接分配
if preferred_agent and preferred_agent in self.agents:
await self._assign_task_to_agent(task, preferred_agent)
else:
# 加入任务队列,等待调度
heapq.heappush(self.task_queue, (5 - task.priority, task))
await self._try_schedule()

async def _try_schedule(self):
"""尝试调度任务"""
while self.task_queue and self._has_available_agent():
priority, task = heapq.heappop(self.task_queue)
best_agent = self._select_best_agent(task)
if best_agent:
await self._assign_task_to_agent(task, best_agent)

def _has_available_agent(self) -> bool:
"""检查是否有可用智能体"""
return any(load < 3 for load in self.agent_loads.values())

def _select_best_agent(self, task: Task) -> Optional[str]:
"""选择最佳智能体"""
# 基于负载的选择
available_agents = [
(agent_id, load)
for agent_id, load in self.agent_loads.items()
if load < 3
]

if not available_agents:
return None

# 选择负载最低的智能体
available_agents.sort(key=lambda x: x[1])
return available_agents[0][0]

async def _assign_task_to_agent(self, task: Task, agent_id: str):
"""分配任务给智能体"""
agent = self.agents[agent_id]
self.task_assignments[task.id] = agent_id
self.agent_loads[agent_id] += 1

# 发送任务消息
await self.message_bus.publish(Message(
type=MessageType.REQUEST,
sender="scheduler",
receivers=[agent_id],
content={
'action': 'execute',
'task_id': task.id,
'task_description': task.description,
'params': {}
}
))

def get_task_status(self, task_id: str) -> Optional[str]:
"""获取任务状态"""
agent_id = self.task_assignments.get(task_id)
if agent_id and agent_id in self.agents:
agent = self.agents[agent_id]
return agent.get_state(f"task_{task_id}_status")
return None

async def cancel_task(self, task_id: str) -> bool:
"""取消任务"""
agent_id = self.task_assignments.get(task_id)
if agent_id and agent_id in self.agents:
agent = self.agents[agent_id]
await agent.send_notification(
receivers=[agent_id],
content={'type': 'cancel_task', 'task_id': task_id}
)
self.agent_loads[agent_id] = max(0, self.agent_loads[agent_id] - 1)
del self.task_assignments[task_id]
return True
return False

5.2 Supervisor协调器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class SupervisorAgent(BaseAgent):
"""监督者智能体 - 协调整个多智能体系统"""

def __init__(self, agent_id: str, message_bus: MessageBus,
planner: PlannerAgent,
researcher: ResearcherAgent,
executor: ExecutorAgent):
super().__init__(agent_id, "Supervisor", AgentRole.SUPERVISOR, message_bus)
self.planner = planner
self.researcher = researcher
self.executor = executor
self.scheduler = TaskScheduler(message_bus)

# 注册子智能体
self.scheduler.register_agent(planner)
self.scheduler.register_agent(researcher)
self.scheduler.register_agent(executor)

self.current_tasks: Dict[str, Dict] = {}

async def process_request(self, message: Message) -> Message:
"""处理来自用户的请求"""
content = message.content
user_request = content.get('request', '')

# 创建唯一请求ID
request_id = str(uuid.uuid4())

# 记录任务
self.current_tasks[request_id] = {
'request': user_request,
'status': 'planning',
'start_time': datetime.now().isoformat()
}

# 步骤1: 任务规划
plan_message = await self.send_request(
self.planner.agent_id,
{'action': 'plan', 'task': user_request}
)

if plan_message.content.get('status') != 'success':
return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'error',
'message': '任务规划失败',
'request_id': request_id
}
)

subtasks = plan_message.content.get('subtasks', [])
self.current_tasks[request_id]['status'] = 'executing'
self.current_tasks[request_id]['subtasks'] = subtasks

# 步骤2: 执行子任务
results = []
for subtask in subtasks:
result = await self._execute_subtask(subtask, request_id)
results.append(result)

# 如果执行失败,考虑是否需要重试或调整
if result.get('status') == 'error':
# 可以实现重试逻辑
pass

# 步骤3: 整合结果
final_result = await self._aggregate_results(request_id, results)

self.current_tasks[request_id]['status'] = 'completed'
self.current_tasks[request_id]['end_time'] = datetime.now().isoformat()
self.current_tasks[request_id]['result'] = final_result

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'request_id': request_id,
'result': final_result,
'task_summary': {
'total': len(subtasks),
'succeeded': sum(1 for r in results if r.get('status') == 'success'),
'failed': sum(1 for r in results if r.get('status') == 'error')
}
}
)

async def _execute_subtask(self, subtask: Dict, request_id: str) -> Dict:
"""执行单个子任务"""
task_id = subtask.get('id')
description = subtask.get('description')

# 根据任务类型选择执行方式
if any(keyword in description for keyword in ['搜索', '查询', '研究']):
# 研究类任务
response = await self.send_request(
self.researcher.agent_id,
{'action': 'search', 'query': description, 'max_results': 5}
)
return response.content

elif any(keyword in description for keyword in ['执行', '运行', '计算']):
# 执行类任务
response = await self.send_request(
self.executor.agent_id,
{
'action': 'execute',
'task_id': task_id,
'command': description,
'params': {}
}
)
return response.content

else:
# 默认使用执行器
response = await self.send_request(
self.executor.agent_id,
{
'action': 'execute',
'task_id': task_id,
'command': description,
'params': {}
}
)
return response.content

async def _aggregate_results(self, request_id: str,
results: List[Dict]) -> Dict[str, Any]:
"""聚合子任务结果"""
aggregated = {
'summary': '',
'details': results,
'timestamp': datetime.now().isoformat()
}

# 生成摘要
successful_results = [r for r in results if r.get('status') == 'success']
failed_results = [r for r in results if r.get('status') == 'error']

if successful_results:
aggregated['summary'] = f"成功完成 {len(successful_results)}/{len(results)} 个子任务"

if failed_results:
aggregated['summary'] += f",{len(failed_results)} 个任务失败"
aggregated['errors'] = [r.get('error', 'Unknown error') for r in failed_results]

return aggregated

六、多智能体系统完整示例

6.1 系统初始化与运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def create_multi_agent_system() -> SupervisorAgent:
"""创建多智能体系统"""

# 1. 创建消息总线
message_bus = MessageBus()

# 2. 创建各个智能体
planner = PlannerAgent(
agent_id="planner_001",
message_bus=message_bus
)

researcher = ResearcherAgent(
agent_id="researcher_001",
message_bus=message_bus
)

executor = ExecutorAgent(
agent_id="executor_001",
message_bus=message_bus,
tools={
'calculate': lambda x, y: x + y,
'format_text': lambda text, style: f"[{style}] {text} [/{style}]",
'convert_unit': lambda value, from_unit, to_unit: f"{value} {from_unit} = {value * 1.61} {to_unit}"
}
)

# 3. 创建监督者
supervisor = SupervisorAgent(
agent_id="supervisor_001",
message_bus=message_bus,
planner=planner,
researcher=researcher,
executor=executor
)

# 4. 启动消息处理
asyncio.create_task(message_bus.start_processing())

# 5. 启动各智能体
await planner.start()
await researcher.start()
await executor.start()
await supervisor.start()

return supervisor


async def main():
"""主函数 - 演示多智能体系统"""

print("🚀 初始化多智能体系统...")

# 创建系统
supervisor = await create_multi_agent_system()

# 等待系统就绪
await asyncio.sleep(1)

print("📋 系统就绪,开始处理请求...")

# 发送任务请求
user_request = "研究人工智能在医疗领域的应用,并生成一份分析报告"

response = await supervisor.send_request(
supervisor.agent_id,
{'request': user_request}
)

print(f"\n📊 任务完成!")
print(f"状态: {response.content.get('status')}")
print(f"请求ID: {response.content.get('request_id')}")

if 'result' in response.content:
result = response.content['result']
print(f"\n📝 结果摘要: {result.get('summary', 'N/A')}")

if 'task_summary' in response.content:
summary = response.content['task_summary']
print(f"📈 任务统计: {summary}")

# 关闭系统
await supervisor.stop()


if __name__ == "__main__":
asyncio.run(main())

6.2 实际应用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
案例:使用多智能体系统进行代码审查
"""

class CodeReviewMultiAgentSystem:
"""代码审查多智能体系统"""

def __init__(self, message_bus: MessageBus):
# 规划器 - 分析代码审查需求
self.planner = PlannerAgent("code_planner", message_bus)

# 研究者 - 查找相关编码规范
self.researcher = ResearcherAgent("code_researcher", message_bus)

# 执行器 - 运行静态分析工具
self.executor = ExecutorAgent("code_executor", message_bus, tools={
'run_linter': self._run_linter,
'check_security': self._check_security,
'analyze_complexity': self._analyze_complexity
})

# 批评者 - 评估代码质量
self.critic = CriticAgent("code_critic", message_bus)

# 协调者
self.coordinator = SupervisorAgent(
"code_review_supervisor",
message_bus,
self.planner,
self.researcher,
self.executor
)

async def review_code(self, code: str, language: str = "python") -> Dict:
"""审查代码"""
request = Message(
type=MessageType.REQUEST,
sender="user",
receivers=[self.coordinator.agent_id],
content={
'request': f"对以下{language}代码进行全面审查:\n{code}",
'language': language
}
)

response = await self.coordinator.send_request(
self.coordinator.agent_id,
request.content
)

return response.content


class CriticAgent(BaseAgent):
"""批评者智能体 - 评估和反馈"""

def __init__(self, agent_id: str, message_bus: MessageBus):
super().__init__(agent_id, "Critic", AgentRole.CRITIC, message_bus)
self.reviews: List[Dict] = []

async def process_request(self, message: Message) -> Message:
"""处理审查请求"""
content = message.content

code_findings = content.get('findings', [])
lint_results = content.get('lint_results', [])

# 综合评估
review = self._comprehensive_review(code_findings, lint_results)

return Message(
type=MessageType.RESPONSE,
sender=self.agent_id,
receivers=[message.sender],
content={
'status': 'success',
'review': review
}
)

def _comprehensive_review(self, findings: List[Dict],
lint_results: List[Dict]) -> Dict:
"""综合审查"""
issues = []
suggestions = []

# 分析发现
for finding in findings:
if finding.get('severity') == 'high':
issues.append({
'type': 'high_priority',
'description': finding.get('description'),
'location': finding.get('location')
})

# 分析lint结果
for result in lint_results:
if result.get('level') == 'error':
issues.append({
'type': 'lint_error',
'description': result.get('message'),
'line': result.get('line')
})
elif result.get('level') == 'warning':
suggestions.append(result.get('message'))

# 计算质量分数
quality_score = max(0, 100 - len(issues) * 10 - len(suggestions) * 2)

return {
'quality_score': quality_score,
'issues': issues,
'suggestions': suggestions,
'summary': self._generate_summary(quality_score, issues, suggestions)
}

def _generate_summary(self, score: int, issues: List[Dict],
suggestions: List[str]) -> str:
"""生成审查摘要"""
if score >= 90:
level = "优秀"
elif score >= 70:
level = "良好"
elif score >= 50:
level = "一般"
else:
level = "需要改进"

summary = f"代码质量评级: {level} (分数: {score}/100)\n"
summary += f"发现 {len(issues)} 个问题,{len(suggestions)} 个改进建议。"

return summary

七、挑战与最佳实践

7.1 主要挑战

  1. 通信开销:智能体间频繁通信可能导致性能瓶颈
  2. 一致性问题:分布式决策可能导致状态不一致
  3. 死锁与活锁:复杂的依赖关系可能导致死锁
  4. 调试困难:多智能体行为难以追踪和调试
  5. 资源竞争:多个智能体可能竞争相同资源

7.2 最佳实践建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 最佳实践1: 实现超时和重试机制
async def robust_request(agent, receiver, content, max_retries=3):
"""带重试的请求"""
for attempt in range(max_retries):
try:
response = await asyncio.wait_for(
agent.send_request(receiver, content),
timeout=30.0
)
return response
except (TimeoutError, ConnectionError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
return None

# 最佳实践2: 实现优雅降级
async def graceful_degradation(agents: List[BaseAgent],
critical_agent_id: str,
task_func):
"""当关键智能体不可用时优雅降级"""
try:
return await task_func()
except Exception as e:
# 记录错误
logging.error(f"关键路径失败: {e}")
# 尝试备用方案
for agent in agents:
if agent.agent_id != critical_agent_id:
try:
return await task_func(alternative_agent=agent)
except:
continue
return {"status": "degraded", "error": str(e)}

# 最佳实践3: 实现健康检查
class HealthCheck:
"""健康检查机制"""
def __init__(self, agents: List[BaseAgent]):
self.agents = {a.agent_id: a for a in agents}

async def check_all(self) -> Dict[str, bool]:
"""检查所有智能体状态"""
results = {}
for agent_id, agent in self.agents.items():
try:
# 检查最近的心跳
last_heartbeat = agent.get_state('last_heartbeat')
if last_heartbeat:
from datetime import datetime
last_time = datetime.fromisoformat(last_heartbeat)
is_alive = (datetime.now() - last_time).seconds < 60
else:
is_alive = False

results[agent_id] = is_alive
except:
results[agent_id] = False

return results

八、总结

多智能体系统代表了人工智能应用的新范式,通过多个专业化智能体的协作,可以处理更加复杂、动态的任务。本文详细介绍了:

  1. 架构设计:包括分层、去中心化、星型等多种架构模式
  2. 通信机制:基于消息总线的异步通信协议
  3. 核心组件:规划器、研究者、执行器、批评者等角色
  4. 协作策略:任务调度、负载均衡、Supervisor协调
  5. 实战代码:完整的Python实现示例

随着LLM技术的不断发展,多智能体系统将在更多领域展现出其强大的能力。希望本文能为读者提供有价值的参考,帮助大家构建更加智能、高效的多智能体应用。


本文代码基于Python 3.10+,使用asyncio实现异步并发。