""" TradingEnv — Environnement Gymnasium pour l'agent RL de trading. Conforme à l'API gymnasium (reset/step) et adapté aux marchés forex. Espace d'observation : 20 features normalisées (fenêtre glissante de 20 barres) Espace d'action : Discrete(3) → {0=HOLD, 1=LONG, 2=SHORT} Récompense : PnL réalisé + pénalité drawdown + bonus Sharpe L'environnement gère : - Les transitions de position (flat → long, flat → short, inversions) - Le stop-loss / take-profit automatiques basés sur l'ATR - La normalisation des observations par l'ATR - La fenêtre glissante de 20 barres (lookback) """ import logging from typing import Optional, Tuple import numpy as np import pandas as pd logger = logging.getLogger(__name__) # Imports conditionnels gymnasium / gym try: import gymnasium as gym from gymnasium import spaces GYM_AVAILABLE = True GYM_MODULE = 'gymnasium' except ImportError: try: import gym from gym import spaces GYM_AVAILABLE = True GYM_MODULE = 'gym' except ImportError: gym = None spaces = None GYM_AVAILABLE = False GYM_MODULE = None # ────────────────────────────────────────────────────────────────────────────── # Actions # ────────────────────────────────────────────────────────────────────────────── ACTION_HOLD = 0 ACTION_LONG = 1 ACTION_SHORT = 2 # Nombre de features dans le vecteur d'observation N_FEATURES = 20 # Fenêtre lookback (nombre de barres passées incluses dans l'observation) LOOKBACK = 20 class TradingEnv: """ Environnement de trading conforme à l'API gymnasium pour l'agent PPO. Chaque step() représente la décision de trading à la fermeture d'une bougie. L'agent reçoit 20 features normalisées décrivant le contexte de marché et sa position courante, puis choisit HOLD / LONG / SHORT. Gestion des positions : - Une seule position à la fois (pas de pyramiding) - Inversion directe possible (SHORT → LONG sans passer par FLAT) - SL/TP ATR-based (sl_atr_mult×ATR et tp_atr_mult×ATR) Récompense : - PnL réalisé à la clôture de position (en multiple d'ATR) - Pénalité proportionnelle au drawdown courant - Pas de bonus pour maintenir une position (évite le surtrading) Args: df: DataFrame OHLCV (colonnes minuscules : open/high/low/close/volume) sl_atr_mult: Multiplicateur ATR pour le stop-loss (défaut: 1.0) tp_atr_mult: Multiplicateur ATR pour le take-profit (défaut: 2.0) atr_period: Période de calcul de l'ATR (défaut: 14) initial_capital: Capital initial (pour calcul drawdown) (défaut: 10000) drawdown_penalty: Coefficient de pénalité drawdown (défaut: 0.1) """ metadata = {'render_modes': []} def __init__( self, df: pd.DataFrame, sl_atr_mult: float = 1.0, tp_atr_mult: float = 2.0, atr_period: int = 14, initial_capital: float = 10_000.0, drawdown_penalty: float = 0.1, ): if not GYM_AVAILABLE: raise RuntimeError( "gymnasium ou gym requis — installer gymnasium>=0.26 ou gym>=0.21" ) self.df = df.copy().reset_index(drop=True) self.df.columns = [c.lower() for c in self.df.columns] self.sl_atr_mult = sl_atr_mult self.tp_atr_mult = tp_atr_mult self.atr_period = atr_period self.initial_capital = initial_capital self.drawdown_penalty = drawdown_penalty # Calcul de l'ATR sur toute la série (optimisation : évite le recalcul dans step) self._atr_series = self._compute_atr_series() # Calcul des EMAs sur toute la série self._ema9 = self.df['close'].ewm(span=9, adjust=False).mean() self._ema21 = self.df['close'].ewm(span=21, adjust=False).mean() self._ema50 = self.df['close'].ewm(span=50, adjust=False).mean() self._ema200 = self.df['close'].ewm(span=200, adjust=False).mean() # Calcul des Bandes de Bollinger (20, 2σ) bb_ma = self.df['close'].rolling(20).mean() bb_std = self.df['close'].rolling(20).std() self._bb_upper = bb_ma + 2 * bb_std self._bb_lower = bb_ma - 2 * bb_std # Calcul du RSI(14) self._rsi = self._compute_rsi(period=14) # Calcul du MACD (12, 26, 9) ema12 = self.df['close'].ewm(span=12, adjust=False).mean() ema26 = self.df['close'].ewm(span=26, adjust=False).mean() self._macd = ema12 - ema26 self._macd_sig = self._macd.ewm(span=9, adjust=False).mean() # Volume MA(20) pour normalisation self._vol_ma20 = self.df['volume'].rolling(20).mean().fillna( self.df['volume'].mean() ) # Espaces gymnasium self.observation_space = spaces.Box( low = -10.0, high = 10.0, shape = (N_FEATURES,), dtype = np.float32, ) self.action_space = spaces.Discrete(3) # État interne (initialisé dans reset()) self._current_step = LOOKBACK self._position = 0 # -1=short, 0=flat, 1=long self._entry_price = 0.0 self._entry_atr = 0.0 self._bars_in_trade = 0 self._capital = initial_capital self._peak_capital = initial_capital self._total_pnl = 0.0 self._pnl_history = [] # Pour calcul Sharpe en fin d'épisode self._done = False # ────────────────────────────────────────────────────────────────────────── # Interface gymnasium # ────────────────────────────────────────────────────────────────────────── def reset(self, seed: Optional[int] = None, options: Optional[dict] = None): """ Réinitialise l'environnement au début d'un épisode. Args: seed: Graine aléatoire (ignorée ici, données déterministes) options: Options additionnelles (non utilisées) Returns: Tuple (observation, info) conforme gymnasium """ if seed is not None: np.random.seed(seed) self._current_step = LOOKBACK self._position = 0 self._entry_price = 0.0 self._entry_atr = 0.0 self._bars_in_trade = 0 self._capital = self.initial_capital self._peak_capital = self.initial_capital self._total_pnl = 0.0 self._pnl_history = [] self._done = False obs = self._get_observation() info = {} return obs, info def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]: """ Exécute une action et retourne la transition. Args: action: 0=HOLD, 1=LONG, 2=SHORT Returns: Tuple (observation, reward, terminated, truncated, info) conforme gymnasium """ if self._done: obs, info = self.reset() return obs, 0.0, True, False, info reward = 0.0 info = {} current_close = float(self.df['close'].iloc[self._current_step]) current_atr = float(self._atr_series.iloc[self._current_step]) if current_atr <= 0: current_atr = float(self.df['close'].iloc[self._current_step]) * 0.001 # ── Vérification SL/TP avant d'appliquer la nouvelle action ────────── if self._position != 0: reward += self._check_sl_tp(current_close, current_atr) # ── Application de la nouvelle action ───────────────────────────────── desired_position = self._action_to_position(action) if desired_position != self._position: # Fermeture de la position courante (si ouverte) if self._position != 0: close_reward = self._close_position(current_close, current_atr) reward += close_reward # Ouverture d'une nouvelle position (si pas HOLD) if desired_position != 0: self._open_position(desired_position, current_close, current_atr) else: # Même position : incrémenter le compteur de barres if self._position != 0: self._bars_in_trade += 1 # ── Pénalité drawdown ───────────────────────────────────────────────── if self._capital > self._peak_capital: self._peak_capital = self._capital drawdown = (self._peak_capital - self._capital) / max(self._peak_capital, 1.0) if drawdown > 0: reward -= self.drawdown_penalty * drawdown # ── Avance d'une barre ──────────────────────────────────────────────── self._current_step += 1 terminated = self._current_step >= len(self.df) - 1 truncated = False if terminated: # Fermeture forcée de la position à la fin de l'épisode if self._position != 0: final_close = float(self.df['close'].iloc[-1]) final_atr = float(self._atr_series.iloc[-1]) reward += self._close_position(final_close, final_atr) self._done = True obs = self._get_observation() # Sauvegarde de la récompense pour calcul Sharpe self._pnl_history.append(reward) info = { 'position': self._position, 'capital': self._capital, 'drawdown': drawdown, 'bars_in_trade': self._bars_in_trade, 'step': self._current_step, } return obs, float(reward), terminated, truncated, info def render(self): """Affichage minimal de l'état courant.""" pos_str = {0: 'FLAT', 1: 'LONG', -1: 'SHORT'}.get(self._position, '?') logger.debug( f"Step={self._current_step} | Pos={pos_str} | " f"Capital={self._capital:.2f} | PnL={self._total_pnl:.4f}" ) # ────────────────────────────────────────────────────────────────────────── # Observation # ────────────────────────────────────────────────────────────────────────── def _get_observation(self) -> np.ndarray: """ Construit le vecteur d'observation de 20 features normalisées. Toutes les features de prix sont normalisées par l'ATR courant pour rendre l'observation invariante à l'échelle du prix. Returns: np.ndarray de forme (20,) avec dtype float32 """ i = self._current_step # Sécurité : ne pas dépasser les bornes i = max(LOOKBACK, min(i, len(self.df) - 1)) close = float(self.df['close'].iloc[i]) open_ = float(self.df['open'].iloc[i]) high = float(self.df['high'].iloc[i]) low = float(self.df['low'].iloc[i]) vol = float(self.df['volume'].iloc[i]) atr = float(self._atr_series.iloc[i]) if atr <= 0: atr = close * 0.001 # Close i-1 et i-5 pour le momentum close_1 = float(self.df['close'].iloc[max(0, i - 1)]) close_5 = float(self.df['close'].iloc[max(0, i - 5)]) rsi = float(self._rsi.iloc[i]) if not np.isnan(self._rsi.iloc[i]) else 50.0 bb_upper = float(self._bb_upper.iloc[i]) if not np.isnan(self._bb_upper.iloc[i]) else close bb_lower = float(self._bb_lower.iloc[i]) if not np.isnan(self._bb_lower.iloc[i]) else close macd = float(self._macd.iloc[i]) if not np.isnan(self._macd.iloc[i]) else 0.0 macd_signal = float(self._macd_sig.iloc[i]) if not np.isnan(self._macd_sig.iloc[i]) else 0.0 vol_ma20 = float(self._vol_ma20.iloc[i]) ema9 = float(self._ema9.iloc[i]) ema21 = float(self._ema21.iloc[i]) ema50 = float(self._ema50.iloc[i]) ema200 = float(self._ema200.iloc[i]) vol_ratio = vol / max(vol_ma20, 1.0) # PnL non-réalisé courant if self._position != 0 and self._entry_price > 0: unrealized = self._position * (close - self._entry_price) else: unrealized = 0.0 features = np.array([ # Prix relatifs normalisés par ATR (close - open_) / atr, # 0 : corps de la bougie (high - low) / atr, # 1 : amplitude bougie (volatilité relative) (close - close_1) / atr, # 2 : momentum 1 barre (close - close_5) / atr, # 3 : momentum 5 barres # Indicateurs techniques normalisés rsi / 100.0, # 4 : RSI [0..1] (close - bb_upper) / atr, # 5 : position vs BB haute (bb_upper - bb_lower) / atr, # 6 : largeur des bandes de Bollinger macd / atr, # 7 : MACD normalisé macd_signal / atr, # 8 : signal MACD normalisé # Volume np.clip(vol_ratio, 0.0, 5.0), # 9 : ratio volume / MA20 (plafonné à 5) # Position et état du trade float(self._position), # 10: position courante (-1, 0, 1) unrealized / atr, # 11: PnL non-réalisé en ATR min(self._bars_in_trade / 50.0, 1.0), # 12: durée du trade (normalisée) # Temporel (si index datetime) self._get_hour(i), # 13: heure normalisée [0..1] self._get_dow(i), # 14: jour de semaine [0..1] # Moyennes mobiles (distance close - EMA, normalisée par ATR) (close - ema9) / atr, # 15: distance EMA9 (close - ema21) / atr, # 16: distance EMA21 (ema9 - ema21) / atr, # 17: croisement EMA9/21 (close - ema50) / atr, # 18: distance EMA50 (close - ema200) / atr, # 19: distance EMA200 ], dtype=np.float32) # Clip pour éviter les valeurs extrêmes (protection robustesse) features = np.clip(features, -10.0, 10.0) # Remplacement des NaN résiduels features = np.nan_to_num(features, nan=0.0, posinf=10.0, neginf=-10.0) return features # ────────────────────────────────────────────────────────────────────────── # Gestion des positions # ────────────────────────────────────────────────────────────────────────── def _open_position(self, direction: int, price: float, atr: float) -> None: """ Ouvre une position dans la direction donnée. Args: direction: 1=LONG, -1=SHORT price: Prix d'entrée (close de la barre courante) atr: ATR courant pour le calcul SL/TP """ self._position = direction self._entry_price = price self._entry_atr = atr self._bars_in_trade = 0 def _close_position(self, price: float, atr: float) -> float: """ Ferme la position courante et calcule la récompense. La récompense est le PnL en multiple d'ATR (normalisé et sans unité). Cette normalisation rend la récompense comparable entre différents symboles et périodes de volatilité. Args: price: Prix de sortie atr: ATR courant (pour normalisation) Returns: Récompense (PnL normalisé par ATR) """ if self._position == 0 or self._entry_price <= 0: self._position = 0 self._bars_in_trade = 0 return 0.0 raw_pnl = self._position * (price - self._entry_price) # Normalisation par l'ATR d'entrée pour une récompense sans unité entry_atr = max(self._entry_atr, price * 0.0001) reward = raw_pnl / entry_atr # Mise à jour du capital (simulation simplifiée avec 1 lot fixe) self._capital += raw_pnl self._total_pnl += raw_pnl self._position = 0 self._entry_price = 0.0 self._bars_in_trade = 0 return float(reward) def _check_sl_tp(self, current_close: float, current_atr: float) -> float: """ Vérifie si le SL ou TP est atteint pour la position courante. Utilise l'ATR d'entrée pour les niveaux SL/TP (stabilité). Si le SL ou TP est touché, la position est fermée. Args: current_close: Prix de clôture de la barre courante current_atr: ATR courant Returns: Récompense si fermeture forcée, 0.0 sinon """ if self._position == 0 or self._entry_price <= 0: return 0.0 entry_atr = max(self._entry_atr, self._entry_price * 0.0001) sl_dist = self.sl_atr_mult * entry_atr tp_dist = self.tp_atr_mult * entry_atr if self._position == 1: # LONG sl_level = self._entry_price - sl_dist tp_level = self._entry_price + tp_dist if current_close <= sl_level or current_close >= tp_level: return self._close_position(current_close, current_atr) elif self._position == -1: # SHORT sl_level = self._entry_price + sl_dist tp_level = self._entry_price - tp_dist if current_close >= sl_level or current_close <= tp_level: return self._close_position(current_close, current_atr) return 0.0 # ────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────── @staticmethod def _action_to_position(action: int) -> int: """Convertit l'action discrète en direction de position.""" return {ACTION_HOLD: 0, ACTION_LONG: 1, ACTION_SHORT: -1}.get(action, 0) def _get_hour(self, i: int) -> float: """Retourne l'heure normalisée [0..1] depuis l'index, ou 0.5 si non datetime.""" try: idx = self.df.index[i] if hasattr(idx, 'hour'): return float(idx.hour) / 24.0 except Exception: pass return 0.5 def _get_dow(self, i: int) -> float: """Retourne le jour de semaine normalisé [0..1] depuis l'index, ou 0.5 si non datetime.""" try: idx = self.df.index[i] if hasattr(idx, 'dayofweek'): return float(idx.dayofweek) / 5.0 except Exception: pass return 0.5 def _compute_atr_series(self) -> pd.Series: """Calcule l'ATR(14) sur toute la série OHLCV.""" h = self.df['high'] l = self.df['low'] pc = self.df['close'].shift(1) tr = pd.concat([h - l, (h - pc).abs(), (l - pc).abs()], axis=1).max(axis=1) atr = tr.ewm(span=self.atr_period, adjust=False).mean() # Remplir les premières valeurs NaN par la plage high-low atr = atr.fillna(h - l) return atr def _compute_rsi(self, period: int = 14) -> pd.Series: """Calcule le RSI sur toute la série de closes.""" delta = self.df['close'].diff() gain = delta.clip(lower=0).ewm(span=period, adjust=False).mean() loss = (-delta.clip(upper=0)).ewm(span=period, adjust=False).mean() rs = gain / loss.replace(0, np.nan) rsi = 100.0 - (100.0 / (1.0 + rs)) return rsi.fillna(50.0) def get_episode_stats(self) -> dict: """ Retourne les statistiques de l'épisode courant. Utile pour le logging et l'évaluation du modèle PPO. Returns: Dict avec total_pnl, n_steps, capital, Sharpe approximatif """ rewards = np.array(self._pnl_history) if len(rewards) > 1 and rewards.std() > 0: sharpe = float(rewards.mean() / rewards.std() * np.sqrt(252)) else: sharpe = 0.0 return { 'total_pnl': self._total_pnl, 'capital': self._capital, 'return_pct': (self._capital - self.initial_capital) / self.initial_capital, 'n_steps': self._current_step, 'sharpe_approx': sharpe, }