Agent记忆与知识管理实战

前言

记忆是人类智能的核心组成部分,对于AI Agent而言同样至关重要。一个真正智能的Agent不仅需要强大的推理能力,还需要完善的记忆系统来存储、检索和利用历史信息。想象一个助手能够记住你三年前的偏好、能够从过往的对话中学习成长、能够准确检索十亿级别的知识库——这些能力都依赖于精心设计的记忆与知识管理系统。

本文将深入探讨Agent记忆系统的架构设计、向量数据库集成、RAG增强、以及实战代码实现,帮助你构建一个真正具有”记忆能力”的智能Agent。

一、Agent记忆系统概述

1.1 为什么Agent需要记忆?

传统LLM的局限性在于其”无状态”特性——每次对话都是独立的上下文,模型无法跨会话保留信息。而Agent的记忆系统正是解决这一问题的关键:

  • 个性化服务:记住用户偏好、习惯、历史交互
  • 连续性对话:支持多轮对话中的上下文连贯性
  • 经验积累:从历史任务执行中学习和改进
  • 知识复用:避免重复获取相同信息
  • 长期学习:随时间积累领域知识

1.2 记忆系统的层次结构

Agent的记忆系统通常分为三个层次:

1
2
3
4
5
6
7
8
9
10
┌─────────────────────────────────────────┐
│ 长期记忆 (Long-term) │
│ - 持久化存储 - 知识图谱 - 向量数据库 │
├─────────────────────────────────────────┤
│ 短期记忆 (Short-term) │
│ - 当前会话上下文 - 工作内存 - 堆栈 │
├─────────────────────────────────────────┤
│ 即时记忆 (Working) │
│ - 当前任务状态 - 工具调用 - 临时数据 │
└─────────────────────────────────────────┘

二、记忆系统架构设计

2.1 核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List, Dict, Any, Optional, Tuple
import uuid
import json

class MemoryType(Enum):
"""记忆类型"""
EPISODIC = "episodic" # 情景记忆:具体事件和经历
SEMANTIC = "semantic" # 语义记忆:事实和概念知识
PROCEDURAL = "procedural" # 程序性记忆:技能和操作流程
WORKING = "working" # 工作记忆:当前任务相关信息

class Importance(Enum):
"""记忆重要性等级"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4

@dataclass
class Memory:
"""记忆单元"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
content: str = ""
memory_type: MemoryType = MemoryType.EPISODIC
importance: Importance = Importance.NORMAL
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
accessed_at: str = field(default_factory=lambda: datetime.now().isoformat())
access_count: int = 0
embedding: Optional[List[float]] = None
metadata: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
source: str = "unknown" # 记忆来源

def access(self):
"""更新访问信息"""
self.accessed_at = datetime.now().isoformat()
self.access_count += 1

def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'content': self.content,
'memory_type': self.memory_type.value,
'importance': self.importance.value,
'created_at': self.created_at,
'accessed_at': self.accessed_at,
'access_count': self.access_count,
'tags': self.tags,
'source': self.source,
'metadata': self.metadata
}

@classmethod
def from_dict(cls, data: Dict) -> 'Memory':
return cls(
id=data.get('id', str(uuid.uuid4())),
content=data.get('content', ''),
memory_type=MemoryType(data.get('memory_type', 'episodic')),
importance=Importance(data.get('importance', 2)),
created_at=data.get('created_at', datetime.now().isoformat()),
accessed_at=data.get('accessed_at', datetime.now().isoformat()),
access_count=data.get('access_count', 0),
tags=data.get('tags', []),
source=data.get('source', 'unknown'),
metadata=data.get('metadata', {})
)

2.2 记忆管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class MemoryManager:
"""记忆管理器 - 统一管理各类记忆"""

def __init__(self, vector_store=None, kg_store=None):
# 存储后端
self.vector_store = vector_store # 向量数据库
self.kg_store = kg_store # 知识图谱存储
self.summarizer = TextSummarizer()

# 记忆分层
self.working_memory: Dict[str, Any] = {} # 当前任务工作内存
self.short_term_memory: List[Memory] = [] # 短期记忆
self.long_term_memory: List[Memory] = [] # 长期记忆

# 配置参数
self.max_working_memory_size = 1000
self.max_short_term_memories = 100
self.consolidation_threshold = 10 # 记忆巩固阈值

async def add_memory(self, content: str,
memory_type: MemoryType = MemoryType.EPISODIC,
importance: Importance = Importance.NORMAL,
metadata: Optional[Dict] = None) -> Memory:
"""添加新记忆"""
# 生成记忆单元
memory = Memory(
content=content,
memory_type=memory_type,
importance=importance,
metadata=metadata or {},
source=metadata.get('source', 'user_input') if metadata else 'user_input'
)

# 根据类型存储
if memory_type == MemoryType.WORKING:
self.working_memory[memory.id] = memory
elif memory_type == MemoryType.EPISODIC:
await self._add_to_short_term(memory)
else:
await self._add_to_long_term(memory)

