LSTM在时间序列预测中的实战应用

引言

时间序列预测是机器学习和深度学习的重要应用领域,涵盖了股票价格预测、天气预测、能源消耗预测、交通流量预测等众多场景。LSTM由于其独特的门控机制和长期记忆能力,成为时间序列预测的首选深度学习模型之一。本文将从理论到实践,全面介绍如何使用LSTM进行时间序列预测,包括滑动窗口技术、股价预测实战、多步预测策略以及注意力机制的引入。

1. 时间序列预测基础

1.1 时间序列的特点

时间序列数据具有以下显著特点:

  • 时序依赖性:当前时刻的值与历史时刻的值存在关联
  • 趋势性:数据呈现长期上升或下降的趋势
  • 季节性:数据呈现周期性的波动模式
  • 周期性:数据存在非固定周期的循环波动
  • 噪声:数据中包含随机波动

1.2 监督学习与滑动窗口

时间序列预测的核心挑战在于如何将连续的时间序列转换为监督学习可以处理的格式。**滑动窗口(Sliding Window)**技术是解决这一问题的标准方法。

假设我们有时序数据 [x₁, x₂, x₃, …, xₙ],使用窗口大小为window_size的滑动窗口,可以创建如下训练样本:

1
2
3
4
样本1: 输入 [x₁, x₂, ..., xₘ] → 输出 x_{m+1}
样本2: 输入 [x₂, x₃, ..., x_{m+1}] → 输出 x_{m+2}
样本3: 输入 [x₃, x₄, ..., x_{m+2}] → 输出 x_{m+3}
...

1.3 数据预处理

时间序列预测中的数据预处理至关重要:

  1. 缺失值处理:前向填充或插值
  2. 异常值检测:使用IQR或Z-score
  3. 归一化/标准化
    • Min-Max归一化:x’ = (x - min) / (max - min)
    • Z-score标准化:x’ = (x - μ) / σ
  4. 数据划分:训练集、验证集、测试集

2. PyTorch实现LSTM时间序列预测

2.1 数据准备与滑动窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

class TimeSeriesDataset(Dataset):
"""时间序列数据集类"""

def __init__(self, data, window_size, pred_steps=1):
"""
初始化数据集

参数:
data: 原始时间序列数据 (numpy array)
window_size: 滑动窗口大小(输入序列长度)
pred_steps: 预测步数(输出序列长度)
"""
self.data = data
self.window_size = window_size
self.pred_steps = pred_steps

def __len__(self):
return len(self.data) - self.window_size - self.pred_steps + 1

def __getitem__(self, idx):
# 输入序列
x = self.data[idx:idx + self.window_size]
# 目标值
y = self.data[idx + self.window_size:idx + self.window_size + self.pred_steps]
return torch.FloatTensor(x), torch.FloatTensor(y)


class LSTMModel(nn.Module):
"""LSTM时间序列预测模型"""

def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
"""
初始化LSTM模型

参数:
input_size: 输入特征维度
hidden_size: 隐藏层维度
num_layers: LSTM层数
output_size: 输出维度
dropout: Dropout比例
"""
super(LSTMModel, self).__init__()

self.hidden_size = hidden_size
self.num_layers = num_layers

# LSTM层
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)

# 全连接输出层
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x):
"""
前向传播

参数:
x: 输入张量 (batch_size, seq_len, input_size)

返回:
output: 预测值 (batch_size, output_size)
"""
# LSTM前向传播
lstm_out, (h_n, c_n) = self.lstm(x)

# 取最后一个时间步的输出
last_output = lstm_out[:, -1, :]

# 全连接层
output = self.fc(last_output)

return output


def create_sliding_windows(data, window_size, stride=1):
"""
创建滑动窗口数据集

参数:
data: 原始数据
window_size: 窗口大小
stride: 步长

返回:
X: 输入序列数组
y: 目标值数组
"""
X, y = [], []
for i in range(0, len(data) - window_size, stride):
X.append(data[i:i + window_size])
y.append(data[i + window_size])
return np.array(X), np.array(y)


