from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from sqlalchemy.ext.asyncio import AsyncSession from app.core.backtester import Backtester from app.core.database import get_db from app.core.strategy.order_block_sweep import OrderBlockSweepParams, OrderBlockSweepStrategy from app.core.strategy.htf_trend_filter import HTFTrendFilter, HTFTrendFilterParams, HTF_MAP from app.models.backtest_result import BacktestResult from app.services.market_data import MarketDataService router = APIRouter(prefix="/backtest", tags=["backtest"]) class BacktestRequest(BaseModel): instrument: str = Field(default="EUR_USD") granularity: str = Field(default="H1") candle_count: int = Field(default=500, ge=100, le=5000) initial_balance: float = Field(default=10_000.0, gt=0) risk_percent: float = Field(default=1.0, gt=0, le=10) rr_ratio: float = Field(default=2.0, ge=1.0, le=10.0) swing_strength: int = Field(default=5, ge=2, le=20) liquidity_tolerance_pips: float = Field(default=2.0, ge=0.5) spread_pips: float = Field(default=1.5, ge=0) # Sélection des stratégies et filtres strategies: list[str] = Field(default=["order_block_sweep"]) filters: list[str] = Field(default=[]) htf_ema_fast: int = Field(default=20, ge=5, le=100) htf_ema_slow: int = Field(default=50, ge=10, le=200) @router.post("") async def run_backtest( req: BacktestRequest, db: AsyncSession = Depends(get_db), ): service = MarketDataService(db) # Fetch données historiques (yfinance + TwelveData + cache DB) try: df = await service.get_candles(req.instrument, req.granularity, req.candle_count) except Exception as e: raise HTTPException(502, f"Erreur fetch données: {e}") if len(df) < 150: raise HTTPException(400, "Pas assez de données (min 150 bougies)") # Construire la stratégie selon la sélection strategies = [] if "order_block_sweep" in req.strategies: params = OrderBlockSweepParams( swing_strength=req.swing_strength, liquidity_tolerance_pips=req.liquidity_tolerance_pips, rr_ratio=req.rr_ratio, ) strategies.append(OrderBlockSweepStrategy(params)) if not strategies: raise HTTPException(400, "Au moins une stratégie doit être sélectionnée") # Pour l'instant une seule stratégie supportée strategy = strategies[0] # Construire les filtres active_filters = [] htf_trend_info: Optional[dict] = None if "htf_trend_filter" in req.filters: htf_gran = HTF_MAP.get(req.granularity, req.granularity) try: htf_df = await service.get_candles(req.instrument, htf_gran, 200) except Exception as e: raise HTTPException(502, f"Erreur fetch données HTF ({htf_gran}): {e}") htf_filter = HTFTrendFilter(HTFTrendFilterParams( ema_fast=req.htf_ema_fast, ema_slow=req.htf_ema_slow, )) trend = htf_filter.determine_trend(htf_df) active_filters.append(htf_filter) htf_trend_info = { "htf_granularity": htf_gran, "trend": trend, "ema_fast": req.htf_ema_fast, "ema_slow": req.htf_ema_slow, } # Lancer le backtest backtester = Backtester( strategy=strategy, initial_balance=req.initial_balance, risk_percent=req.risk_percent, spread_pips=req.spread_pips, filters=active_filters, ) metrics = backtester.run(df) # Sauvegarder en BDD strategy_params = strategy.get_params() if htf_trend_info: strategy_params["htf_trend_filter"] = htf_trend_info result = BacktestResult( instrument=req.instrument, granularity=req.granularity, start_date=df.iloc[0]["time"], end_date=df.iloc[-1]["time"], initial_balance=metrics.initial_balance, final_balance=metrics.final_balance, total_pnl=metrics.total_pnl, total_trades=metrics.total_trades, winning_trades=metrics.winning_trades, losing_trades=metrics.losing_trades, win_rate=metrics.win_rate, max_drawdown=metrics.max_drawdown, sharpe_ratio=metrics.sharpe_ratio, expectancy=metrics.expectancy, equity_curve=metrics.equity_curve, strategy_params=strategy_params, ) db.add(result) await db.commit() await db.refresh(result) # Formater les trades pour la réponse trades_out = [ { "direction": t.direction, "entry_price": t.entry_price, "exit_price": t.exit_price, "stop_loss": t.stop_loss, "take_profit": t.take_profit, "entry_time": str(t.entry_time), "exit_time": str(t.exit_time) if t.exit_time else None, "pnl_pips": t.pnl_pips, "status": t.status, "signal_type": t.signal_type, "reason": t.reason, "order_block": t.order_block, "liquidity_level": t.liquidity_level, } for t in metrics.trades ] # Candles utilisées pour le backtest (pour le chart frontend) candles_out = df[["time", "open", "high", "low", "close", "volume"]].copy() candles_out["time"] = candles_out["time"].astype(str) candles_list = candles_out.to_dict(orient="records") return { "backtest_id": result.id, "instrument": req.instrument, "granularity": req.granularity, "period": {"start": str(df.iloc[0]["time"]), "end": str(df.iloc[-1]["time"])}, "metrics": { "initial_balance": metrics.initial_balance, "final_balance": round(metrics.final_balance, 2), "total_pnl": round(metrics.total_pnl, 2), "total_pnl_pct": round(metrics.total_pnl_pct, 2), "total_trades": metrics.total_trades, "winning_trades": metrics.winning_trades, "losing_trades": metrics.losing_trades, "win_rate": round(metrics.win_rate, 1), "avg_win_pips": round(metrics.avg_win_pips, 1), "avg_loss_pips": round(metrics.avg_loss_pips, 1), "expectancy": round(metrics.expectancy, 2), "max_drawdown": round(metrics.max_drawdown, 2), "max_drawdown_pct": round(metrics.max_drawdown_pct, 2), "sharpe_ratio": round(metrics.sharpe_ratio, 3), "profit_factor": round(metrics.profit_factor, 2), }, "equity_curve": metrics.equity_curve, "trades": trades_out, "candles": candles_list, "order_blocks": metrics.all_order_blocks, "liquidity_levels": metrics.all_liquidity_levels, "htf_trend": htf_trend_info, } @router.get("/strategies") async def list_strategies(): """Retourne les stratégies et filtres disponibles.""" return { "strategies": [ { "name": "order_block_sweep", "label": "Order Block + Liquidity Sweep", "description": "Stratégie ICT : détection Equal H/L, sweep et entrée sur OB", }, ], "filters": [ { "name": "htf_trend_filter", "label": "Filtre tendance HTF", "description": "Ne prendre que les trades dans le sens de la tendance du timeframe supérieur (EMA crossover)", }, ], } @router.get("/history") async def get_backtest_history(db: AsyncSession = Depends(get_db)): from sqlalchemy import select stmt = select(BacktestResult).order_by(BacktestResult.created_at.desc()).limit(20) result = await db.execute(stmt) rows = result.scalars().all() return [ { "id": r.id, "instrument": r.instrument, "granularity": r.granularity, "total_pnl": r.total_pnl, "win_rate": r.win_rate, "total_trades": r.total_trades, "sharpe_ratio": r.sharpe_ratio, "created_at": str(r.created_at), } for r in rows ]