""" Provider TwelveData — données OHLCV historiques illimitées. Plan gratuit : 800 requêtes/jour, 8 req/min. Docs : https://twelvedata.com/docs """ import asyncio import logging import time from datetime import datetime, timedelta from typing import Optional import httpx import pandas as pd from app.core.config import settings from app.services.data_providers.constants import GRANULARITY_TO_TD, INSTRUMENT_TO_TD logger = logging.getLogger(__name__) TWELVEDATA_BASE_URL = "https://api.twelvedata.com" # Nombre max de points par requête TwelveData (plan gratuit) MAX_OUTPUTSIZE = 5000 # Limite du plan gratuit : 8 req/min _RATE_LIMIT = 8 _RATE_WINDOW = 61 # secondes (légèrement au-dessus de 60 pour la marge) _rate_lock = asyncio.Lock() _request_times: list[float] = [] async def _rate_limited_get(client: httpx.AsyncClient, url: str, params: dict) -> httpx.Response: """Wrapper qui respecte la limite de 8 req/min de TwelveData.""" global _request_times async with _rate_lock: now = time.monotonic() # Purger les timestamps hors fenêtre _request_times = [t for t in _request_times if now - t < _RATE_WINDOW] if len(_request_times) >= _RATE_LIMIT: wait = _RATE_WINDOW - (now - _request_times[0]) if wait > 0: logger.info("TwelveData rate limit : attente %.1f s", wait) await asyncio.sleep(wait) _request_times = [t for t in _request_times if time.monotonic() - t < _RATE_WINDOW] _request_times.append(time.monotonic()) return await client.get(url, params=params) class TwelveDataProvider: """Fetche des candles depuis l'API TwelveData.""" def __init__(self) -> None: self._api_key = settings.twelvedata_api_key def is_configured(self) -> bool: return bool(self._api_key) async def fetch( self, instrument: str, granularity: str, start: datetime, end: Optional[datetime] = None, ) -> pd.DataFrame: """Fetche les candles pour la période [start, end].""" if not self.is_configured(): logger.warning("TwelveData : TWELVEDATA_API_KEY non configurée") return pd.DataFrame() td_symbol = INSTRUMENT_TO_TD.get(instrument) td_interval = GRANULARITY_TO_TD.get(granularity) if not td_symbol or not td_interval: logger.warning("TwelveData : instrument/granularité non supporté — %s %s", instrument, granularity) return pd.DataFrame() if end is None: end = datetime.utcnow() logger.info( "TwelveData fetch : %s (%s) %s → %s", instrument, granularity, start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"), ) # TwelveData supporte max 5000 points par requête # Si la période est longue, on fait plusieurs requêtes all_frames: list[pd.DataFrame] = [] current_end = end while current_end > start: df_chunk = await self._fetch_chunk(td_symbol, td_interval, start, current_end) if df_chunk.empty: break all_frames.append(df_chunk) oldest = df_chunk["time"].min() if oldest <= start: break # Reculer pour la prochaine requête current_end = oldest - timedelta(seconds=1) if not all_frames: return pd.DataFrame() df = pd.concat(all_frames, ignore_index=True) df = df.drop_duplicates(subset=["time"]) df = df.sort_values("time").reset_index(drop=True) df = df[(df["time"] >= start) & (df["time"] <= end)] logger.info("TwelveData : %d bougies récupérées pour %s %s", len(df), instrument, granularity) return df async def _fetch_chunk( self, td_symbol: str, td_interval: str, start: datetime, end: datetime, ) -> pd.DataFrame: params = { "symbol": td_symbol, "interval": td_interval, "start_date": start.strftime("%Y-%m-%d %H:%M:%S"), "end_date": end.strftime("%Y-%m-%d %H:%M:%S"), "outputsize": MAX_OUTPUTSIZE, "format": "JSON", "apikey": self._api_key, } try: async with httpx.AsyncClient(timeout=30) as client: resp = await _rate_limited_get(client, f"{TWELVEDATA_BASE_URL}/time_series", params=params) resp.raise_for_status() data = resp.json() except Exception as e: logger.error("TwelveData erreur HTTP : %s", e) return pd.DataFrame() if data.get("status") == "error": logger.error("TwelveData API erreur : %s", data.get("message")) return pd.DataFrame() values = data.get("values", []) if not values: return pd.DataFrame() rows = [] for v in values: rows.append({ "time": pd.to_datetime(v["datetime"]), "open": float(v["open"]), "high": float(v["high"]), "low": float(v["low"]), "close": float(v["close"]), "volume": int(v.get("volume", 0)), }) df = pd.DataFrame(rows) df = df.sort_values("time").reset_index(drop=True) return df