Skip to main content

nautilus_betfair/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Betfair HTTP client implementation.
17
18use std::{
19    collections::HashMap,
20    num::NonZeroU32,
21    sync::{
22        Arc,
23        atomic::{AtomicU64, Ordering},
24    },
25};
26
27use nautilus_core::string::urlencoding;
28use nautilus_network::{
29    http::{HttpClient, Method},
30    ratelimiter::quota::Quota,
31    retry::{RetryConfig, RetryManager},
32};
33use serde::{Deserialize, Serialize, de::DeserializeOwned};
34use tokio_util::sync::CancellationToken;
35
36use super::{
37    error::BetfairHttpError,
38    models::{LoginResponse, LoginStatus},
39};
40use crate::common::{
41    consts::{
42        BETFAIR_ACCOUNTS_URL, BETFAIR_BETTING_URL, BETFAIR_IDENTITY_LOGIN_URL,
43        BETFAIR_KEEP_ALIVE_URL, BETFAIR_NAVIGATION_URL, BETFAIR_RATE_LIMIT_DEFAULT,
44        BETFAIR_RATE_LIMIT_ORDERS, HEADER_X_APPLICATION, HEADER_X_AUTHENTICATION,
45    },
46    credential::BetfairCredential,
47};
48
49/// Betfair JSON-RPC request envelope.
50#[derive(Debug, Serialize)]
51struct JsonRpcRequest<P: Serialize> {
52    jsonrpc: &'static str,
53    method: String,
54    params: P,
55    id: u64,
56}
57
58/// Betfair JSON-RPC response envelope.
59#[derive(Debug, Deserialize)]
60struct JsonRpcResponse<T> {
61    result: Option<T>,
62    error: Option<JsonRpcError>,
63}
64
65/// JSON-RPC error object.
66#[derive(Debug, Deserialize)]
67struct JsonRpcError {
68    code: i64,
69    message: String,
70}
71
72/// Betfair HTTP client for raw API operations.
73///
74/// Handles session-token authentication, JSON-RPC protocol, form-encoded
75/// identity requests, REST navigation, rate limiting, and retry logic.
76#[derive(Debug)]
77pub struct BetfairHttpClient {
78    client: HttpClient,
79    credential: BetfairCredential,
80    session_token: Arc<tokio::sync::RwLock<Option<String>>>,
81    retry_manager: RetryManager<BetfairHttpError>,
82    cancellation_token: CancellationToken,
83    request_id: AtomicU64,
84    url_identity_login: String,
85    url_keep_alive: String,
86    url_betting: String,
87    url_accounts: String,
88    url_navigation: String,
89}
90
91impl BetfairHttpClient {
92    /// Creates a new [`BetfairHttpClient`].
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the HTTP client cannot be created.
97    pub fn new(
98        credential: BetfairCredential,
99        timeout_secs: Option<u64>,
100        max_retries: Option<u32>,
101        retry_delay_ms: Option<u64>,
102        proxy_url: Option<String>,
103        request_rate_per_second: Option<u32>,
104        order_request_rate_per_second: Option<u32>,
105    ) -> Result<Self, BetfairHttpError> {
106        let retry_config = RetryConfig {
107            max_retries: max_retries.unwrap_or(3),
108            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
109            max_delay_ms: 10_000,
110            backoff_factor: 2.0,
111            jitter_ms: 500,
112            operation_timeout_ms: Some(30_000),
113            immediate_first: false,
114            max_elapsed_ms: Some(120_000),
115        };
116
117        Ok(Self {
118            client: HttpClient::new(
119                HashMap::new(),
120                Vec::new(),
121                Self::rate_limiter_quotas(
122                    request_rate_per_second.unwrap_or(5),
123                    order_request_rate_per_second.unwrap_or(20),
124                )?,
125                Self::default_quota(request_rate_per_second.unwrap_or(5))?,
126                timeout_secs,
127                proxy_url,
128            )
129            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
130            credential,
131            session_token: Arc::new(tokio::sync::RwLock::new(None)),
132            retry_manager: RetryManager::new(retry_config),
133            cancellation_token: CancellationToken::new(),
134            request_id: AtomicU64::new(1),
135            url_identity_login: BETFAIR_IDENTITY_LOGIN_URL.to_string(),
136            url_keep_alive: BETFAIR_KEEP_ALIVE_URL.to_string(),
137            url_betting: BETFAIR_BETTING_URL.to_string(),
138            url_accounts: BETFAIR_ACCOUNTS_URL.to_string(),
139            url_navigation: BETFAIR_NAVIGATION_URL.to_string(),
140        })
141    }
142
143    /// Overrides the API base URLs (for testing with mock servers).
144    ///
145    /// The keep-alive URL is derived from `identity_login` by replacing the
146    /// path with `/keepAlive`.
147    #[must_use]
148    pub fn with_urls(
149        mut self,
150        identity_login: String,
151        betting: String,
152        accounts: String,
153        navigation: String,
154    ) -> Self {
155        // Derive keep-alive from same host as login
156        if let Some(base) = identity_login.rfind('/') {
157            self.url_keep_alive = format!("{}/keepAlive", &identity_login[..base]);
158        }
159        self.url_identity_login = identity_login;
160        self.url_betting = betting;
161        self.url_accounts = accounts;
162        self.url_navigation = navigation;
163        self
164    }
165
166    /// Returns the cancellation token for this client.
167    pub fn cancellation_token(&self) -> &CancellationToken {
168        &self.cancellation_token
169    }
170
171    /// Returns the current session token, if authenticated.
172    pub async fn session_token(&self) -> Option<String> {
173        self.session_token.read().await.clone()
174    }
175
176    /// Returns whether the client has an active session.
177    pub async fn is_connected(&self) -> bool {
178        self.session_token.read().await.is_some()
179    }
180
181    /// Returns the application key.
182    #[must_use]
183    pub fn app_key(&self) -> &str {
184        self.credential.app_key()
185    }
186
187    /// Authenticates with Betfair using interactive (non-cert) login.
188    ///
189    /// Sends credentials to the Identity API and stores the returned
190    /// session token for subsequent requests.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the login request fails or authentication
195    /// is rejected.
196    pub async fn connect(&self) -> Result<(), BetfairHttpError> {
197        let form_body = format!(
198            "username={}&password={}",
199            urlencoding::encode(self.credential.username()),
200            urlencoding::encode(self.credential.password()),
201        );
202
203        let resp_bytes = self
204            .send_identity(&self.url_identity_login, form_body.into_bytes())
205            .await?;
206
207        let resp: LoginResponse = serde_json::from_slice(&resp_bytes)?;
208
209        if resp.status == LoginStatus::Success {
210            log::info!("Betfair login successful");
211            *self.session_token.write().await = Some(resp.token);
212            Ok(())
213        } else {
214            Err(BetfairHttpError::LoginFailed {
215                status: resp.error.unwrap_or_else(|| format!("{:?}", resp.status)),
216            })
217        }
218    }
219
220    /// Resets the session and re-authenticates.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if re-authentication fails.
225    pub async fn reconnect(&self) -> Result<(), BetfairHttpError> {
226        log::info!("Betfair reconnecting...");
227        *self.session_token.write().await = None;
228        self.connect().await
229    }
230
231    /// Clears the session token.
232    pub async fn disconnect(&self) {
233        log::info!("Betfair disconnecting...");
234        *self.session_token.write().await = None;
235    }
236
237    /// Sends a keep-alive request to renew the session.
238    ///
239    /// # Errors
240    ///
241    /// Returns an error if the keep-alive request fails.
242    pub async fn keep_alive(&self) -> Result<(), BetfairHttpError> {
243        let resp_bytes = self.send_identity(&self.url_keep_alive, Vec::new()).await?;
244
245        let resp: LoginResponse = serde_json::from_slice(&resp_bytes)?;
246
247        if resp.status == LoginStatus::Success {
248            *self.session_token.write().await = Some(resp.token);
249            Ok(())
250        } else {
251            Err(BetfairHttpError::LoginFailed {
252                status: resp.error.unwrap_or_else(|| format!("{:?}", resp.status)),
253            })
254        }
255    }
256
257    /// Sends a JSON-RPC request to the Betting API.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the request fails, authentication is missing,
262    /// or the response contains a JSON-RPC error.
263    pub async fn send_betting<T, P>(&self, method: &str, params: P) -> Result<T, BetfairHttpError>
264    where
265        T: DeserializeOwned,
266        P: Serialize,
267    {
268        self.send_jsonrpc(&self.url_betting, method, params, false)
269            .await
270    }
271
272    /// Sends a JSON-RPC request to the Betting API with order rate limiting.
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the request fails, authentication is missing,
277    /// or the response contains a JSON-RPC error.
278    pub async fn send_betting_order<T, P>(
279        &self,
280        method: &str,
281        params: P,
282    ) -> Result<T, BetfairHttpError>
283    where
284        T: DeserializeOwned,
285        P: Serialize,
286    {
287        self.send_jsonrpc(&self.url_betting, method, params, true)
288            .await
289    }
290
291    /// Sends a JSON-RPC request to the Accounts API.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the request fails, authentication is missing,
296    /// or the response contains a JSON-RPC error.
297    pub async fn send_accounts<T, P>(&self, method: &str, params: P) -> Result<T, BetfairHttpError>
298    where
299        T: DeserializeOwned,
300        P: Serialize,
301    {
302        self.send_jsonrpc(&self.url_accounts, method, params, false)
303            .await
304    }
305
306    /// Sends a GET request to the Navigation API.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the request fails or the response cannot be parsed.
311    pub async fn send_navigation<T>(&self) -> Result<T, BetfairHttpError>
312    where
313        T: DeserializeOwned,
314    {
315        let headers = self.build_headers("application/json").await?;
316
317        let resp = self
318            .client
319            .request(
320                Method::GET,
321                self.url_navigation.clone(),
322                None,
323                Some(headers),
324                None,
325                None,
326                Some(vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]),
327            )
328            .await
329            .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
330
331        if resp.status.as_u16() != 200 {
332            let body = String::from_utf8_lossy(&resp.body);
333            return Err(BetfairHttpError::UnexpectedStatus {
334                status: resp.status.as_u16(),
335                body: body.to_string(),
336            });
337        }
338
339        serde_json::from_slice(&resp.body).map_err(BetfairHttpError::from)
340    }
341
342    fn make_quota(requests_per_second: u32, label: &str) -> Result<Quota, BetfairHttpError> {
343        let rate = NonZeroU32::new(requests_per_second).ok_or_else(|| {
344            BetfairHttpError::InvalidConfiguration(format!("{label} must be greater than zero"))
345        })?;
346
347        Quota::per_second(rate).ok_or_else(|| {
348            BetfairHttpError::InvalidConfiguration(format!("Invalid {label} quota configuration"))
349        })
350    }
351
352    fn rate_limiter_quotas(
353        request_rate_per_second: u32,
354        order_request_rate_per_second: u32,
355    ) -> Result<Vec<(String, Quota)>, BetfairHttpError> {
356        Ok(vec![
357            (
358                BETFAIR_RATE_LIMIT_DEFAULT.to_string(),
359                Self::make_quota(request_rate_per_second, "request_rate_per_second")?,
360            ),
361            (
362                BETFAIR_RATE_LIMIT_ORDERS.to_string(),
363                Self::make_quota(
364                    order_request_rate_per_second,
365                    "order_request_rate_per_second",
366                )?,
367            ),
368        ])
369    }
370
371    fn default_quota(request_rate_per_second: u32) -> Result<Option<Quota>, BetfairHttpError> {
372        Ok(Some(Self::make_quota(
373            request_rate_per_second,
374            "request_rate_per_second",
375        )?))
376    }
377
378    async fn build_headers(
379        &self,
380        content_type: &str,
381    ) -> Result<HashMap<String, String>, BetfairHttpError> {
382        let token = self
383            .session_token
384            .read()
385            .await
386            .clone()
387            .ok_or(BetfairHttpError::MissingCredentials)?;
388
389        let mut headers = HashMap::new();
390        headers.insert(HEADER_X_AUTHENTICATION.to_string(), token);
391        headers.insert(
392            HEADER_X_APPLICATION.to_string(),
393            self.credential.app_key().to_string(),
394        );
395        headers.insert("Accept".to_string(), "application/json".to_string());
396        headers.insert("Content-Type".to_string(), content_type.to_string());
397        Ok(headers)
398    }
399
400    async fn send_identity(&self, url: &str, body: Vec<u8>) -> Result<Vec<u8>, BetfairHttpError> {
401        let mut headers = HashMap::new();
402        headers.insert("Accept".to_string(), "application/json".to_string());
403        headers.insert(
404            "Content-Type".to_string(),
405            "application/x-www-form-urlencoded".to_string(),
406        );
407        headers.insert(
408            HEADER_X_APPLICATION.to_string(),
409            self.credential.app_key().to_string(),
410        );
411
412        // Add session token if we have one (for keep-alive)
413        if let Some(token) = self.session_token.read().await.as_ref() {
414            headers.insert(HEADER_X_AUTHENTICATION.to_string(), token.clone());
415        }
416
417        let resp = self
418            .client
419            .request(
420                Method::POST,
421                url.to_string(),
422                None,
423                Some(headers),
424                Some(body),
425                None,
426                Some(vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]),
427            )
428            .await
429            .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
430
431        if resp.status.as_u16() != 200 {
432            let body = String::from_utf8_lossy(&resp.body);
433            return Err(BetfairHttpError::UnexpectedStatus {
434                status: resp.status.as_u16(),
435                body: body.to_string(),
436            });
437        }
438
439        Ok(resp.body.to_vec())
440    }
441
442    async fn send_jsonrpc<T, P>(
443        &self,
444        base_url: &str,
445        method: &str,
446        params: P,
447        is_order: bool,
448    ) -> Result<T, BetfairHttpError>
449    where
450        T: DeserializeOwned,
451        P: Serialize,
452    {
453        let operation_id = format!("{base_url}#{method}");
454        let params_value = serde_json::to_value(&params)?;
455
456        let operation = || {
457            let method = method.to_string();
458            let params_value = params_value.clone();
459
460            async move {
461                let id = self.request_id.fetch_add(1, Ordering::SeqCst);
462                let request = JsonRpcRequest {
463                    jsonrpc: "2.0",
464                    method: method.clone(),
465                    params: params_value.clone(),
466                    id,
467                };
468
469                let body = serde_json::to_vec(&request)?;
470                let headers = self.build_headers("application/json").await?;
471
472                let rate_keys = if is_order {
473                    vec![BETFAIR_RATE_LIMIT_ORDERS.to_string()]
474                } else {
475                    vec![BETFAIR_RATE_LIMIT_DEFAULT.to_string()]
476                };
477
478                let resp = self
479                    .client
480                    .request(
481                        Method::POST,
482                        base_url.to_string(),
483                        None,
484                        Some(headers),
485                        Some(body),
486                        None,
487                        Some(rate_keys),
488                    )
489                    .await
490                    .map_err(|e| BetfairHttpError::NetworkError(e.to_string()))?;
491
492                let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
493                    Ok(json) => json,
494                    Err(_) => {
495                        let error_body = String::from_utf8_lossy(&resp.body);
496                        let preview: String = error_body.chars().take(500).collect();
497                        log::error!(
498                            "Non-JSON response: method={method}, status={}, body={}",
499                            resp.status.as_u16(),
500                            preview,
501                        );
502                        return Err(BetfairHttpError::UnexpectedStatus {
503                            status: resp.status.as_u16(),
504                            body: error_body.to_string(),
505                        });
506                    }
507                };
508
509                let rpc_resp: JsonRpcResponse<T> =
510                    serde_json::from_value(json_value).map_err(|e| {
511                        log::error!(
512                            "Failed to deserialize JSON-RPC response: method={method}, error={e}",
513                        );
514                        BetfairHttpError::JsonError(e.to_string())
515                    })?;
516
517                if let Some(result) = rpc_resp.result {
518                    Ok(result)
519                } else if let Some(error) = rpc_resp.error {
520                    Err(BetfairHttpError::BetfairError {
521                        code: error.code,
522                        message: error.message,
523                    })
524                } else {
525                    Err(BetfairHttpError::JsonError(
526                        "Response contains neither result nor error".to_string(),
527                    ))
528                }
529            }
530        };
531
532        let should_retry = |error: &BetfairHttpError| -> bool { error.is_retryable() };
533
534        let create_error = |msg: String| -> BetfairHttpError {
535            if msg == "canceled" {
536                BetfairHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
537            } else {
538                BetfairHttpError::NetworkError(msg)
539            }
540        };
541
542        self.retry_manager
543            .execute_with_retry_with_cancel(
544                &operation_id,
545                operation,
546                should_retry,
547                create_error,
548                &self.cancellation_token,
549            )
550            .await
551    }
552}
553
554#[cfg(test)]
555mod tests {
556    use rstest::rstest;
557
558    use super::*;
559    use crate::common::consts::{
560        BETFAIR_RATE_LIMIT_DEFAULT, BETFAIR_RATE_LIMIT_ORDERS, METHOD_LIST_MARKET_CATALOGUE,
561    };
562
563    #[rstest]
564    fn test_rate_limiter_quotas_has_expected_keys() {
565        let quotas = BetfairHttpClient::rate_limiter_quotas(5, 20).unwrap();
566        let keys: Vec<&str> = quotas.iter().map(|(k, _)| k.as_str()).collect();
567        assert!(keys.contains(&BETFAIR_RATE_LIMIT_DEFAULT));
568        assert!(keys.contains(&BETFAIR_RATE_LIMIT_ORDERS));
569    }
570
571    #[rstest]
572    fn test_default_quota_is_some() {
573        assert!(BetfairHttpClient::default_quota(5).unwrap().is_some());
574    }
575
576    #[rstest]
577    fn test_rate_limiter_quotas_reject_zero_rate_limit() {
578        let result = BetfairHttpClient::rate_limiter_quotas(0, 20);
579
580        assert!(result.is_err());
581        assert!(
582            result
583                .err()
584                .unwrap()
585                .to_string()
586                .contains("request_rate_per_second")
587        );
588    }
589
590    #[rstest]
591    fn test_json_rpc_request_serialization() {
592        let request = JsonRpcRequest {
593            jsonrpc: "2.0",
594            method: METHOD_LIST_MARKET_CATALOGUE.to_string(),
595            params: serde_json::json!({"filter": {}, "maxResults": 100}),
596            id: 1,
597        };
598
599        let json = serde_json::to_value(&request).unwrap();
600        assert_eq!(json["jsonrpc"], "2.0");
601        assert_eq!(json["method"], "SportsAPING/v1.0/listMarketCatalogue");
602        assert_eq!(json["params"]["maxResults"], 100);
603        assert_eq!(json["id"], 1);
604    }
605
606    #[rstest]
607    fn test_json_rpc_response_success() {
608        let json = r#"{"result": [1, 2, 3], "error": null}"#;
609        let resp: JsonRpcResponse<Vec<i32>> = serde_json::from_str(json).unwrap();
610        assert_eq!(resp.result, Some(vec![1, 2, 3]));
611        assert!(resp.error.is_none());
612    }
613
614    #[rstest]
615    fn test_json_rpc_response_error() {
616        let json = r#"{"result": null, "error": {"code": -32600, "message": "Invalid request"}}"#;
617        let resp: JsonRpcResponse<serde_json::Value> = serde_json::from_str(json).unwrap();
618        assert!(resp.result.is_none());
619        let error = resp.error.unwrap();
620        assert_eq!(error.code, -32600);
621        assert_eq!(error.message, "Invalid request");
622    }
623}