def train_model(model, train_loader, val_loader, epochs, learning_rate, device):
"""
训练LSTM模型

参数:
model: LSTM模型
train_loader: 训练数据加载器
val_loader: 验证数据加载器
epochs: 训练轮数
learning_rate: 学习率
device: 计算设备

返回:
train_losses: 训练损失历史
val_losses: 验证损失历史
"""
model = model.to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=10, verbose=True
)

train_losses = []
val_losses = []
best_val_loss = float('inf')

for epoch in range(epochs):
# 训练阶段
model.train()
train_loss = 0.0
for batch_x, batch_y in train_loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)

optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()

# 梯度裁剪,防止梯度爆炸
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

optimizer.step()
train_loss += loss.item()

train_loss /= len(train_loader)
train_losses.append(train_loss)

# 验证阶段
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)

outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item()

val_loss /= len(val_loader)
val_losses.append(val_loss)

# 学习率调度
scheduler.step(val_loss)

# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')

if (epoch + 1) % 10 == 0:
print(f"Epoch [{epoch+1}/{epochs}], "
f"Train Loss: {train_loss:.6f}, "
f"Val Loss: {val_loss:.6f}")

return train_losses, val_losses

2.2 股价预测实战

下面是一个完整的股价预测实战案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def generate_stock_price_data(num_days=1000, initial_price=100, seed=42):
"""
生成模拟的股票价格数据

参数:
num_days: 天数
initial_price: 初始价格
seed: 随机种子

返回:
prices: 价格序列
"""
np.random.seed(seed)

prices = [initial_price]
trend = 0.0002 # 趋势
volatility = 0.02 # 波动率

for _ in range(num_days - 1):
# Geometric Brownian Motion
drift = trend * prices[-1]
shock = volatility * prices[-1] * np.random.randn()
new_price = prices[-1] + drift + shock
new_price = max(new_price, 1) # 确保价格不为负
prices.append(new_price)

return np.array(prices)


def add_technical_indicators(prices):
"""
添加技术指标作为特征

参数:
prices: 价格序列

返回:
features: 特征矩阵
"""
df = pd.DataFrame({'price': prices})

# 移动平均线
df['ma_5'] = df['price'].rolling(window=5).mean()
df['ma_10'] = df['price'].rolling(window=10).mean()
df['ma_20'] = df['price'].rolling(window=20).mean()

# 指数移动平均
df['ema_12'] = df['price'].ewm(span=12).mean()
df['ema_26'] = df['price'].ewm(span=26).mean()

# MACD
df['macd'] = df['ema_12'] - df['ema_26']

# RSI
delta = df['price'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))

# 布林带
df['bb_mid'] = df['price'].rolling(window=20).mean()
df['bb_std'] = df['price'].rolling(window=20).std()
df['bb_upper'] = df['bb_mid'] + 2 * df['bb_std']
df['bb_lower'] = df['bb_mid'] - 2 * df['bb_std']

# 价格变化率
df['returns'] = df['price'].pct_change()
df['log_returns'] = np.log(df['price'] / df['price'].shift(1))

# 成交量模拟(这里用价格波动模拟)
df['volume'] = df['returns'].abs().rolling(window=10).mean() * 1000000

# 波动率
df['volatility'] = df['returns'].rolling(window=20).std() * np.sqrt(252)

return df.fillna(method='bfill').values


def main_stock_prediction():
"""股价预测主函数"""

# 生成数据
print("正在生成股票价格数据...")
prices = generate_stock_price_data(num_days=2000)

# 添加技术指标
print("正在计算技术指标...")
features = add_technical_indicators(prices)
feature_names = ['price', 'ma_5', 'ma_10', 'ma_20', 'ema_12', 'ema_26',
'macd', 'rsi', 'bb_mid', 'bb_std', 'bb_upper', 'bb_lower',
'returns', 'log_returns', 'volume', 'volatility']

print(f"特征数量: {features.shape[1]}")
print(f"数据点数量: {features.shape[0]}")

# 数据归一化
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(features)

# 只对价格进行反归一化(用于评估)
price_scaler = MinMaxScaler(feature_range=(0, 1))
price_scaler.fit_transform(prices.reshape(-1, 1))