# 生成向量嵌入
if self.vector_store:
memory.embedding = await self._generate_embedding(content)
await self.vector_store.add(memory)

return memory

async def _add_to_short_term(self, memory: Memory):
"""添加到短期记忆"""
self.short_term_memory.append(memory)

# 检查是否需要记忆巩固
if len(self.short_term_memory) >= self.max_short_term_memories:
await self.consolidate_memories()

async def _add_to_long_term(self, memory: Memory):
"""添加到长期记忆"""
self.long_term_memory.append(memory)

async def consolidate_memories(self):
"""记忆巩固:将短期记忆整合到长期记忆"""
if len(self.short_term_memory) < self.consolidation_threshold:
return

# 选择重要记忆进行巩固
important_memories = sorted(
self.short_term_memory,
key=lambda m: (m.importance.value, m.access_count),
reverse=True
)[:self.consolidation_threshold]

for memory in important_memories:
# 移动到长期记忆
self.long_term_memory.append(memory)

# 清空短期记忆
self.short_term_memory.clear()

print(f"巩固了 {len(important_memories)} 条重要记忆")

async def retrieve(self, query: str,
memory_type: Optional[MemoryType] = None,
top_k: int = 5,
use_vector_search: bool = True) -> List[Memory]:
"""检索记忆"""
results = []

# 1. 工作内存精确匹配
if memory_type is None or memory_type == MemoryType.WORKING:
for memory in self.working_memory.values():
if query.lower() in memory.content.lower():
results.append(memory)

# 2. 短期记忆搜索
if memory_type is None or memory_type == MemoryType.EPISODIC:
results.extend(self.short_term_memory)

# 3. 长期记忆向量搜索
if use_vector_search and self.vector_store:
query_embedding = await self._generate_embedding(query)
vector_results = await self.vector_store.search(
query_embedding,
top_k=top_k,
filter_type=memory_type.value if memory_type else None
)
results.extend(vector_results)

# 4. 基于关键词的过滤
keyword_results = []
for memory in results:
if query.lower() in memory.content.lower():
keyword_results.append(memory)
if keyword_results:
results = keyword_results

# 按相关性和重要性排序
results = self._rank_memories(results, query)

return results[:top_k]

def _rank_memories(self, memories: List[Memory], query: str) -> List[Memory]:
"""对记忆进行排序"""
def score(memory: Memory) -> float:
# 基础相关性分数
relevance = 1.0 if query.lower() in memory.content.lower() else 0.5
# 重要性分数
importance_score = memory.importance.value / 4.0
# 访问频率分数
access_score = min(memory.access_count / 10.0, 1.0)
# 时间衰减分数
time_decay = self._calculate_time_decay(memory.accessed_at)

return (relevance * 0.4 + importance_score * 0.3 +
access_score * 0.1 + time_decay * 0.2)

return sorted(memories, key=score, reverse=True)

def _calculate_time_decay(self, timestamp: str) -> float:
"""计算时间衰减因子"""
from datetime import datetime
try:
dt = datetime.fromisoformat(timestamp)
hours_since = (datetime.now() - dt).total_seconds() / 3600
# 24小时后开始衰减
return max(0.1, 1.0 - (hours_since / (24 * 7))) # 一周完全衰减
except:
return 0.5

async def _generate_embedding(self, text: str) -> List[float]:
"""生成文本嵌入向量"""
# 简化实现:使用字符频率作为伪嵌入
# 实际应使用 OpenAI embedings 或 sentence-transformers
embedding = [0.0] * 384
for i, char in enumerate(text[:384]):
embedding[i % 384] += ord(char) / 255.0

# 归一化
norm = sum(x**2 for x in embedding) ** 0.5
if norm > 0:
embedding = [x / norm for x in embedding]

return embedding

async def get_context_window(self, max_tokens: int = 4000) -> str:
"""获取当前上下文窗口的文本"""
context_parts = []

# 添加近期重要记忆
important_memories = sorted(
[m for m in self.short_term_memory + self.long_term_memory],
key=lambda m: (m.importance.value, m.access_count),
reverse=True
)[:10]

for memory in important_memories:
text = f"[记忆-{memory.memory_type.value}] {memory.content}"
context_parts.append(text)

# 添加工作内存
for mid, memory in list(self.working_memory.items())[-5:]:
text = f"[工作-{memory.memory_type.value}] {memory.content}"
context_parts.append(text)

# 截断到最大token数
context = "\n\n".join(context_parts)
if len(context) > max_tokens * 4:
context = context[:max_tokens * 4] + "..."

return context

三、向量数据库集成

3.1 向量存储接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from typing import List, Optional, Callable
import numpy as np

class VectorStore(ABC):
"""向量数据库抽象接口"""

@abstractmethod
async def add(self, memory: Memory) -> str:
"""添加记忆"""
pass

