Files
trader-ml/src/ml/walk_forward.py
Tika da30ef19ed Initial commit — Trading AI Secure project complet
Architecture Docker (8 services), FastAPI, TimescaleDB, Redis, Streamlit.
Stratégies : scalping, intraday, swing. MLEngine + RegimeDetector (HMM).
BacktestEngine + WalkForwardAnalyzer + Optuna optimizer.
Routes API complètes dont /optimize async.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-08 17:38:09 +00:00

359 lines
12 KiB
Python

"""
Walk-Forward Analysis - Validation Robuste des Stratégies.
Implémente walk-forward analysis pour éviter l'overfitting:
- Rolling window optimization
- Out-of-sample testing
- Anchored vs rolling windows
- Performance tracking
"""
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class WalkForwardAnalyzer:
"""
Analyseur walk-forward pour validation robuste.
Divise les données en périodes train/test successives:
- Optimise sur période train
- Teste sur période test (out-of-sample)
- Avance la fenêtre
- Répète
Évite l'overfitting en testant sur données non vues.
Usage:
wfa = WalkForwardAnalyzer(strategy_class, data)
results = wfa.run(n_splits=10, train_size=0.7)
"""
def __init__(
self,
strategy_class,
data: pd.DataFrame,
optimizer,
initial_capital: float = 10000.0
):
"""
Initialise le walk-forward analyzer.
Args:
strategy_class: Classe de stratégie
data: Données complètes
optimizer: Optimiseur de paramètres
initial_capital: Capital initial
"""
self.strategy_class = strategy_class
self.data = data
self.optimizer = optimizer
self.initial_capital = initial_capital
self.results = []
logger.info("WalkForwardAnalyzer initialized")
def run(
self,
n_splits: int = 10,
train_ratio: float = 0.7,
window_type: str = 'rolling',
n_trials_per_split: int = 50
) -> Dict:
"""
Lance l'analyse walk-forward.
Args:
n_splits: Nombre de splits
train_ratio: Ratio train/test
window_type: 'rolling' ou 'anchored'
n_trials_per_split: Trials d'optimisation par split
Returns:
Résultats complets
"""
logger.info("=" * 60)
logger.info("WALK-FORWARD ANALYSIS")
logger.info("=" * 60)
logger.info(f"Splits: {n_splits}")
logger.info(f"Train ratio: {train_ratio:.0%}")
logger.info(f"Window type: {window_type}")
# Créer splits
splits = self._create_splits(n_splits, train_ratio, window_type)
# Analyser chaque split
for i, (train_data, test_data) in enumerate(splits):
logger.info(f"\n--- Split {i+1}/{n_splits} ---")
logger.info(f"Train: {len(train_data)} bars")
logger.info(f"Test: {len(test_data)} bars")
# Optimiser sur train
logger.info("Optimizing on train data...")
self.optimizer.data = train_data
opt_results = self.optimizer.optimize(n_trials=n_trials_per_split)
best_params = opt_results['best_params']
train_sharpe = opt_results['best_value']
logger.info(f"Train Sharpe: {train_sharpe:.2f}")
# Tester sur test (out-of-sample)
logger.info("Testing on out-of-sample data...")
test_metrics = self._backtest_on_data(best_params, test_data)
test_sharpe = test_metrics.get('sharpe_ratio', 0)
logger.info(f"Test Sharpe: {test_sharpe:.2f}")
# Sauvegarder résultats
self.results.append({
'split': i + 1,
'train_size': len(train_data),
'test_size': len(test_data),
'best_params': best_params,
'train_sharpe': train_sharpe,
'test_sharpe': test_sharpe,
'test_metrics': test_metrics,
'degradation': train_sharpe - test_sharpe,
})
# Analyser résultats globaux
summary = self._analyze_results()
logger.info("\n" + "=" * 60)
logger.info("WALK-FORWARD RESULTS")
logger.info("=" * 60)
logger.info(f"Avg Train Sharpe: {summary['avg_train_sharpe']:.2f}")
logger.info(f"Avg Test Sharpe: {summary['avg_test_sharpe']:.2f}")
logger.info(f"Avg Degradation: {summary['avg_degradation']:.2f}")
logger.info(f"Consistency: {summary['consistency']:.2%}")
logger.info(f"Overfitting Score: {summary['overfitting_score']:.2f}")
return {
'results': self.results,
'summary': summary
}
def _create_splits(
self,
n_splits: int,
train_ratio: float,
window_type: str
) -> List[Tuple[pd.DataFrame, pd.DataFrame]]:
"""
Crée les splits train/test.
Args:
n_splits: Nombre de splits
train_ratio: Ratio train/test
window_type: Type de fenêtre
Returns:
Liste de tuples (train_data, test_data)
"""
total_size = len(self.data)
splits = []
if window_type == 'rolling':
# Rolling window: fenêtre glissante
window_size = total_size // n_splits
train_size = int(window_size * train_ratio)
test_size = window_size - train_size
for i in range(n_splits):
start_idx = i * window_size
train_end_idx = start_idx + train_size
test_end_idx = min(train_end_idx + test_size, total_size)
if test_end_idx > total_size:
break
train_data = self.data.iloc[start_idx:train_end_idx]
test_data = self.data.iloc[train_end_idx:test_end_idx]
splits.append((train_data, test_data))
elif window_type == 'anchored':
# Anchored window: début fixe, fin avance
test_size = total_size // (n_splits + 1)
for i in range(n_splits):
train_end_idx = (i + 1) * test_size
test_end_idx = min(train_end_idx + test_size, total_size)
if test_end_idx > total_size:
break
train_data = self.data.iloc[:train_end_idx]
test_data = self.data.iloc[train_end_idx:test_end_idx]
splits.append((train_data, test_data))
return splits
def _backtest_on_data(
self,
params: Dict,
data: pd.DataFrame
) -> Dict:
"""
Backteste avec paramètres sur données out-of-sample.
Args:
params: Paramètres de stratégie
data: Données de test
Returns:
Métriques de performance calculées par MetricsCalculator
"""
from src.backtesting.metrics_calculator import MetricsCalculator
strategy = self.strategy_class(params)
metrics_calculator = MetricsCalculator()
equity = self.initial_capital
equity_curve = [equity]
trades = []
# Coûts de transaction (valeurs conservatrices)
commission_pct = 0.0001
slippage_pct = 0.0005
spread_pct = 0.0002
for i in range(50, len(data)):
historical_data = data.iloc[:i + 1]
try:
signal = strategy.analyze(historical_data)
if signal is None:
equity_curve.append(equity)
continue
current_bar = data.iloc[i]
close_price = float(current_bar.get("close", signal.entry_price))
# Prix d'exécution avec slippage + spread
if signal.direction == "LONG":
exec_price = signal.entry_price * (1 + slippage_pct + spread_pct)
else:
exec_price = signal.entry_price * (1 - slippage_pct - spread_pct)
qty = signal.quantity if signal.quantity else 1000.0
# Simuler fermeture sur la même barre (simplification walk-forward)
if signal.direction == "LONG":
exit_price = min(close_price, signal.take_profit) if close_price >= signal.take_profit else \
max(close_price, signal.stop_loss)
else:
exit_price = max(close_price, signal.take_profit) if close_price <= signal.take_profit else \
min(close_price, signal.stop_loss)
pnl = (exit_price - exec_price) * (qty if signal.direction == "LONG" else -qty)
commission = abs(exec_price * qty) * commission_pct * 2 # aller-retour
pnl -= commission
equity += pnl
equity_curve.append(equity)
trades.append({
"pnl": pnl,
"pnl_pct": pnl / (exec_price * qty) if qty else 0,
"entry_price": exec_price,
"exit_price": exit_price,
"direction": signal.direction,
"commission": commission,
"risk": abs(exec_price - signal.stop_loss) * qty,
})
except Exception:
equity_curve.append(equity)
continue
if not trades:
return {
"sharpe_ratio": 0.0,
"total_return": 0.0,
"max_drawdown": 0.0,
"win_rate": 0.0,
"total_trades": 0,
}
equity_series = pd.Series(equity_curve)
return metrics_calculator.calculate_all(
equity_curve=equity_series,
trades=trades,
initial_capital=self.initial_capital,
)
def _analyze_results(self) -> Dict:
"""
Analyse les résultats globaux.
Returns:
Dictionnaire avec métriques globales
"""
if not self.results:
return {}
train_sharpes = [r['train_sharpe'] for r in self.results]
test_sharpes = [r['test_sharpe'] for r in self.results]
degradations = [r['degradation'] for r in self.results]
# Moyennes
avg_train_sharpe = np.mean(train_sharpes)
avg_test_sharpe = np.mean(test_sharpes)
avg_degradation = np.mean(degradations)
# Consistency: % de splits avec test Sharpe > 0
positive_tests = len([s for s in test_sharpes if s > 0])
consistency = positive_tests / len(test_sharpes)
# Overfitting score: ratio degradation / train performance
overfitting_score = avg_degradation / avg_train_sharpe if avg_train_sharpe > 0 else 1.0
# Stabilité
stability = 1 - (np.std(test_sharpes) / avg_test_sharpe) if avg_test_sharpe > 0 else 0
return {
'avg_train_sharpe': avg_train_sharpe,
'avg_test_sharpe': avg_test_sharpe,
'avg_degradation': avg_degradation,
'consistency': consistency,
'overfitting_score': overfitting_score,
'stability': max(0, stability),
'n_splits': len(self.results),
}
def plot_results(self):
"""Affiche les résultats graphiquement."""
try:
import matplotlib.pyplot as plt
splits = [r['split'] for r in self.results]
train_sharpes = [r['train_sharpe'] for r in self.results]
test_sharpes = [r['test_sharpe'] for r in self.results]
plt.figure(figsize=(12, 6))
plt.plot(splits, train_sharpes, 'o-', label='Train Sharpe', linewidth=2)
plt.plot(splits, test_sharpes, 's-', label='Test Sharpe', linewidth=2)
plt.xlabel('Split')
plt.ylabel('Sharpe Ratio')
plt.title('Walk-Forward Analysis Results')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('walk_forward_results.png')
logger.info("Plot saved to walk_forward_results.png")
except ImportError:
logger.warning("matplotlib not available for plotting")