- PROJECT_STATUS.md : réécriture complète — phases 1-4b terminées à 100%, routes API exhaustives, fixes critiques documentés, à-faire priorisé - STRATEGY_GUIDE.md : ajout section ML-Driven Strategy avec features, labels, usage API et paramètres de configuration - AI_FRAMEWORK.md : ajout section ML-Driven + tableau statut implémentation, différenciation HMM/Optuna/MLStrategy - ARCHITECTURE.md : ajout structure réelle du code avec les nouveaux fichiers ml_strategy_model.py, features/, ml_driven/ annotés [NOUVEAU] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
28 KiB
28 KiB
🏗️ Architecture Détaillée - Trading AI Secure
📋 Table des Matières
- Vue d'ensemble
- Architecture Globale
- Modules Core
- Flux de Données
- Patterns et Principes
- Sécurité
- Scalabilité
🎯 Vue d'ensemble
Principes Architecturaux
- Separation of Concerns : Chaque module a une responsabilité unique
- Dependency Injection : Facilite tests et modularité
- Event-Driven : Communication asynchrone entre composants
- Fail-Safe : Dégradation gracieuse en cas d'erreur
- Observable : Monitoring et logging à tous les niveaux
Stack Technologique
Backend:
Language: Python 3.11+
Framework: FastAPI
Async: asyncio + threading
Data:
Storage: PostgreSQL (positions, trades)
Cache: Redis (market data, signals)
Time-Series: InfluxDB (métriques)
ML/AI:
Core: scikit-learn, XGBoost, LightGBM
Optimization: Optuna
Deep Learning: TensorFlow/PyTorch (optionnel)
Monitoring:
Metrics: Prometheus
Visualization: Grafana
Logging: ELK Stack (Elasticsearch, Logstash, Kibana)
UI:
Dashboard: Streamlit
API Docs: Swagger/OpenAPI
🏛️ Architecture Globale
Diagramme de Haut Niveau
┌─────────────────────────────────────────────────────────────────┐
│ TRADING AI SECURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ UI LAYER │ │ API LAYER │ │ MONITORING │ │
│ │ │ │ │ │ │ │
│ │ Streamlit │ │ FastAPI │ │ Prometheus │ │
│ │ Dashboard │ │ REST API │ │ Grafana │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌────────────────────────▼────────────────────────┐ │
│ │ ORCHESTRATION LAYER │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Strategy │ │ Risk │ │ │
│ │ │ Engine │ │ Manager │ │ │
│ │ │ (Singleton) │ │ (Singleton) │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ └──────────────────┼──────────────────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────────────────┐ │
│ │ CORE SERVICES │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Data │ │ ML │ │ Order │ │ │
│ │ │ Service │ │ Engine │ │ Manager │ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │
│ └───────┼─────────────┼─────────────┼─────────────┘ │
│ │ │ │ │
│ ┌───────▼─────────────▼─────────────▼─────────────┐ │
│ │ DATA & INTEGRATION LAYER │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Market │ │ IG │ │ Cache │ │ │
│ │ │ Data │ │ API │ │ (Redis) │ │ │
│ │ │ Sources │ │Connector │ │ │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ PERSISTENCE LAYER │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │PostgreSQL│ │ InfluxDB │ │ File │ │ │
│ │ │(Trades, │ │(Metrics, │ │ System │ │ │
│ │ │Positions)│ │TimeSeries│ │ (Logs) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
🔧 Modules Core
1. Strategy Engine
Responsabilité : Orchestration des stratégies de trading
# src/core/strategy_engine.py
class StrategyEngine:
"""
Moteur central de gestion des stratégies
Responsabilités:
- Charger et initialiser stratégies
- Distribuer données marché
- Collecter signaux
- Coordonner exécution
"""
def __init__(self):
self.strategies: Dict[str, BaseStrategy] = {}
self.active_signals: List[Signal] = []
self.risk_manager = RiskManager()
self.order_manager = OrderManager()
async def run(self):
"""Boucle principale"""
while True:
# 1. Récupérer données marché
market_data = await self.fetch_market_data()
# 2. Analyser avec chaque stratégie
signals = await self.analyze_strategies(market_data)
# 3. Filtrer avec risk manager
valid_signals = self.risk_manager.filter_signals(signals)
# 4. Exécuter signaux valides
await self.execute_signals(valid_signals)
# 5. Mettre à jour positions
await self.update_positions()
# 6. Sleep jusqu'à prochaine itération
await asyncio.sleep(self.interval)
2. Risk Manager (Singleton)
Responsabilité : Gestion centralisée du risque
# src/core/risk_manager.py
class RiskManager:
"""
Singleton Risk Manager
Garantit:
- Une seule instance
- État global cohérent
- Thread-safe
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.initialized = True
self.positions = {}
self.portfolio_value = 0.0
self.peak_value = 0.0
# ... autres attributs
3. Data Service
Responsabilité : Abstraction des sources de données
# src/data/data_service.py
class DataService:
"""
Service unifié d'accès aux données
Features:
- Multi-source avec failover
- Cache intelligent
- Validation données
- Rate limiting
"""
def __init__(self):
self.sources = self._initialize_sources()
self.cache = RedisCache()
self.validator = DataValidator()
async def get_market_data(
self,
symbol: str,
timeframe: str,
start: datetime,
end: datetime
) -> pd.DataFrame:
"""
Récupère données marché avec failover
"""
# 1. Check cache
cached = await self.cache.get(symbol, timeframe, start, end)
if cached:
return cached
# 2. Essayer sources par priorité
for source in self.sources:
try:
data = await source.fetch(symbol, timeframe, start, end)
# 3. Valider
if self.validator.validate(data):
# 4. Cache
await self.cache.set(symbol, timeframe, data)
return data
except Exception as e:
logger.warning(f"Source {source.name} failed: {e}")
continue
raise DataUnavailableError("All sources failed")
4. ML Engine
Responsabilité : Intelligence artificielle adaptative
# src/ml/ml_engine.py
class MLEngine:
"""
Moteur ML adaptatif
Features:
- Ensemble de modèles
- Auto-retraining
- Parameter optimization
- Regime detection
"""
def __init__(self):
self.models = self._initialize_models()
self.optimizer = OptunaOptimizer()
self.regime_detector = RegimeDetector()
async def predict(self, features: pd.DataFrame) -> Dict:
"""
Prédiction avec ensemble
"""
# 1. Détecter régime
regime = self.regime_detector.detect(features)
# 2. Sélectionner modèles selon régime
active_models = self._select_models(regime)
# 3. Prédictions individuelles
predictions = []
for model in active_models:
pred = model.predict(features)
predictions.append(pred)
# 4. Agrégation (stacking)
final_prediction = self._aggregate(predictions)
return {
'prediction': final_prediction,
'confidence': self._calculate_confidence(predictions),
'regime': regime
}
async def optimize_daily(self):
"""
Optimisation quotidienne des paramètres
"""
# 1. Récupérer performance récente
recent_performance = self._get_recent_performance()
# 2. Détecter drift
if self._detect_drift(recent_performance):
# 3. Lancer optimisation Optuna
new_params = await self.optimizer.optimize()
# 4. Valider avec backtesting
if self._validate_params(new_params):
# 5. Appliquer nouveaux paramètres
self._update_parameters(new_params)
🔄 Flux de Données
Flux Principal (Trading Loop)
┌─────────────────────────────────────────────────────────────┐
│ TRADING LOOP (60s) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. FETCH MARKET DATA │
│ ├─ Check cache │
│ ├─ Fetch from sources (failover) │
│ └─ Validate & store │
│ │
│ 2. ANALYZE STRATEGIES │
│ ├─ Calculate indicators │
│ ├─ ML predictions │
│ ├─ Generate signals │
│ └─ Calculate confidence │
│ │
│ 3. RISK VALIDATION │
│ ├─ Check portfolio risk │
│ ├─ Validate position size │
│ ├─ Check correlation │
│ ├─ Verify margin │
│ └─ Circuit breakers │
│ │
│ 4. ORDER EXECUTION │
│ ├─ Place orders (IG API) │
│ ├─ Confirm execution │
│ └─ Update positions │
│ │
│ 5. MONITORING │
│ ├─ Update metrics │
│ ├─ Check alerts │
│ └─ Log events │
│ │
│ 6. SLEEP (until next iteration) │
│ │
└─────────────────────────────────────────────────────────────┘
Flux Optimisation (Daily)
┌─────────────────────────────────────────────────────────────┐
│ OPTIMIZATION LOOP (Daily 00:00) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. COLLECT PERFORMANCE DATA │
│ ├─ Last 30 days trades │
│ ├─ Calculate metrics │
│ └─ Detect drift │
│ │
│ 2. PARAMETER OPTIMIZATION (if drift detected) │
│ ├─ Define search space │
│ ├─ Run Optuna (Bayesian) │
│ ├─ Backtest candidates │
│ └─ Select best parameters │
│ │
│ 3. VALIDATION │
│ ├─ Walk-forward analysis │
│ ├─ Monte Carlo simulation │
│ └─ Out-of-sample test │
│ │
│ 4. A/B TESTING │
│ ├─ Deploy variant in paper trading │
│ ├─ Monitor 7 days │
│ └─ Compare vs control │
│ │
│ 5. DEPLOYMENT (if validated) │
│ ├─ Update strategy parameters │
│ ├─ Retrain ML models │
│ └─ Notify operators │
│ │
└─────────────────────────────────────────────────────────────┘
🎨 Patterns et Principes
Design Patterns Utilisés
1. Singleton (Risk Manager)
class RiskManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
Pourquoi : Garantir une seule instance pour état global cohérent
2. Strategy Pattern (Stratégies de Trading)
class BaseStrategy(ABC):
@abstractmethod
def analyze(self, data):
pass
class ScalpingStrategy(BaseStrategy):
def analyze(self, data):
# Implémentation scalping
pass
class IntradayStrategy(BaseStrategy):
def analyze(self, data):
# Implémentation intraday
pass
Pourquoi : Facilite ajout de nouvelles stratégies
3. Observer Pattern (Events)
class EventBus:
def __init__(self):
self.subscribers = {}
def subscribe(self, event_type, callback):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(callback)
def publish(self, event_type, data):
for callback in self.subscribers.get(event_type, []):
callback(data)
# Usage
event_bus.subscribe('trade_executed', log_trade)
event_bus.subscribe('trade_executed', update_metrics)
event_bus.publish('trade_executed', trade_data)
Pourquoi : Découplage entre composants
4. Factory Pattern (Création Stratégies)
class StrategyFactory:
@staticmethod
def create(strategy_type: str, config: Dict) -> BaseStrategy:
if strategy_type == 'scalping':
return ScalpingStrategy(config)
elif strategy_type == 'intraday':
return IntradayStrategy(config)
elif strategy_type == 'swing':
return SwingStrategy(config)
else:
raise ValueError(f"Unknown strategy: {strategy_type}")
Pourquoi : Centralise logique de création
Principes SOLID
- Single Responsibility : Chaque classe une responsabilité
- Open/Closed : Ouvert extension, fermé modification
- Liskov Substitution : Sous-classes substituables
- Interface Segregation : Interfaces spécifiques
- Dependency Inversion : Dépendre d'abstractions
🔒 Sécurité
Niveaux de Sécurité
┌─────────────────────────────────────────────────────────────┐
│ SECURITY LAYERS │
├─────────────────────────────────────────────────────────────┤
│ │
│ LAYER 1: Authentication & Authorization │
│ ├─ API Key management │
│ ├─ OAuth 2.0 (IG Markets) │
│ └─ Role-based access control │
│ │
│ LAYER 2: Data Encryption │
│ ├─ Credentials encrypted at rest │
│ ├─ TLS/SSL for API calls │
│ └─ Database encryption │
│ │
│ LAYER 3: Input Validation │
│ ├─ Pydantic models │
│ ├─ SQL injection prevention │
│ └─ XSS protection │
│ │
│ LAYER 4: Rate Limiting │
│ ├─ API rate limiting │
│ ├─ Brute force protection │
│ └─ DDoS mitigation │
│ │
│ LAYER 5: Audit & Monitoring │
│ ├─ All actions logged │
│ ├─ Anomaly detection │
│ └─ Security alerts │
│ │
└─────────────────────────────────────────────────────────────┘
📈 Scalabilité
Stratégies de Scaling
Horizontal Scaling
┌─────────────────────────────────────────────────────────────┐
│ HORIZONTAL SCALING STRATEGY │
├─────────────────────────────────────────────────────────────┤
│ │
│ Load Balancer │
│ │ │
│ ├─── Instance 1 (Scalping) │
│ ├─── Instance 2 (Intraday) │
│ └─── Instance 3 (Swing) │
│ │
│ Shared: │
│ ├─ Redis (Cache) │
│ ├─ PostgreSQL (Positions) │
│ └─ Message Queue (RabbitMQ) │
│ │
└─────────────────────────────────────────────────────────────┘
Vertical Scaling
- Augmenter RAM pour cache plus large
- Plus de CPU cores pour ML parallèle
- SSD NVMe pour I/O rapide
Structure Réelle du Code (2026-03-08)
src/
├── api/
│ ├── app.py # FastAPI lifespan (init DB + RiskManager)
│ └── routers/
│ ├── health.py # GET /health, /ready
│ └── trading.py # Toutes les routes trading
│
├── core/
│ ├── risk_manager.py # Singleton VaR/CVaR/circuit breakers
│ ├── notifications.py # Telegram + Email
│ └── strategy_engine.py # Orchestration des stratégies
│
├── data/
│ ├── data_service.py # Agrégation des sources
│ ├── yahoo_finance_connector.py
│ ├── alpha_vantage_connector.py
│ ├── data_validator.py
│ └── base_data_source.py
│
├── db/
│ ├── models.py # Trade, OHLCVData, BacktestResult, MLModelMeta
│ └── session.py # SQLAlchemy engine, get_db(), init_db()
│
├── ml/
│ ├── ml_engine.py # MLEngine (intégré à StrategyEngine)
│ ├── regime_detector.py # HMM — 3 régimes (trend/range/volatile)
│ ├── feature_engineering.py # FeatureEngineering (50+ features)
│ ├── parameter_optimizer.py # Optuna TPE Sampler + walk-forward
│ ├── walk_forward.py # WalkForwardAnalyzer
│ ├── position_sizing.py # Kelly Criterion
│ ├── service.py # Microservice ML FastAPI (port 8200)
│ ├── ml_strategy_model.py # [NOUVEAU] XGBoost/LightGBM sur features TA
│ └── features/
│ ├── technical_features.py # [NOUVEAU] TechnicalFeatureBuilder (~50 features)
│ └── label_generator.py # [NOUVEAU] Labels LONG/SHORT/NEUTRAL
│
├── strategies/
│ ├── base_strategy.py # ABC + Signal + StrategyConfig
│ ├── scalping/
│ │ └── scalping_strategy.py # BB + RSI + MACD + ATR
│ ├── intraday/
│ │ └── intraday_strategy.py
│ ├── swing/
│ │ └── swing_strategy.py
│ └── ml_driven/ # [NOUVEAU]
│ └── ml_strategy.py # MLDrivenStrategy — XGBoost pilote les signaux
│
├── backtesting/
│ ├── backtest_engine.py
│ ├── paper_trading.py
│ └── metrics_calculator.py
│
└── ui/
├── dashboard.py # Streamlit — 5 onglets
├── api_client.py # Client httpx vers trading-api
└── pages/
├── live_trading.py
├── ml_monitor.py
└── analytics.py # Monte Carlo
models/
└── ml_strategy/ # [NOUVEAU] Modèles ML-Driven sauvegardés
├── EURUSD_1h_xgboost.joblib
└── EURUSD_1h_xgboost_meta.json
Architecture complète et évolutive !