1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
| class ResearcherAgent(BaseAgent): """研究者智能体 - 负责信息检索和知识获取"""
def __init__(self, agent_id: str, message_bus: MessageBus, search_engine=None, knowledge_base=None): super().__init__(agent_id, "Researcher", AgentRole.RESEARCHER, message_bus) self.search_engine = search_engine self.knowledge_base = knowledge_base self.search_results_cache: Dict[str, List[Dict]] = {}
async def process_request(self, message: Message) -> Message: """处理研究请求""" content = message.content action = content.get('action')
if action == 'search': query = content.get('query', '') max_results = content.get('max_results', 5)
results = await self.search(query, max_results)
return Message( type=MessageType.RESPONSE, sender=self.agent_id, receivers=[message.sender], content={ 'status': 'success', 'query': query, 'results': results, 'result_count': len(results) } )
elif action == 'deep_research': topic = content.get('topic', '') aspects = content.get('aspects', [])
research_data = await self.deep_research(topic, aspects)
return Message( type=MessageType.RESPONSE, sender=self.agent_id, receivers=[message.sender], content={ 'status': 'success', 'topic': topic, 'research_data': research_data } )
elif action == 'fact_check': claims = content.get('claims', []) checked = await self.fact_check(claims)
return Message( type=MessageType.RESPONSE, sender=self.agent_id, receivers=[message.sender], content={ 'status': 'success', 'checked_claims': checked } )
return Message( type=MessageType.RESPONSE, sender=self.agent_id, receivers=[message.sender], content={'status': 'error', 'message': 'Unknown action'} )
async def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: """执行搜索""" cache_key = f"{query}_{max_results}" if cache_key in self.search_results_cache: return self.search_results_cache[cache_key]
results = []
if self.search_engine: raw_results = await self.search_engine.search(query, max_results) for item in raw_results: results.append({ 'title': item.get('title', ''), 'url': item.get('url', ''), 'snippet': item.get('snippet', ''), 'relevance_score': item.get('score', 0.0) }) else: results = self._simulate_search(query, max_results)
self.search_results_cache[cache_key] = results
return results
def _simulate_search(self, query: str, max_results: int) -> List[Dict[str, Any]]: """生成模拟搜索结果""" return [ { 'title': f"关于'{query}'的研究报告", 'url': f"https://example.com/research/{hash(query) % 10000}", 'snippet': f"本文档包含关于'{query}'的详细分析和研究资料...", 'relevance_score': 0.95 - (i * 0.1) } for i in range(min(max_results, 5)) ]
async def deep_research(self, topic: str, aspects: List[str]) -> Dict[str, Any]: """深度研究一个主题""" research_data = { 'topic': topic, 'overview': '', 'aspects': {}, 'sources': [], 'timestamp': datetime.now().isoformat() }
overview_results = await self.search(f"{topic} 概述", max_results=3) if overview_results: research_data['overview'] = overview_results[0].get('snippet', '')
for aspect in aspects: aspect_results = await self.search(f"{topic} {aspect}", max_results=5) research_data['aspects'][aspect] = { 'findings': [r.get('snippet', '') for r in aspect_results], 'source_count': len(aspect_results) }
all_sources = set() for aspect_data in research_data['aspects'].values(): for finding in aspect_data.get('findings', []): if 'https://' in finding: start = finding.find('https://') end = finding.find(' ', start) if end == -1: end = len(finding) all_sources.add(finding[start:end])
research_data['sources'] = list(all_sources)
return research_data
async def fact_check(self, claims: List[str]) -> List[Dict[str, Any]]: """核实声明的真实性""" checked_claims = []
for claim in claims: search_results = await self.search(claim, max_results=3)
supporting = 0 contradicting = 0
for result in search_results: snippet = result.get('snippet', '').lower() if any(word in snippet for word in ['证实', '确认', '证明', '支持']): supporting += 1 if any(word in snippet for word in ['反驳', '否认', '错误', '虚假']): contradicting += 1
if supporting > contradicting: verdict = "支持" elif contradicting > supporting: verdict = "质疑" else: verdict = "不确定"
checked_claims.append({ 'claim': claim, 'verdict': verdict, 'supporting_evidence': supporting, 'contradicting_evidence': contradicting, 'sources': search_results })
return checked_claims
|