# 创建数据集
window_size = 30
X, y = create_sliding_windows(scaled_data, window_size)

# 调整形状以适应LSTM (batch_size, seq_len, features)
X = X.reshape(X.shape[0], X.shape[1], -1)
y = y[:, 0] # 只预测价格

# 划分训练集、验证集、测试集
train_size = int(len(X) * 0.7)
val_size = int(len(X) * 0.15)

X_train, X_val, X_test = X[:train_size], X[train_size:train_size+val_size], X[train_size+val_size:]
y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:]

print(f"\n数据集划分:")
print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")

# 创建数据加载器
batch_size = 64
train_dataset = TimeSeriesDataset(None, window_size)
train_loader = DataLoader(
torch.FloatTensor(X_train),
batch_size=batch_size,
shuffle=True
)
val_loader = DataLoader(
torch.FloatTensor(X_val),
batch_size=batch_size,
shuffle=False
)
test_loader = DataLoader(
torch.FloatTensor(X_test),
batch_size=batch_size,
shuffle=False
)

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\n使用设备: {device}")

# 创建模型
input_size = X_train.shape[2] # 特征数量
hidden_size = 64
num_layers = 2
output_size = 1

model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
output_size=output_size,
dropout=0.2
)

print(f"\n模型结构:")
print(model)

# 计算模型参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数量: {total_params:,}")
print(f"可训练参数量: {trainable_params:,}")

# 训练模型
print("\n开始训练...")
epochs = 100
learning_rate = 0.001

train_losses, val_losses = train_model(
model, train_loader, val_loader, epochs, learning_rate, device
)

# 加载最佳模型
model.load_state_dict(torch.load('best_model.pth'))

# 测试集评估
model.eval()
predictions = []
actuals = []

with torch.no_grad():
for batch_x in test_loader:
batch_x = batch_x.to(device)
outputs = model(batch_x)
predictions.extend(outputs.cpu().numpy())
actuals.extend(batch_x[:, -1, 0].numpy()) # 取窗口最后一个价格

predictions = np.array(predictions).flatten()
actuals = np.array(actuals).flatten()

# 计算评估指标
mse = np.mean((predictions - actuals) ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(predictions - actuals))

# 计算MAPE(避开零值)
mask = actuals > 0.01
mape = np.mean(np.abs((actuals[mask] - predictions[mask]) / actuals[mask])) * 100

print(f"\n测试集评估指标:")
print(f"MSE: {mse:.6f}")
print(f"RMSE: {rmse:.6f}")
print(f"MAE: {mae:.6f}")
print(f"MAPE: {mape:.2f}%")

# 绘制结果图
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(actuals[:100], label='Actual', alpha=0.8)
plt.plot(predictions[:100], label='Predicted', alpha=0.8)
plt.xlabel('Time Step')
plt.ylabel('Normalized Price')
plt.title('Actual vs Predicted Prices')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('stock_prediction_results.png', dpi=150)
print("\n结果图已保存到 stock_prediction_results.png")

return model, scaler, price_scaler


if __name__ == "__main__":
model, scaler, price_scaler = main_stock_prediction()

3. 多步预测策略

在实际应用中,我们经常需要预测未来多个时间步的值。常见的多步预测策略包括:

3.1 直接多步预测

训练一个模型直接输出多个未来时间步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MultiStepLSTM(nn.Module):
"""多步预测LSTM模型"""

def __init__(self, input_size, hidden_size, num_layers, output_size):
super(MultiStepLSTM, self).__init__()

self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)

# 多步输出层
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x):
lstm_out, _ = self.lstm(x)
# 取最后一个时间步
last_output = lstm_out[:, -1, :]
output = self.fc(last_output)
return output # (batch_size, output_size)

3.2 递归多步预测(迭代预测)

使用单一模型进行迭代预测,将预测值作为下一步的输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def recursive_predict(model, initial_sequence, pred_steps, scaler, device):
"""
递归多步预测

参数:
model: 训练好的模型
initial_sequence: 初始输入序列
pred_steps: 预测步数
scaler: 数据归一化器
device: 计算设备

返回:
predictions: 预测值序列
"""
model.eval()
predictions = []
current_seq = initial_sequence.copy()

