1use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use ahash::{AHashMap, AHashSet};
19use nautilus_core::{
20 UnixNanos,
21 consts::NAUTILUS_USER_AGENT,
22 string::{parsing::precision_from_str, secret::REDACTED, urlencoding},
23};
24use nautilus_model::instruments::InstrumentAny;
25use nautilus_network::http::HttpClient;
26use ustr::Ustr;
27
28use super::{
29 error::{Error, TardisErrorResponse},
30 instruments::is_available,
31 models::TardisInstrumentInfo,
32 parse::parse_instrument_any,
33 query::InstrumentFilter,
34};
35use crate::{
36 common::{
37 consts::{TARDIS_REST_QUOTA, TARDIS_REST_RATE_KEY},
38 credential::Credential,
39 enums::TardisExchange,
40 parse::{normalize_instrument_id, parse_instrument_id},
41 urls::TARDIS_HTTP_BASE_URL,
42 },
43 machine::types::{TardisInstrumentKey, TardisInstrumentMiniInfo},
44};
45
46pub type Result<T> = std::result::Result<T, Error>;
47
48#[cfg_attr(
51 feature = "python",
52 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.tardis", from_py_object)
53)]
54#[cfg_attr(
55 feature = "python",
56 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")
57)]
58#[derive(Clone)]
59pub struct TardisHttpClient {
60 base_url: String,
61 credential: Option<Credential>,
62 client: HttpClient,
63 normalize_symbols: bool,
64}
65
66impl Debug for TardisHttpClient {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct(stringify!(TardisHttpClient))
69 .field("base_url", &self.base_url)
70 .field("credential", &self.credential.as_ref().map(|_| REDACTED))
71 .field("normalize_symbols", &self.normalize_symbols)
72 .finish()
73 }
74}
75
76impl TardisHttpClient {
77 pub fn new(
84 api_key: Option<&str>,
85 base_url: Option<&str>,
86 timeout_secs: Option<u64>,
87 normalize_symbols: bool,
88 proxy_url: Option<String>,
89 ) -> anyhow::Result<Self> {
90 let credential = Credential::resolve(api_key.map(ToString::to_string));
91
92 if credential.is_none() {
93 anyhow::bail!(
94 "API key must be provided or set in the 'TARDIS_API_KEY' environment variable"
95 );
96 }
97
98 let base_url =
99 base_url.map_or_else(|| TARDIS_HTTP_BASE_URL.to_string(), ToString::to_string);
100
101 let mut headers = HashMap::new();
102 headers.insert("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string());
103
104 if let Some(ref cred) = credential {
105 headers.insert(
106 "Authorization".to_string(),
107 format!("Bearer {}", cred.api_key()),
108 );
109 }
110
111 let keyed_quotas = vec![(TARDIS_REST_RATE_KEY.to_string(), *TARDIS_REST_QUOTA)];
112 let client = HttpClient::new(
113 headers,
114 vec![],
115 keyed_quotas,
116 Some(*TARDIS_REST_QUOTA),
117 timeout_secs.or(Some(60)),
118 proxy_url,
119 )?;
120
121 Ok(Self {
122 base_url,
123 credential,
124 client,
125 normalize_symbols,
126 })
127 }
128
129 #[must_use]
131 pub const fn credential(&self) -> Option<&Credential> {
132 self.credential.as_ref()
133 }
134
135 pub async fn instruments_info(
143 &self,
144 exchange: TardisExchange,
145 symbol: Option<&str>,
146 filter: Option<&InstrumentFilter>,
147 ) -> Result<Vec<TardisInstrumentInfo>> {
148 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
149
150 if let Some(symbol) = symbol {
151 url.push_str(&format!("/{symbol}"));
152 }
153
154 if let Some(filter) = filter
155 && let Ok(filter_json) = serde_json::to_string(filter)
156 {
157 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
158 }
159 log::debug!("Requesting: {url}");
160
161 let rate_keys = Some(vec![TARDIS_REST_RATE_KEY.to_string()]);
162 let response = self
163 .client
164 .get(url, None, None, None, rate_keys)
165 .await
166 .map_err(|e| Error::Request(e.to_string()))?;
167
168 let status = response.status.as_u16();
169 log::debug!("Response status: {status}");
170
171 if !response.status.is_success() {
172 let body = String::from_utf8_lossy(&response.body).to_string();
173 return if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&body) {
174 Err(Error::ApiError {
175 status,
176 code: error.code,
177 message: error.message,
178 })
179 } else {
180 Err(Error::ApiError {
181 status,
182 code: 0,
183 message: body,
184 })
185 };
186 }
187
188 let body = String::from_utf8_lossy(&response.body);
189 log::trace!("{body}");
190
191 if let Ok(instrument) = serde_json::from_str::<TardisInstrumentInfo>(&body) {
192 return Ok(vec![instrument]);
193 }
194
195 match serde_json::from_str(&body) {
196 Ok(parsed) => Ok(parsed),
197 Err(e) => {
198 log::error!("Failed to parse response: {e}");
199 log::debug!("Response body was: {body}");
200 Err(Error::ResponseParse(e.to_string()))
201 }
202 }
203 }
204
205 #[expect(clippy::too_many_arguments)]
213 pub async fn instruments(
214 &self,
215 exchange: TardisExchange,
216 symbol: Option<&str>,
217 filter: Option<&InstrumentFilter>,
218 start: Option<UnixNanos>,
219 end: Option<UnixNanos>,
220 available_offset: Option<UnixNanos>,
221 effective: Option<UnixNanos>,
222 ts_init: Option<UnixNanos>,
223 ) -> Result<Vec<InstrumentAny>> {
224 let response = self.instruments_info(exchange, symbol, filter).await?;
225
226 Ok(response
227 .into_iter()
228 .filter(|info| is_available(info, start, end, available_offset, effective))
229 .flat_map(|info| {
230 parse_instrument_any(&info, effective, ts_init, self.normalize_symbols)
231 })
232 .collect())
233 }
234
235 pub async fn bootstrap_instruments(
246 &self,
247 exchanges: &AHashSet<TardisExchange>,
248 ) -> Result<(
249 AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
250 Vec<InstrumentAny>,
251 )> {
252 let mut instrument_map: AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>> =
253 AHashMap::new();
254 let mut nautilus_instruments: Vec<InstrumentAny> = Vec::new();
255
256 for exchange in exchanges {
257 log::info!("Fetching instruments for {exchange}");
258
259 let instruments_info = match self.instruments_info(*exchange, None, None).await {
260 Ok(info) => info,
261 Err(e) => {
262 log::error!("Failed to fetch instruments for {exchange}: {e}");
263 continue;
264 }
265 };
266
267 log::info!(
268 "Received {} instruments for {exchange}",
269 instruments_info.len()
270 );
271
272 for inst in &instruments_info {
273 let instrument_type = inst.instrument_type;
274 let price_precision = precision_from_str(&inst.price_increment.to_string());
275 let size_precision = precision_from_str(&inst.amount_increment.to_string());
276
277 let instrument_id = if self.normalize_symbols {
278 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
279 } else {
280 parse_instrument_id(exchange, inst.id)
281 };
282
283 let info = TardisInstrumentMiniInfo::new(
284 instrument_id,
285 Some(Ustr::from(&inst.id)),
286 *exchange,
287 price_precision,
288 size_precision,
289 );
290 let key = info.as_tardis_instrument_key();
291 instrument_map.insert(key, Arc::new(info));
292 }
293
294 for inst in instruments_info {
295 nautilus_instruments.extend(parse_instrument_any(
296 &inst,
297 None,
298 None,
299 self.normalize_symbols,
300 ));
301 }
302 }
303
304 Ok((instrument_map, nautilus_instruments))
305 }
306}