@abstractmethod
async def search(self, query_embedding: List[float],
top_k: int = 5,
filter_type: Optional[str] = None) -> List[Memory]:
"""向量相似度搜索"""
pass

@abstractmethod
async def delete(self, memory_id: str) -> bool:
"""删除记忆"""
pass

@abstractmethod
async def update(self, memory: Memory) -> bool:
"""更新记忆"""
pass

def cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""计算余弦相似度"""
dot_product = sum(x * y for x, y in zip(a, b))
norm_a = sum(x**2 for x in a) ** 0.5
norm_b = sum(x**2 for x in b) ** 0.5
if norm_a * norm_b == 0:
return 0.0
return dot_product / (norm_a * norm_b)

3.2 内存向量存储实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class InMemoryVectorStore(VectorStore):
"""内存向量存储 - 适合开发和测试"""

def __init__(self, dimension: int = 384):
self.dimension = dimension
self.storage: Dict[str, Dict] = {} # id -> {memory, embedding}
self.dimensional_index: Dict[int, List[str]] = {} # 用于加速

async def add(self, memory: Memory) -> str:
"""添加记忆到向量存储"""
if memory.embedding is None:
raise ValueError("Memory must have an embedding")

self.storage[memory.id] = {
'memory': memory,
'embedding': memory.embedding
}

# 更新简单索引
return memory.id

async def search(self, query_embedding: List[float],
top_k: int = 5,
filter_type: Optional[str] = None) -> List[Memory]:
"""搜索最相似的记忆"""
candidates = []

for mid, data in self.storage.items():
memory = data['memory']

# 类型过滤
if filter_type and memory.memory_type.value != filter_type:
continue

# 计算相似度
similarity = self.cosine_similarity(query_embedding, data['embedding'])
candidates.append((similarity, memory))

# 按相似度排序
candidates.sort(key=lambda x: x[0], reverse=True)

# 返回top_k结果
return [memory for _, memory in candidates[:top_k]]

async def delete(self, memory_id: str) -> bool:
"""删除记忆"""
if memory_id in self.storage:
del self.storage[memory_id]
return True
return False

async def update(self, memory: Memory) -> bool:
"""更新记忆"""
if memory.id not in self.storage:
return False

self.storage[memory.id] = {
'memory': memory,
'embedding': memory.embedding or await self._generate_dummy_embedding()
}
return True

async def _generate_dummy_embedding(self) -> List[float]:
"""生成随机嵌入向量"""
import random
return [random.random() for _ in range(self.dimension)]

async def get_stats(self) -> Dict[str, Any]:
"""获取存储统计"""
type_counts = {}
for data in self.storage.values():
mt = data['memory'].memory_type.value
type_counts[mt] = type_counts.get(mt, 0) + 1

return {
'total_memories': len(self.storage),
'type_distribution': type_counts,
'dimension': self.dimension
}

3.3 FAISS向量存储实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class FAISSVectorStore(VectorStore):
"""FAISS向量存储 - 生产级高性能实现"""

def __init__(self, dimension: int = 384, metric: str = "ip"):
"""
初始化FAISS向量存储
metric: "ip" 内积, "l2" 欧氏距离
"""
self.dimension = dimension
self.metric = metric
self.index = None
self.id_to_idx: Dict[str, int] = {}
self.id_list: List[str] = [] # 索引到ID的映射
self._initialize_index()

def _initialize_index(self):
"""初始化FAISS索引"""
try:
import faiss
if self.metric == "ip":
# 内积相似度(需要归一化)
self.index = faiss.IndexFlatIP(self.dimension)
else:
# L2距离
self.index = faiss.IndexFlatL2(self.dimension)
except ImportError:
print("警告: FAISS未安装,使用备用实现")
self.index = None

async def add(self, memory: Memory) -> str:
"""添加记忆"""
if memory.embedding is None:
raise ValueError("Memory embedding is required")

embedding = np.array(memory.embedding, dtype=np.float32)
if embedding.ndim == 1:
embedding = embedding.reshape(1, -1)

# 归一化(用于内积相似度)
if self.metric == "ip":
faiss.normalize_L2(embedding)

idx = len(self.id_list)
self.id_to_idx[memory.id] = idx
self.id_list.append(memory.id)

if self.index is not None:
self.index.add(embedding)

return memory.id

async def search(self, query_embedding: List[float],
top_k: int = 5,
filter_type: Optional[str] = None) -> List[Memory]:
"""搜索最相似的记忆"""
if self.index is None or len(self.id_list) == 0:
return []

query = np.array(query_embedding, dtype=np.float32).reshape(1, -1)

if self.metric == "ip":
faiss.normalize_L2(query)

# 执行搜索
k = min(top_k, len(self.id_list))
distances, indices = self.index.search(query, k)

# 获取结果
results = []
memories = self._get_all_memories()

for i, idx in enumerate(indices[0]):
if idx < 0 or idx >= len(self.id_list):
continue

memory_id = self.id_list[idx]
memory = memories.get(memory_id)

if memory:
# 类型过滤
if filter_type and memory.memory_type.value != filter_type:
continue

memory.embedding = None # 节省内存
results.append(memory)

return results

async def delete(self, memory_id: str) -> bool:
"""FAISS不支持删除,需要重建索引"""
if memory_id not in self.id_to_idx:
return False

# 标记删除
idx = self.id_to_idx[memory_id]
self.id_to_idx[memory_id] = -1
del self.id_list[idx]

return True

async def update(self, memory: Memory) -> bool:
"""更新需要删除后重新添加"""
await self.delete(memory.id)
await self.add(memory)
return True

def _get_all_memories(self) -> Dict[str, Memory]:
"""获取所有记忆的映射"""
# 这个需要外部提供,在实际使用中通过外部存储维护
return {}

async def save(self, path: str):
"""保存索引到磁盘"""
if self.index is not None:
import faiss
faiss.write_index(self.index, path)

async def load(self, path: str):
"""从磁盘加载索引"""
if self.index is not None:
import faiss
self.index = faiss.read_index(path)

四、RAG增强系统

4.1 RAG检索器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class RAGRetriever:
"""RAG检索器 - 结合向量搜索和关键词搜索"""

def __init__(self, vector_store: VectorStore,
memory_manager: MemoryManager):
self.vector_store = vector_store
self.memory_manager = memory_manager
self.bm25_scorer = BM25Scorer()

async def retrieve_relevant(self, query: str,
top_k: int = 5,
retrieval_mode: str = "hybrid") -> List[Dict]:
"""
检索相关内容

