1use std::{borrow::Cow, collections::HashMap, str::FromStr, sync::Arc, time::Duration};
19
20use nautilus_core::collections::into_ustr_vec;
21use nautilus_cryptography::providers::install_cryptographic_provider;
22use reqwest::{
23 Method, Response, Url,
24 header::{HeaderMap, HeaderName, HeaderValue},
25};
26use ustr::Ustr;
27
28use super::{HttpClientError, HttpResponse, HttpStatus};
29use crate::ratelimiter::{RateLimiter, clock::MonotonicClock, quota::Quota};
30
31const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 32;
33
34const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 60;
36
37const DEFAULT_HTTP2_KEEP_ALIVE_SECS: u64 = 30;
39
40#[derive(Clone, Debug)]
50#[cfg_attr(
51 feature = "python",
52 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.network", from_py_object)
53)]
54#[cfg_attr(
55 feature = "python",
56 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.network")
57)]
58pub struct HttpClient {
59 pub(crate) client: InnerHttpClient,
61 pub(crate) rate_limiter: Arc<RateLimiter<Ustr, MonotonicClock>>,
63}
64
65impl HttpClient {
66 pub fn new(
73 headers: HashMap<String, String>,
74 header_keys: Vec<String>,
75 keyed_quotas: Vec<(String, Quota)>,
76 default_quota: Option<Quota>,
77 timeout_secs: Option<u64>,
78 proxy_url: Option<String>,
79 ) -> Result<Self, HttpClientError> {
80 install_cryptographic_provider();
81
82 let mut header_map = HeaderMap::new();
84
85 for (key, value) in headers {
86 let header_name = HeaderName::from_str(&key)
87 .map_err(|e| HttpClientError::Error(format!("Invalid header name '{key}': {e}")))?;
88 let header_value = HeaderValue::from_str(&value).map_err(|e| {
89 HttpClientError::Error(format!("Invalid header value '{value}': {e}"))
90 })?;
91 header_map.insert(header_name, header_value);
92 }
93
94 let mut client_builder = reqwest::Client::builder()
95 .default_headers(header_map)
96 .tcp_nodelay(true)
97 .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
98 .pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
99 .http2_keep_alive_interval(Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_SECS))
100 .http2_keep_alive_while_idle(true)
101 .http2_adaptive_window(true);
102
103 if let Some(timeout_secs) = timeout_secs {
104 client_builder = client_builder.timeout(Duration::from_secs(timeout_secs));
105 }
106
107 if let Some(proxy_url) = proxy_url {
109 let proxy = reqwest::Proxy::all(&proxy_url)
110 .map_err(|e| HttpClientError::InvalidProxy(format!("{proxy_url}: {e}")))?;
111 client_builder = client_builder.proxy(proxy);
112 }
113
114 let client = client_builder
115 .build()
116 .map_err(|e| HttpClientError::ClientBuildError(e.to_string()))?;
117
118 let (valid_keys, header_names): (Vec<String>, Vec<HeaderName>) = header_keys
120 .into_iter()
121 .filter_map(|k| HeaderName::from_str(&k).ok().map(|name| (k, name)))
122 .unzip();
123
124 let client = InnerHttpClient {
125 client,
126 header_keys: Arc::new(valid_keys),
127 header_names: Arc::new(header_names),
128 };
129
130 let keyed_quotas = keyed_quotas
131 .into_iter()
132 .map(|(key, quota)| (Ustr::from(&key), quota))
133 .collect();
134
135 let rate_limiter = Arc::new(RateLimiter::new_with_quota(default_quota, keyed_quotas));
136
137 Ok(Self {
138 client,
139 rate_limiter,
140 })
141 }
142
143 #[expect(clippy::too_many_arguments)]
153 pub async fn request(
154 &self,
155 method: Method,
156 url: String,
157 params: Option<&HashMap<String, Vec<String>>>,
158 headers: Option<HashMap<String, String>>,
159 body: Option<Vec<u8>>,
160 timeout_secs: Option<u64>,
161 keys: Option<Vec<String>>,
162 ) -> Result<HttpResponse, HttpClientError> {
163 let keys = keys.map(into_ustr_vec);
164
165 self.request_with_ustr_keys(method, url, params, headers, body, timeout_secs, keys)
166 .await
167 }
168
169 #[expect(clippy::too_many_arguments)]
179 pub async fn request_with_params<P: serde::Serialize>(
180 &self,
181 method: Method,
182 url: String,
183 params: Option<&P>,
184 headers: Option<HashMap<String, String>>,
185 body: Option<Vec<u8>>,
186 timeout_secs: Option<u64>,
187 keys: Option<Vec<String>>,
188 ) -> Result<HttpResponse, HttpClientError> {
189 let keys = keys.map(into_ustr_vec);
190 let rate_limiter = self.rate_limiter.clone();
191 rate_limiter.await_keys_ready(keys.as_deref()).await;
192
193 self.client
194 .send_request_with_query(method, url, params, headers, body, timeout_secs)
195 .await
196 }
197
198 #[expect(clippy::too_many_arguments)]
204 pub async fn request_with_ustr_keys(
205 &self,
206 method: Method,
207 url: String,
208 params: Option<&HashMap<String, Vec<String>>>,
209 headers: Option<HashMap<String, String>>,
210 body: Option<Vec<u8>>,
211 timeout_secs: Option<u64>,
212 keys: Option<Vec<Ustr>>,
213 ) -> Result<HttpResponse, HttpClientError> {
214 let rate_limiter = self.rate_limiter.clone();
215 rate_limiter.await_keys_ready(keys.as_deref()).await;
216
217 self.client
218 .send_request(method, url, params, headers, body, timeout_secs)
219 .await
220 }
221
222 pub async fn get(
228 &self,
229 url: String,
230 params: Option<&HashMap<String, Vec<String>>>,
231 headers: Option<HashMap<String, String>>,
232 timeout_secs: Option<u64>,
233 keys: Option<Vec<String>>,
234 ) -> Result<HttpResponse, HttpClientError> {
235 self.request(Method::GET, url, params, headers, None, timeout_secs, keys)
236 .await
237 }
238
239 pub async fn post(
245 &self,
246 url: String,
247 params: Option<&HashMap<String, Vec<String>>>,
248 headers: Option<HashMap<String, String>>,
249 body: Option<Vec<u8>>,
250 timeout_secs: Option<u64>,
251 keys: Option<Vec<String>>,
252 ) -> Result<HttpResponse, HttpClientError> {
253 self.request(Method::POST, url, params, headers, body, timeout_secs, keys)
254 .await
255 }
256
257 pub async fn patch(
263 &self,
264 url: String,
265 params: Option<&HashMap<String, Vec<String>>>,
266 headers: Option<HashMap<String, String>>,
267 body: Option<Vec<u8>>,
268 timeout_secs: Option<u64>,
269 keys: Option<Vec<String>>,
270 ) -> Result<HttpResponse, HttpClientError> {
271 self.request(
272 Method::PATCH,
273 url,
274 params,
275 headers,
276 body,
277 timeout_secs,
278 keys,
279 )
280 .await
281 }
282
283 pub async fn delete(
289 &self,
290 url: String,
291 params: Option<&HashMap<String, Vec<String>>>,
292 headers: Option<HashMap<String, String>>,
293 timeout_secs: Option<u64>,
294 keys: Option<Vec<String>>,
295 ) -> Result<HttpResponse, HttpClientError> {
296 self.request(
297 Method::DELETE,
298 url,
299 params,
300 headers,
301 None,
302 timeout_secs,
303 keys,
304 )
305 .await
306 }
307}
308
309#[derive(Clone, Debug)]
318pub struct InnerHttpClient {
319 pub(crate) client: reqwest::Client,
320 pub(crate) header_keys: Arc<Vec<String>>,
321 pub(crate) header_names: Arc<Vec<HeaderName>>,
322}
323
324impl InnerHttpClient {
325 pub async fn send_request(
331 &self,
332 method: Method,
333 url: String,
334 params: Option<&HashMap<String, Vec<String>>>,
335 headers: Option<HashMap<String, String>>,
336 body: Option<Vec<u8>>,
337 timeout_secs: Option<u64>,
338 ) -> Result<HttpResponse, HttpClientError> {
339 let full_url = encode_url_params(&url, params)?;
340 self.send_request_internal(
341 method,
342 full_url.as_ref(),
343 None::<&()>,
344 headers,
345 body,
346 timeout_secs,
347 )
348 .await
349 }
350
351 pub async fn send_request_with_query<Q: serde::Serialize>(
360 &self,
361 method: Method,
362 url: String,
363 query: Option<&Q>,
364 headers: Option<HashMap<String, String>>,
365 body: Option<Vec<u8>>,
366 timeout_secs: Option<u64>,
367 ) -> Result<HttpResponse, HttpClientError> {
368 self.send_request_internal(method, &url, query, headers, body, timeout_secs)
369 .await
370 }
371
372 async fn send_request_internal<Q: serde::Serialize>(
378 &self,
379 method: Method,
380 url: &str,
381 query: Option<&Q>,
382 headers: Option<HashMap<String, String>>,
383 body: Option<Vec<u8>>,
384 timeout_secs: Option<u64>,
385 ) -> Result<HttpResponse, HttpClientError> {
386 let reqwest_url =
387 Url::parse(url).map_err(|e| HttpClientError::from(format!("URL parse error: {e}")))?;
388
389 let mut request_builder = self.client.request(method, reqwest_url);
390
391 if let Some(headers) = headers {
392 let mut header_map = HeaderMap::with_capacity(headers.len());
393 for (header_key, header_value) in &headers {
394 let key = HeaderName::from_bytes(header_key.as_bytes())
395 .map_err(|e| HttpClientError::from(format!("Invalid header name: {e}")))?;
396
397 if let Some(old_value) = header_map.insert(
398 key.clone(),
399 header_value
400 .parse()
401 .map_err(|e| HttpClientError::from(format!("Invalid header value: {e}")))?,
402 ) {
403 log::trace!("Replaced header '{key}': old={old_value:?}, new={header_value}");
404 }
405 }
406 request_builder = request_builder.headers(header_map);
407 }
408
409 if let Some(q) = query {
410 request_builder = request_builder.query(q);
411 }
412
413 if let Some(timeout_secs) = timeout_secs {
414 request_builder = request_builder.timeout(Duration::new(timeout_secs, 0));
415 }
416
417 let request = match body {
418 Some(b) => request_builder
419 .body(b)
420 .build()
421 .map_err(HttpClientError::from)?,
422 None => request_builder.build().map_err(HttpClientError::from)?,
423 };
424
425 log::trace!("{} {}", request.method(), request.url());
426
427 let response = self
428 .client
429 .execute(request)
430 .await
431 .map_err(HttpClientError::from)?;
432
433 self.to_response(response).await
434 }
435
436 pub async fn to_response(&self, response: Response) -> Result<HttpResponse, HttpClientError> {
444 log::trace!("{response:?}");
445
446 let resp_headers = response.headers();
447 let mut headers =
448 HashMap::with_capacity(std::cmp::min(self.header_names.len(), resp_headers.len()));
449
450 for (name, key_str) in self.header_names.iter().zip(self.header_keys.iter()) {
451 if let Some(val) = resp_headers.get(name)
452 && let Ok(v) = val.to_str()
453 {
454 headers.insert(key_str.clone(), v.to_owned());
455 }
456 }
457
458 let status = HttpStatus::new(response.status());
459 let body = response.bytes().await.map_err(HttpClientError::from)?;
460
461 Ok(HttpResponse {
462 status,
463 headers,
464 body,
465 })
466 }
467}
468
469impl Default for InnerHttpClient {
470 fn default() -> Self {
474 install_cryptographic_provider();
475 let client = reqwest::Client::new();
476 Self {
477 client,
478 header_keys: Arc::default(),
479 header_names: Arc::default(),
480 }
481 }
482}
483
484fn encode_url_params<'a>(
490 url: &'a str,
491 params: Option<&HashMap<String, Vec<String>>>,
492) -> Result<Cow<'a, str>, HttpClientError> {
493 let Some(params) = params else {
494 return Ok(Cow::Borrowed(url));
495 };
496
497 let pairs: Vec<(&str, &str)> = params
498 .iter()
499 .flat_map(|(key, values)| {
500 values
501 .iter()
502 .map(move |value| (key.as_str(), value.as_str()))
503 })
504 .collect();
505
506 if pairs.is_empty() {
507 return Ok(Cow::Borrowed(url));
508 }
509
510 let query_string = serde_urlencoded::to_string(pairs)
511 .map_err(|e| HttpClientError::Error(format!("Failed to encode params: {e}")))?;
512
513 let separator = if url.contains('?') { '&' } else { '?' };
514 Ok(Cow::Owned(format!("{url}{separator}{query_string}")))
515}
516
517#[cfg(test)]
518#[cfg(target_os = "linux")] mod tests {
520 use std::net::SocketAddr;
521
522 use axum::{
523 Router,
524 routing::{delete, get, patch, post},
525 serve,
526 };
527 use http::status::StatusCode;
528 use rstest::rstest;
529
530 use super::*;
531
532 fn create_router() -> Router {
533 Router::new()
534 .route("/get", get(|| async { "hello-world!" }))
535 .route("/post", post(|| async { StatusCode::OK }))
536 .route("/patch", patch(|| async { StatusCode::OK }))
537 .route("/delete", delete(|| async { StatusCode::OK }))
538 .route("/notfound", get(|| async { StatusCode::NOT_FOUND }))
539 .route(
540 "/slow",
541 get(|| async {
542 tokio::time::sleep(Duration::from_secs(2)).await;
543 "Eventually responded"
544 }),
545 )
546 }
547
548 async fn start_test_server() -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
549 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
550 let addr = listener.local_addr().unwrap();
551
552 tokio::spawn(async move {
553 serve(listener, create_router()).await.unwrap();
554 });
555
556 Ok(addr)
557 }
558
559 #[tokio::test]
560 async fn test_get() {
561 let addr = start_test_server().await.unwrap();
562 let url = format!("http://{addr}");
563
564 let client = InnerHttpClient::default();
565 let response = client
566 .send_request(
567 reqwest::Method::GET,
568 format!("{url}/get"),
569 None,
570 None,
571 None,
572 None,
573 )
574 .await
575 .unwrap();
576
577 assert!(response.status.is_success());
578 assert_eq!(String::from_utf8_lossy(&response.body), "hello-world!");
579 }
580
581 #[tokio::test]
582 async fn test_post() {
583 let addr = start_test_server().await.unwrap();
584 let url = format!("http://{addr}");
585
586 let client = InnerHttpClient::default();
587 let response = client
588 .send_request(
589 reqwest::Method::POST,
590 format!("{url}/post"),
591 None,
592 None,
593 None,
594 None,
595 )
596 .await
597 .unwrap();
598
599 assert!(response.status.is_success());
600 }
601
602 #[tokio::test]
603 async fn test_post_with_body() {
604 let addr = start_test_server().await.unwrap();
605 let url = format!("http://{addr}");
606
607 let client = InnerHttpClient::default();
608
609 let mut body = HashMap::new();
610 body.insert(
611 "key1".to_string(),
612 serde_json::Value::String("value1".to_string()),
613 );
614 body.insert(
615 "key2".to_string(),
616 serde_json::Value::String("value2".to_string()),
617 );
618
619 let body_string = serde_json::to_string(&body).unwrap();
620 let body_bytes = body_string.into_bytes();
621
622 let response = client
623 .send_request(
624 reqwest::Method::POST,
625 format!("{url}/post"),
626 None,
627 None,
628 Some(body_bytes),
629 None,
630 )
631 .await
632 .unwrap();
633
634 assert!(response.status.is_success());
635 }
636
637 #[tokio::test]
638 async fn test_patch() {
639 let addr = start_test_server().await.unwrap();
640 let url = format!("http://{addr}");
641
642 let client = InnerHttpClient::default();
643 let response = client
644 .send_request(
645 reqwest::Method::PATCH,
646 format!("{url}/patch"),
647 None,
648 None,
649 None,
650 None,
651 )
652 .await
653 .unwrap();
654
655 assert!(response.status.is_success());
656 }
657
658 #[tokio::test]
659 async fn test_delete() {
660 let addr = start_test_server().await.unwrap();
661 let url = format!("http://{addr}");
662
663 let client = InnerHttpClient::default();
664 let response = client
665 .send_request(
666 reqwest::Method::DELETE,
667 format!("{url}/delete"),
668 None,
669 None,
670 None,
671 None,
672 )
673 .await
674 .unwrap();
675
676 assert!(response.status.is_success());
677 }
678
679 #[tokio::test]
680 async fn test_not_found() {
681 let addr = start_test_server().await.unwrap();
682 let url = format!("http://{addr}/notfound");
683 let client = InnerHttpClient::default();
684
685 let response = client
686 .send_request(reqwest::Method::GET, url, None, None, None, None)
687 .await
688 .unwrap();
689
690 assert!(response.status.is_client_error());
691 assert_eq!(response.status.as_u16(), 404);
692 }
693
694 #[tokio::test]
695 async fn test_timeout() {
696 let addr = start_test_server().await.unwrap();
697 let url = format!("http://{addr}/slow");
698 let client = InnerHttpClient::default();
699
700 let result = client
702 .send_request(reqwest::Method::GET, url, None, None, None, Some(1))
703 .await;
704
705 match result {
706 Err(HttpClientError::TimeoutError(msg)) => {
707 println!("Got expected timeout error: {msg}");
708 }
709 Err(e) => panic!("Expected a timeout error, was: {e:?}"),
710 Ok(resp) => panic!("Expected a timeout error, but was a successful response: {resp:?}"),
711 }
712 }
713
714 #[rstest]
715 fn test_http_client_without_proxy() {
716 let result = HttpClient::new(
718 HashMap::new(),
719 vec![],
720 vec![],
721 None,
722 None,
723 None, );
725
726 assert!(result.is_ok());
727 }
728
729 #[rstest]
730 fn test_http_client_with_valid_proxy() {
731 let result = HttpClient::new(
733 HashMap::new(),
734 vec![],
735 vec![],
736 None,
737 None,
738 Some("http://proxy.example.com:8080".to_string()),
739 );
740
741 assert!(result.is_ok());
742 }
743
744 #[rstest]
745 fn test_http_client_with_socks5_proxy() {
746 let result = HttpClient::new(
748 HashMap::new(),
749 vec![],
750 vec![],
751 None,
752 None,
753 Some("socks5://127.0.0.1:1080".to_string()),
754 );
755
756 assert!(result.is_ok());
757 }
758
759 #[rstest]
760 fn test_http_client_with_malformed_proxy() {
761 let result = HttpClient::new(
765 HashMap::new(),
766 vec![],
767 vec![],
768 None,
769 None,
770 Some("://invalid".to_string()),
771 );
772
773 assert!(result.is_err());
774 assert!(matches!(result, Err(HttpClientError::InvalidProxy(_))));
775 }
776
777 #[rstest]
778 fn test_http_client_with_empty_proxy_string() {
779 let result = HttpClient::new(
781 HashMap::new(),
782 vec![],
783 vec![],
784 None,
785 None,
786 Some(String::new()),
787 );
788
789 assert!(result.is_err());
790 assert!(matches!(result, Err(HttpClientError::InvalidProxy(_))));
791 }
792
793 #[tokio::test]
794 async fn test_http_client_get() {
795 let addr = start_test_server().await.unwrap();
796 let url = format!("http://{addr}/get");
797
798 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
799 let response = client.get(url, None, None, None, None).await.unwrap();
800
801 assert!(response.status.is_success());
802 assert_eq!(String::from_utf8_lossy(&response.body), "hello-world!");
803 }
804
805 #[tokio::test]
806 async fn test_http_client_post() {
807 let addr = start_test_server().await.unwrap();
808 let url = format!("http://{addr}/post");
809
810 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
811 let response = client
812 .post(url, None, None, None, None, None)
813 .await
814 .unwrap();
815
816 assert!(response.status.is_success());
817 }
818
819 #[tokio::test]
820 async fn test_http_client_patch() {
821 let addr = start_test_server().await.unwrap();
822 let url = format!("http://{addr}/patch");
823
824 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
825 let response = client
826 .patch(url, None, None, None, None, None)
827 .await
828 .unwrap();
829
830 assert!(response.status.is_success());
831 }
832
833 #[tokio::test]
834 async fn test_http_client_delete() {
835 let addr = start_test_server().await.unwrap();
836 let url = format!("http://{addr}/delete");
837
838 let client = HttpClient::new(HashMap::new(), vec![], vec![], None, None, None).unwrap();
839 let response = client.delete(url, None, None, None, None).await.unwrap();
840
841 assert!(response.status.is_success());
842 }
843}