""" MarketDataService — source de données hybride avec cache DB. Stratégie de fetch pour une période [start, end] demandée : 1. DB d'abord → on récupère ce qu'on a déjà, on ne refetch jamais ce qui existe 2. Gaps récents → yfinance (dans ses limites temporelles) 3. Gaps historiques → TwelveData (pour tout ce que yfinance ne peut pas couvrir) 4. Tout est stocké → les prochaines requêtes seront servies depuis la DB Exemple (M1, 10 derniers jours demandés) : - DB : déjà ce qu'on a en cache - yfinance : J-7 → maintenant (limite M1 = 7 jours) - TwelveData : J-10 → J-7 (historique au-delà de yfinance) """ import logging from datetime import datetime, timedelta from typing import Optional import pandas as pd from sqlalchemy import and_, select, text from sqlalchemy.ext.asyncio import AsyncSession from app.models.candle import Candle from app.services.data_providers.constants import GRANULARITY_MINUTES from app.services.data_providers.twelvedata_provider import TwelveDataProvider from app.services.data_providers.yfinance_provider import YFinanceProvider logger = logging.getLogger(__name__) # Facteur pour compenser weekends + jours fériés dans le calcul de la fenêtre TRADING_DAYS_FACTOR = 1.5 class MarketDataService: def __init__(self, db: AsyncSession) -> None: self._db = db self._yf = YFinanceProvider() self._td = TwelveDataProvider() # ── API publique ────────────────────────────────────────────────────────── async def get_candles( self, instrument: str, granularity: str, count: int = 200, start: Optional[datetime] = None, end: Optional[datetime] = None, ) -> pd.DataFrame: """ Retourne jusqu'à `count` bougies pour instrument/granularity. Si start/end fournis, ils définissent la plage exacte. Processus : 1. Calcul de la fenêtre temporelle nécessaire 2. Détection et comblement des gaps (yfinance + TwelveData) 3. Lecture depuis DB et retour """ if end is None: end = datetime.utcnow() if start is None: minutes = GRANULARITY_MINUTES.get(granularity, 60) start = end - timedelta(minutes=int(minutes * count * TRADING_DAYS_FACTOR)) await self._fill_gaps(instrument, granularity, start, end) return await self._db_fetch(instrument, granularity, start, end, limit=count) async def get_latest_price(self, instrument: str) -> Optional[float]: """Retourne le dernier close connu (DB ou yfinance M1).""" stmt = ( select(Candle.close) .where(Candle.instrument == instrument) .order_by(Candle.time.desc()) .limit(1) ) result = await self._db.execute(stmt) price = result.scalar_one_or_none() if price: return float(price) df = await self.get_candles(instrument, "M1", count=2) return float(df.iloc[-1]["close"]) if not df.empty else None # ── Logique de détection et comblement des gaps ─────────────────────────── async def _fill_gaps( self, instrument: str, granularity: str, start: datetime, end: datetime, ) -> None: gaps = await self._find_gaps(instrument, granularity, start, end) for gap_start, gap_end in gaps: await self._fetch_and_store_gap(instrument, granularity, gap_start, gap_end) async def _find_gaps( self, instrument: str, granularity: str, start: datetime, end: datetime, ) -> list[tuple[datetime, datetime]]: """ Retourne la liste des (gap_start, gap_end) manquants en DB. Logique : - Si rien en DB pour la plage → un seul gap = (start, end) - Sinon → combler avant le plus ancien et/ou après le plus récent """ stmt = ( select(Candle.time) .where( and_( Candle.instrument == instrument, Candle.granularity == granularity, Candle.time >= start, Candle.time <= end, ) ) .order_by(Candle.time) ) result = await self._db.execute(stmt) times = [r[0] for r in result.fetchall()] if not times: return [(start, end)] gaps: list[tuple[datetime, datetime]] = [] interval = timedelta(minutes=GRANULARITY_MINUTES.get(granularity, 60)) oldest, newest = times[0], times[-1] # Gap avant : demande de données antérieures à ce qu'on a if start < oldest - interval: gaps.append((start, oldest)) # Gap après : demande de données plus récentes que ce qu'on a freshness_threshold = interval * 2 if end > newest + freshness_threshold: gaps.append((newest, end)) return gaps async def _fetch_and_store_gap( self, instrument: str, granularity: str, gap_start: datetime, gap_end: datetime, ) -> None: """ Fetche un gap : 1. yfinance pour la partie récente (dans ses limites) 2. TwelveData en fallback si yfinance échoue, ou pour la partie historique """ yf_cutoff = self._yf.yf_cutoff(granularity) yf_covered = False # ── yfinance : partie récente du gap ───────────────────────────────── if yf_cutoff is not None: yf_start = max(gap_start, yf_cutoff) if yf_start < gap_end: df_yf = await self._yf.fetch(instrument, granularity, yf_start, gap_end) if not df_yf.empty: await self._store(df_yf, instrument, granularity) yf_covered = True # ── TwelveData : historique + fallback si yfinance indisponible ─────── if self._td.is_configured(): # Partie historique (avant la limite yfinance) td_end = yf_cutoff if (yf_cutoff and gap_start < yf_cutoff) else None if td_end and gap_start < td_end: df_td = await self._td.fetch(instrument, granularity, gap_start, td_end) if not df_td.empty: await self._store(df_td, instrument, granularity) # Fallback pour la partie récente si yfinance n'a rien retourné if not yf_covered: yf_start = max(gap_start, yf_cutoff) if yf_cutoff else gap_start if yf_start < gap_end: logger.info( "yfinance indisponible — fallback TwelveData pour %s %s [%s → %s]", instrument, granularity, yf_start.strftime("%Y-%m-%d"), gap_end.strftime("%Y-%m-%d"), ) df_td2 = await self._td.fetch(instrument, granularity, yf_start, gap_end) if not df_td2.empty: await self._store(df_td2, instrument, granularity) elif not yf_covered: logger.warning( "Gap [%s → %s] pour %s %s — " "TWELVEDATA_API_KEY manquante et yfinance indisponible.", gap_start.strftime("%Y-%m-%d"), gap_end.strftime("%Y-%m-%d"), instrument, granularity, ) # ── DB helpers ──────────────────────────────────────────────────────────── async def _db_fetch( self, instrument: str, granularity: str, start: datetime, end: datetime, limit: int = 5000, ) -> pd.DataFrame: stmt = ( select(Candle) .where( and_( Candle.instrument == instrument, Candle.granularity == granularity, Candle.time >= start, Candle.time <= end, ) ) .order_by(Candle.time.desc()) .limit(limit) ) result = await self._db.execute(stmt) rows = result.scalars().all() if not rows: return pd.DataFrame(columns=["time", "open", "high", "low", "close", "volume"]) df = pd.DataFrame( [{"time": r.time, "open": r.open, "high": r.high, "low": r.low, "close": r.close, "volume": r.volume} for r in rows] ) return df.sort_values("time").reset_index(drop=True) async def _store(self, df: pd.DataFrame, instrument: str, granularity: str) -> None: """ Insère les bougies en DB avec INSERT OR IGNORE. Les bougies déjà présentes (même instrument+granularity+time) ne sont jamais modifiées. """ if df.empty: return for _, row in df.iterrows(): await self._db.execute( text( "INSERT OR IGNORE INTO candles " "(instrument, granularity, time, open, high, low, close, volume, complete) " "VALUES (:instrument, :granularity, :time, :open, :high, :low, :close, :volume, 1)" ), { "instrument": instrument, "granularity": granularity, "time": pd.Timestamp(row["time"]).to_pydatetime().replace(tzinfo=None), "open": float(row["open"]), "high": float(row["high"]), "low": float(row["low"]), "close": float(row["close"]), "volume": int(row.get("volume", 0)), }, ) await self._db.commit()