""" Générateur de Labels — Supervision du modèle ML stratégie. Pour chaque barre, on regarde ce qui se passe dans les N barres suivantes : - LONG (1) : le prix monte de `tp_pct` avant de baisser de `sl_pct` - SHORT (-1) : le prix baisse de `sl_pct` avant de monter de `tp_pct` - NEUTRAL (0) : aucune des deux conditions n'est atteinte Cette approche "forward simulation" reproduit la logique réelle d'un trade : chaque label correspond à un trade qui aurait été rentable (TP atteint avant SL). """ import numpy as np import pandas as pd from typing import Tuple import logging logger = logging.getLogger(__name__) class LabelGenerator: """ Génère des labels de classification pour l'entraînement supervisé. Args: tp_pct: Mouvement haussier minimal pour valider un LONG (ex: 0.003 = 0.3%) sl_pct: Mouvement baissier maximal avant stop-loss (ex: 0.002 = 0.2%) horizon: Nombre de barres max pour atteindre TP ou SL min_ratio: Ratio TP/SL minimum (ignore les configs déséquilibrées) Exemple avec ATR dynamique: gen = LabelGenerator(tp_pct=None, sl_pct=None) labels = gen.generate_atr(df, atr_tp_mult=2.0, atr_sl_mult=1.0) """ def __init__( self, tp_pct: float = 0.003, # 0.3% TP par défaut sl_pct: float = 0.002, # 0.2% SL par défaut → R:R = 1.5 horizon: int = 30, # Fenêtre de 30 barres min_ratio: float = 1.0, # TP doit être >= SL ): self.tp_pct = tp_pct self.sl_pct = sl_pct self.horizon = horizon self.min_ratio = min_ratio def generate(self, df: pd.DataFrame) -> pd.Series: """ Génère les labels à partir de seuils fixes en pourcentage. Args: df: DataFrame OHLCV (colonnes en minuscules) Returns: pd.Series avec valeurs 1 (LONG), -1 (SHORT), 0 (NEUTRAL) """ df = df.copy() df.columns = [c.lower() for c in df.columns] labels = self._forward_simulate(df, self.tp_pct, self.sl_pct) self._log_distribution(labels) return labels def generate_atr_based( self, df: pd.DataFrame, atr_period: int = 14, atr_tp_mult: float = 2.0, atr_sl_mult: float = 1.0, ) -> pd.Series: """ Génère les labels avec des seuils TP/SL dynamiques basés sur l'ATR. Plus adapté aux marchés forex où la volatilité varie beaucoup. Args: df: DataFrame OHLCV atr_period: Période ATR atr_tp_mult: Multiplicateur ATR pour TP (ex: 2.0 = 2×ATR) atr_sl_mult: Multiplicateur ATR pour SL (ex: 1.0 = 1×ATR) Returns: pd.Series labels 1 / -1 / 0 """ df = df.copy() df.columns = [c.lower() for c in df.columns] # Calcul ATR h, l, pc = df['high'], df['low'], df['close'].shift(1) tr = pd.concat([h - l, (h - pc).abs(), (l - pc).abs()], axis=1).max(axis=1) atr = tr.rolling(atr_period).mean() labels = np.zeros(len(df), dtype=int) for i in range(len(df) - self.horizon): close = df['close'].iloc[i] atr_i = atr.iloc[i] if np.isnan(atr_i) or atr_i <= 0: continue tp_long = close + atr_tp_mult * atr_i sl_long = close - atr_sl_mult * atr_i tp_short = close - atr_tp_mult * atr_i sl_short = close + atr_sl_mult * atr_i future = df.iloc[i + 1: i + 1 + self.horizon] label = self._classify_bar(future, tp_long, sl_long, tp_short, sl_short) labels[i] = label result = pd.Series(labels, index=df.index) self._log_distribution(result) return result # ------------------------------------------------------------------------- # Internals # ------------------------------------------------------------------------- def _forward_simulate( self, df: pd.DataFrame, tp_pct: float, sl_pct: float ) -> pd.Series: """Simule chaque barre : TP ou SL atteint en premier dans l'horizon.""" labels = np.zeros(len(df), dtype=int) for i in range(len(df) - self.horizon): close = df['close'].iloc[i] tp_long = close * (1 + tp_pct) sl_long = close * (1 - sl_pct) tp_short = close * (1 - tp_pct) sl_short = close * (1 + sl_pct) future = df.iloc[i + 1: i + 1 + self.horizon] labels[i] = self._classify_bar(future, tp_long, sl_long, tp_short, sl_short) return pd.Series(labels, index=df.index) @staticmethod def _classify_bar( future: pd.DataFrame, tp_long: float, sl_long: float, tp_short: float, sl_short: float, ) -> int: """ Simule LONG et SHORT de façon indépendante sur les barres futures. LONG et SHORT sont deux trades hypothétiques distincts : le SL du LONG (prix baisse) ne signifie pas que le SL du SHORT (prix monte) est touché. Les deux simulations sont donc parcourues séparément pour éviter de manquer les signaux SHORT quand le prix descend. Retourne le label du trade gagnant qui se résout en premier : 1 (LONG), -1 (SHORT) ou 0 (NEUTRAL). """ # --- Simulation LONG indépendante --- long_win_idx = None long_lose_idx = None for idx, (_, bar) in enumerate(future.iterrows()): tp_hit = bar['high'] >= tp_long sl_hit = bar['low'] <= sl_long if tp_hit and sl_hit: long_lose_idx = idx # Barre ambiguë → perte break if tp_hit: long_win_idx = idx break if sl_hit: long_lose_idx = idx break # --- Simulation SHORT indépendante --- short_win_idx = None short_lose_idx = None for idx, (_, bar) in enumerate(future.iterrows()): tp_hit = bar['low'] <= tp_short sl_hit = bar['high'] >= sl_short if tp_hit and sl_hit: short_lose_idx = idx # Barre ambiguë → perte break if tp_hit: short_win_idx = idx break if sl_hit: short_lose_idx = idx break long_won = long_win_idx is not None short_won = short_win_idx is not None if long_won and not short_won: return 1 if short_won and not long_won: return -1 if long_won and short_won: # Les deux trades seraient gagnants : prendre celui qui se résout en premier return 1 if long_win_idx <= short_win_idx else -1 return 0 # Aucun TP atteint dans l'horizon @staticmethod def _log_distribution(labels: pd.Series) -> None: total = len(labels) if total == 0: return n_long = (labels == 1).sum() n_short = (labels == -1).sum() n_neutral = (labels == 0).sum() logger.info( f"Distribution labels : LONG={n_long} ({n_long/total:.1%}), " f"SHORT={n_short} ({n_short/total:.1%}), " f"NEUTRAL={n_neutral} ({n_neutral/total:.1%})" )