feat: trading bot MVP — ICT Order Block + Liquidity Sweep strategy

Full-stack trading bot with:
- FastAPI backend with ICT strategy (Order Block + Liquidity Sweep detection)
- Backtester engine with rolling window, spread simulation, and performance metrics
- Hybrid market data service (yfinance + TwelveData with rate limiting + SQLite cache)
- Simulated exchange for paper trading
- React/TypeScript frontend with TradingView lightweight-charts v5
- Live dashboard with candlestick chart, OHLC legend, trade markers
- Backtest page with configurable parameters, equity curve, and trade table
- WebSocket support for real-time updates
- Bot runner with asyncio loop for automated trading

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-24 23:25:51 +01:00
commit 4df8d53b1a
58 changed files with 7484 additions and 0 deletions

View File

View File

@@ -0,0 +1,66 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
import pandas as pd
@dataclass
class OrderBlockZone:
"""Zone Order Block à surveiller."""
id: str
direction: str # "bullish" | "bearish"
top: float # Haut de la zone
bottom: float # Bas de la zone
origin_time: pd.Timestamp
mitigated: bool = False # True si le prix a traversé la zone
@dataclass
class LiquidityLevel:
"""Niveau de liquidité (Equal H/L)."""
id: str
direction: str # "high" | "low"
price: float
origin_time: pd.Timestamp
swept: bool = False # True si le prix a dépassé ce niveau
@dataclass
class TradeSignal:
"""Signal de trading généré par la stratégie."""
direction: str # "buy" | "sell"
entry_price: float
stop_loss: float
take_profit: float
signal_type: str # description du setup
time: pd.Timestamp
order_block: Optional[OrderBlockZone] = None
liquidity_level: Optional[LiquidityLevel] = None
@dataclass
class AnalysisResult:
"""Résultat complet de l'analyse de la stratégie."""
order_blocks: list[OrderBlockZone] = field(default_factory=list)
liquidity_levels: list[LiquidityLevel] = field(default_factory=list)
signals: list[TradeSignal] = field(default_factory=list)
class AbstractStrategy(ABC):
"""Interface commune pour toutes les stratégies."""
@abstractmethod
def analyze(self, df: pd.DataFrame) -> AnalysisResult:
"""
Analyse un DataFrame de candles et retourne les zones,
niveaux de liquidité et signaux d'entrée.
df doit avoir les colonnes : time, open, high, low, close, volume
"""
...
@abstractmethod
def get_params(self) -> dict:
"""Retourne les paramètres configurables de la stratégie."""
...

View File

