I tried Win11 + TensorFlow-gpu 2.7.0 + CUDA 11.5 + cuDNN 8.3 + Python 3.9.6.
and TensorFlow2.10.0+CUDA v11.2+cudnn 8.6+python3.10, still have the same problem
shows:Epoch 1/100
2024-07-22 15:23:41.993258: F tensorflow/compiler/xla/service/gpu/nvptx_compiler.cc:480] ptxas returned an error during compilation of ptx to sass: 'INTERNAL: ptxas exited with non-zero error code -1, output: ’ If the error message indicates that a file could not be written, please verify that sufficient filesystem space is provided.
my original code is :
import os
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model, model_from_json
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input, MultiHeadAttention, LayerNormalization,
GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import ta
import warnings
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import tensorflow as tf
from joblib import Parallel, delayed
import h5py
import time
from concurrent.futures import ThreadPoolExecutor
warnings.filterwarnings(“ignore”)
plt.style.use(‘fivethirtyeight’)
os.environ[‘TF_XLA_FLAGS’] = ‘–tf_xla_cpu_global_jit’
os.environ[‘TF_CPP_MIN_LOG_LEVEL’] = ‘2’ # 仅显示警告和错误日志
def set_seed(seed=42):
np.random.seed(seed)
tf.random.set_seed(seed)
set_seed()
tf.config.threading.set_intra_op_parallelism_threads(8)
tf.config.threading.set_inter_op_parallelism_threads(8)
tf.config.optimizer.set_jit(True)
def get_stock_data(ticker, start_date, end_date=None):
stock_data = yf.download(ticker, start=start_date, end=end_date)
return stock_data
def engineer_features(data):
data[‘SMA_20’] = data[‘Close’].rolling(window=20).mean()
data[‘EMA_20’] = data[‘Close’].ewm(span=20, adjust=False).mean()
data[‘RSI’] = ta.momentum.rsi(data[‘Close’], window=14)
data[‘MACD’] = ta.trend.macd_diff(data[‘Close’])
data[‘BB_High’], data[‘BB_Mid’], data[‘BB_Low’] = ta.volatility.bollinger_hband(
data[‘Close’]), ta.volatility.bollinger_mavg(data[‘Close’]), ta.volatility.bollinger_lband(data[‘Close’])
data[‘ATR’] = ta.volatility.average_true_range(data[‘High’], data[‘Low’], data[‘Close’])
data[‘ADX’] = ta.trend.adx(data[‘High’], data[‘Low’], data[‘Close’])
data[‘OBV’] = ta.volume.on_balance_volume(data[‘Close’], data[‘Volume’])
features = ['Open', 'High', 'Low', 'Close', 'Volume', 'SMA_20', 'EMA_20', 'RSI', 'MACD', 'BB_High', 'BB_Mid',
'BB_Low', 'ATR', 'ADX', 'OBV']
data = data[features].fillna(method='ffill').fillna(method='bfill')
return data
def handle_outliers(data, n_sigmas=3):
for column in data.columns:
mean = data[column].mean()
std = data[column].std()
data[column] = data[column].clip(lower=mean - n_sigmas * std, upper=mean + n_sigmas * std)
return data
def preprocess_data(data, look_back=60):
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)
close_index = data.columns.get_loc('Close')
X, y = [], []
for i in range(look_back, len(scaled_data)):
X.append(scaled_data[i - look_back:i])
y.append(scaled_data[i, close_index])
return np.array(X), np.array(y), scaler, close_index
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs)
x = Dropout(dropout)(x)
x = LayerNormalization(epsilon=1e-6)(x + inputs)
ff = Dense(ff_dim, activation=“relu”)(x)
ff = Dense(inputs.shape[-1])(ff)
ff = Dropout(dropout)(ff)
return LayerNormalization(epsilon=1e-6)(x + ff)
创建LSTM-Transformer模型
def create_lstm_transformer_model(input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=3,
mlp_units=[128, 64], dropout=0.1, mlp_dropout=0.2):
inputs = Input(shape=input_shape)
x = LSTM(64, return_sequences=True)(inputs)
x = Dropout(dropout)(x)
x = LSTM(64, return_sequences=True)(x)
x = Dropout(dropout)(x)
for _ in range(num_transformer_blocks):
x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
x = GlobalAveragePooling1D()(x)
for dim in mlp_units:
x = Dense(dim, activation="relu")(x)
x = Dropout(mlp_dropout)(x)
outputs = Dense(1)(x)
model = Model(inputs, outputs)
model.compile(optimizer=Adam(learning_rate=1e-4), loss='mse')
return model
def arima_forecast(data, order=(1, 1, 1)):
try:
data = data.reset_index(drop=True)
model = ARIMA(data, order=order)
results = model.fit()
forecast = results.forecast(steps=1)
return forecast.values[0]
except Exception as e:
print(f"ARIMA预测失败: {e}")
return data.iloc[-1]
def rf_forecast(X, y):
model = RandomForestRegressor(n_estimators=50, random_state=42)
model.fit(X, y)
return model.predict(X[-1].reshape(1, -1))[0]
def ensemble_predict(lstm_transformer_pred, arima_pred, rf_pred):
return (0.6 * lstm_transformer_pred + 0.2 * arima_pred + 0.2 * rf_pred)
def save_weights_h5(model, filepath):
with h5py.File(filepath, ‘w’) as f:
for layer in model.layers:
g = f.create_group(layer.name)
weights = layer.get_weights()
for i, weight in enumerate(weights):
g.create_dataset(str(i), data=weight)
def load_weights_h5(model, filepath):
with h5py.File(filepath, ‘r’) as f:
for layer in model.layers:
g = f[layer.name]
weights = [g[str(i)][:] for i in range(len(g))]
model.get_layer(layer.name).set_weights(weights)
def trading_strategy(actual_prices, predicted_prices, positions, cash, shares,
buy_threshold=0.005, sell_threshold=0.005, stop_loss=0.05, take_profit=0.1, cooldown_period=5):
for i in range(1, len(actual_prices)):
current_price = actual_prices[i]
predicted_price = predicted_prices[i]
if positions[-1] == 0: # 当前没有持仓
if predicted_price > current_price * (1 + buy_threshold):
shares_to_buy = cash // current_price
cash -= shares_to_buy * current_price
shares += shares_to_buy
positions.append(1)
else:
positions.append(0)
elif positions[-1] == 1: # 当前持有股票
if predicted_price < current_price * (1 - sell_threshold) or \
current_price <= actual_prices[positions.index(1)] * (1 - stop_loss) or \
current_price >= actual_prices[positions.index(1)] * (1 + take_profit):
cash += shares * current_price
shares = 0
positions.append(-1)
else:
positions.append(1)
else: # 冷却期
if len(positions) - positions.index(-1) > cooldown_period:
positions.append(0)
else:
positions.append(-1)
return positions, cash, shares
def backtest(actual_prices, predicted_prices, initial_cash=10000):
cash = initial_cash
shares = 0
positions = [0]
portfolio_values = [cash]
positions, cash, shares = trading_strategy(actual_prices, predicted_prices, positions, cash, shares)
for i in range(1, len(actual_prices)):
portfolio_value = cash + shares * actual_prices[i]
portfolio_values.append(portfolio_value)
return portfolio_values, positions
def single_step_forecast(i, featured_data, train_data, scaler, close_index, model_json, weights_path):
current_data = featured_data.iloc[:len(train_data) + i]
X, _, _, _ = preprocess_data(current_data)
lstm_transformer_model = model_from_json(model_json)
load_weights_h5(lstm_transformer_model, weights_path)
lstm_transformer_pred = lstm_transformer_model.predict(X[-1:], verbose=0)
full_pred = np.zeros((1, X.shape[2]))
full_pred[0, close_index] = lstm_transformer_pred[0, 0]
lstm_transformer_pred = scaler.inverse_transform(full_pred)[0, close_index]
arima_pred = arima_forecast(current_data['Close'])
X_rf = current_data.values
y_rf = current_data['Close'].values
rf_pred = rf_forecast(X_rf[:-1], y_rf[1:])
ensemble_pred = ensemble_predict(lstm_transformer_pred, arima_pred, rf_pred)
return lstm_transformer_pred, arima_pred, rf_pred, ensemble_pred
ticker = ‘603283.SS’
start_date = ‘2017-09-15’
end_date = None
train_start_date = ‘2020-08-26’
stock_data = get_stock_data(ticker, start_date, end_date)
test_end_date = stock_data.index[-1]
test_start_date = test_end_date - pd.DateOffset(months=3)
train_data = stock_data[stock_data.index >= train_start_date]
test_data = stock_data[stock_data.index >= test_start_date]
print(“训练数据收盘价范围:”, train_data[‘Close’].min(), train_data[‘Close’].max())
print(“测试数据收盘价范围:”, test_data[‘Close’].min(), test_data[‘Close’].max())
with ThreadPoolExecutor() as executor:
train_features_future = executor.submit(engineer_features, train_data)
test_features_future = executor.submit(engineer_features, test_data)
featured_data_future = executor.submit(engineer_features, stock_data)
train_features = handle_outliers(train_features_future.result())
test_features = handle_outliers(test_features_future.result())
featured_data = handle_outliers(featured_data_future.result())
预处理训练数据
X_train, y_train, scaler, close_index = preprocess_data(train_features)
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
lstm_transformer_model = create_lstm_transformer_model((X_train.shape[1], X_train.shape[2]))
early_stopping = EarlyStopping(monitor=‘val_loss’, patience=10, restore_best_weights=True)
print(lstm_transformer_model.summary())
start_time = time.time()
history = lstm_transformer_model.fit(X_train, y_train, epochs=100, batch_size=32, validation_split=0.2,
callbacks=[early_stopping], verbose=1)
training_time = time.time() - start_time
plt.figure(figsize=(10, 5))
plt.plot(history.history[‘loss’], label=‘Training Loss’)
plt.plot(history.history[‘val_loss’], label=‘Validation Loss’)
plt.title(‘Model Loss During Training’)
plt.xlabel(‘Epochs’)
plt.ylabel(‘Loss’)
plt.legend()
plt.show()
model_json = lstm_transformer_model.to_json()
weights_path = “lstm_transformer_weights.h5”
save_weights_h5(lstm_transformer_model, weights_path)
results = Parallel(n_jobs=-1)(delayed(single_step_forecast)(
i, featured_data, train_data, scaler, close_index, model_json, weights_path)
for i in range(len(test_data)))
lstm_transformer_predictions, arima_predictions, rf_predictions, ensemble_predictions = zip(*results)
lstm_transformer_predictions = list(lstm_transformer_predictions)
arima_predictions = list(arima_predictions)
rf_predictions = list(rf_predictions)
ensemble_predictions = list(ensemble_predictions)
min_length = min(len(test_data), len(ensemble_predictions))
actual_prices = test_data[‘Close’].values[:min_length]
ensemble_predictions = ensemble_predictions[:min_length]
portfolio_values, positions = backtest(actual_prices, ensemble_predictions)
returns = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
max_drawdown = np.min(portfolio_values / np.maximum.accumulate(portfolio_values)) - 1
print(f"最终投资组合价值: {portfolio_values[-1]:.2f}“)
print(f"总回报率: {returns:.2%}”)
print(f"最大回撤: {max_drawdown:.2%}")
print(“预测结果分析:”)
print(f"实际价格平均值: {np.mean(actual_prices)}“)
print(f"预测价格平均值: {np.mean(ensemble_predictions)}”)
print(f"实际价格范围: {np.min(actual_prices)} to {np.max(actual_prices)}“)
print(f"预测价格范围: {np.min(ensemble_predictions)} to {np.max(ensemble_predictions)}”)
mse = mean_squared_error(actual_prices, ensemble_predictions)
mae = mean_absolute_error(actual_prices, ensemble_predictions)
rmse = np.sqrt(mse)
r2 = r2_score(actual_prices, ensemble_predictions)
print(f"均方误差 (MSE): {mse:.2f}“)
print(f"平均绝对误差 (MAE): {mae:.2f}”)
print(f"均方根误差 (RMSE): {rmse:.2f}“)
print(f"R2 分数: {r2:.2f}”)
print(“\n交易信号统计:”)
print(f"买入信号数量: {positions.count(1)}“)
print(f"卖出信号数量: {positions.count(-1)}”)
print(f"持有不动数量: {positions.count(0)}")
errors = np.array(actual_prices) - np.array(ensemble_predictions)
plt.figure(figsize=(10, 5))
plt.hist(errors, bins=50)
plt.title(‘预测误差分布’)
plt.xlabel(‘误差’)
plt.ylabel(‘频率’)
plt.show()
plt.figure(figsize=(15, 5))
plt.plot(test_data.index[:min_length], errors)
plt.title(‘预测误差随时间的变化’)
plt.xlabel(‘日期’)
plt.ylabel(‘误差’)
plt.show()
correlation = np.corrcoef(actual_prices, ensemble_predictions)[0, 1]
print(f"\n预测价格与实际价格的相关系数: {correlation:.2f}")
price_ranges = pd.cut(actual_prices, bins=5)
performance_by_range = pd.DataFrame({
‘Actual’: actual_prices,
‘Predicted’: ensemble_predictions,
‘Range’: price_ranges
})
for name, group in performance_by_range.groupby(‘Range’):
group_mae = mean_absolute_error(group[‘Actual’], group[‘Predicted’])
print(f"\n价格区间 {name}:“)
print(f” 样本数量: {len(group)}“)
print(f” 平均绝对误差 (MAE): {group_mae:.2f}")
print(“\n交易策略详细信息:”)
total_trades = sum(1 for i in range(1, len(positions)) if positions[i] != positions[i - 1])
profitable_trades = sum(1 for i in range(1, len(positions)) if
positions[i] == -1 and portfolio_values[i] > portfolio_values[positions.index(1)])
win_rate = profitable_trades / total_trades if total_trades > 0 else 0
print(f"总交易次数: {total_trades}“)
print(f"盈利交易次数: {profitable_trades}”)
print(f"胜率: {win_rate:.2%}")
计算夏普比率
risk_free_rate = 0.02 # 假设无风险利率为2%
returns = np.diff(portfolio_values) / portfolio_values[:-1]
excess_returns = returns - risk_free_rate / 252 # 假设每年252个交易日
sharpe_ratio = np.sqrt(252) * np.mean(excess_returns) / np.std(excess_returns) if np.std(excess_returns) != 0 else 0
print(f"夏普比率: {sharpe_ratio:.2f}")
start_time = time.time()
_ = lstm_transformer_model.predict(X_train[-1:])
prediction_time = time.time() - start_time
print(f"\n模型训练时间: {training_time:.2f} 秒")
print(f"单次预测时间: {prediction_time:.2f} 秒")
total_params = lstm_transformer_model.count_params()
print(f"模型参数数量: {total_params}")
plt.figure(figsize=(15, 10))
plt.plot(test_data.index[:min_length], actual_prices, label=‘实际价格’)
plt.plot(test_data.index[:min_length], ensemble_predictions, label=‘预测价格’)
plt.scatter(test_data.index[[i for i, x in enumerate(positions) if x == 1]],
[actual_prices[i] for i, x in enumerate(positions) if x == 1],
color=‘green’, label=‘买入信号’, marker=‘^’)
plt.scatter(test_data.index[[i for i, x in enumerate(positions) if x == -1]],
[actual_prices[i] for i, x in enumerate(positions) if x == -1],
color=‘red’, label=‘卖出信号’, marker=‘v’)
plt.title(‘股票价格预测和交易信号’)
plt.xlabel(‘日期’)
plt.ylabel(‘价格’)
plt.legend()
plt.show()