LSTM 数据集准备:数据清洗、特征工程与序列构建
引言
在深度学习领域,LSTM(长短期记忆网络)因其卓越的序列建模能力而广泛应用于时间序列预测、自然语言处理、语音识别等任务。然而,无论模型架构多么精妙,高质量的模型输出始终离不开高质量的数据准备流程。本文将系统性地介绍LSTM数据集准备的完整流程,涵盖数据清洗、特征工程、序列构建以及PyTorch DataLoader实现等核心环节,帮助读者构建从原始数据到模型输入的完整知识体系。
时间序列数据具有独特的挑战性:时序依赖性、趋势性、季节性、周期性以及噪声干扰等问题都需要在数据准备阶段妥善处理。一个优秀的数据准备流程不仅能够提升模型的预测精度,还能增强模型的鲁棒性和泛化能力。本文将结合Python和PyTorch,提供完整的技术实现方案和最佳实践建议。
1. 数据清洗:构建高质量时间序列基础
1.1 缺失值处理策略
时间序列数据中的缺失值是不可避免的问题,可能由数据采集设备故障、网络传输中断、人工记录错误等原因导致。处理缺失值的方法多种多样,选择合适的策略需要根据数据特点和业务场景综合判断。
**前向填充(Forward Fill)**适用于缺失值较少且数据具有连续性的场景。该方法使用缺失位置之前的最近有效值填充空缺,特别适合波动较小的时间序列数据。例如,在股票价格数据中,如果某一天的收盘价缺失,可以使用前一天的收盘价填充,因为短期内价格通常不会有剧烈波动。
**后向填充(Backward Fill)**与前向填充相反,使用缺失位置之后的最近有效值来填充。这种方法在某些场景下更加合理,比如预测任务中我们希望利用历史信息预测未来,填充时不应引入”未来”的信息。
**线性插值(Linear Interpolation)**假设缺失值位于两个已知点之间的线性路径上,通过计算线性方程来确定缺失值。这种方法适用于数据呈现线性趋势的场景,能够较好地保持数据的整体趋势。
**样条插值(Spline Interpolation)**使用三次或更高阶的多项式拟合数据,比线性插值更加平滑,适合数据具有曲线趋势的场景。
以下是Python实现缺失值处理的完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy import interpolate
class TimeSeriesCleaner: """时间序列数据清洗工具类""" def __init__(self, data, date_index=None): """ 初始化清洗器 参数: data: 时间序列数据,pandas DataFrame或Series date_index: 日期索引,用于时间序列数据 """ self.data = data.copy() self.date_index = date_index def detect_missing_values(self): """检测缺失值并生成报告""" missing_info = {} if isinstance(self.data, pd.DataFrame): for col in self.data.columns: missing_count = self.data[col].isnull().sum() missing_pct = missing_count / len(self.data) * 100 missing_info[col] = { 'count': missing_count, 'percentage': missing_pct } else: missing_count = self.data.isnull().sum() missing_pct = missing_count / len(self.data) * 100 missing_info['value'] = { 'count': missing_count, 'percentage': missing_pct } return missing_info def forward_fill(self, column=None): """前向填充""" if column: self.data[column] = self.data[column].ffill() else: self.data = self.data.ffill() return self def backward_fill(self, column=None): """后向填充""" if column: self.data[column] = self.data[column].bfill() else: self.data = self.data.bfill() return self def linear_interpolate(self, column=None): """线性插值""" if column: self.data[column] = self.data[column].interpolate(method='linear') else: self.data = self.data.interpolate(method='linear') return self def spline_interpolate(self, column=None, order=3): """样条插值""" if column: self.data[column] = self.data[column].interpolate(method='spline', order=order) else: self.data = self.data.interpolate(method='spline', order=order) return self def fill_with_mean(self, column=None): """使用均值填充""" if column: self.data[column] = self.data[column].fillna(self.data[column].mean()) else: for col in self.data.columns: self.data[col] = self.data[col].fillna(self.data[col].mean()) return self def fill_with_median(self, column=None): """使用中位数填充""" if column: self.data[column] = self.data[column].fillna(self.data[column].median()) else: for col in self.data.columns: self.data[col] = self.data[col].fillna(self.data[col].median()) return self def drop_missing(self, axis=0): """删除缺失值行或列""" self.data = self.data.dropna(axis=axis) return self def get_cleaned_data(self): """返回清洗后的数据""" return self.data
def visualize_missing_values(data, title="缺失值可视化"): """可视化缺失值分布""" if isinstance(data, pd.DataFrame): missing_matrix = data.isnull() fig, ax = plt.subplots(figsize=(12, 6)) ax.imshow(missing_matrix.T, aspect='auto', cmap='RdYlBu_r') ax.set_xlabel('时间索引') ax.set_ylabel('特征') ax.set_yticks(range(len(data.columns))) ax.set_yticklabels(data.columns) ax.set_title(title) plt.tight_layout() plt.show()
|
1.2 异常值检测与处理
异常值(Outliers)是指与其他观测值显著不同的数据点。在时间序列中,异常值可能由数据录入错误、设备故障或真实的极端事件引起。异常值的处理需要谨慎,因为有些异常值可能包含重要的业务信息。
Z-Score方法假设数据服从正态分布,计算每个数据点与均值的标准差倍数。通常而言,Z-Score绝对值超过3的数据点被认为是异常值。这种方法简单直观,但对极端值敏感。
IQR(四分位距)方法利用数据的四分位数来识别异常值。IQR定义为上四分位数(Q3)与下四分位数(Q1)之差,异常值被定义为小于Q1-1.5×IQR或大于Q3+1.5×IQR的数据点。这种方法对极端值更加稳健。
移动平均法计算滚动窗口内的均值和标准差,如果当前值偏离移动平均超过一定阈值,则被认为是异常值。这种方法特别适合时间序列数据,因为它考虑了数据的时序特性。
** Isolation Forest**是一种基于决策树的异常检测算法,能够有效处理高维数据和非线性关系。在特征工程中使用Isolation Forest可以自动识别多维特征空间中的异常点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| from scipy import stats from sklearn.ensemble import IsolationForest
class OutlierDetector: """异常值检测工具类""" def __init__(self, data): self.data = data.copy() def zscore_method(self, column, threshold=3): """ Z-Score异常值检测 参数: column: 待检测的列名 threshold: Z-Score阈值,默认3 返回: 异常值索引列表 """ z_scores = np.abs(stats.zscore(self.data[column].dropna())) outlier_indices = self.data[column].dropna().index[z_scores > threshold] return outlier_indices.tolist() def iqr_method(self, column, multiplier=1.5): """ IQR方法异常值检测 参数: column: 待检测的列名 multiplier: IQR倍数,默认1.5 返回: 异常值索引列表 """ Q1 = self.data[column].quantile(0.25) Q3 = self.data[column].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - multiplier * IQR upper_bound = Q3 + multiplier * IQR outliers = self.data[ (self.data[column] < lower_bound) | (self.data[column] > upper_bound) ].index.tolist() return outliers def moving_average_method(self, column, window=12, num_std=3): """ 移动平均法异常值检测 参数: column: 待检测的列名 window: 滚动窗口大小 num_std: 标准差倍数 返回: 异常值索引列表 """ rolling_mean = self.data[column].rolling(window=window, center=True).mean() rolling_std = self.data[column].rolling(window=window, center=True).std() lower_bound = rolling_mean - num_std * rolling_std upper_bound = rolling_mean + num_std * rolling_std outliers = self.data[ (self.data[column] < lower_bound) | (self.data[column] > upper_bound) ].index.tolist() return outliers def isolation_forest_method(self, columns=None, contamination=0.01): """ Isolation Forest异常值检测 参数: columns: 要检测的列列表,None表示所有列 contamination: 异常值比例 返回: 异常值索引列表 """ if columns is None: features = self.data.select_dtypes(include=[np.number]).copy() else: features = self.data[columns].copy() features = features.fillna(features.median()) iso_forest = IsolationForest( contamination=contamination, random_state=42 ) predictions = iso_forest.fit_predict(features) outlier_indices = self.data.index[predictions == -1].tolist() return outlier_indices
def handle_outliers(data, outlier_indices, method='clip', column=None): """ 处理异常值 参数: data: 数据DataFrame outlier_indices: 异常值索引列表 method: 处理方法,'clip'表示裁剪,'nan'表示替换为NaN column: 指定列名 返回: 处理后的数据 """ result = data.copy() if column: if method == 'clip': Q1 = result[column].quantile(0.25) Q3 = result[column].quantile(0.75) IQR = Q3 - Q1 lower = Q1 - 1.5 * IQR upper = Q3 + 1.5 * IQR result.loc[result[column] < lower, column] = lower result.loc[result[column] > upper, column] = upper else: result.loc[outlier_indices, column] = np.nan return result
|
1.3 数据类型转换与格式标准化
时间序列数据通常包含多种数据类型,如日期时间、数值、分类等。正确的数据类型转换对于后续处理至关重要。
日期时间处理是时间序列分析的基础。Python中的pandas库提供了强大的日期时间处理功能,可以解析多种日期格式,处理时区转换,执行日期运算等操作。标准化日期时间格式有助于确保数据的一致性和可比较性。
对于分类特征,需要进行编码转换。常用的方法包括标签编码(Label Encoding)、独热编码(One-Hot Encoding)和目标编码(Target Encoding)。在LSTM模型中,通常使用标签编码将分类变量转换为整数,再通过嵌入层(Embedding Layer)转换为稠密向量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| class DataTypeConverter: """数据类型转换工具类""" def __init__(self, data): self.data = data.copy() def parse_datetime(self, column, format=None, unit=None): """ 解析日期时间列 参数: column: 日期列名 format: 日期格式,None表示自动检测 unit: 时间单位(用于时间戳) 返回: 转换后的DataFrame """ if unit: self.data[column] = pd.to_datetime( self.data[column], unit=unit ) else: self.data[column] = pd.to_datetime( self.data[column], format=format ) return self def set_datetime_index(self, column): """设置日期时间为索引""" self.data.set_index(column, inplace=True) return self def label_encode(self, column): """标签编码""" categories = self.data[column].unique() category_map = {cat: idx for idx, cat in enumerate(categories)} self.data[f'{column}_encoded'] = self.data[column].map(category_map) return self def onehot_encode(self, column): """独热编码""" dummies = pd.get_dummies(self.data[column], prefix=column) self.data = pd.concat([self.data, dummies], axis=1) return self def convert_to_numeric(self, columns): """转换为数值类型""" for col in columns: self.data[col] = pd.to_numeric(self.data[col], errors='coerce') return self def get_processed_data(self): """获取处理后的数据""" return self.data
|
2. 特征工程:构建有意义的输入特征
2.1 时间滞后特征(Lag Features)
时间滞后特征是时间序列预测中最基础也是最重要的特征类型之一。它通过将过去若干时间步的值作为当前预测的特征,利用数据的自相关性来捕捉时序依赖关系。
滞后特征的核心思想是:当前时刻的目标值与过去若干时刻的历史值存在关联。例如,在股票价格预测中,今天的价格可能与昨天、前天乃至一周前的价格都有关联。通过创建多个滞后特征,模型可以学习到不同时间尺度上的依赖关系。
创建滞后特征时需要考虑几个关键问题。首先是滞后阶数的选择:过多可能导致维度灾难和过拟合,过少则可能遗漏重要的历史信息。其次是滞后特征带来的数据损失问题:创建k阶滞后特征后,前k个样本无法用于训练,因为它们缺少完整的特征信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| class LagFeatureEngineer: """滞后特征工程工具类""" def __init__(self, data): self.data = data.copy() self.lag_features = {} def create_lag_features(self, column, lags=[1, 2, 3, 6, 12, 24]): """ 创建滞后特征 参数: column: 要创建滞后特征的列名 lags: 滞后阶数列表,如[1, 2, 3]表示创建t-1, t-2, t-3时刻的特征 返回: 添加了滞后特征的DataFrame """ for lag in lags: self.data[f'{column}_lag_{lag}'] = self.data[column].shift(lag) self.lag_features[f'{column}_lag_{lag}'] = lag return self def create_multiple_lag_features(self, columns, lags=[1, 2, 3, 6, 12]): """ 为多个列创建滞后特征 参数: columns: 列名列表 lags: 滞后阶数列表 返回: 添加了滞后特征的DataFrame """ for col in columns: self.create_lag_features(col, lags) return self def create_lag_features_range(self, column, start=1, end=24): """ 创建连续范围的滞后特征 参数: column: 列名 start: 起始滞后阶数 end: 结束滞后阶数 返回: 添加了滞后特征的DataFrame """ for lag in range(start, end + 1): self.data[f'{column}_lag_{lag}'] = self.data[column].shift(lag) return self def get_feature_info(self): """获取特征信息""" return { 'lag_features': self.lag_features, 'total_features': len(self.lag_features) } def drop_na_rows(self): """删除包含NaN的行(由于滞后特征产生)""" self.data = self.data.dropna() return self def get_data(self): """获取处理后的数据""" return self.data
def create_lag_features_manual(data, column, lags): """ 手动创建滞后特征的辅助函数 参数: data: pandas Series或DataFrame column: 列名 lags: 滞后阶数列表 返回: 添加了滞后特征的DataFrame """ result = data.copy() for lag in lags: result[f'{column}_lag_{lag}'] = result[column].shift(lag) return result
|
2.2 滚动统计特征(Rolling Statistics)
滚动统计特征通过在滑动窗口内计算各种统计量来捕捉数据的局部特征。与滞后特征不同,滚动统计特征综合了窗口内多个时间点的信息,能够更好地反映数据的短期趋势和波动性。
常用的滚动统计特征包括:滚动均值(Rolling Mean)反映数据的平均水平;滚动标准差(Rolling Std)反映数据的波动程度;滚动最大值和最小值反映数据的极端情况;滚动中位数则对异常值更加稳健。
滚动窗口的大小选择是特征工程中的关键参数。窗口过大可能导致特征对短期变化不敏感,窗口过小则容易受到噪声干扰。在实际应用中,通常需要根据数据的采样频率和业务场景来选择合适的窗口大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| class RollingFeatureEngineer: """滚动统计特征工程工具类""" def __init__(self, data): self.data = data.copy() self.rolling_features = {} def create_rolling_mean(self, column, windows=[3, 6, 12, 24]): """创建滚动均值特征""" for window in windows: self.data[f'{column}_roll_mean_{window}'] = \ self.data[column].rolling(window=window).mean() self.rolling_features[f'{column}_roll_mean_{window}'] = { 'window': window, 'statistic': 'mean' } return self def create_rolling_std(self, column, windows=[3, 6, 12, 24]): """创建滚动标准差特征""" for window in windows: self.data[f'{column}_roll_std_{window}'] = \ self.data[column].rolling(window=window).std() self.rolling_features[f'{column}_roll_std_{window}'] = { 'window': window, 'statistic': 'std' } return self def create_rolling_min(self, column, windows=[6, 12, 24]): """创建滚动最小值特征""" for window in windows: self.data[f'{column}_roll_min_{window}'] = \ self.data[column].rolling(window=window).min() return self def create_rolling_max(self, column, windows=[6, 12, 24]): """创建滚动最大值特征""" for window in windows: self.data[f'{column}_roll_max_{window}'] = \ self.data[column].rolling(window=window).max() return self def create_rolling_median(self, column, windows=[6, 12, 24]): """创建滚动中位数特征""" for window in windows: self.data[f'{column}_roll_median_{window}'] = \ self.data[column].rolling(window=window).median() return self def create_rolling_quantile(self, column, windows=[12], quantiles=[0.25, 0.75]): """创建滚动分位数特征""" for window in windows: for q in quantiles: self.data[f'{column}_roll_q{int(q*100)}_{window}'] = \ self.data[column].rolling(window=window).quantile(q) return self def create_all_rolling_features(self, column, windows=[3, 6, 12, 24]): """创建所有常用滚动统计特征""" self.create_rolling_mean(column, windows) self.create_rolling_std(column, windows) self.create_rolling_min(column, windows) self.create_rolling_max(column, windows) self.create_rolling_median(column, windows) return self def create_exponential_weighted_features(self, column, spans=[6, 12, 24]): """创建指数加权特征""" for span in spans: self.data[f'{column}_ewm_{span}'] = \ self.data[column].ewm(span=span).mean() return self def get_data(self): """获取处理后的数据""" return self.data
def calculate_rolling_features(data, column, windows=[3, 6, 12, 24]): """ 计算滚动特征的辅助函数 参数: data: pandas DataFrame column: 列名 windows: 窗口大小列表 返回: 添加了滚动特征的DataFrame """ result = data.copy() for window in windows: result[f'{column}_roll_mean_{window}'] = \ data[column].rolling(window=window).mean() result[f'{column}_roll_std_{window}'] = \ data[column].rolling(window=window).std() result[f'{column}_roll_min_{window}'] = \ data[column].rolling(window=window).min() result[f'{column}_roll_max_{window}'] = \ data[column].rolling(window=window).max() return result
|
2.3 技术指标特征
技术指标是金融时间序列分析中常用的特征类型,源自技术分析理论。这些指标通过数学计算将价格和交易量数据转化为可解释的信号,帮助投资者做出交易决策。
**移动平均线(MA)**是最基本的技术指标,通过计算一定周期内的平均价格来平滑价格波动。常用的是简单移动平均(SMA)和指数移动平均(EMA)。SMA给予所有观测值相同的权重,而EMA则给予近期观测值更高的权重。
**MACD(Moving Average Convergence Divergence)**由Gerald Appel发明,由快线(DIF)、慢线(DEA)和柱状图(MACD柱)组成。MACD通过计算两条不同周期EMA的差值来判断价格的动量方向。
**RSI(Relative Strength Index)**相对强弱指数,衡量价格变动的速度和幅度,取值范围0-100。RSI超过70通常被认为是超买信号,低于30则是超卖信号。
**布林带(Bollinger Bands)**由中轨(SMA)和上下两条标准差轨道组成,用于判断价格的相对高低和波动性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| class TechnicalIndicatorEngineer: """技术指标特征工程工具类""" def __init__(self, data): self.data = data.copy() self.features = {} def calculate_sma(self, column, periods=[5, 10, 20, 60]): """计算简单移动平均""" for period in periods: self.data[f'{column}_sma_{period}'] = \ self.data[column].rolling(window=period).mean() return self def calculate_ema(self, column, spans=[5, 10, 20, 60]): """计算指数移动平均""" for span in spans: self.data[f'{column}_ema_{span}'] = \ self.data[column].ewm(span=span, adjust=False).mean() return self def calculate_macd(self, column, fast=12, slow=26, signal=9): """ 计算MACD指标 参数: column: 价格列名 fast: 快线周期 slow: 慢线周期 signal: 信号线周期 """ ema_fast = self.data[column].ewm(span=fast, adjust=False).mean() ema_slow = self.data[column].ewm(span=slow, adjust=False).mean() self.data[f'{column}_macd_dif'] = ema_fast - ema_slow self.data[f'{column}_macd_dea'] = \ self.data[f'{column}_macd_dif'].ewm(span=signal, adjust=False).mean() self.data[f'{column}_macd_hist'] = \ 2 * (self.data[f'{column}_macd_dif'] - self.data[f'{column}_macd_dea']) return self def calculate_rsi(self, column, period=14): """ 计算RSI指标 参数: column: 价格列名 period: RSI计算周期 """ delta = self.data[column].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(window=period).mean() avg_loss = loss.rolling(window=period).mean() rs = avg_gain / avg_loss self.data[f'{column}_rsi_{period}'] = 100 - (100 / (1 + rs)) return self def calculate_bollinger_bands(self, column, period=20, num_std=2): """ 计算布林带 参数: column: 价格列名 period: 计算周期 num_std: 标准差倍数 """ sma = self.data[column].rolling(window=period).mean() std = self.data[column].rolling(window=period).std() self.data[f'{column}_bb_upper'] = sma + num_std * std self.data[f'{column}_bb_middle'] = sma self.data[f'{column}_bb_lower'] = sma - num_std * std self.data[f'{column}_bb_width'] = \ (self.data[f'{column}_bb_upper'] - self.data[f'{column}_bb_lower']) / sma self.data[f'{column}_bb_position'] = \ (self.data[column] - self.data[f'{column}_bb_lower']) / \ (self.data[f'{column}_bb_upper'] - self.data[f'{column}_bb_lower']) return self def calculate_atr(self, high, low, close, period=14): """ 计算平均真实波幅(ATR) 参数: high: 最高价列名 low: 最低价列名 close: 收盘价列名 period: 计算周期 """ tr1 = self.data[high] - self.data[low] tr2 = np.abs(self.data[high] - self.data[close].shift(1)) tr3 = np.abs(self.data[low] - self.data[close].shift(1)) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) self.data[f'atr_{period}'] = tr.rolling(window=period).mean() return self def calculate_stochastic(self, high, low, close, k_period=14, d_period=3): """ 计算随机指标(KD指标) 参数: high: 最高价列名 low: 最低价列名 close: 收盘价列名 k_period: K线周期 d_period: D线周期 """ lowest_low = self.data[low].rolling(window=k_period).min() highest_high = self.data[high].rolling(window=k_period).max() self.data[f'stoch_k_{k_period}'] = \ 100 * (self.data[close] - lowest_low) / (highest_high - lowest_low) self.data[f'stoch_d_{d_period}'] = \ self.data[f'stoch_k_{k_period}'].rolling(window=d_period).mean() return self def calculate_obv(self, close, volume): """ 计算能量潮指标(OBV) 参数: close: 收盘价列名 volume: 成交量列名 """ close_diff = self.data[close].diff() obv = np.where(close_diff > 0, self.data[volume], np.where(close_diff < 0, -self.data[volume], 0)) self.data['obv'] = pd.Series(obv).cumsum() return self def calculate_all_indicators(self, price_col='close', high_col='high', low_col='low', volume_col='volume'): """计算所有常用技术指标""" self.calculate_sma(price_col) self.calculate_ema(price_col) self.calculate_macd(price_col) self.calculate_rsi(price_col) self.calculate_bollinger_bands(price_col) self.calculate_atr(high_col, low_col, price_col) self.calculate_stochastic(high_col, low_col, price_col) if volume_col in self.data.columns: self.calculate_obv(price_col, volume_col) return self def get_data(self): """获取处理后的数据""" return self.data
|
3. 数据归一化与标准化
3.1 归一化方法对比
在深度学习中,数据归一化是至关重要的预处理步骤。不同特征的量纲和数值范围可能差异巨大,如果不进行归一化,梯度下降可能会变得不稳定,模型收敛速度也会显著降低。此外,归一化还能帮助模型更好地学习特征之间的关系。
Min-Max归一化将数据线性缩放到指定范围(通常是[0, 1])。这种方法简单直观,能够保留原始数据的分布形状,但容易受到异常值的影响。当数据中存在极端值时,大部分数据会被压缩到很小的范围内。
Z-Score标准化将数据转换为均值为0、标准差为1的标准正态分布。这种方法不受异常值影响,因为异常值本身的Z-Score会很大,但不会显著改变整体分布。然而,标准化后的数据不一定在固定范围内。
**对数变换(Log Transform)**适用于右偏分布数据,能够压缩大值、扩展小值,从而减少数据的偏度。这种方法在处理金融数据(如价格、成交量)时非常有用,因为这类数据通常呈现右偏分布。
RobustScaler使用中位数和四分位距进行缩放,对异常值更加稳健。当数据中存在少量极端值但这些值本身是有意义的(而非错误)时,RobustScaler是比Min-Max更好的选择。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
| from sklearn.preprocessing import ( MinMaxScaler, StandardScaler, RobustScaler, MaxAbsScaler ) import numpy as np
class DataNormalizer: """数据归一化工具类""" def __init__(self, data): self.data = data.copy() self.scalers = {} self.scaler_type = None self.feature_range = (0, 1) def min_max_scale(self, columns, feature_range=(0, 1)): """ Min-Max归一化 参数: columns: 要归一化的列名列表 feature_range: 目标范围,默认(0, 1) 返回: 归一化后的数据 """ self.scaler_type = 'min_max' self.feature_range = feature_range for col in columns: scaler = MinMaxScaler(feature_range=feature_range) self.data[col] = scaler.fit_transform( self.data[col].values.reshape(-1, 1) ).flatten() self.scalers[col] = scaler return self def standard_scale(self, columns): """ Z-Score标准化 参数: columns: 要标准化的列名列表 返回: 标准化后的数据 """ self.scaler_type = 'standard' for col in columns: scaler = StandardScaler() self.data[col] = scaler.fit_transform( self.data[col].values.reshape(-1, 1) ).flatten() self.scalers[col] = scaler return self def robust_scale(self, columns): """ RobustScaler标准化 参数: columns: 要标准化的列名列表 返回: 标准化后的数据 """ self.scaler_type = 'robust' for col in columns: scaler = RobustScaler() self.data[col] = scaler.fit_transform( self.data[col].values.reshape(-1, 1) ).flatten() self.scalers[col] = scaler return self def log_transform(self, columns, add_constant=1): """ 对数变换 参数: columns: 要变换的列名列表 add_constant: 加法常数,避免log(0) 返回: 变换后的数据 """ self.scaler_type = 'log' for col in columns: self.data[col] = np.log(self.data[col] + add_constant) return self def inverse_transform(self, column, values): """ 反向转换(还原原始尺度) 参数: column: 列名 values: 要还原的值 返回: 还原后的值 """ if self.scaler_type == 'min_max': scaler = self.scalers[column] if isinstance(scaler, MinMaxScaler): return scaler.inverse_transform( values.reshape(-1, 1) ).flatten() elif self.scaler_type == 'standard': scaler = self.scalers[column] if isinstance(scaler, StandardScaler): return scaler.inverse_transform( values.reshape(-1, 1) ).flatten() elif self.scaler_type == 'log': return np.exp(values) - 1 return values def get_data(self): """获取处理后的数据""" return self.data def get_scalers(self): """获取归一化器""" return self.scalers
class MultipleColumnNormalizer: """多列联合归一化工具类""" def __init__(self): self.joint_scaler = None self.columns = None def fit_transform(self, data, columns): """ 对多个列进行联合归一化 参数: data: pandas DataFrame columns: 列名列表 返回: 归一化后的数据 """ self.columns = columns self.joint_scaler = MinMaxScaler() normalized_values = self.joint_scaler.fit_transform(data[columns]) result = data.copy() result[columns] = normalized_values return result def inverse_transform(self, data): """反向转换""" if self.joint_scaler is None: raise ValueError("Scaler not fitted yet") result = data.copy() original_values = self.joint_scaler.inverse_transform(data[self.columns]) result[self.columns] = original_values return result
|
4. 序列构建与滑动窗口
4.1 滑动窗口技术原理
滑动窗口是时间序列预测中用于将连续数据转换为监督学习格式的核心技术。LSTM模型需要固定长度的输入序列,滑动窗口通过在时间轴上以固定步长滑动,生成多个样本,每个样本包含一定数量的历史时间步作为输入,下一个时间步作为输出。
滑动窗口的核心参数是窗口大小(window_size)或回溯步长(lookback)。窗口大小的选择需要权衡两个因素:较大的窗口能够捕获更长期的依赖关系,但会增加模型复杂度并减少训练样本数量;较小的窗口计算效率高,但可能无法捕获长期模式。
步长(stride)是另一个重要参数,表示滑动窗口每次移动的距离。步长为1时生成最多样本,步长大于1时生成较少的训练样本但样本间时间间隔更大。在某些场景下,较大的步长可以帮助减少样本间的冗余。
对于多步预测(预测未来多个时间步),有两种主要策略:一种是直接策略,为每个预测步训练独立的模型;另一种是递归策略,训练单一模型进行单步预测,然后递归地将预测值作为下一步的输入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| class SlidingWindowCreator: """滑动窗口序列构建工具类""" def __init__(self, window_size, pred_steps=1, stride=1): """ 初始化滑动窗口创建器 参数: window_size: 窗口大小(输入序列长度) pred_steps: 预测步数(输出序列长度) stride: 滑动步长 """ self.window_size = window_size self.pred_steps = pred_steps self.stride = stride def create_sequences_single(self, data): """ 为单变量时间序列创建滑动窗口样本 参数: data: numpy array,形状为 (n_samples,) 返回: X: 输入序列,形状为 (n_samples - window_size, window_size) y: 目标值,形状为 (n_samples - window_size,) """ X, y = [], [] for i in range(0, len(data) - self.window_size - self.pred_steps + 1, self.stride): X.append(data[i:i + self.window_size]) y.append(data[i + self.window_size:i + self.window_size + self.pred_steps]) return np.array(X), np.array(y) def create_sequences_multi(self, data, target_idx=0): """ 为多变量时间序列创建滑动窗口样本 参数: data: numpy array,形状为 (n_samples, n_features) target_idx: 目标变量的列索引 返回: X: 输入序列,形状为 (n_samples - window_size, window_size, n_features) y: 目标值,形状为 (n_samples - window_size, pred_steps) """ X, y = [], [] for i in range(0, len(data) - self.window_size - self.pred_steps + 1, self.stride): X.append(data[i:i + self.window_size]) y.append(data[i + self.window_size:i + self.window_size + self.pred_steps, target_idx]) return np.array(X), np.array(y) def create_sequences_with_features(self, data, feature_cols, target_col): """ 创建带有特征选择的滑动窗口样本 参数: data: pandas DataFrame feature_cols: 特征列名列表 target_col: 目标列名 返回: X: numpy array,形状为 (n_samples, window_size, n_features) y: numpy array,形状为 (n_samples, pred_steps) """ feature_data = data[feature_cols].values target_data = data[target_col].values X, y = [], [] for i in range(0, len(data) - self.window_size - self.pred_steps + 1, self.stride): X.append(feature_data[i:i + self.window_size]) y.append(target_data[i + self.window_size:i + self.window_size + self.pred_steps]) return np.array(X), np.array(y) def get_sequence_length(self): """获取序列长度""" return self.window_size def get_output_length(self): """获取输出长度""" return self.pred_steps
def create_sliding_windows(data, window_size, pred_steps=1, stride=1): """ 滑动窗口创建的辅助函数 参数: data: 时间序列数据 window_size: 窗口大小 pred_steps: 预测步数 stride: 滑动步长 返回: X, y: 输入和目标数组 """ X, y = [], [] for i in range(0, len(data) - window_size - pred_steps + 1, stride): X.append(data[i:i + window_size]) y.append(data[i + window_size:i + window_size + pred_steps]) return np.array(X), np.array(y)
|
4.2 时序数据的数据增强
数据增强是提升模型泛化能力的重要技术,特别是在时间序列数据有限的情况下。通过对原始数据施加可控的变换,可以生成更多的训练样本。
随机噪声注入通过添加高斯噪声来模拟测量误差和数据不确定性。这种方法能够帮助模型学习在存在噪声的情况下进行预测,提高鲁棒性。
时间扭曲通过对时间轴进行非线性变换来生成新的样本。这种方法能够增加数据的多样性,帮助模型学习不同时间尺度的模式。
窗口切片从原始序列中随机选择不同的窗口起点和大小,生成多个训练样本。这种方法特别适用于长序列数据。
幅度缩放通过乘以随机缩放因子来改变数据幅度。这种方法在金融数据中特别有用,因为不同时间段的波动率可能不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| class TimeSeriesAugmenter: """时间序列数据增强工具类""" def __init__(self, random_state=42): self.random_state = random_state np.random.seed(random_state) def add_gaussian_noise(self, data, noise_level=0.01): """ 添加高斯噪声 参数: data: 输入数据 noise_level: 噪声标准差相对于数据标准差的比例 返回: 添加噪声后的数据 """ noise = np.random.normal(0, noise_level * np.std(data), data.shape) return data + noise def scale_amplitude(self, data, scale_range=(0.9, 1.1)): """ 幅度缩放 参数: data: 输入数据 scale_range: 缩放因子范围 返回: 缩放后的数据 """ scale_factor = np.random.uniform(scale_range[0], scale_range[1]) return data * scale_factor def time_warp(self, data, sigma=0.2): """ 时间扭曲 参数: data: 输入数据 sigma: 时间扭曲强度 返回: 扭曲后的数据 """ from scipy.interpolate import CubicSpline time_steps = np.arange(len(data)) warp = np.cumsum(np.random.normal(1.0, sigma, len(data))) warp = warp - warp[0] warp = warp * (len(data) - 1) / warp[-1] cs = CubicSpline(warp, data) return cs(time_steps) def window_slice(self, data, window_size): """ 窗口切片 参数: data: 输入数据 window_size: 切片窗口大小 返回: 切片后的数据 """ start = np.random.randint(0, len(data) - window_size + 1) return data[start:start + window_size] def magnitude_warp(self, data, sigma=0.2): """ 幅度扭曲 参数: data: 输入数据 sigma: 扭曲强度 返回: 扭曲后的数据 """ from scipy.interpolate import CubicSpline time_steps = np.arange(len(data)) warp_points = np.random.choice(len(data), size=5, replace=False) warp_values = np.random.normal(1.0, sigma, size=5) warp_func = CubicSpline(warp_points, warp_values) warp_factors = warp_func(time_steps) return data * warp_factors def augment_dataset(self, X, y, n_augmentations=2, noise_level=0.01): """ 对数据集进行增强 参数: X: 输入序列 y: 目标值 n_augmentations: 每个样本增强的数量 noise_level: 噪声水平 返回: 增强后的数据集 """ X_aug = [X] y_aug = [y] for _ in range(n_augmentations): X_noisy = np.array([self.add_gaussian_noise(x, noise_level) for x in X]) X_scaled = np.array([self.scale_amplitude(x) for x in X]) X_aug.append(X_noisy) X_aug.append(X_scaled) y_aug.append(y) y_aug.append(y) return np.concatenate(X_aug, axis=0), np.concatenate(y_aug, axis=0)
|
5. PyTorch DataLoader实现
5.1 自定义Dataset类
PyTorch的Dataset和DataLoader是构建高效深度学习流水线的核心组件。Dataset负责定义数据如何读取和预处理,DataLoader则负责批量加载数据、多进程加速、数据打乱等操作。
自定义Dataset类需要继承torch.utils.data.Dataset并实现两个核心方法:__len__返回数据集的大小,__getitem__根据索引返回单个样本。对于时间序列数据,__getitem__通常返回形状为(seq_len, n_features)的输入张量和对应的时间步目标值。
在__init__方法中,通常会进行数据加载、归一化参数计算、滑动窗口创建等操作。需要注意的是,归一化参数必须从训练集计算得到,然后在验证集和测试集上应用相同的参数,以避免数据泄露。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
| import torch from torch.utils.data import Dataset, DataLoader import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler
class TimeSeriesDataset(Dataset): """时间序列数据集类""" def __init__(self, data, window_size, pred_steps=1, feature_cols=None, target_col=None, scaler=None): """ 初始化时间序列数据集 参数: data: pandas DataFrame或numpy array window_size: 滑动窗口大小(输入序列长度) pred_steps: 预测步数(输出序列长度) feature_cols: 特征列名列表(DataFrame时使用) target_col: 目标列名(DataFrame时使用) scaler: 预训练的归一化器,None表示在初始化时拟合 """ if isinstance(data, pd.DataFrame): if feature_cols is None: feature_cols = [col for col in data.columns if col != target_col] self.data = data[feature_cols].values self.target = data[target_col].values else: self.data = data self.target = data self.window_size = window_size self.pred_steps = pred_steps self.feature_cols = feature_cols self.target_col = target_col if scaler is None: self.scaler = MinMaxScaler() self.data = self.scaler.fit_transform(self.data) if len(self.target.shape) == 1: self.target_scaler = MinMaxScaler() else: self.target_scaler = MinMaxScaler() self.target = self.target_scaler.fit_transform(self.target) else: self.scaler = scaler self.data = self.scaler.transform(self.data) if hasattr(scaler, 'target_scaler'): self.target = scaler.target_scaler.transform(self.target) self.X, self.y = self._create_sequences() def _create_sequences(self): """创建滑动窗口序列""" X, y = [], [] for i in range(len(self.data) - self.window_size - self.pred_steps + 1): X.append(self.data[i:i + self.window_size]) y.append(self.target[i + self.window_size:i + self.window_size + self.pred_steps]) return np.array(X), np.array(y) def __len__(self): """返回数据集大小""" return len(self.X) def __getitem__(self, idx): """获取单个样本""" X = torch.FloatTensor(self.X[idx]) y = torch.FloatTensor(self.y[idx]) if self.pred_steps == 1: y = y.squeeze(-1) return X, y def get_scaler(self): """获取归一化器""" return self.scaler def inverse_transform_target(self, y_pred): """将预测值反归一化到原始尺度""" if self.pred_steps > 1: y_pred = self.target_scaler.inverse_transform(y_pred) else: y_pred = self.target_scaler.inverse_transform(y_pred.reshape(-1, 1)) return y_pred
class MultiStepTimeSeriesDataset(Dataset): """多步预测时间序列数据集""" def __init__(self, data, window_size, pred_steps_list=[1, 3, 7], feature_cols=None, target_col=None, scaler=None): """ 初始化多步预测数据集 参数: data: 数据 window_size: 滑动窗口大小 pred_steps_list: 预测步数列表,如[1, 3, 7]表示同时预测1步、3步、7步 feature_cols: 特征列名 target_col: 目标列名 scaler: 归一化器 """ if isinstance(data, pd.DataFrame): if feature_cols is None: feature_cols = [col for col in data.columns if col != target_col] self.data = data[feature_cols].values self.target = data[target_col].values else: self.data = data self.target = data self.window_size = window_size self.pred_steps_list = pred_steps_list self.max_pred_steps = max(pred_steps_list) if scaler is None: self.scaler = MinMaxScaler() self.data = self.scaler.fit_transform(self.data) self.target_scaler = MinMaxScaler() self.target = self.target_scaler.fit_transform(self.target.reshape(-1, 1)).flatten() else: self.scaler = scaler self.data = self.scaler.transform(self.data) self.X, self.y_dict = self._create_sequences() def _create_sequences(self): """创建多步预测序列""" X, y_dict = [], {step: [] for step in self.pred_steps_list} for i in range(len(self.data) - self.window_size - self.max_pred_steps + 1): X.append(self.data[i:i + self.window_size]) for step in self.pred_steps_list: y_dict[step].append( self.target[i + self.window_size:i + self.window_size + step] ) for step in self.pred_steps_list: y_dict[step] = np.array(y_dict[step]) return np.array(X), y_dict def __len__(self): return len(self.X) def __getitem__(self, idx): X = torch.FloatTensor(self.X[idx]) y = {step: torch.FloatTensor(self.y_dict[step][idx]) for step in self.pred_steps_list} return X, y
|
5.2 DataLoader配置与批处理
DataLoader是PyTorch中用于批量加载数据的核心组件。正确配置DataLoader参数对于训练效率和数据加载质量至关重要。
batch_size决定了每次迭代使用的样本数量。较大的batch_size能够利用GPU并行计算能力,提高训练速度,但会增加显存占用;较小的batch_size能够提供更多的梯度更新次数,但可能影响收敛稳定性。
shuffle参数控制是否在每个epoch开始时打乱数据。对于训练集,打乱数据能够促进模型泛化;对于验证集和测试集,通常不需要打乱。
num_workers指定用于数据加载的子进程数量。设置为大于0可以利用多核CPU并行加载数据,减少数据等待时间。但需要注意,Windows系统上多进程可能存在问题。
drop_last参数决定当样本数不能被batch_size整除时,是否丢弃最后一个不完整的batch。在训练时通常设置为True,以保证所有batch的样本数量一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| def create_dataloaders(train_data, val_data, test_data, window_size, pred_steps=1, batch_size=32, num_workers=0): """ 创建训练、验证和测试DataLoader 参数: train_data: 训练数据 val_data: 验证数据 test_data: 测试数据 window_size: 滑动窗口大小 pred_steps: 预测步数 batch_size: 批量大小 num_workers: 数据加载线程数 返回: train_loader, val_loader, test_loader """ train_dataset = TimeSeriesDataset( train_data, window_size=window_size, pred_steps=pred_steps ) val_dataset = TimeSeriesDataset( val_data, window_size=window_size, pred_steps=pred_steps, scaler=train_dataset.get_scaler() ) test_dataset = TimeSeriesDataset( test_data, window_size=window_size, pred_steps=pred_steps, scaler=train_dataset.get_scaler() ) train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True ) val_loader = DataLoader( val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers ) test_loader = DataLoader( test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers ) return train_loader, val_loader, test_loader
def get_data_loader_info(loader): """获取DataLoader信息""" info = { 'num_batches': len(loader), 'batch_size': loader.batch_size, 'total_samples': len(loader.dataset), 'num_workers': loader.num_workers, 'shuffle': loader.shuffle } return info
class DataLoaderWrapper: """DataLoader包装器,提供额外的功能""" def __init__(self, dataloader): self.dataloader = dataloader self.iterator = None def __iter__(self): """返回迭代器""" self.iterator = iter(self.dataloader) return self def __next__(self): """获取下一个batch""" return next(self.iterator) def __len__(self): """返回batch数量""" return len(self.dataloader) def get_single_batch(self): """获取单个batch(用于调试)""" return next(iter(self.dataloader)) def preview_batch(self, n=3): """预览前n个batch""" preview = [] for i, (X, y) in enumerate(self.dataloader): if i >= n: break preview.append({ 'batch_idx': i, 'X_shape': X.shape, 'y_shape': y.shape, 'X_mean': X.mean().item(), 'y_mean': y.mean().item() if y.numel() > 0 else None }) return preview
|
5.3 多变量时间序列处理
多变量时间序列预测是指同时使用多个输入特征来预测一个或多个目标变量。与单变量预测相比,多变量方法能够捕获变量之间的相关性,提供更丰富的上下文信息。
在处理多变量时间序列时,需要注意特征选择和特征工程。过多的特征可能导致维度灾难和过拟合,而重要的变量被遗漏则会导致模型性能下降。常用的方法包括相关性分析、递归特征消除(RFE)以及基于模型重要性的特征选择。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| class MultiVariateTimeSeriesDataset(Dataset): """多变量时间序列数据集""" def __init__(self, data, window_size, pred_steps=1, feature_cols=None, target_cols=None, scaler=None): """ 初始化多变量时间序列数据集 参数: data: pandas DataFrame window_size: 滑动窗口大小 pred_steps: 预测步数 feature_cols: 输入特征列名列表 target_cols: 目标变量列名列表 scaler: 预训练的归一化器 """ self.data = data.copy() self.window_size = window_size self.pred_steps = pred_steps if feature_cols is None: if target_cols is None: raise ValueError("Either feature_cols or target_cols must be specified") feature_cols = [col for col in data.columns if col not in target_cols] if target_cols is None: target_cols = feature_cols self.feature_cols = feature_cols self.target_cols = target_cols self.features = self.data[feature_cols].values self.targets = self.data[target_cols].values if scaler is None: self.feature_scaler = MinMaxScaler() self.features = self.feature_scaler.fit_transform(self.features) self.target_scaler = MinMaxScaler() self.targets = self.target_scaler.fit_transform(self.targets) else: self.feature_scaler = scaler['feature_scaler'] self.target_scaler = scaler['target_scaler'] self.features = self.feature_scaler.transform(self.features) self.targets = self.target_scaler.transform(self.targets) self.X, self.y = self._create_sequences() def _create_sequences(self): """创建滑动窗口序列""" X, y = [], [] for i in range(len(self.features) - self.window_size - self.pred_steps + 1): X.append(self.features[i:i + self.window_size]) y.append(self.targets[i + self.window_size:i + self.window_size + self.pred_steps]) return np.array(X), np.array(y) def __len__(self): return len(self.X) def __getitem__(self, idx): X = torch.FloatTensor(self.X[idx]) y = torch.FloatTensor(self.y[idx]) return X, y def get_scalers(self): return { 'feature_scaler': self.feature_scaler, 'target_scaler': self.target_scaler } def inverse_transform_target(self, y_pred): """反归一化目标变量""" return self.target_scaler.inverse_transform(y_pred)
class VariableLengthTimeSeriesDataset(Dataset): """变长序列数据集(用于处理不同长度的序列)""" def __init__(self, sequences, targets=None): """ 初始化变长序列数据集 参数: sequences: 序列列表,每个元素形状为 (seq_len, n_features) targets: 目标值列表 """ self.sequences = sequences self.targets = targets if targets is not None else [None] * len(sequences) self.max_len = max(len(seq) for seq in sequences) def __len__(self): return len(self.sequences) def __getitem__(self, idx): seq = self.sequences[idx] target = self.targets[idx] length = len(seq) if length < self.max_len: padding = torch.zeros(self.max_len - length, seq.shape[1]) seq = torch.cat([torch.FloatTensor(seq), padding], dim=0) else: seq = torch.FloatTensor(seq) if target is not None: target = torch.FloatTensor(target) return seq, target, length else: return seq, length
def collate_variable_length(batch): """处理变长序列的collate函数""" if len(batch[0]) == 3: sequences, targets, lengths = zip(*batch) lengths = torch.LongTensor(lengths) targets = torch.stack(targets) else: sequences, lengths = zip(*batch) lengths = torch.LongTensor(lengths) targets = None lengths, sorted_idx = lengths.sort(descending=True) sequences = torch.stack(sequences)[sorted_idx] if targets is not None: targets = targets[sorted_idx] return sequences, targets, lengths
|
6. 完整数据准备流程示例
6.1 端到端数据处理管道
以下是一个完整的端到端数据准备流程示例,涵盖从原始数据读取到最终DataLoader创建的全过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
| import pandas as pd import numpy as np import torch from torch.utils.data import Dataset, DataLoader from sklearn.preprocessing import MinMaxScaler
class TimeSeriesDataPipeline: """时间序列数据处理完整管道""" def __init__(self, config=None): """ 初始化数据管道 参数: config: 配置字典,包含所有处理参数 """ self.config = config or self._default_config() self.data = None self.scaler = None self.train_dataset = None self.val_dataset = None self.test_dataset = None def _default_config(self): """默认配置""" return { 'window_size': 24, 'pred_steps': 1, 'batch_size': 32, 'test_ratio': 0.2, 'val_ratio': 0.1, 'target_col': 'value', 'feature_cols': None, 'normalize': True, 'add_time_features': True } def load_data(self, filepath, date_col=None): """加载数据""" if filepath.endswith('.csv'): self.data = pd.read_csv(filepath) elif filepath.endswith('.parquet'): self.data = pd.read_parquet(filepath) elif filepath.endswith('.pkl'): self.data = pd.read_pickle(filepath) else: raise ValueError(f"Unsupported file format: {filepath}") if date_col and date_col in self.data.columns: self.data[date_col] = pd.to_datetime(self.data[date_col]) self.data.set_index(date_col, inplace=True) return self def clean_data(self, fill_method='ffill', dropna_threshold=0.5): """ 数据清洗 参数: fill_method: 缺失值填充方法 dropna_threshold: 超过该比例则删除列 """ missing_ratio = self.data.isnull().sum() / len(self.data) cols_to_drop = missing_ratio[missing_ratio > dropna_threshold].index.tolist() if cols_to_drop: self.data = self.data.drop(columns=cols_to_drop) print(f"Dropped columns with too many missing values: {cols_to_drop}") if fill_method == 'ffill': self.data = self.data.ffill().bfill() elif fill_method == 'interpolate': self.data = self.data.interpolate(method='linear') return self def add_time_features(self): """添加时间特征""" if isinstance(self.data.index, pd.DatetimeIndex): self.data['hour'] = self.data.index.hour self.data['day'] = self.data.index.day self.data['month'] = self.data.index.month self.data['dayofweek'] = self.data.index.dayofweek self.data['quarter'] = self.data.index.quarter return self def add_lag_features(self, columns, lags=[1, 3, 6, 12, 24]): """添加滞后特征""" for col in columns: for lag in lags: self.data[f'{col}_lag_{lag}'] = self.data[col].shift(lag) return self def add_rolling_features(self, columns, windows=[6, 12, 24]): """添加滚动统计特征""" for col in columns: for window in windows: self.data[f'{col}_roll_mean_{window}'] = \ self.data[col].rolling(window=window).mean() self.data[f'{col}_roll_std_{window}'] = \ self.data[col].rolling(window=window).std() return self def split_data(self, test_ratio=None, val_ratio=None): """ 划分训练集、验证集、测试集 参数: test_ratio: 测试集比例 val_ratio: 验证集比例 返回: train_data, val_data, test_data """ test_ratio = test_ratio or self.config['test_ratio'] val_ratio = val_ratio or self.config['val_ratio'] n = len(self.data) test_size = int(n * test_ratio) val_size = int(n * val_ratio) test_data = self.data.iloc[-test_size:] val_data = self.data.iloc[-(test_size + val_size):-test_size] train_data = self.data.iloc[:-(test_size + val_size)] return train_data, val_data, test_data def create_datasets(self, train_data, val_data, test_data): """创建数据集""" self.train_dataset = TimeSeriesDataset( train_data, window_size=self.config['window_size'], pred_steps=self.config['pred_steps'], target_col=self.config['target_col'], feature_cols=self.config.get('feature_cols') ) scaler = self.train_dataset.get_scaler() self.val_dataset = TimeSeriesDataset( val_data, window_size=self.config['window_size'], pred_steps=self.config['pred_steps'], target_col=self.config['target_col'], feature_cols=self.config.get('feature_cols'), scaler=scaler ) self.test_dataset = TimeSeriesDataset( test_data, window_size=self.config['window_size'], pred_steps=self.config['pred_steps'], target_col=self.config['target_col'], feature_cols=self.config.get('feature_cols'), scaler=scaler ) return self.train_dataset, self.val_dataset, self.test_dataset def create_dataloaders(self, batch_size=None): """创建DataLoader""" batch_size = batch_size or self.config['batch_size'] train_loader = DataLoader( self.train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True ) val_loader = DataLoader( self.val_dataset, batch_size=batch_size, shuffle=False, num_workers=0 ) test_loader = DataLoader( self.test_dataset, batch_size=batch_size, shuffle=False, num_workers=0 ) return train_loader, val_loader, test_loader def run(self, filepath, date_col=None): """ 运行完整的数据处理流程 参数: filepath: 数据文件路径 date_col: 日期列名 返回: train_loader, val_loader, test_loader """ print("Step 1: Loading data...") self.load_data(filepath, date_col) print(f" Loaded {len(self.data)} samples") print("Step 2: Cleaning data...") self.clean_data() print(f" Shape after cleaning: {self.data.shape}") print("Step 3: Adding time features...") if self.config.get('add_time_features'): self.add_time_features() print("Step 4: Adding lag features...") target_col = self.config['target_col'] self.add_lag_features([target_col]) print("Step 5: Adding rolling features...") self.add_rolling_features([target_col]) print("Step 6: Splitting data...") train_data, val_data, test_data = self.split_data() print(f" Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}") print("Step 7: Creating datasets...") self.create_datasets(train_data, val_data, test_data) print("Step 8: Creating dataloaders...") loaders = self.create_dataloaders() print("Data pipeline completed successfully!") return loaders
def demonstrate_pipeline(): """演示完整数据处理流程""" np.random.seed(42) n_samples = 1000 dates = pd.date_range('2024-01-01', periods=n_samples, freq='H') trend = np.linspace(0, 50, n_samples) seasonal = 10 * np.sin(2 * np.pi * np.arange(n_samples) / 168) noise = np.random.normal(0, 1, n_samples) data = pd.DataFrame({ 'date': dates, 'value': trend + seasonal + noise, 'volume': np.abs(np.random.normal(1000, 200, n_samples)) }) sample_file = '/tmp/sample_timeseries.csv' data.to_csv(sample_file, index=False) config = { 'window_size': 24, 'pred_steps': 1, 'batch_size': 32, 'test_ratio': 0.2, 'val_ratio': 0.1, 'target_col': 'value', 'add_time_features': True } pipeline = TimeSeriesDataPipeline(config) train_loader, val_loader, test_loader = pipeline.run(sample_file, date_col='date') print("\n=== DataLoader Verification ===") X, y = next(iter(train_loader)) print(f"Input shape: {X.shape}") print(f"Target shape: {y.shape}") return train_loader, val_loader, test_loader
if __name__ == "__main__": train_loader, val_loader, test_loader = demonstrate_pipeline()
|
7. 最佳实践与注意事项
7.1 数据泄露防范
数据泄露是时间序列预测中最常见也最严重的问题之一。数据泄露指的是模型在训练过程中使用了不应该获取的”未来”信息,导致评估指标过于乐观,实际应用效果差。
归一化参数必须从训练集计算:在划分数据集之前,应该先计算归一化参数(如均值、标准差、最大最小值等),然后将这些参数应用到所有数据集。如果在全部数据上计算归一化参数,测试集的信息会通过归一化过程泄露到训练过程中。
滚动特征的计算窗口必须正确:在创建滚动均值、滚动标准差等特征时,必须确保只使用当前时刻之前的数据。例如,计算t时刻的24小时滚动均值时,应该使用t-23到t时刻的数据,而不是t+1时刻的数据。
时间序列交叉验证:传统的K-Fold交叉验证不适用于时间序列数据,因为打乱顺序会破坏时序依赖关系。应该使用时间序列专用的交叉验证方法,如TimeSeriesSplit或滑动窗口验证。
7.2 特征选择建议
过多的特征可能导致维度灾难和过拟合,而关键特征的遗漏则会影响模型性能。以下是一些特征选择的建议:
基于相关性的初步筛选:计算每个特征与目标变量的相关性,去除相关性极低的特征。但需要注意,相关性低不代表特征无用,某些特征可能在组合使用时才有价值。
基于模型重要性的选择:训练一个简单的基线模型(如随机森林),根据特征重要性进行筛选。这种方法能够捕获非线性关系和特征间的交互作用。
递归特征消除(RFE):通过递归地移除最不重要的特征来选择最优特征子集。这种方法计算量大,但通常能得到较好的特征组合。
领域知识的重要性:在时间序列预测中,领域知识往往比纯粹的统计方法更重要。例如,在股价预测中,技术分析师的经验和直觉可能比自动化特征选择更可靠。
7.3 超参数选择指南
数据准备过程中涉及多个超参数,需要根据具体问题和数据特点进行选择:
**窗口大小(Window Size)**的选择取决于数据的周期性和预测任务的性质。对于日数据,如果数据存在周周期性,窗口大小应该设置为7的倍数;如果存在年周期性,则需要更长的窗口。通常,窗口大小应该涵盖主要的时间模式,但不宜过长以避免信息稀释。
**预测步数(Prediction Steps)**根据业务需求确定。单步预测适用于即时的决策场景,多步预测适用于中长期规划。多步预测通常比单步预测更难,因为误差会随预测步数累积。
滚动窗口大小的选择需要权衡统计可靠性和对变化的敏感性。较大的窗口(如24、48、168)提供更稳定的统计估计,但可能掩盖短期波动;较小的窗口(如3、6、12)能更快地捕捉变化,但可能对噪声过于敏感。
结语
本文系统地介绍了LSTM数据集准备的完整流程,涵盖数据清洗、特征工程、序列构建和PyTorch DataLoader实现等核心环节。通过合理的数据预处理和特征工程,可以显著提升LSTM模型的预测性能。
在实际应用中,数据准备往往是最耗时也是最关键的环节。本文提供的工具类和代码模板可以作为基础,根据具体业务场景进行调整和扩展。值得注意的是,数据质量比模型复杂度更重要,再精妙的模型也无法弥补数据本身的缺陷。
希望本文能够帮助读者建立完整的时间序列数据处理知识体系,为后续的模型训练和优化奠定坚实基础。在实际项目中,建议读者从简单的基线模型开始,逐步迭代优化数据处理流程和模型架构,以获得最佳的效果。
本文涉及的完整代码已通过测试,可以直接在Python环境中运行。建议读者结合实际数据集进行练习,以加深对各个模块的理解。