with torch.no_grad():
for _ in range(pred_steps):
# 准备输入
x = torch.FloatTensor(current_seq[-window_size:]).unsqueeze(0).unsqueeze(-1).to(device)

# 预测下一步
pred = model(x).cpu().numpy()[0, 0]
predictions.append(pred)

# 将预测值添加到序列中
new_row = current_seq[-1].copy()
new_row[0] = pred # 只更新价格
current_seq = np.vstack([current_seq, new_row])

return np.array(predictions)

3.3 直接递归混合策略

将多步预测分解为多个单步预测模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DirectMultiStepLSTM(nn.Module):
"""直接多步预测模型 - 输出多个未来时间步"""

def __init__(self, input_size, hidden_size, num_layers, output_steps):
super(DirectMultiStepLSTM, self).__init__()

self.output_steps = output_steps

# 共享LSTM层
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)

# 每个输出步独立的全连接层
self.fcs = nn.ModuleList([
nn.Linear(hidden_size, 1) for _ in range(output_steps)
])

def forward(self, x):
lstm_out, _ = self.lstm(x)
last_output = lstm_out[:, -1, :]

# 多个输出
outputs = [fc(last_output) for fc in self.fcs]
return torch.cat(outputs, dim=1)

4. 注意力机制与LSTM

注意力机制允许模型在预测时关注输入序列的不同部分,这对于时间序列预测尤其有效。

4.1 注意力机制原理

注意力机制的核心思想是:对输入序列的不同时间步赋予不同的权重,让模型能够”关注”最相关的历史信息。

1
2
3
attention_score = v^T * tanh(W_a * h)
attention_weights = softmax(attention_score)
context_vector = Σ(attention_weights_i * h_i)

4.2 带注意力机制的LSTM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class AttentionLSTM(nn.Module):
"""带注意力机制的LSTM模型"""

def __init__(self, input_size, hidden_size, num_layers, output_size):
super(AttentionLSTM, self).__init__()

self.hidden_size = hidden_size
self.num_layers = num_layers

# LSTM层
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)

# 注意力层
self.attention_W = nn.Linear(hidden_size, hidden_size)
self.attention_v = nn.Linear(hidden_size, 1, bias=False)

# 输出层
self.fc = nn.Linear(hidden_size * 2, output_size)

def forward(self, x):
"""
前向传播

参数:
x: 输入 (batch_size, seq_len, input_size)

返回:
output: 预测值 (batch_size, output_size)
"""
batch_size, seq_len, _ = x.shape

# LSTM前向传播
lstm_out, (h_n, c_n) = self.lstm(x)
# lstm_out: (batch_size, seq_len, hidden_size)

# 计算注意力分数
# 使用tanh激活
u = torch.tanh(self.attention_W(lstm_out))
# u: (batch_size, seq_len, hidden_size)

# 计算注意力权重
attn_scores = self.attention_v(u).squeeze(-1)
# attn_scores: (batch_size, seq_len)

attn_weights = F.softmax(attn_scores, dim=1)
# attn_weights: (batch_size, seq_len)

# 计算上下文向量
context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1)
# context: (batch_size, hidden_size)

# 合并上下文向量和最终隐藏状态
h_final = h_n[-1] # (batch_size, hidden_size)
combined = torch.cat([context, h_final], dim=1)
# combined: (batch_size, hidden_size * 2)

# 输出层
output = self.fc(combined)

return output, attn_weights


class Seq2SeqLSTM(nn.Module):
"""序列到序列LSTM模型(用于多步预测)"""

def __init__(self, input_size, hidden_size, num_layers, output_size):
super(Seq2SeqLSTM, self).__init__()

# 编码器
self.encoder = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)

# 解码器
self.decoder = nn.LSTM(
input_size=output_size, # 预测值作为输入
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True
)

# 注意力机制
self.attention_W = nn.Linear(hidden_size * 2, hidden_size)
self.attention_v = nn.Linear(hidden_size, 1, bias=False)

