1use std::{
19 collections::HashMap,
20 num::NonZeroU32,
21 sync::{
22 Arc,
23 atomic::{AtomicU64, Ordering},
24 },
25};
26
27use nautilus_core::string::urlencoding;
28use nautilus_network::{
29 http::{HttpClient, Method},
30 ratelimiter::quota::Quota,
31 retry::{RetryConfig, RetryManager},
32};
33use serde::{Deserialize, Serialize, de::DeserializeOwned};
34use tokio_util::sync::CancellationToken;
35
36use super::{
37 error::BetfairHttpError,
38 models::{LoginResponse, LoginStatus},
39};
40use crate::common::{
41 consts::{
42 BETFAIR_ACCOUNTS_URL, BETFAIR_BETTING_URL, BETFAIR_IDENTITY_LOGIN_URL,
43 BETFAIR_KEEP_ALIVE_URL, BETFAIR_NAVIGATION_URL, BETFAIR_RATE_LIMIT_DEFAULT,
44 BETFAIR_RATE_LIMIT_ORDERS, HEADER_X_APPLICATION, HEADER_X_AUTHENTICATION,
45 },
46 credential::BetfairCredential,
47};
48
49#[derive(Debug, Serialize)]
51struct JsonRpcRequest<P: Serialize> {
52 jsonrpc: &'static str,
53 method: String,
54 params: P,
55 id: u64,
56}
57
58#[derive(Debug, Deserialize)]
60struct JsonRpcResponse<T> {
61 result: Option<T>,
62 error: Option<JsonRpcError>,
63}
64
65#[derive(Debug, Deserialize)]
67struct JsonRpcError {
68 code: i64,
69 message: String,
70}
71
72#[derive(Debug)]
77pub struct BetfairHttpClient {
78 client: HttpClient,
79 credential: BetfairCredential,
80 session_token: Arc<tokio::sync::RwLock<Option<String>>>,
81 retry_manager: RetryManager<BetfairHttpError>,
82 cancellation_token: CancellationToken,
83 request_id: AtomicU64,
84 url_identity_login: String,
85 url_keep_alive: String,
86 url_betting: String,
87 url_accounts: String,
88 url_navigation: String,
89}
90
91impl BetfairHttpClient {
92 pub fn new(
98 credential: BetfairCredential,
99 timeout_secs: Option<u64>,
100 max_retries: Option<u32>,
101 retry_delay_ms: Option<u64>,
102 proxy_url: Option<String>,
103 request_rate_per_second: Option<u32>,
104 order_request_rate_per_second: Option<u32>,
105 ) -> Result<Self, BetfairHttpError> {
106 let retry_config = RetryConfig {
107 max_retries: max_retries.unwrap_or(3),
108 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
109 max_delay_ms: 10_000,
110 backoff_factor: 2.0,
111 jitter_ms: 500,
112 operation_timeout_ms: Some(30_000),
113 immediate_first: false,
114 max_elapsed_ms: Some(120_000),
115 };
116
117 Ok(Self {
118 client: HttpClient::new(
119 HashMap::new(),
120 Vec::new(),
121 Self::rate_limiter_quotas(
122 request_rate_per_second.unwrap_or(5),
123 order_request_rate_per_second.unwrap_or(20),
124 )?,
125 Self::default_quota(request_rate_per_second.unwrap_or(5))?,
126 timeout_secs,
127 proxy_url,
128 )
129 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
130 credential,
131 session_token: Arc::new(tokio::sync::RwLock::new(None)),
132 retry_manager: RetryManager::new(retry_config),
133 cancellation_token: CancellationToken::new(),
134 request_id: AtomicU64::new(1),
135 url_identity_login: BETFAIR_IDENTITY_LOGIN_URL.to_string(),
136 url_keep_alive: BETFAIR_KEEP_ALIVE_URL.to_string(),
137 url_betting: BETFAIR_BETTING_URL.to_string(),
138 url_accounts: BETFAIR_ACCOUNTS_URL.to_string(),
139 url_navigation: BETFAIR_NAVIGATION_URL.to_string(),
140 })
141 }
142
143 #[must_use]
148 pub fn with_urls(
149 mut self,
150 identity_login: String,
151 betting: String,
152 accounts: String,
153 navigation: String,
154 ) -> Self {
155 if let Some(base) = identity_login.rfind('/') {
157 self.url_keep_alive = format!("{}/keepAlive", &identity_login[..base]);
158 }
159 self.url_identity_login = identity_login;
160 self.url_betting = betting;
161 self.url_accounts = accounts;
162 self.url_navigation = navigation;
163 self
164 }
165
166 pub fn cancellation_token(&self) -> &CancellationToken {
168 &self.cancellation_token
169 }
170
171 pub async fn session_token(&self) -> Option<String> {
173 self.session_token.read().await.clone()
174 }
175
176 pub async fn is_connected(&self) -> bool {
178 self.session_token.read().await.is_some()
179 }
180
181 #[must_use]
183 pub fn app_key(&self) -> &str {
184 self.credential.app_key()
185 }
186
187 pub async fn connect(&self) -> Result<(), BetfairHttpError> {
197 let form_body = format!(
198 "username={}&password={}",
199 urlencoding::encode(self.credential.username()),
200 urlencoding::encode(self.credential.password()),
201 );
202
203 let resp_bytes = self
204 .send_identity(&self.url_identity_login, form_body.into_bytes())
205 .await?;
206
207 let resp: LoginResponse = serde_json::from_slice(&resp_bytes)?;
208
209 if resp.status == LoginStatus::Success {
210 log::info!("Betfair login successful");
211 *self.session_token.write().await = Some(resp.token);
212 Ok(())
213 } else {
214 Err(BetfairHttpError::LoginFailed {
215 status: resp.error.unwrap_or_else(|| format!("{:?}", resp.status)),
216 })
217 }
218 }
219
220 pub async fn reconnect(&self) -> Result<(), BetfairHttpError> {
226 log::info!("Betfair reconnecting...");
227 *self.session_token.write().await = None;
228 self.connect().await
229 }
230
231 pub async fn disconnect(&self) {
233 log::info!("Betfair disconnecting...");
234 *self.session_token.write().await = None;
235 }
236
237 pub async fn keep_alive(&self) -> Result<(), BetfairHttpError> {
243 let resp_bytes = self.send_identity(&self.url_keep_alive, Vec::new()).await?;
244
245 let resp: LoginResponse = serde_json::from_slice(&resp_bytes)?;
246
247 if resp.status == LoginStatus::Success {
248 *self.session_token.write().await = Some(resp.token);
249 Ok(())
250 } else {
251 Err(BetfairHttpError::LoginFailed {
252 status: resp.error.unwrap_or_else(|| format!("{:?}", resp.status)),
253 })
254 }
255 }
256
257 pub async fn send_betting<T, P>(&self, method: &str, params: P) -> Result<T, BetfairHttpError>
264 where
265 T: DeserializeOwned,
266 P: Serialize,
267 {
268 self.send_jsonrpc(&self.url_betting, method, params, false)
269 .await
270 }
271
272 pub async fn send_betting_order<T, P>(
279 &self,
280 method: &str,
281 params: P,
282 ) -> Result<T, BetfairHttpError>
283 where
284 T: DeserializeOwned,
285 P: Serialize,
286 {
287 self.send_jsonrpc(&self.url_betting, method, params, true)
288 .await
289 }
290
291 pub async fn send_accounts<T, P>(&self, method: &str, params: P) -> Result<T, BetfairHttpError>
298 where
299 T: DeserializeOwned,
300 P: Serialize,
301 {
302 self.send_jsonrpc(&self.url_accounts, method, params, false)
303 .await
304 }
305
306 pub async fn send_navigation<T>(&self) -> Result<T, BetfairHttpError>
312 where
313 T: DeserializeOwned,
314 {
315 let headers = self.build_headers("application/json").await?;
316
317 let resp = self
318 .client
319 .request(
320 Method::GET,
321 self.url_navigation.clone(),
322 None,
323 Some(headers),
324 None,
325 None,
326 Some(vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]),
327 )
328 .await
329 .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
330
331 if resp.status.as_u16() != 200 {
332 let body = String::from_utf8_lossy(&resp.body);
333 return Err(BetfairHttpError::UnexpectedStatus {
334 status: resp.status.as_u16(),
335 body: body.to_string(),
336 });
337 }
338
339 serde_json::from_slice(&resp.body).map_err(BetfairHttpError::from)
340 }
341
342 fn make_quota(requests_per_second: u32, label: &str) -> Result<Quota, BetfairHttpError> {
343 let rate = NonZeroU32::new(requests_per_second).ok_or_else(|| {
344 BetfairHttpError::InvalidConfiguration(format!("{label} must be greater than zero"))
345 })?;
346
347 Quota::per_second(rate).ok_or_else(|| {
348 BetfairHttpError::InvalidConfiguration(format!("Invalid {label} quota configuration"))
349 })
350 }
351
352 fn rate_limiter_quotas(
353 request_rate_per_second: u32,
354 order_request_rate_per_second: u32,
355 ) -> Result<Vec<(String, Quota)>, BetfairHttpError> {
356 Ok(vec![
357 (
358 BETFAIR_RATE_LIMIT_DEFAULT.to_string(),
359 Self::make_quota(request_rate_per_second, "request_rate_per_second")?,
360 ),
361 (
362 BETFAIR_RATE_LIMIT_ORDERS.to_string(),
363 Self::make_quota(
364 order_request_rate_per_second,
365 "order_request_rate_per_second",
366 )?,
367 ),
368 ])
369 }
370
371 fn default_quota(request_rate_per_second: u32) -> Result<Option<Quota>, BetfairHttpError> {
372 Ok(Some(Self::make_quota(
373 request_rate_per_second,
374 "request_rate_per_second",
375 )?))
376 }
377
378 async fn build_headers(
379 &self,
380 content_type: &str,
381 ) -> Result<HashMap<String, String>, BetfairHttpError> {
382 let token = self
383 .session_token
384 .read()
385 .await
386 .clone()
387 .ok_or(BetfairHttpError::MissingCredentials)?;
388
389 let mut headers = HashMap::new();
390 headers.insert(HEADER_X_AUTHENTICATION.to_string(), token);
391 headers.insert(
392 HEADER_X_APPLICATION.to_string(),
393 self.credential.app_key().to_string(),
394 );
395 headers.insert("Accept".to_string(), "application/json".to_string());
396 headers.insert("Content-Type".to_string(), content_type.to_string());
397 Ok(headers)
398 }
399
400 async fn send_identity(&self, url: &str, body: Vec<u8>) -> Result<Vec<u8>, BetfairHttpError> {
401 let mut headers = HashMap::new();
402 headers.insert("Accept".to_string(), "application/json".to_string());
403 headers.insert(
404 "Content-Type".to_string(),
405 "application/x-www-form-urlencoded".to_string(),
406 );
407 headers.insert(
408 HEADER_X_APPLICATION.to_string(),
409 self.credential.app_key().to_string(),
410 );
411
412 if let Some(token) = self.session_token.read().await.as_ref() {
414 headers.insert(HEADER_X_AUTHENTICATION.to_string(), token.clone());
415 }
416
417 let resp = self
418 .client
419 .request(
420 Method::POST,
421 url.to_string(),
422 None,
423 Some(headers),
424 Some(body),
425 None,
426 Some(vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]),
427 )
428 .await
429 .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
430
431 if resp.status.as_u16() != 200 {
432 let body = String::from_utf8_lossy(&resp.body);
433 return Err(BetfairHttpError::UnexpectedStatus {
434 status: resp.status.as_u16(),
435 body: body.to_string(),
436 });
437 }
438
439 Ok(resp.body.to_vec())
440 }
441
442 async fn send_jsonrpc<T, P>(
443 &self,
444 base_url: &str,
445 method: &str,
446 params: P,
447 is_order: bool,
448 ) -> Result<T, BetfairHttpError>
449 where
450 T: DeserializeOwned,
451 P: Serialize,
452 {
453 let operation_id = format!("{base_url}#{method}");
454 let params_value = serde_json::to_value(¶ms)?;
455
456 let operation = || {
457 let method = method.to_string();
458 let params_value = params_value.clone();
459
460 async move {
461 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
462 let request = JsonRpcRequest {
463 jsonrpc: "2.0",
464 method: method.clone(),
465 params: params_value.clone(),
466 id,
467 };
468
469 let body = serde_json::to_vec(&request)?;
470 let headers = self.build_headers("application/json").await?;
471
472 let rate_keys = if is_order {
473 vec![BETFAIR_RATE_LIMIT_ORDERS.to_string()]
474 } else {
475 vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]
476 };
477
478 let resp = self
479 .client
480 .request(
481 Method::POST,
482 base_url.to_string(),
483 None,
484 Some(headers),
485 Some(body),
486 None,
487 Some(rate_keys),
488 )
489 .await
490 .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
491
492 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
493 Ok(json) => json,
494 Err(_) => {
495 let error_body = String::from_utf8_lossy(&resp.body);
496 let preview: String = error_body.chars().take(500).collect();
497 log::error!(
498 "Non-JSON response: method={method}, status={}, body={}",
499 resp.status.as_u16(),
500 preview,
501 );
502 return Err(BetfairHttpError::UnexpectedStatus {
503 status: resp.status.as_u16(),
504 body: error_body.to_string(),
505 });
506 }
507 };
508
509 let rpc_resp: JsonRpcResponse<T> =
510 serde_json::from_value(json_value).map_err(|e| {
511 log::error!(
512 "Failed to deserialize JSON-RPC response: method={method}, error={e}",
513 );
514 BetfairHttpError::JsonError(e.to_string())
515 })?;
516
517 if let Some(result) = rpc_resp.result {
518 Ok(result)
519 } else if let Some(error) = rpc_resp.error {
520 Err(BetfairHttpError::BetfairError {
521 code: error.code,
522 message: error.message,
523 })
524 } else {
525 Err(BetfairHttpError::JsonError(
526 "Response contains neither result nor error".to_string(),
527 ))
528 }
529 }
530 };
531
532 let should_retry = |error: &BetfairHttpError| -> bool { error.is_retryable() };
533
534 let create_error = |msg: String| -> BetfairHttpError {
535 if msg == "canceled" {
536 BetfairHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
537 } else {
538 BetfairHttpError::NetworkError(msg)
539 }
540 };
541
542 self.retry_manager
543 .execute_with_retry_with_cancel(
544 &operation_id,
545 operation,
546 should_retry,
547 create_error,
548 &self.cancellation_token,
549 )
550 .await
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use rstest::rstest;
557
558 use super::*;
559 use crate::common::consts::{
560 BETFAIR_RATE_LIMIT_DEFAULT, BETFAIR_RATE_LIMIT_ORDERS, METHOD_LIST_MARKET_CATALOGUE,
561 };
562
563 #[rstest]
564 fn test_rate_limiter_quotas_has_expected_keys() {
565 let quotas = BetfairHttpClient::rate_limiter_quotas(5, 20).unwrap();
566 let keys: Vec<&str> = quotas.iter().map(|(k, _)| k.as_str()).collect();
567 assert!(keys.contains(&BETFAIR_RATE_LIMIT_DEFAULT));
568 assert!(keys.contains(&BETFAIR_RATE_LIMIT_ORDERS));
569 }
570
571 #[rstest]
572 fn test_default_quota_is_some() {
573 assert!(BetfairHttpClient::default_quota(5).unwrap().is_some());
574 }
575
576 #[rstest]
577 fn test_rate_limiter_quotas_reject_zero_rate_limit() {
578 let result = BetfairHttpClient::rate_limiter_quotas(0, 20);
579
580 assert!(result.is_err());
581 assert!(
582 result
583 .err()
584 .unwrap()
585 .to_string()
586 .contains("request_rate_per_second")
587 );
588 }
589
590 #[rstest]
591 fn test_json_rpc_request_serialization() {
592 let request = JsonRpcRequest {
593 jsonrpc: "2.0",
594 method: METHOD_LIST_MARKET_CATALOGUE.to_string(),
595 params: serde_json::json!({"filter": {}, "maxResults": 100}),
596 id: 1,
597 };
598
599 let json = serde_json::to_value(&request).unwrap();
600 assert_eq!(json["jsonrpc"], "2.0");
601 assert_eq!(json["method"], "SportsAPING/v1.0/listMarketCatalogue");
602 assert_eq!(json["params"]["maxResults"], 100);
603 assert_eq!(json["id"], 1);
604 }
605
606 #[rstest]
607 fn test_json_rpc_response_success() {
608 let json = r#"{"result": [1, 2, 3], "error": null}"#;
609 let resp: JsonRpcResponse<Vec<i32>> = serde_json::from_str(json).unwrap();
610 assert_eq!(resp.result, Some(vec![1, 2, 3]));
611 assert!(resp.error.is_none());
612 }
613
614 #[rstest]
615 fn test_json_rpc_response_error() {
616 let json = r#"{"result": null, "error": {"code": -32600, "message": "Invalid request"}}"#;
617 let resp: JsonRpcResponse<serde_json::Value> = serde_json::from_str(json).unwrap();
618 assert!(resp.result.is_none());
619 let error = resp.error.unwrap();
620 assert_eq!(error.code, -32600);
621 assert_eq!(error.message, "Invalid request");
622 }
623}