retrieval_mode:
- "vector": 仅向量搜索
- "keyword": 仅关键词搜索
- "hybrid": 混合搜索
"""
results = []

if retrieval_mode in ["vector", "hybrid"]:
# 向量搜索
query_embedding = await self.memory_manager._generate_embedding(query)
vector_results = await self.vector_store.search(
query_embedding,
top_k=top_k * 2 if retrieval_mode == "hybrid" else top_k
)
for memory in vector_results:
results.append({
'memory': memory,
'score': 0.8, # 向量搜索置信度
'source': 'vector'
})

if retrieval_mode in ["keyword", "hybrid"]:
# BM25关键词搜索
all_memories = self.memory_manager.short_term_memory + \
self.memory_manager.long_term_memory

bm25_results = self.bm25_scorer.search(query, all_memories, top_k=top_k)

for score, memory in bm25_results:
# 去重
existing_ids = [r['memory'].id for r in results]
if memory.id not in existing_ids:
results.append({
'memory': memory,
'score': score,
'source': 'keyword'
})

# 混合模式:加权融合
if retrieval_mode == "hybrid":
results = self._hybrid_fusion(results, top_k)
else:
results = sorted(results, key=lambda x: x['score'], reverse=True)[:top_k]

return results

def _hybrid_fusion(self, results: List[Dict], top_k: int) -> List[Dict]:
"""混合搜索结果融合"""
# RRF (Reciprocal Rank Fusion) 算法
k = 60 # RRF参数

fused_scores: Dict[str, float] = {}

for result in results:
memory_id = result['memory'].id
source = result['source']
rank = results.index(result) + 1

# 不同来源使用不同的分数权重
if source == 'vector':
base_score = 0.6
else:
base_score = 0.4

rrf_score = base_score / (k + rank)
fused_scores[memory_id] = fused_scores.get(memory_id, 0) + rrf_score

# 按融合分数排序
fused_results = []
for result in results:
memory_id = result['memory'].id
if memory_id in fused_scores:
result['fused_score'] = fused_scores[memory_id]
result['score'] = fused_scores[memory_id]
fused_results.append(result)

return sorted(fused_results, key=lambda x: x['fused_score'], reverse=True)[:top_k]


class BM25Scorer:
"""BM25关键词评分器"""

def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
self.corpus_size = 0
self.avgdl = 0
self.doc_freqs: Dict[str, int] = {}
self.idf: Dict[str, float] = {}

def fit(self, documents: List[str]):
"""构建BM25索引"""
# 简单的tokenization
tokenized_docs = [self._tokenize(doc) for doc in documents]

self.corpus_size = len(documents)
nd = {} # word -> number of documents with that word

for tokens in tokenized_docs:
for token in set(tokens):
nd[token] = nd.get(token, 0) + 1

# 计算IDF
for token, freq in nd.items():
self.idf[token] = math.log(self.corpus_size - freq + 0.5) - \
math.log(freq + 0.5)

# 计算平均文档长度
self.avgdl = sum(len(tokens) for tokens in tokenized_docs) / self.corpus_size

def _tokenize(self, text: str) -> List[str]:
"""简单分词"""
import re
text = text.lower()
tokens = re.findall(r'\b\w+\b', text)
return tokens

def search(self, query: str, memories: List[Memory], top_k: int) -> List[Tuple[float, Memory]]:
"""搜索相关记忆"""
if not self.idf:
self.fit([m.content for m in memories])

query_tokens = self._tokenize(query)
scores = []

for memory in memories:
score = self._calc_bm25(memory.content, query_tokens)
scores.append((score, memory))

return sorted(scores, key=lambda x: x[0], reverse=True)[:top_k]

def _calc_bm25(self, document: str, query_tokens: List[str]) -> float:
"""计算单文档的BM25分数"""
doc_tokens = self._tokenize(document)
doc_len = len(doc_tokens)

score = 0.0
for token in query_tokens:
if token not in self.idf:
continue

tf = doc_tokens.count(token)
if tf == 0:
continue

idf = self.idf[token]
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * doc_len / self.avgdl)

score += idf * numerator / denominator

return score

4.2 上下文增强生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class RAGPipeline:
"""RAG完整pipeline"""

def __init__(self, retriever: RAGRetriever,
generator: Any = None): # LLM生成器
self.retriever = retriever
self.generator = generator

async def query(self, question: str,
return_sources: bool = True) -> Dict[str, Any]:
"""执行RAG查询"""

# 1. 检索相关记忆
retrieved = await self.retriever.retrieve_relevant(
question,
top_k=5,
retrieval_mode="hybrid"
)

# 2. 构建上下文
context_parts = []
sources = []

for item in retrieved:
memory = item['memory']
context_parts.append(f"【来源{len(sources)+1}{memory.content}")
sources.append({
'id': memory.id,
'type': memory.memory_type.value,
'content': memory.content[:200] + "...",
'relevance': item['score']
})

context = "\n\n".join(context_parts)

# 3. 生成回答
if self.generator:
prompt = self._build_prompt(question, context)
answer = await self.generator.generate(prompt)
else:
answer = f"基于{len(sources)}条相关记忆回答:\n{context}"

return {
'question': question,
'answer': answer,
'sources': sources if return_sources else None,
'context_used': context
}

def _build_prompt(self, question: str, context: str) -> str:
"""构建prompt"""
return f"""基于以下参考资料回答问题。如果资料中没有相关信息,请如实说明。