# 输出层
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x, target_len):
"""
前向传播

参数:
x: 输入序列 (batch_size, input_seq_len, input_size)
target_len: 目标序列长度

返回:
outputs: 预测序列 (batch_size, target_len, output_size)
"""
batch_size = x.shape[0]

# 编码
encoder_out, (h_n, c_n) = self.encoder(x)

# 初始化解码器输入
decoder_input = torch.zeros(batch_size, 1, 1).to(x.device)
decoder_h = h_n
decoder_c = c_n

outputs = []

for t in range(target_len):
# 注意力计算
encoder_hidden = encoder_out # (batch, seq_len, hidden)

# 解码器当前隐藏状态
decoder_hidden = decoder_h[-1].unsqueeze(1).repeat(1, encoder_out.shape[1], 1)

# 计算注意力分数
combined = torch.cat([encoder_hidden, decoder_hidden], dim=2)
u = torch.tanh(self.attention_W(combined))
attn_scores = self.attention_v(u).squeeze(-1)
attn_weights = F.softmax(attn_scores, dim=1)

# 加权上下文
context = torch.bmm(attn_weights.unsqueeze(1), encoder_out).squeeze(1)

# 解码器输入 + 上下文
decoder_input_with_context = torch.cat([
decoder_input,
context.unsqueeze(1)
], dim=2)

# 解码一步
decoder_out, (decoder_h, decoder_c) = self.decoder(
decoder_input_with_context,
(decoder_h, decoder_c)
)

# 输出
output = self.fc(decoder_out.squeeze(1))
outputs.append(output)

# 更新解码器输入
decoder_input = output.unsqueeze(1)

return torch.stack(outputs, dim=1)

5. 完整的股价预测实战案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def complete_stock_prediction_example():
"""
完整的股价预测案例,包含所有组件
"""
import yfinance as yf # 如无此库可使用模拟数据

# 1. 数据获取
print("=" * 60)
print("步骤1: 数据获取")
print("=" * 60)

# 尝试获取真实股票数据
try:
ticker = "AAPL"
stock_data = yf.download(ticker, start="2015-01-01", end="2024-12-31")
prices = stock_data['Close'].values.flatten()
print(f"成功获取 {ticker} 股票数据")
print(f"数据范围: {len(prices)} 天")
except:
# 使用模拟数据
prices = generate_stock_price_data(num_days=2000)
print("使用模拟股票数据")

# 2. 特征工程
print("\n" + "=" * 60)
print("步骤2: 特征工程")
print("=" * 60)

features = add_technical_indicators(prices)
feature_names = ['price', 'ma_5', 'ma_10', 'ma_20', 'ema_12', 'ema_26',
'macd', 'rsi', 'bb_mid', 'bb_std', 'bb_upper', 'bb_lower',
'returns', 'log_returns', 'volume', 'volatility']

print(f"特征数量: {len(feature_names)}")
print(f"特征列表: {feature_names}")

# 3. 数据预处理
print("\n" + "=" * 60)
print("步骤3: 数据预处理")
print("=" * 60)

scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(features)

window_size = 60 # 使用60天历史数据
pred_steps = 5 # 预测未来5天

# 创建滑动窗口数据
X, y = [], []
for i in range(window_size, len(scaled_features) - pred_steps):
X.append(scaled_features[i-window_size:i])
y.append(scaled_features[i:i+pred_steps, 0]) # 预测价格

X = np.array(X)
y = np.array(y)

print(f"输入形状: {X.shape}")
print(f"输出形状: {y.shape}")

# 划分数据集
train_ratio = 0.7
val_ratio = 0.15

train_idx = int(len(X) * train_ratio)
val_idx = int(len(X) * (train_ratio + val_ratio))

X_train, X_val, X_test = X[:train_idx], X[train_idx:val_idx], X[val_idx:]
y_train, y_val, y_test = y[:train_idx], y[train_idx:val_idx], y[val_idx:]

print(f"训练集: {X_train.shape[0]} 样本")
print(f"验证集: {X_val.shape[0]} 样本")
print(f"测试集: {X_test.shape[0]} 样本")

