""" Générateur de Labels — Supervision du modèle ML stratégie. Pour chaque barre, on regarde ce qui se passe dans les N barres suivantes : - LONG (1) : le prix monte de `tp_pct` avant de baisser de `sl_pct` - SHORT (-1) : le prix baisse de `sl_pct` avant de monter de `tp_pct` - NEUTRAL (0) : aucune des deux conditions n'est atteinte Cette approche "forward simulation" reproduit la logique réelle d'un trade : chaque label correspond à un trade qui aurait été rentable (TP atteint avant SL). """ import numpy as np import pandas as pd from typing import Tuple import logging logger = logging.getLogger(__name__) class LabelGenerator: """ Génère des labels de classification pour l'entraînement supervisé. Args: tp_pct: Mouvement haussier minimal pour valider un LONG (ex: 0.003 = 0.3%) sl_pct: Mouvement baissier maximal avant stop-loss (ex: 0.002 = 0.2%) horizon: Nombre de barres max pour atteindre TP ou SL min_ratio: Ratio TP/SL minimum (ignore les configs déséquilibrées) Exemple avec ATR dynamique: gen = LabelGenerator(tp_pct=None, sl_pct=None) labels = gen.generate_atr(df, atr_tp_mult=2.0, atr_sl_mult=1.0) """ def __init__( self, tp_pct: float = 0.003, # 0.3% TP par défaut sl_pct: float = 0.002, # 0.2% SL par défaut → R:R = 1.5 horizon: int = 30, # Fenêtre de 30 barres min_ratio: float = 1.0, # TP doit être >= SL ): self.tp_pct = tp_pct self.sl_pct = sl_pct self.horizon = horizon self.min_ratio = min_ratio def generate(self, df: pd.DataFrame) -> pd.Series: """ Génère les labels à partir de seuils fixes en pourcentage. Args: df: DataFrame OHLCV (colonnes en minuscules) Returns: pd.Series avec valeurs 1 (LONG), -1 (SHORT), 0 (NEUTRAL) """ df = df.copy() df.columns = [c.lower() for c in df.columns] labels = self._forward_simulate(df, self.tp_pct, self.sl_pct) self._log_distribution(labels) return labels def generate_atr_based( self, df: pd.DataFrame, atr_period: int = 14, atr_tp_mult: float = 2.0, atr_sl_mult: float = 1.0, ) -> pd.Series: """ Génère les labels avec des seuils TP/SL dynamiques basés sur l'ATR. Plus adapté aux marchés forex où la volatilité varie beaucoup. Args: df: DataFrame OHLCV atr_period: Période ATR atr_tp_mult: Multiplicateur ATR pour TP (ex: 2.0 = 2×ATR) atr_sl_mult: Multiplicateur ATR pour SL (ex: 1.0 = 1×ATR) Returns: pd.Series labels 1 / -1 / 0 """ df = df.copy() df.columns = [c.lower() for c in df.columns] # Calcul ATR h, l, pc = df['high'], df['low'], df['close'].shift(1) tr = pd.concat([h - l, (h - pc).abs(), (l - pc).abs()], axis=1).max(axis=1) atr = tr.rolling(atr_period).mean() labels = np.zeros(len(df), dtype=int) for i in range(len(df) - self.horizon): close = df['close'].iloc[i] atr_i = atr.iloc[i] if np.isnan(atr_i) or atr_i <= 0: continue tp_long = close + atr_tp_mult * atr_i sl_long = close - atr_sl_mult * atr_i tp_short = close - atr_tp_mult * atr_i sl_short = close + atr_sl_mult * atr_i future = df.iloc[i + 1: i + 1 + self.horizon] label = self._classify_bar(future, tp_long, sl_long, tp_short, sl_short) labels[i] = label result = pd.Series(labels, index=df.index) self._log_distribution(result) return result # ------------------------------------------------------------------------- # Internals # ------------------------------------------------------------------------- def _forward_simulate( self, df: pd.DataFrame, tp_pct: float, sl_pct: float ) -> pd.Series: """Simule chaque barre : TP ou SL atteint en premier dans l'horizon.""" labels = np.zeros(len(df), dtype=int) for i in range(len(df) - self.horizon): close = df['close'].iloc[i] tp_long = close * (1 + tp_pct) sl_long = close * (1 - sl_pct) tp_short = close * (1 - tp_pct) sl_short = close * (1 + sl_pct) future = df.iloc[i + 1: i + 1 + self.horizon] labels[i] = self._classify_bar(future, tp_long, sl_long, tp_short, sl_short) return pd.Series(labels, index=df.index) @staticmethod def _classify_bar( future: pd.DataFrame, tp_long: float, sl_long: float, tp_short: float, sl_short: float, ) -> int: """ Parcourt les barres futures bar par bar et retourne le label. Vérifie HIGH pour TP LONG et LOW pour SL LONG (et inversement pour SHORT). """ for _, bar in future.iterrows(): # LONG : TP atteint ? if bar['high'] >= tp_long and bar['low'] > sl_long: return 1 # LONG : SL atteint en premier ? if bar['low'] <= sl_long: # Vérifie si TP atteint le même bar (candle ambiguë) if bar['high'] >= tp_long: return 0 # Ambigu → neutre return 0 # SL touché → pas de LONG # SHORT : TP atteint ? if bar['low'] <= tp_short and bar['high'] < sl_short: return -1 # SHORT : SL atteint en premier ? if bar['high'] >= sl_short: if bar['low'] <= tp_short: return 0 return 0 return 0 # Ni TP ni SL atteint dans l'horizon @staticmethod def _log_distribution(labels: pd.Series) -> None: total = len(labels) if total == 0: return n_long = (labels == 1).sum() n_short = (labels == -1).sum() n_neutral = (labels == 0).sum() logger.info( f"Distribution labels : LONG={n_long} ({n_long/total:.1%}), " f"SHORT={n_short} ({n_short/total:.1%}), " f"NEUTRAL={n_neutral} ({n_neutral/total:.1%})" )