参考资料:
{context}

问题:{question}

回答:"""

五、知识图谱集成

5.1 知识图谱存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class KnowledgeGraphStore:
"""知识图谱存储"""

def __init__(self):
# 节点存储: id -> Node
self.nodes: Dict[str, 'Node'] = {}
# 关系存储: (subject_id, predicate, object_id) -> Relation
self.relations: Dict[Tuple[str, str, str], 'Relation'] = {}
# 倒排索引: predicate -> List[(subject, object)]
self.predicate_index: Dict[str, List[Tuple[str, str]]] = {}

def add_node(self, node: 'Node') -> str:
"""添加节点"""
self.nodes[node.id] = node
return node.id

def add_relation(self, relation: 'Relation') -> str:
"""添加关系"""
key = (relation.subject_id, relation.predicate, relation.object_id)
self.relations[key] = relation

# 更新倒排索引
if relation.predicate not in self.predicate_index:
self.predicate_index[relation.predicate] = []
self.predicate_index[relation.predicate].append(
(relation.subject_id, relation.object_id)
)

return relation.id

def query(self, subject: Optional[str] = None,
predicate: Optional[str] = None,
object_id: Optional[str] = None) -> List['Relation']:
"""查询关系"""
results = []

if predicate and predicate in self.predicate_index:
for subj, obj in self.predicate_index[predicate]:
if subject and subj != subject:
continue
if object_id and obj != object_id:
continue
key = (subj, predicate, obj)
if key in self.relations:
results.append(self.relations[key])

elif subject:
for key, rel in self.relations.items():
if key[0] == subject:
if object_id is None or key[2] == object_id:
results.append(rel)

return results

def get_node(self, node_id: str) -> Optional['Node']:
"""获取节点"""
return self.nodes.get(node_id)

def get_neighbors(self, node_id: str,
relation_type: Optional[str] = None) -> List[Dict]:
"""获取邻居节点"""
neighbors = []

for key, rel in self.relations.items():
if key[0] == node_id:
target_node = self.nodes.get(key[2])
if target_node:
if relation_type is None or key[1] == relation_type:
neighbors.append({
'node': target_node,
'relation': key[1],
'direction': 'outgoing'
})

elif key[2] == node_id:
source_node = self.nodes.get(key[0])
if source_node:
if relation_type is None or key[1] == relation_type:
neighbors.append({
'node': source_node,
'relation': key[1],
'direction': 'incoming'
})

return neighbors


@dataclass
class Node:
"""知识图谱节点"""
id: str
label: str # 节点类型
name: str
properties: Dict[str, Any] = field(default_factory=dict)
embedding: Optional[List[float]] = None

def to_dict(self) -> Dict:
return {
'id': self.id,
'label': self.label,
'name': self.name,
'properties': self.properties
}


@dataclass
class Relation:
"""知识图谱关系"""
id: str
subject_id: str
predicate: str
object_id: str
properties: Dict[str, Any] = field(default_factory=dict)
weight: float = 1.0

5.2 Agent与知识图谱交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class KGEnhancedAgent:
"""知识图谱增强的Agent"""

def __init__(self, memory_manager: MemoryManager,
kg_store: KnowledgeGraphStore):
self.memory_manager = memory_manager
self.kg_store = kg_store

async def store_fact(self, subject: str, predicate: str,
object_value: str, properties: Dict = None):
"""存储事实到知识图谱"""

# 创建或获取节点
subject_node = await self._get_or_create_node(subject, "实体")
object_node = await self._get_or_create_node(object_value, "实体")

# 创建关系
relation = Relation(
id=str(uuid.uuid4()),
subject_id=subject_node.id,
predicate=predicate,
object_id=object_node.id,
properties=properties or {}
)

self.kg_store.add_relation(relation)

# 同时添加到记忆系统
memory_content = f"{subject} {predicate} {object_value}"
await self.memory_manager.add_memory(
content=memory_content,
memory_type=MemoryType.SEMANTIC,
importance=Importance.HIGH,
metadata={'kg_node_id': object_node.id}
)

async def query_kg(self, subject: str,
predicate: Optional[str] = None) -> List[Dict]:
"""查询知识图谱"""
# 查找subject对应的节点
subject_node = None
for node in self.kg_store.nodes.values():
if node.name == subject:
subject_node = node
break

if not subject_node:
return []

# 查询关系
relations = self.kg_store.query(subject_node.id, predicate)

results = []
for rel in relations:
obj_node = self.kg_store.get_node(rel.object_id)
if obj_node:
results.append({
'subject': subject,
'predicate': rel.predicate,
'object': obj_node.name,
'properties': rel.properties
})

return results

async def _get_or_create_node(self, name: str, label: str) -> Node:
"""获取或创建节点"""
# 查找现有节点
for node in self.kg_store.nodes.values():
if node.name == name:
return node

# 创建新节点
node = Node(
id=str(uuid.uuid4()),
label=label,
name=name
)
self.kg_store.add_node(node)
return node

async def KG_to_text(self, subject: str) -> str:
"""将关于某主题的知识图谱信息转换为文本"""
facts = await self.query_kg(subject)

if not facts:
return f"未找到关于'{subject}'的知识"

text_parts = [f"关于'{subject}'的知识:"]
for fact in facts:
text_parts.append(f"- {fact['subject']} {fact['predicate']} {fact['object']}")

return "\n".join(text_parts)

六、实战示例:完整记忆Agent

6.1 MemoryAgent实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class MemoryAgent:
"""完整的记忆Agent"""

def __init__(self, agent_id: str, name: str):
self.agent_id = agent_id
self.name = name

# 初始化存储
self.vector_store = InMemoryVectorStore(dimension=384)
self.kg_store = KnowledgeGraphStore()

# 初始化管理器
self.memory_manager = MemoryManager(
vector_store=self.vector_store,
kg_store=self.kg_store
)

# 初始化RAG
self.rag_retriever = RAGRetriever(
vector_store=self.vector_store,
memory_manager=self.memory_manager
)
self.rag_pipeline = RAGPipeline(
retriever=self.rag_retriever,
generator=None # 可接入LLM
)

# KG增强
self.kg_agent = KGEnhancedAgent(
memory_manager=self.memory_manager,
kg_store=self.kg_store
)

# 对话历史
self.conversation_history: List[Dict] = []

async def process_message(self, user_message: str) -> Dict[str, Any]:
"""处理用户消息"""

# 1. 记录用户消息
await self.memory_manager.add_memory(
content=f"用户: {user_message}",
memory_type=MemoryType.EPISODIC,
importance=Importance.NORMAL,
metadata={'source': 'user'}
)

# 2. 检索相关记忆
relevant_memories = await self.memory_manager.retrieve(
user_message,
top_k=5
)

# 3. RAG增强回答
rag_result = await self.rag_pipeline.query(user_message)

# 4. 生成回复
reply = await self._generate_reply(
user_message,
relevant_memories,
rag_result
)

# 5. 记录助手回复
await self.memory_manager.add_memory(
content=f"助手: {reply}",
memory_type=MemoryType.EPISODIC,
importance=Importance.NORMAL,
metadata={'source': 'assistant'}
)

# 6. 更新对话历史
self.conversation_history.append({
'user': user_message,
'assistant': reply,
'relevant_memories': [m.content for m in relevant_memories]
})

return {
'reply': reply,
'relevant_memories': [m.content for m in relevant_memories],
'sources': rag_result.get('sources')
}

async def _generate_reply(self, user_message: str,
memories: List[Memory],
rag_result: Dict) -> str:
"""生成回复"""
# 构建上下文提示
context_parts = []

if memories:
context_parts.append("相关记忆:")
for m in memories[:3]:
context_parts.append(f"- {m.content}")

if rag_result.get('context_used'):
context_parts.append(f"\n检索上下文:\n{rag_result['context_used'][:500]}")

context = "\n".join(context_parts) if context_parts else "无相关记忆"

# 生成回复(简化版本)
reply = f"我理解您的问题。"

if memories:
reply += f"\n\n根据我的记忆:"
for m in memories[:2]:
# 提取对话内容
content = m.content.replace('用户: ', '').replace('助手: ', '')
if len(content) < 100:
reply += f"\n- {content}"

return reply

async def learn_from_interaction(self, user_message: str,
assistant_response: str,
feedback: Optional[str] = None):
"""从交互中学习"""

# 如果用户提供了反馈,调整相关记忆的重要性
if feedback:
relevant = await self.memory_manager.retrieve(
user_message,
top_k=3
)

for memory in relevant:
if feedback == "helpful":
memory.importance = Importance.HIGH
elif feedback == "not_helpful":
memory.importance = Importance.LOW

async def get_memory_summary(self) -> Dict[str, Any]:
"""获取记忆摘要"""
stats = await self.vector_store.get_stats()

return {
'total_memories': stats['total_memories'],
'memory_distribution': stats['type_distribution'],
'conversation_count': len(self.conversation_history),
'kg_nodes': len(self.kg_store.nodes),
'kg_relations': len(self.kg_store.relations)
}

async def forget(self, memory_id: str) -> bool:
"""选择性遗忘"""
# 从向量存储删除
await self.vector_store.delete(memory_id)

# 从短期记忆中删除
self.memory_manager.short_term_memory = [
m for m in self.memory_manager.short_term_memory
if m.id != memory_id
]

return True

6.2 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import math

async def demo_memory_agent():
"""演示记忆Agent功能"""

print("=" * 50)
print("Agent记忆系统演示")
print("=" * 50)

# 创建Agent
agent = MemoryAgent("agent_001", "记忆助手")

# 1. 基本对话
print("\n📝 对话1: 用户自我介绍")
response = await agent.process_message("我叫张三,是一名软件工程师")
print(f"Agent: {response['reply']}")

print("\n📝 对话2: 询问偏好")
response = await agent.process_message("我喜欢使用Python编程,最近在学习机器学习")
print(f"Agent: {response['reply']}")

print("\n📝 对话3: 检索测试")
response = await agent.process_message("我叫什麼名字?")
print(f"Agent: {response['reply']}")
print(f"相关记忆: {response.get('relevant_memories', [])}")

# 2. 存储知识到图谱
print("\n📚 知识图谱测试")
await agent.kg_agent.store_fact("Python", "是一种", "编程语言")
await agent.kg_agent.store_fact("Python", "特点", "易于学习")
await agent.kg_agent.store_fact("机器学习", "属于", "人工智能")
await agent.kg_agent.store_fact("Python", "常用于", "机器学习")

kg_info = await agent.kg_agent.KG_to_text("Python")
print(kg_info)

# 3. 查看记忆统计
print("\n📊 记忆统计")
summary = await agent.get_memory_summary()
for key, value in summary.items():
print(f" {key}: {value}")

# 4. 记忆巩固演示
print("\n🔄 添加更多记忆以触发巩固...")
for i in range(15):
await agent.process_message(f"这是第{i+1}条测试记忆,编号{i+1}")

summary = await agent.get_memory_summary()
print(f" 巩固后记忆数: {summary['total_memories']}")

# 5. 上下文窗口测试
print("\n🪟 获取上下文窗口")
context = await agent.memory_manager.get_context_window(max_tokens=500)
print(f"上下文内容预览: {context[:200]}...")

print("\n" + "=" * 50)
print("演示完成!")
print("=" * 50)


if __name__ == "__main__":
asyncio.run(demo_memory_agent())

七、高级特性

7.1 记忆优先级动态调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class AdaptiveMemoryManager(MemoryManager):
"""自适应记忆管理器 - 根据使用情况动态调整"""

def __init__(self, *args, decay_rate: float = 0.95, **kwargs):
super().__init__(*args, **kwargs)
self.decay_rate = decay_rate
self.access_patterns: Dict[str, int] = {} # 访问模式追踪

async def retrieve(self, query: str, memory_type: Optional[MemoryType] = None,
top_k: int = 5, use_vector_search: bool = True) -> List[Memory]:
"""检索并更新记忆权重"""
results = await super().retrieve(query, memory_type, top_k, use_vector_search)

# 更新访问模式
for memory in results:
memory_id = memory.id
self.access_patterns[memory_id] = self.access_patterns.get(memory_id, 0) + 1

# 提升重要性
if self.access_patterns[memory_id] > 3:
memory.importance = Importance(
min(4, memory.importance.value + 1)
)

return results

def apply_decay(self):
"""应用记忆衰减"""
for memory in self.long_term_memory + self.short_term_memory:
# 降低访问频率的影响
self.access_patterns[memory.id] = int(
self.access_patterns.get(memory.id, 0) * self.decay_rate
)

# 如果长时间未访问,降低重要性
time_diff = (datetime.now() - datetime.fromisoformat(memory.accessed_at))
if time_diff.days > 7 and memory.importance.value > Importance.NORMAL.value:
memory.importance = Importance(memory.importance.value - 1)

7.2 记忆压缩与摘要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class MemoryCompressor:
"""记忆压缩器 - 生成摘要节省空间"""

def __init__(self, summarizer=None):
self.summarizer = summarizer

async def compress(self, memories: List[Memory]) -> Memory:
"""将多个记忆压缩为一个摘要记忆"""
if not memories:
raise ValueError("No memories to compress")

# 合并内容
combined_content = "\n".join([m.content for m in memories])

# 生成摘要
if self.summarizer:
summary = await self.summarizer.summarize(combined_content)
else:
# 简单截取
summary = combined_content[:200] + "..." if len(combined_content) > 200 else combined_content

# 创建摘要记忆
compressed = Memory(
content=f"[摘要] {summary}",
memory_type=MemoryType.SEMANTIC,
importance=self._calculate_summary_importance(memories),
metadata={
'original_count': len(memories),
'original_ids': [m.id for m in memories],
'is_summary': True
}
)

return compressed

def _calculate_summary_importance(self, memories: List[Memory]) -> Importance:
"""计算摘要的重要性"""
max_importance = max(m.importance.value for m in memories)
return Importance(min(4, max_importance))

async def compress_old_memories(self, memory_manager: MemoryManager,
max_age_days: int = 30):
"""压缩旧的记忆"""
now = datetime.now()
old_memories = []

for memory in memory_manager.long_term_memory:
mem_time = datetime.fromisoformat(memory.created_at)
if (now - mem_time).days > max_age_days:
old_memories.append(memory)

if len(old_memories) >= 3:
# 压缩这些记忆
compressed = await self.compress(old_memories)

# 从原位置删除
for m in old_memories:
memory_manager.long_term_memory.remove(m)

# 添加摘要
memory_manager.long_term_memory.append(compressed)

return compressed

return None

八、性能优化建议

8.1 索引优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 1. 使用HNSW索引加速向量搜索
class HNSWVectorStore(VectorStore):
"""HNSW层次可导航小世界图索引"""

def __init__(self, dimension: int = 384, m: int = 16, ef_construction: int = 200):
self.dimension = dimension
self.m = m
self.ef_construction = ef_construction
# 实际实现需要faiss的HNSW index

async def build_index(self, memories: List[Memory]):
"""构建HNSW索引"""
import faiss
embeddings = [m.embedding for m in memories if m.embedding]
if embeddings:
embeddings_matrix = np.array(embeddings, dtype=np.float32)
self.index = faiss.IndexHNSWFlat(self.dimension, self.m)
self.index.hnsw.efConstruction = self.ef_construction
self.index.add(embeddings_matrix)

8.2 缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from functools import lru_cache
import hashlib

class CachedEmbeddingGenerator:
"""带缓存的嵌入生成器"""

def __init__(self, underlying_generator, cache_size: int = 1000):
self.generator = underlying_generator
self.cache: Dict[str, List[float]] = {}
self.cache_size = cache_size

async def generate(self, text: str) -> List[float]:
"""生成嵌入,使用缓存"""
cache_key = self._hash_text(text)

if cache_key in self.cache:
return self.cache[cache_key]

embedding = await self.generator.generate(text)

# LRU缓存
if len(self.cache) >= self.cache_size:
# 删除最旧的条目
oldest = next(iter(self.cache))
del self.cache[oldest]

self.cache[cache_key] = embedding
return embedding

def _hash_text(self, text: str) -> str:
"""文本哈希"""
return hashlib.md5(text.encode()).hexdigest()

九、总结

本文详细介绍了Agent记忆与知识管理的完整技术方案:

  1. 三层记忆架构:工作记忆、短期记忆、长期记忆,满足不同时间尺度的记忆需求
  2. 向量数据库集成:支持FAISS等高性能向量存储,实现语义相似度检索
  3. RAG增强系统:混合搜索策略,结合向量检索和BM25关键词搜索
  4. 知识图谱集成:结构化知识存储,支持复杂推理查询
  5. 自适应机制:动态重要性调整、记忆衰减、压缩摘要

通过这些技术的综合运用,可以构建出真正具有”记忆能力”的智能Agent,为用户提供连续、个性化、智能的服务体验。


本文代码基于Python 3.10+,需要安装numpy、faiss等依赖。