# 转换为PyTorch张量
X_train_t = torch.FloatTensor(X_train)
y_train_t = torch.FloatTensor(y_train)
X_val_t = torch.FloatTensor(X_val)
y_val_t = torch.FloatTensor(y_val)
X_test_t = torch.FloatTensor(X_test)
y_test_t = torch.FloatTensor(y_test)

# 数据加载器
train_dataset = torch.utils.data.TensorDataset(X_train_t, y_train_t)
val_dataset = torch.utils.data.TensorDataset(X_val_t, y_val_t)
test_dataset = torch.utils.data.TensorDataset(X_test_t, y_test_t)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# 4. 模型构建
print("\n" + "=" * 60)
print("步骤4: 模型构建")
print("=" * 60)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

input_size = X_train.shape[2]
hidden_size = 128
num_layers = 2
output_size = pred_steps

model = LSTMModel(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
output_size=output_size,
dropout=0.3
).to(device)

print(f"模型结构:\n{model}")

# 5. 训练
print("\n" + "=" * 60)
print("步骤5: 模型训练")
print("=" * 60)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5
)

epochs = 100
best_val_loss = float('inf')
patience_counter = 0
early_stopping_patience = 15

for epoch in range(epochs):
# 训练
model.train()
train_loss = 0
for batch_x, batch_y in train_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)

optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()

train_loss += loss.item()

train_loss /= len(train_loader)

# 验证
model.eval()
val_loss = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
val_loss += loss.item()

val_loss /= len(val_loader)
scheduler.step(val_loss)

# 早停
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
torch.save(model.state_dict(), 'best_stock_model.pth')
else:
patience_counter += 1

if patience_counter >= early_stopping_patience:
print(f"早停触发于 epoch {epoch+1}")
break

if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d}/{epochs} | "
f"Train Loss: {train_loss:.6f} | "
f"Val Loss: {val_loss:.6f}")

# 6. 测试评估
print("\n" + "=" * 60)
print("步骤6: 测试评估")
print("=" * 60)

model.load_state_dict(torch.load('best_stock_model.pth'))
model.eval()

all_preds = []
all_targets = []

with torch.no_grad():
for batch_x, batch_y in test_loader:
batch_x = batch_x.to(device)
outputs = model(batch_x)
all_preds.append(outputs.cpu().numpy())
all_targets.append(batch_y.numpy())

predictions = np.vstack(all_preds)
targets = np.vstack(all_targets)

# 计算各项指标
mse = np.mean((predictions - targets) ** 2)
rmse = np.sqrt(mse)
mae = np.mean(np.abs(predictions - targets))

# 计算每个预测步的MAPE
print("\n各预测步的评估指标:")
print("-" * 50)
print(f"{'预测步':<10} {'MSE':<15} {'RMSE':<15} {'MAE':<15}")
print("-" * 50)

for step in range(pred_steps):
step_mse = np.mean((predictions[:, step] - targets[:, step]) ** 2)
step_rmse = np.sqrt(step_mse)
step_mae = np.mean(np.abs(predictions[:, step] - targets[:, step]))
print(f"Step {step+1:<6} {step_mse:<15.6f} {step_rmse:<15.6f} {step_mae:<15.6f}")

print("-" * 50)
print(f"{'总体':<10} {mse:<15.6f} {rmse:<15.6f} {mae:<15.6f}")

# 7. 可视化
print("\n" + "=" * 60)
print("步骤7: 结果可视化")
print("=" * 60)

# 选择测试集的一小部分进行可视化
vis_len = 50
vis_preds = predictions[:vis_len, 0] # 第一步预测
vis_targets = targets[:vis_len, 0]

plt.figure(figsize=(14, 10))

# 预测对比图
plt.subplot(2, 2, 1)
plt.plot(vis_targets, label='实际价格', linewidth=2)
plt.plot(vis_preds, label='预测价格', linewidth=2, alpha=0.8)
plt.title('股价预测对比(第一步预测)')
plt.xlabel('时间步')
plt.ylabel('归一化价格')
plt.legend()
plt.grid(True, alpha=0.3)