@@ -0,0 +1,381 @@
"""
Stratégie ICT — Order Block + Liquidity Sweep
Logique :
1. Détecter les swing highs/lows (N bougies de chaque côté)
2. Identifier les niveaux de liquidité : Equal Highs (EQH) / Equal Lows (EQL)
- Deux swing H/L proches dans une tolérance de X pips = pool de liquidité
3. Détecter un Liquidity Sweep :
- Le wick d'une bougie dépasse un EQH/EQL, mais la bougie close de l'autre côté
→ Confirmation que la liquidité a été absorbée (stop hunt)
4. Identifier l'Order Block dans la direction du renversement :
- Bullish OB : dernière bougie bearish (close < open) avant le mouvement impulsif haussier
- Bearish OB : dernière bougie bullish (close > open) avant le mouvement impulsif bearish
5. Signal d'entrée : le prix revient dans la zone OB après le sweep
6. SL : en dessous du bas de l'OB (buy) ou au-dessus du haut (sell)
7. TP : niveau de liquidité opposé ou R:R fixe
"""
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from app.core.strategy.base import (
AbstractStrategy,
AnalysisResult,
LiquidityLevel,
OrderBlockZone,
TradeSignal,
)
@dataclass
class OrderBlockSweepParams:
swing_strength: int = 5 # N bougies de chaque côté pour valider un swing
liquidity_tolerance_pips: float = 2.0 # Tolérance en pips pour Equal H/L
pip_value: float = 0.0001 # Valeur d'un pip (0.0001 pour Forex, 0.01 pour JPY)
min_impulse_candles: int = 3 # Nombre min de bougies dans l'impulsion
min_impulse_factor: float = 1.5 # Taille impulsion vs bougie moyenne
rr_ratio: float = 2.0 # Ratio Risk/Reward pour le TP
class OrderBlockSweepStrategy(AbstractStrategy):
def __init__(self, params: Optional[OrderBlockSweepParams] = None) -> None:
self.params = params or OrderBlockSweepParams()
def get_params(self) -> dict:
return {
"swing_strength": self.params.swing_strength,
"liquidity_tolerance_pips": self.params.liquidity_tolerance_pips,
"pip_value": self.params.pip_value,
"min_impulse_candles": self.params.min_impulse_candles,
"min_impulse_factor": self.params.min_impulse_factor,
"rr_ratio": self.params.rr_ratio,
}
def analyze(self, df: pd.DataFrame) -> AnalysisResult:
if len(df) < self.params.swing_strength * 2 + 5:
return AnalysisResult()
df = df.copy().reset_index(drop=True)
swings = self._detect_swings(df)
liquidity_levels = self._detect_liquidity(df, swings)
order_blocks = self._detect_order_blocks(df)
signals = self._detect_signals(df, liquidity_levels, order_blocks)
return AnalysisResult(
order_blocks=order_blocks,
liquidity_levels=liquidity_levels,
signals=signals,
)
# ─── Swing Detection ──────────────────────────────────────────────────────
def _detect_swings(self, df: pd.DataFrame) -> pd.DataFrame:
"""Retourne un DataFrame avec colonnes swing_high et swing_low (bool)."""
n = self.params.swing_strength
highs = df["high"].values
lows = df["low"].values
swing_high = [False] * len(df)
swing_low = [False] * len(df)
for i in range(n, len(df) - n):
# Swing High : plus haut que les N bougies avant et après
if all(highs[i] > highs[i - j] for j in range(1, n + 1)) and \
all(highs[i] > highs[i + j] for j in range(1, n + 1)):
swing_high[i] = True
# Swing Low : plus bas que les N bougies avant et après
if all(lows[i] < lows[i - j] for j in range(1, n + 1)) and \
all(lows[i] < lows[i + j] for j in range(1, n + 1)):
swing_low[i] = True
df = df.copy()
df["swing_high"] = swing_high
df["swing_low"] = swing_low
return df
# ─── Liquidity Detection ──────────────────────────────────────────────────
def _detect_liquidity(
self, df: pd.DataFrame, swings: pd.DataFrame
) -> list[LiquidityLevel]:
"""
Identifie les Equal Highs (EQH) et Equal Lows (EQL).
Deux swing H/L sont "égaux" si leur différence est < tolerance_pips.
"""
tol = self.params.liquidity_tolerance_pips * self.params.pip_value
levels: list[LiquidityLevel] = []
swing_highs = swings[swings["swing_high"]].copy()
swing_lows = swings[swings["swing_low"]].copy()
# Equal Highs
sh_prices = swing_highs["high"].values
sh_times = swing_highs["time"].values
for i in range(len(sh_prices)):
for j in range(i + 1, len(sh_prices)):
if abs(sh_prices[i] - sh_prices[j]) <= tol:
level_price = (sh_prices[i] + sh_prices[j]) / 2
# Vérifier si déjà sweepé dans les données actuelles
swept = self._is_level_swept(df, level_price, "high", sh_times[j])
levels.append(LiquidityLevel(
id=f"EQH_{i}_{j}",
direction="high",
price=level_price,
origin_time=pd.Timestamp(sh_times[j]),
swept=swept,
))
# Equal Lows
sl_prices = swing_lows["low"].values
sl_times = swing_lows["time"].values
for i in range(len(sl_prices)):
for j in range(i + 1, len(sl_prices)):
if abs(sl_prices[i] - sl_prices[j]) <= tol:
level_price = (sl_prices[i] + sl_prices[j]) / 2
swept = self._is_level_swept(df, level_price, "low", sl_times[j])
levels.append(LiquidityLevel(
id=f"EQL_{i}_{j}",
direction="low",
price=level_price,
origin_time=pd.Timestamp(sl_times[j]),
swept=swept,
))
return levels
def _is_level_swept(
self,
df: pd.DataFrame,
price: float,
direction: str,
after_time,
) -> bool:
"""Vérifie si un niveau a été sweepé après sa formation."""
mask = df["time"] > pd.Timestamp(after_time)
future = df[mask]
if future.empty:
return False
if direction == "high":
return bool((future["high"] > price).any())
return bool((future["low"] < price).any())
# ─── Order Block Detection ────────────────────────────────────────────────
def _detect_order_blocks(self, df: pd.DataFrame) -> list[OrderBlockZone]:
"""
Détecte les Order Blocks :
- Bullish OB : dernière bougie bearish avant une impulsion haussière significative
- Bearish OB : dernière bougie bullish avant une impulsion bearish significative
"""
blocks: list[OrderBlockZone] = []
min_imp = self.params.min_impulse_candles
factor = self.params.min_impulse_factor
avg_body = (df["close"] - df["open"]).abs().mean()
for i in range(1, len(df) - min_imp):
# Chercher une impulsion haussière après la bougie i
impulse_up = self._is_impulse(df, i, "up", min_imp, factor, avg_body)
if impulse_up:
# Chercher la dernière bougie bearish avant i (inclus)
for k in range(i, max(0, i - 10) - 1, -1):
if df.loc[k, "close"] < df.loc[k, "open"]: # bearish
mitigated = self._is_ob_mitigated(df, k, "bullish", i + min_imp)
blocks.append(OrderBlockZone(
id=f"BullishOB_{k}",
direction="bullish",
top=df.loc[k, "open"], # top = open de la bougie bearish
bottom=df.loc[k, "low"],
origin_time=df.loc[k, "time"],
mitigated=mitigated,
))
break
# Chercher une impulsion bearish après la bougie i
impulse_down = self._is_impulse(df, i, "down", min_imp, factor, avg_body)
if impulse_down:
for k in range(i, max(0, i - 10) - 1, -1):
if df.loc[k, "close"] > df.loc[k, "open"]: # bullish
mitigated = self._is_ob_mitigated(df, k, "bearish", i + min_imp)
blocks.append(OrderBlockZone(
id=f"BearishOB_{k}",
direction="bearish",
top=df.loc[k, "high"],
bottom=df.loc[k, "open"], # bottom = open de la bougie bullish
origin_time=df.loc[k, "time"],
mitigated=mitigated,
))
break
# Dédupliquer (garder le plus récent par zone)
return self._deduplicate_obs(blocks)
def _is_impulse(
self,
df: pd.DataFrame,
start: int,
direction: str,
min_candles: int,
factor: float,
avg_body: float,
) -> bool:
"""Vérifie si une impulsion directionnelle commence à l'index start."""
end = min(start + min_candles, len(df))
segment = df.iloc[start:end]
if len(segment) < min_candles:
return False
total_move = abs(segment.iloc[-1]["close"] - segment.iloc[0]["open"])
if total_move < avg_body * factor:
return False
if direction == "up":
return segment.iloc[-1]["close"] > segment.iloc[0]["open"]
return segment.iloc[-1]["close"] < segment.iloc[0]["open"]
def _is_ob_mitigated(
self,
df: pd.DataFrame,
ob_idx: int,
direction: str,
after_idx: int,
) -> bool:
"""Vérifie si un OB a été mitié (prix est revenu dans la zone)."""
top = df.loc[ob_idx, "open"] if direction == "bullish" else df.loc[ob_idx, "high"]
bottom = df.loc[ob_idx, "low"] if direction == "bullish" else df.loc[ob_idx, "open"]
future = df.iloc[after_idx:]
if direction == "bullish":
return bool(((future["low"] <= top) & (future["low"] >= bottom)).any())
return bool(((future["high"] >= bottom) & (future["high"] <= top)).any())
def _deduplicate_obs(self, blocks: list[OrderBlockZone]) -> list[OrderBlockZone]:
"""Supprime les OB en double (même direction et zone proche)."""
seen_ids: set[str] = set()
unique: list[OrderBlockZone] = []
for b in blocks:
if b.id not in seen_ids:
seen_ids.add(b.id)
unique.append(b)
return unique
# ─── Signal Detection ─────────────────────────────────────────────────────
def _detect_signals(
self,
df: pd.DataFrame,
liquidity_levels: list[LiquidityLevel],
order_blocks: list[OrderBlockZone],
) -> list[TradeSignal]:
"""
Génère des signaux quand :
1. Un niveau de liquidité est sweepé (wick dépasse + close de l'autre côté)
2. Le prix revient dans un OB de direction opposée au sweep
"""
signals: list[TradeSignal] = []
# Travailler sur les 50 dernières bougies pour les signaux récents
lookback = df.tail(50).copy()
if len(lookback) < 2:
return signals
for level in liquidity_levels:
if level.swept:
continue
for i in range(1, len(lookback)):
candle = lookback.iloc[i]
# Sweep d'un Equal High → signal SELL potentiel
if level.direction == "high":
sweep = (
candle["high"] > level.price and # wick dépasse
candle["close"] < level.price # close en dessous
)
if sweep:
signal = self._find_entry_signal(
df=lookback,
sweep_idx=i,
direction="sell",
swept_level=level,
order_blocks=order_blocks,
)
if signal:
signals.append(signal)
break # Un seul signal par niveau
# Sweep d'un Equal Low → signal BUY potentiel
elif level.direction == "low":
sweep = (
candle["low"] < level.price and # wick dépasse
candle["close"] > level.price # close au-dessus
)
if sweep:
signal = self._find_entry_signal(
df=lookback,
sweep_idx=i,
direction="buy",
swept_level=level,
order_blocks=order_blocks,
)
if signal:
signals.append(signal)
break # Un seul signal par niveau
return signals
def _find_entry_signal(
self,
df: pd.DataFrame,
sweep_idx: int,
direction: str,
swept_level: LiquidityLevel,
order_blocks: list[OrderBlockZone],
) -> Optional[TradeSignal]:
"""
Cherche un OB valide dans la direction du signal après le sweep.
"""
sweep_time = df.iloc[sweep_idx]["time"]
sweep_price = df.iloc[sweep_idx]["close"]
# Chercher un OB non mitié dans la bonne direction
ob_direction = "bullish" if direction == "buy" else "bearish"
candidate_obs = [
ob for ob in order_blocks
if ob.direction == ob_direction
and not ob.mitigated
and ob.origin_time <= sweep_time
]
if not candidate_obs:
return None
# Prendre l'OB le plus récent
ob = max(candidate_obs, key=lambda x: x.origin_time)
# Vérifier que le prix actuel est proche de l'OB ou dans la zone
if direction == "buy":
# Le prix doit être au-dessus ou proche du bas de l'OB
if sweep_price < ob.bottom * 0.998: # trop loin en dessous
return None
entry = ob.top
sl = ob.bottom - 2 * self.params.pip_value
tp = entry + (entry - sl) * self.params.rr_ratio
else:
if sweep_price > ob.top * 1.002:
return None
entry = ob.bottom
sl = ob.top + 2 * self.params.pip_value
tp = entry - (sl - entry) * self.params.rr_ratio
return TradeSignal(
direction=direction,
entry_price=entry,
stop_loss=sl,
take_profit=tp,
signal_type=f"LiquiditySweep_{swept_level.direction.upper()}+OrderBlock",
time=sweep_time,
order_block=ob,
liquidity_level=swept_level,
)