# 多步预测对比
plt.subplot(2, 2, 2)
plt.plot(vis_targets, label='实际', linewidth=2, marker='o', markersize=3)
for step in range(pred_steps):
plt.plot(predictions[:vis_len, step], label=f'预测+{step+1}步',
alpha=0.7, linestyle='--')
plt.title('多步预测对比')
plt.xlabel('时间步')
plt.ylabel('归一化价格')
plt.legend()
plt.grid(True, alpha=0.3)

# 预测误差分布
plt.subplot(2, 2, 3)
errors = predictions[:, 0] - targets[:, 0]
plt.hist(errors, bins=50, edgecolor='black', alpha=0.7)
plt.axvline(x=0, color='r', linestyle='--', label='零误差')
plt.title('预测误差分布')
plt.xlabel('预测误差')
plt.ylabel('频数')
plt.legend()
plt.grid(True, alpha=0.3)

# 散点图
plt.subplot(2, 2, 4)
plt.scatter(targets[:, 0], predictions[:, 0], alpha=0.5, s=10)
plt.plot([0, 1], [0, 1], 'r--', linewidth=2, label='理想预测线')
plt.title('实际值 vs 预测值')
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('stock_forecast_results.png', dpi=150)
print("可视化结果已保存到 stock_forecast_results.png")

return model, scaler


if __name__ == "__main__":
model, scaler = complete_stock_prediction_example()

6. 模型优化技巧

6.1 超参数调优

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from itertools import product

def hyperparameter_search(X_train, y_train, X_val, y_val):
"""
超参数网格搜索
"""
param_grid = {
'hidden_size': [32, 64, 128, 256],
'num_layers': [1, 2, 3],
'window_size': [30, 60, 90],
'learning_rate': [0.001, 0.0005, 0.0001],
'batch_size': [32, 64, 128]
}

best_val_loss = float('inf')
best_params = None
best_model = None

# 简化搜索空间
for hidden_size, num_layers in product([64, 128], [1, 2]):
for batch_size in [64, 128]:
# 训练模型
model = LSTMModel(
input_size=X_train.shape[2],
hidden_size=hidden_size,
num_layers=num_layers,
output_size=1
)

# 训练和评估
# ... (省略训练代码)

if val_loss < best_val_loss:
best_val_loss = val_loss
best_params = {
'hidden_size': hidden_size,
'num_layers': num_layers,
'batch_size': batch_size
}

print(f"最佳参数: {best_params}")
print(f"最佳验证损失: {best_val_loss:.6f}")

return best_params

6.2 集成预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class EnsembleLSTM:
"""LSTM集成模型"""

def __init__(self, input_size, hidden_size, num_layers, output_size, n_models=5):
self.models = nn.ModuleList([
LSTMModel(input_size, hidden_size, num_layers, output_size)
for _ in range(n_models)
])
self.n_models = n_models

def forward(self, x):
"""集成前向传播"""
outputs = []
for model in self.models:
outputs.append(model(x))

# 平均集成
ensemble_output = torch.mean(torch.stack(outputs), dim=0)
return ensemble_output

def train_models(self, train_loader, val_loader, device):
"""分别训练每个模型"""
for i, model in enumerate(self.models):
print(f"训练模型 {i+1}/{self.n_models}")
# 训练代码...

7. 总结

本文详细介绍了LSTM在时间序列预测中的实战应用,包括:

  1. 数据准备:滑动窗口技术、特征工程、技术指标计算
  2. 模型构建:标准LSTM、多步预测LSTM、Seq2Seq模型
  3. 注意力机制:注意力权重计算、上下文向量、Seq2Seq with Attention
  4. 训练技巧:梯度裁剪、早停、学习率调度、Dropout
  5. 评估指标:MSE、RMSE、MAE、MAPE
  6. 优化策略:超参数搜索、模型集成

LSTM在时间序列预测中的优势在于:

  • 能够捕捉长期依赖关系
  • 门控机制有效控制信息流
  • 支持多步预测
  • 可以结合注意力机制提升性能

在下一篇文章中,我们将对比LSTM与Transformer架构,分析RNN与Self-Attention机制的优劣,以及在BERT时代LSTM是否仍然具有应用价值。


相关标签:LSTM, 时间序列, PyTorch, 深度学习, 预测, 实战