1use std::{
23 collections::HashMap,
24 num::NonZeroU32,
25 sync::{Arc, LazyLock},
26};
27
28use arc_swap::ArcSwap;
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31 AtomicMap, UnixNanos,
32 consts::NAUTILUS_USER_AGENT,
33 time::{AtomicTime, get_atomic_clock_realtime},
34};
35use nautilus_model::{
36 enums::{OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{MarginBalance, Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, HttpClientError, HttpResponse, Method, USER_AGENT},
45 ratelimiter::quota::Quota,
46 retry::{RetryConfig, RetryManager},
47};
48use rust_decimal::Decimal;
49use serde_json::Value;
50use tokio_util::sync::CancellationToken;
51use url::form_urlencoded;
52use ustr::Ustr;
53
54use crate::{
55 common::{
56 consts::{ACCOUNTS_PAGE_LIMIT, ORDER_STATUS_OPEN, REST_API_PATH},
57 credential::CoinbaseCredential,
58 enums::{
59 CoinbaseEnvironment, CoinbaseMarginType, CoinbaseOrderSide, CoinbaseProductType,
60 CoinbaseStopDirection,
61 },
62 parse::format_rfc3339_from_nanos,
63 urls,
64 },
65 http::{
66 error::{Error, Result},
67 models::{
68 Account, AccountsResponse, CancelOrdersResponse, CfmBalanceSummary,
69 CfmBalanceSummaryResponse, CfmPositionResponse, CfmPositionsResponse,
70 CreateOrderResponse, EditOrderResponse, Fill, FillsResponse, Order, OrderResponse,
71 OrdersListResponse, ProductsResponse,
72 },
73 parse::{
74 parse_account_state, parse_cfm_account_state, parse_cfm_margin_balances,
75 parse_cfm_position_status_report, parse_fill_report, parse_instrument,
76 parse_order_status_report,
77 },
78 query::{
79 CancelOrdersRequest, CreateOrderRequest, EditOrderRequest, FillListQuery, LimitFok,
80 LimitFokParams, LimitGtc, LimitGtcParams, LimitGtd, LimitGtdParams, MarketFok,
81 MarketIoc, MarketParams, OrderConfiguration, OrderListQuery, StopLimitGtc,
82 StopLimitGtcParams, StopLimitGtd, StopLimitGtdParams,
83 },
84 },
85};
86
87pub static COINBASE_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
89 Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant")
90});
91
92#[must_use]
94pub fn default_retry_config() -> RetryConfig {
95 RetryConfig {
96 max_retries: 3,
97 initial_delay_ms: 100,
98 max_delay_ms: 5_000,
99 backoff_factor: 2.0,
100 jitter_ms: 250,
101 operation_timeout_ms: Some(60_000),
102 immediate_first: false,
103 max_elapsed_ms: Some(180_000),
104 }
105}
106
107#[must_use]
113pub fn data_client_retry_config() -> RetryConfig {
114 RetryConfig {
115 max_retries: 0,
116 initial_delay_ms: 100,
117 max_delay_ms: 100,
118 backoff_factor: 1.0,
119 jitter_ms: 0,
120 operation_timeout_ms: None,
121 immediate_first: false,
122 max_elapsed_ms: None,
123 }
124}
125
126fn encode_query(params: &[(&str, &str)]) -> String {
131 let mut serializer = form_urlencoded::Serializer::new(String::new());
132 for (k, v) in params {
133 serializer.append_pair(k, v);
134 }
135 serializer.finish()
136}
137
138#[derive(Debug)]
143pub struct CoinbaseRawHttpClient {
144 client: HttpClient,
145 credential: Option<CoinbaseCredential>,
146 base_url: ArcSwap<String>,
147 environment: CoinbaseEnvironment,
148 retry_manager: RetryManager<Error>,
149 cancellation_token: CancellationToken,
150}
151
152impl CoinbaseRawHttpClient {
153 pub fn new(
159 environment: CoinbaseEnvironment,
160 timeout_secs: u64,
161 proxy_url: Option<String>,
162 retry_config: Option<RetryConfig>,
163 ) -> std::result::Result<Self, HttpClientError> {
164 Ok(Self {
165 client: HttpClient::new(
166 Self::default_headers(),
167 vec![],
168 vec![],
169 Some(*COINBASE_REST_QUOTA),
170 Some(timeout_secs),
171 proxy_url,
172 )?,
173 credential: None,
174 base_url: ArcSwap::from_pointee(urls::rest_url(environment).to_string()),
175 environment,
176 retry_manager: RetryManager::new(retry_config.unwrap_or_else(default_retry_config)),
177 cancellation_token: CancellationToken::new(),
178 })
179 }
180
181 pub fn with_credentials(
187 credential: CoinbaseCredential,
188 environment: CoinbaseEnvironment,
189 timeout_secs: u64,
190 proxy_url: Option<String>,
191 retry_config: Option<RetryConfig>,
192 ) -> std::result::Result<Self, HttpClientError> {
193 Ok(Self {
194 client: HttpClient::new(
195 Self::default_headers(),
196 vec![],
197 vec![],
198 Some(*COINBASE_REST_QUOTA),
199 Some(timeout_secs),
200 proxy_url,
201 )?,
202 credential: Some(credential),
203 base_url: ArcSwap::from_pointee(urls::rest_url(environment).to_string()),
204 environment,
205 retry_manager: RetryManager::new(retry_config.unwrap_or_else(default_retry_config)),
206 cancellation_token: CancellationToken::new(),
207 })
208 }
209
210 pub fn from_env(environment: CoinbaseEnvironment) -> Result<Self> {
216 let credential = CoinbaseCredential::from_env()
217 .map_err(|e| Error::auth(format!("Missing credentials in environment: {e}")))?;
218 Self::with_credentials(credential, environment, 10, None, None)
219 .map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
220 }
221
222 pub fn from_credentials(
228 api_key: &str,
229 api_secret: &str,
230 environment: CoinbaseEnvironment,
231 timeout_secs: u64,
232 proxy_url: Option<String>,
233 retry_config: Option<RetryConfig>,
234 ) -> Result<Self> {
235 let credential = CoinbaseCredential::new(api_key.to_string(), api_secret.to_string());
236 Self::with_credentials(
237 credential,
238 environment,
239 timeout_secs,
240 proxy_url,
241 retry_config,
242 )
243 .map_err(|e| Error::auth(format!("Failed to create HTTP client: {e}")))
244 }
245
246 #[must_use]
248 pub fn cancellation_token(&self) -> &CancellationToken {
249 &self.cancellation_token
250 }
251
252 pub fn set_base_url(&self, url: String) {
256 self.base_url.store(Arc::new(url));
257 }
258
259 #[must_use]
261 pub fn environment(&self) -> CoinbaseEnvironment {
262 self.environment
263 }
264
265 #[must_use]
267 pub fn is_authenticated(&self) -> bool {
268 self.credential.is_some()
269 }
270
271 fn default_headers() -> HashMap<String, String> {
272 HashMap::from([
273 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
274 ("Content-Type".to_string(), "application/json".to_string()),
275 ])
276 }
277
278 fn build_url(&self, path: &str) -> String {
279 format!("{}{REST_API_PATH}{path}", self.base_url.load())
280 }
281
282 fn build_jwt_uri(&self, method: &str, path: &str) -> String {
284 let base = self.base_url.load();
285 let host = base
286 .strip_prefix("https://")
287 .or_else(|| base.strip_prefix("http://"))
288 .unwrap_or(base.as_str());
289 format!("{method} {host}{REST_API_PATH}{path}")
290 }
291
292 fn auth_headers(&self, method: &str, path: &str) -> Result<HashMap<String, String>> {
293 let credential = self
294 .credential
295 .as_ref()
296 .ok_or_else(|| Error::auth("No credentials configured"))?;
297
298 let uri = self.build_jwt_uri(method, path);
299 let jwt = credential.build_rest_jwt(&uri)?;
300
301 Ok(HashMap::from([(
302 "Authorization".to_string(),
303 format!("Bearer {jwt}"),
304 )]))
305 }
306
307 fn parse_response(&self, response: &HttpResponse) -> Result<Value> {
308 if !response.status.is_success() {
309 return Err(Error::from_http_status(
310 response.status.as_u16(),
311 &response.body,
312 ));
313 }
314
315 if response.body.is_empty() {
316 return Ok(Value::Null);
317 }
318
319 serde_json::from_slice(&response.body).map_err(Error::Serde)
320 }
321
322 async fn send_request(
327 &self,
328 method: Method,
329 url: String,
330 sign_method: Option<&'static str>,
331 sign_path: Option<&str>,
332 body: Option<Vec<u8>>,
333 ) -> Result<Value> {
334 let sign_path_owned = sign_path.map(ToOwned::to_owned);
335 let operation_name = sign_path_owned
336 .as_deref()
337 .unwrap_or(url.as_str())
338 .to_string();
339
340 let is_idempotent = matches!(method, Method::GET | Method::DELETE);
341
342 let operation = || {
343 let method = method.clone();
344 let url = url.clone();
345 let body = body.clone();
346 let sign_path = sign_path_owned.clone();
347
348 async move {
349 let headers = match (sign_method, sign_path.as_deref()) {
350 (Some(m), Some(p)) => Some(self.auth_headers(m, p)?),
351 _ => None,
352 };
353
354 let response = self
355 .client
356 .request(method, url, None, headers, body, None, None)
357 .await
358 .map_err(Error::from_http_client)?;
359
360 self.parse_response(&response)
361 }
362 };
363
364 let should_retry = move |err: &Error| is_idempotent && err.is_retryable();
365
366 self.retry_manager
367 .execute_with_retry_with_cancel(
368 &operation_name,
369 operation,
370 should_retry,
371 Error::transport,
372 &self.cancellation_token,
373 )
374 .await
375 }
376
377 pub async fn get_public(&self, path: &str) -> Result<Value> {
379 let url = self.build_url(path);
380 self.send_request(Method::GET, url, None, None, None).await
381 }
382
383 pub async fn get_public_with_query(&self, path: &str, query: &str) -> Result<Value> {
385 let full_path = if query.is_empty() {
386 path.to_string()
387 } else {
388 format!("{path}?{query}")
389 };
390 let url = self.build_url(&full_path);
391 self.send_request(Method::GET, url, None, None, None).await
392 }
393
394 pub async fn get(&self, path: &str) -> Result<Value> {
396 let url = self.build_url(path);
397 self.send_request(Method::GET, url, Some("GET"), Some(path), None)
398 .await
399 }
400
401 pub async fn get_with_query(&self, path: &str, query: &str) -> Result<Value> {
407 let full_url_path = if query.is_empty() {
408 path.to_string()
409 } else {
410 format!("{path}?{query}")
411 };
412 let url = self.build_url(&full_url_path);
413 self.send_request(Method::GET, url, Some("GET"), Some(path), None)
415 .await
416 }
417
418 pub async fn post(&self, path: &str, body: &Value) -> Result<Value> {
420 let url = self.build_url(path);
421 let body_bytes = serde_json::to_vec(body).map_err(Error::Serde)?;
422 self.send_request(
423 Method::POST,
424 url,
425 Some("POST"),
426 Some(path),
427 Some(body_bytes),
428 )
429 .await
430 }
431
432 pub async fn delete(&self, path: &str) -> Result<Value> {
434 let url = self.build_url(path);
435 self.send_request(Method::DELETE, url, Some("DELETE"), Some(path), None)
436 .await
437 }
438
439 pub async fn get_products(&self) -> Result<Value> {
441 self.get_public("/market/products").await
442 }
443
444 pub async fn get_product(&self, product_id: &str) -> Result<Value> {
446 self.get_public(&format!("/market/products/{product_id}"))
447 .await
448 }
449
450 pub async fn get_candles(
452 &self,
453 product_id: &str,
454 start: &str,
455 end: &str,
456 granularity: &str,
457 ) -> Result<Value> {
458 let query = format!("start={start}&end={end}&granularity={granularity}");
459 self.get_public_with_query(&format!("/market/products/{product_id}/candles"), &query)
460 .await
461 }
462
463 pub async fn get_market_trades(&self, product_id: &str, limit: u32) -> Result<Value> {
465 let query = format!("limit={limit}");
466 self.get_public_with_query(&format!("/market/products/{product_id}/ticker"), &query)
467 .await
468 }
469
470 pub async fn get_best_bid_ask(&self, product_ids: &[&str]) -> Result<Value> {
475 let query = product_ids
476 .iter()
477 .map(|id| format!("product_ids={id}"))
478 .collect::<Vec<_>>()
479 .join("&");
480 self.get_with_query("/best_bid_ask", &query).await
481 }
482
483 pub async fn get_product_book(&self, product_id: &str, limit: Option<u32>) -> Result<Value> {
485 let mut query = format!("product_id={product_id}");
486
487 if let Some(limit) = limit {
488 query.push_str(&format!("&limit={limit}"));
489 }
490 self.get_public_with_query("/market/product_book", &query)
491 .await
492 }
493
494 pub async fn get_accounts(&self) -> Result<Value> {
496 self.get("/accounts").await
497 }
498
499 pub async fn get_accounts_with_query(&self, query: &str) -> Result<Value> {
501 if query.is_empty() {
502 self.get("/accounts").await
503 } else {
504 self.get_with_query("/accounts", query).await
505 }
506 }
507
508 pub async fn get_account(&self, account_id: &str) -> Result<Value> {
510 self.get(&format!("/accounts/{account_id}")).await
511 }
512
513 pub async fn get_portfolios(&self) -> Result<Value> {
515 self.get("/portfolios").await
516 }
517
518 pub async fn get_orders(&self, query: &str) -> Result<Value> {
520 self.get_with_query("/orders/historical/batch", query).await
521 }
522
523 pub async fn get_order(&self, order_id: &str) -> Result<Value> {
525 self.get(&format!("/orders/historical/{order_id}")).await
526 }
527
528 pub async fn get_fills(&self, query: &str) -> Result<Value> {
530 self.get_with_query("/orders/historical/fills", query).await
531 }
532
533 pub async fn get_transaction_summary(&self) -> Result<Value> {
535 self.get("/transaction_summary").await
536 }
537
538 pub async fn get_cfm_balance_summary(&self) -> Result<CfmBalanceSummaryResponse> {
544 let json = self.get("/cfm/balance_summary").await?;
545 serde_json::from_value(json).map_err(Error::Serde)
546 }
547
548 pub async fn get_cfm_positions(&self) -> Result<CfmPositionsResponse> {
554 let json = self.get("/cfm/positions").await?;
555 serde_json::from_value(json).map_err(Error::Serde)
556 }
557
558 pub async fn get_cfm_position(&self, product_id: &str) -> Result<CfmPositionResponse> {
564 let json = self.get(&format!("/cfm/positions/{product_id}")).await?;
565 serde_json::from_value(json).map_err(Error::Serde)
566 }
567
568 pub async fn fetch_all_accounts(&self) -> Result<Vec<Account>> {
573 let mut all = Vec::new();
574 let mut cursor: Option<String> = None;
575
576 loop {
577 let mut pairs: Vec<(&str, &str)> = vec![("limit", ACCOUNTS_PAGE_LIMIT)];
578 if let Some(c) = cursor.as_deref().filter(|s| !s.is_empty()) {
579 pairs.push(("cursor", c));
580 }
581 let query_str = encode_query(&pairs);
582
583 let json = self.get_accounts_with_query(&query_str).await?;
584 let response: AccountsResponse = serde_json::from_value(json).map_err(Error::Serde)?;
585
586 all.extend(response.accounts);
587
588 if !response.has_next || response.cursor.is_empty() {
589 break;
590 }
591 cursor = Some(response.cursor);
592 }
593
594 Ok(all)
595 }
596
597 pub async fn fetch_all_orders(&self, query: &OrderListQuery) -> Result<Vec<Order>> {
603 let mut collected: Vec<Order> = Vec::new();
604 let mut cursor: Option<String> = None;
605
606 loop {
607 let start_str = query.start.map(|s| s.to_rfc3339());
608 let end_str = query.end.map(|e| e.to_rfc3339());
609 let limit_str = query.limit.map(|l| l.to_string());
610
611 let mut pairs: Vec<(&str, &str)> = Vec::new();
612
613 if let Some(pid) = query.product_id.as_deref() {
616 pairs.push(("product_ids", pid));
617 }
618
619 if query.open_only {
620 pairs.push(("order_status", ORDER_STATUS_OPEN));
621 }
622
623 if let Some(s) = start_str.as_deref() {
624 pairs.push(("start_date", s));
625 }
626
627 if let Some(e) = end_str.as_deref() {
628 pairs.push(("end_date", e));
629 }
630
631 if let Some(l) = limit_str.as_deref() {
632 pairs.push(("limit", l));
633 }
634
635 if let Some(c) = cursor.as_deref().filter(|s| !s.is_empty()) {
636 pairs.push(("cursor", c));
637 }
638
639 let query_str = encode_query(&pairs);
640 let json = self.get_orders(&query_str).await?;
641 let response: OrdersListResponse =
642 serde_json::from_value(json).map_err(Error::Serde)?;
643
644 for order in response.orders {
645 if let Some(cid) = query.client_order_id_filter.as_deref()
646 && order.client_order_id != cid
647 {
648 continue;
649 }
650 collected.push(order);
651 }
652
653 if let Some(limit) = query.limit
654 && collected.len() >= limit as usize
655 {
656 collected.truncate(limit as usize);
657 break;
658 }
659
660 if !response.has_next || response.cursor.is_empty() {
661 break;
662 }
663 cursor = Some(response.cursor);
664 }
665
666 Ok(collected)
667 }
668
669 pub async fn fetch_all_fills(&self, query: &FillListQuery) -> Result<Vec<Fill>> {
671 let mut collected: Vec<Fill> = Vec::new();
672 let mut cursor: Option<String> = None;
673
674 loop {
675 let start_str = query.start.map(|s| s.to_rfc3339());
676 let end_str = query.end.map(|e| e.to_rfc3339());
677 let limit_str = query.limit.map(|l| l.to_string());
678
679 let mut pairs: Vec<(&str, &str)> = Vec::new();
680
681 if let Some(pid) = query.product_id.as_deref() {
685 pairs.push(("product_ids", pid));
686 }
687
688 if let Some(vid) = query.venue_order_id.as_deref() {
689 pairs.push(("order_ids", vid));
690 }
691
692 if let Some(s) = start_str.as_deref() {
693 pairs.push(("start_sequence_timestamp", s));
694 }
695
696 if let Some(e) = end_str.as_deref() {
697 pairs.push(("end_sequence_timestamp", e));
698 }
699
700 if let Some(l) = limit_str.as_deref() {
701 pairs.push(("limit", l));
702 }
703
704 if let Some(c) = cursor.as_deref().filter(|s| !s.is_empty()) {
705 pairs.push(("cursor", c));
706 }
707
708 let query_str = encode_query(&pairs);
709 let json = self.get_fills(&query_str).await?;
710 let response: FillsResponse = serde_json::from_value(json).map_err(Error::Serde)?;
711
712 collected.extend(response.fills);
713
714 if let Some(limit) = query.limit
715 && collected.len() >= limit as usize
716 {
717 collected.truncate(limit as usize);
718 break;
719 }
720
721 if response.cursor.is_empty() {
722 break;
723 }
724 cursor = Some(response.cursor);
725 }
726
727 Ok(collected)
728 }
729
730 pub async fn create_order(&self, request: &CreateOrderRequest) -> Result<CreateOrderResponse> {
736 let body = serde_json::to_value(request).map_err(Error::Serde)?;
737 let json = self.post("/orders", &body).await?;
738 serde_json::from_value(json).map_err(Error::Serde)
739 }
740
741 pub async fn cancel_orders(
747 &self,
748 request: &CancelOrdersRequest,
749 ) -> Result<CancelOrdersResponse> {
750 let body = serde_json::to_value(request).map_err(Error::Serde)?;
751 let json = self.post("/orders/batch_cancel", &body).await?;
752 serde_json::from_value(json).map_err(Error::Serde)
753 }
754
755 pub async fn edit_order(&self, request: &EditOrderRequest) -> Result<EditOrderResponse> {
764 let body = serde_json::to_value(request).map_err(Error::Serde)?;
765 let json = self.post("/orders/edit", &body).await?;
766 serde_json::from_value(json).map_err(Error::Serde)
767 }
768}
769
770#[derive(Debug, Clone)]
776#[cfg_attr(
777 feature = "python",
778 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.coinbase", from_py_object)
779)]
780#[cfg_attr(
781 feature = "python",
782 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.coinbase")
783)]
784pub struct CoinbaseHttpClient {
785 pub(crate) inner: Arc<CoinbaseRawHttpClient>,
786 clock: &'static AtomicTime,
787 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
788 product_aliases: Arc<AtomicMap<Ustr, Ustr>>,
793}
794
795impl Default for CoinbaseHttpClient {
796 fn default() -> Self {
797 Self::new(CoinbaseEnvironment::Live, 10, None, None)
798 .expect("Failed to create default Coinbase HTTP client")
799 }
800}
801
802impl CoinbaseHttpClient {
803 pub fn new(
809 environment: CoinbaseEnvironment,
810 timeout_secs: u64,
811 proxy_url: Option<String>,
812 retry_config: Option<RetryConfig>,
813 ) -> std::result::Result<Self, HttpClientError> {
814 let raw = CoinbaseRawHttpClient::new(environment, timeout_secs, proxy_url, retry_config)?;
815 Ok(Self::from_raw(raw))
816 }
817
818 pub fn with_credentials(
824 credential: CoinbaseCredential,
825 environment: CoinbaseEnvironment,
826 timeout_secs: u64,
827 proxy_url: Option<String>,
828 retry_config: Option<RetryConfig>,
829 ) -> std::result::Result<Self, HttpClientError> {
830 let raw = CoinbaseRawHttpClient::with_credentials(
831 credential,
832 environment,
833 timeout_secs,
834 proxy_url,
835 retry_config,
836 )?;
837 Ok(Self::from_raw(raw))
838 }
839
840 pub fn from_env(environment: CoinbaseEnvironment) -> Result<Self> {
846 let raw = CoinbaseRawHttpClient::from_env(environment)?;
847 Ok(Self::from_raw(raw))
848 }
849
850 pub fn from_credentials(
856 api_key: &str,
857 api_secret: &str,
858 environment: CoinbaseEnvironment,
859 timeout_secs: u64,
860 proxy_url: Option<String>,
861 retry_config: Option<RetryConfig>,
862 ) -> Result<Self> {
863 let raw = CoinbaseRawHttpClient::from_credentials(
864 api_key,
865 api_secret,
866 environment,
867 timeout_secs,
868 proxy_url,
869 retry_config,
870 )?;
871 Ok(Self::from_raw(raw))
872 }
873
874 #[must_use]
876 pub fn cancellation_token(&self) -> &CancellationToken {
877 self.inner.cancellation_token()
878 }
879
880 fn from_raw(raw: CoinbaseRawHttpClient) -> Self {
881 Self {
882 inner: Arc::new(raw),
883 clock: get_atomic_clock_realtime(),
884 instruments: Arc::new(AtomicMap::new()),
885 product_aliases: Arc::new(AtomicMap::new()),
886 }
887 }
888
889 pub fn set_base_url(&self, url: String) {
893 self.inner.set_base_url(url);
894 }
895
896 #[must_use]
898 pub fn environment(&self) -> CoinbaseEnvironment {
899 self.inner.environment()
900 }
901
902 #[must_use]
904 pub fn is_authenticated(&self) -> bool {
905 self.inner.is_authenticated()
906 }
907
908 #[must_use]
910 pub fn instruments(&self) -> &Arc<AtomicMap<InstrumentId, InstrumentAny>> {
911 &self.instruments
912 }
913
914 #[must_use]
916 pub fn product_aliases(&self) -> &Arc<AtomicMap<Ustr, Ustr>> {
917 &self.product_aliases
918 }
919
920 #[must_use]
922 pub fn ts_now(&self) -> UnixNanos {
923 self.clock.get_time_ns()
924 }
925
926 pub async fn get_products(&self) -> Result<Value> {
928 self.inner.get_products().await
929 }
930
931 pub async fn get_product(&self, product_id: &str) -> Result<Value> {
933 self.inner.get_product(product_id).await
934 }
935
936 pub async fn get_candles(
938 &self,
939 product_id: &str,
940 start: &str,
941 end: &str,
942 granularity: &str,
943 ) -> Result<Value> {
944 self.inner
945 .get_candles(product_id, start, end, granularity)
946 .await
947 }
948
949 pub async fn get_market_trades(&self, product_id: &str, limit: u32) -> Result<Value> {
951 self.inner.get_market_trades(product_id, limit).await
952 }
953
954 pub async fn get_best_bid_ask(&self, product_ids: &[&str]) -> Result<Value> {
956 self.inner.get_best_bid_ask(product_ids).await
957 }
958
959 pub async fn get_product_book(&self, product_id: &str, limit: Option<u32>) -> Result<Value> {
961 self.inner.get_product_book(product_id, limit).await
962 }
963
964 pub async fn get_accounts(&self) -> Result<Value> {
966 self.inner.get_accounts().await
967 }
968
969 pub async fn get_account(&self, account_id: &str) -> Result<Value> {
971 self.inner.get_account(account_id).await
972 }
973
974 pub async fn get_portfolios(&self) -> Result<Value> {
976 self.inner.get_portfolios().await
977 }
978
979 pub async fn preview_order(&self, body: &Value) -> Result<Value> {
984 self.inner.post("/orders/preview", body).await
985 }
986
987 pub async fn get_orders(&self, query: &str) -> Result<Value> {
989 self.inner.get_orders(query).await
990 }
991
992 pub async fn get_order(&self, order_id: &str) -> Result<Value> {
994 self.inner.get_order(order_id).await
995 }
996
997 pub async fn get_fills(&self, query: &str) -> Result<Value> {
999 self.inner.get_fills(query).await
1000 }
1001
1002 pub async fn get_transaction_summary(&self) -> Result<Value> {
1004 self.inner.get_transaction_summary().await
1005 }
1006
1007 pub async fn request_instruments(
1018 &self,
1019 product_type: Option<CoinbaseProductType>,
1020 ) -> anyhow::Result<Vec<InstrumentAny>> {
1021 let json = self
1022 .inner
1023 .get_products()
1024 .await
1025 .map_err(|e| anyhow::anyhow!("Failed to fetch products: {e}"))?;
1026 let response: ProductsResponse =
1027 serde_json::from_value(json).map_err(|e| anyhow::anyhow!(e))?;
1028
1029 let ts_init = self.ts_now();
1030 let mut instruments = Vec::with_capacity(response.products.len());
1031
1032 for product in &response.products {
1033 if let Some(filter) = product_type
1034 && product.product_type != filter
1035 {
1036 continue;
1037 }
1038
1039 match parse_instrument(product, ts_init) {
1040 Ok(instrument) => instruments.push(instrument),
1041 Err(e) => {
1042 log::debug!(
1043 "Skipping product '{}' during parse: {e}",
1044 product.product_id
1045 );
1046 }
1047 }
1048 }
1049
1050 self.cache_instruments(&instruments);
1051 self.record_product_aliases(&response.products);
1052 Ok(instruments)
1053 }
1054
1055 pub async fn request_instrument(&self, product_id: &str) -> anyhow::Result<InstrumentAny> {
1064 let json = self
1065 .inner
1066 .get_product(product_id)
1067 .await
1068 .map_err(|e| anyhow::anyhow!("Failed to fetch product '{product_id}': {e}"))?;
1069 let product: crate::http::models::Product =
1070 serde_json::from_value(json).map_err(|e| anyhow::anyhow!(e))?;
1071 let ts_init = self.ts_now();
1072 let instrument = parse_instrument(&product, ts_init)?;
1073 self.cache_instrument(&instrument);
1074 self.record_product_aliases(std::slice::from_ref(&product));
1075 Ok(instrument)
1076 }
1077
1078 pub async fn request_raw_product(
1090 &self,
1091 product_id: &str,
1092 ) -> anyhow::Result<crate::http::models::Product> {
1093 let json = self
1094 .inner
1095 .get_product(product_id)
1096 .await
1097 .map_err(|e| anyhow::anyhow!("Failed to fetch product '{product_id}': {e}"))?;
1098 serde_json::from_value(json).map_err(|e| anyhow::anyhow!(e))
1099 }
1100
1101 pub async fn request_account_state(
1113 &self,
1114 account_id: AccountId,
1115 ) -> anyhow::Result<AccountState> {
1116 let accounts = self
1117 .inner
1118 .fetch_all_accounts()
1119 .await
1120 .map_err(|e| anyhow::anyhow!("Failed to fetch accounts: {e}"))?;
1121 let ts_event = self.ts_now();
1122 parse_account_state(&accounts, account_id, true, ts_event, ts_event)
1123 }
1124
1125 pub async fn request_order_status_report(
1136 &self,
1137 account_id: AccountId,
1138 client_order_id: Option<ClientOrderId>,
1139 venue_order_id: Option<VenueOrderId>,
1140 ) -> anyhow::Result<OrderStatusReport> {
1141 let venue_order_id = match (venue_order_id, client_order_id) {
1142 (Some(vid), _) => vid,
1143 (None, Some(cid)) => {
1144 let query = OrderListQuery {
1146 client_order_id_filter: Some(cid.as_str().to_string()),
1147 ..Default::default()
1148 };
1149 let orders = self
1150 .inner
1151 .fetch_all_orders(&query)
1152 .await
1153 .map_err(|e| anyhow::anyhow!("Failed to fetch orders: {e}"))?;
1154 let order = orders
1155 .into_iter()
1156 .next()
1157 .ok_or_else(|| anyhow::anyhow!("No order found for client_order_id={cid}"))?;
1158 let instrument = self.get_or_fetch_instrument(order.product_id).await?;
1159 let ts_init = self.ts_now();
1160 return parse_order_status_report(&order, &instrument, account_id, ts_init);
1161 }
1162 (None, None) => {
1163 anyhow::bail!("Either client_order_id or venue_order_id is required")
1164 }
1165 };
1166
1167 let json = self
1168 .inner
1169 .get_order(venue_order_id.as_str())
1170 .await
1171 .map_err(|e| anyhow::anyhow!("Failed to fetch order: {e}"))?;
1172 let response: OrderResponse =
1173 serde_json::from_value(json).map_err(|e| anyhow::anyhow!(e))?;
1174 let instrument = self
1175 .get_or_fetch_instrument(response.order.product_id)
1176 .await?;
1177 let ts_init = self.ts_now();
1178 parse_order_status_report(&response.order, &instrument, account_id, ts_init)
1179 }
1180
1181 pub async fn request_order_status_reports(
1189 &self,
1190 account_id: AccountId,
1191 instrument_id: Option<InstrumentId>,
1192 open_only: bool,
1193 start: Option<DateTime<Utc>>,
1194 end: Option<DateTime<Utc>>,
1195 limit: Option<u32>,
1196 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1197 let query = OrderListQuery {
1198 product_id: instrument_id.map(|id| id.symbol.as_str().to_string()),
1199 open_only,
1200 start,
1201 end,
1202 limit,
1203 client_order_id_filter: None,
1204 };
1205
1206 let orders = self
1207 .inner
1208 .fetch_all_orders(&query)
1209 .await
1210 .map_err(|e| anyhow::anyhow!("Failed to fetch orders: {e}"))?;
1211
1212 let ts_init = self.ts_now();
1213 let mut reports = Vec::with_capacity(orders.len());
1214
1215 for order in &orders {
1216 let instrument = match self.get_or_fetch_instrument(order.product_id).await {
1217 Ok(inst) => inst,
1218 Err(e) => {
1219 log::debug!("Skipping order {}: {e}", order.order_id);
1220 continue;
1221 }
1222 };
1223
1224 match parse_order_status_report(order, &instrument, account_id, ts_init) {
1225 Ok(report) => reports.push(report),
1226 Err(e) => log::warn!("Failed to parse order {}: {e}", order.order_id),
1227 }
1228 }
1229
1230 Ok(reports)
1231 }
1232
1233 pub async fn request_fill_reports(
1241 &self,
1242 account_id: AccountId,
1243 instrument_id: Option<InstrumentId>,
1244 venue_order_id: Option<VenueOrderId>,
1245 start: Option<DateTime<Utc>>,
1246 end: Option<DateTime<Utc>>,
1247 limit: Option<u32>,
1248 ) -> anyhow::Result<Vec<FillReport>> {
1249 let query = FillListQuery {
1250 product_id: instrument_id.map(|id| id.symbol.as_str().to_string()),
1251 venue_order_id: venue_order_id.map(|id| id.as_str().to_string()),
1252 start,
1253 end,
1254 limit,
1255 };
1256
1257 let fills = self
1258 .inner
1259 .fetch_all_fills(&query)
1260 .await
1261 .map_err(|e| anyhow::anyhow!("Failed to fetch fills: {e}"))?;
1262
1263 let ts_init = self.ts_now();
1264 let mut reports = Vec::with_capacity(fills.len());
1265
1266 for fill in &fills {
1267 let instrument = match self.get_or_fetch_instrument(fill.product_id).await {
1268 Ok(inst) => inst,
1269 Err(e) => {
1270 log::debug!("Skipping fill {}: {e}", fill.trade_id);
1271 continue;
1272 }
1273 };
1274
1275 match parse_fill_report(fill, &instrument, account_id, ts_init) {
1276 Ok(report) => reports.push(report),
1277 Err(e) => log::warn!("Failed to parse fill {}: {e}", fill.trade_id),
1278 }
1279 }
1280
1281 Ok(reports)
1282 }
1283
1284 pub fn cache_instrument(&self, instrument: &InstrumentAny) {
1286 self.instruments.rcu(|m| {
1287 m.insert(instrument.id(), instrument.clone());
1288 });
1289 }
1290
1291 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1293 self.instruments.rcu(|m| {
1294 for instrument in instruments {
1295 m.insert(instrument.id(), instrument.clone());
1296 }
1297 });
1298 }
1299
1300 pub fn record_product_aliases(&self, products: &[crate::http::models::Product]) {
1305 let aliased: Vec<(Ustr, Ustr)> = products
1306 .iter()
1307 .filter(|p| !p.alias.is_empty())
1308 .map(|p| (p.product_id, p.alias))
1309 .collect();
1310
1311 if aliased.is_empty() {
1312 return;
1313 }
1314
1315 self.product_aliases.rcu(|m| {
1316 for (product_id, alias) in &aliased {
1317 m.insert(*product_id, *alias);
1318 }
1319 });
1320 }
1321
1322 async fn get_or_fetch_instrument(&self, product_id: Ustr) -> anyhow::Result<InstrumentAny> {
1328 let instrument_id = InstrumentId::new(
1329 Symbol::new(product_id),
1330 *crate::common::consts::COINBASE_VENUE,
1331 );
1332
1333 if let Some(instrument) = self.instruments.get_cloned(&instrument_id) {
1334 return Ok(instrument);
1335 }
1336 self.request_instrument(product_id.as_str()).await
1340 }
1341
1342 #[allow(clippy::too_many_arguments)]
1355 pub async fn submit_order(
1356 &self,
1357 client_order_id: ClientOrderId,
1358 instrument_id: InstrumentId,
1359 side: OrderSide,
1360 order_type: OrderType,
1361 quantity: Quantity,
1362 time_in_force: TimeInForce,
1363 price: Option<Price>,
1364 trigger_price: Option<Price>,
1365 expire_time: Option<UnixNanos>,
1366 post_only: bool,
1367 is_quote_quantity: bool,
1368 leverage: Option<Decimal>,
1369 margin_type: Option<CoinbaseMarginType>,
1370 reduce_only: bool,
1371 retail_portfolio_id: Option<String>,
1372 ) -> anyhow::Result<CreateOrderResponse> {
1373 let coinbase_side = map_order_side(side)?;
1374 let order_config = build_order_configuration(
1375 order_type,
1376 side,
1377 quantity,
1378 price,
1379 trigger_price,
1380 time_in_force,
1381 expire_time,
1382 post_only,
1383 is_quote_quantity,
1384 reduce_only,
1385 )?;
1386
1387 let request = CreateOrderRequest {
1388 client_order_id: client_order_id.to_string(),
1389 product_id: instrument_id.symbol.inner(),
1390 side: coinbase_side,
1391 order_configuration: order_config,
1392 self_trade_prevention_id: None,
1393 leverage: leverage.map(|d| d.normalize().to_string()),
1394 margin_type,
1395 retail_portfolio_id,
1396 reduce_only,
1397 };
1398
1399 self.inner
1400 .create_order(&request)
1401 .await
1402 .map_err(|e| anyhow::anyhow!("Failed to submit order: {e}"))
1403 }
1404
1405 pub async fn cancel_orders(
1412 &self,
1413 venue_order_ids: &[VenueOrderId],
1414 ) -> anyhow::Result<CancelOrdersResponse> {
1415 let request = CancelOrdersRequest {
1416 order_ids: venue_order_ids
1417 .iter()
1418 .map(|id| id.as_str().to_string())
1419 .collect(),
1420 };
1421 self.inner
1422 .cancel_orders(&request)
1423 .await
1424 .map_err(|e| anyhow::anyhow!("Failed to cancel orders: {e}"))
1425 }
1426
1427 pub async fn request_cfm_balance_summary(&self) -> anyhow::Result<CfmBalanceSummary> {
1434 let response = self
1435 .inner
1436 .get_cfm_balance_summary()
1437 .await
1438 .map_err(|e| anyhow::anyhow!("Failed to fetch CFM balance summary: {e}"))?;
1439 Ok(response.balance_summary)
1440 }
1441
1442 pub async fn request_cfm_margin_balances(&self) -> anyhow::Result<Vec<MarginBalance>> {
1449 let summary = self.request_cfm_balance_summary().await?;
1450 parse_cfm_margin_balances(&summary)
1451 }
1452
1453 pub async fn request_cfm_account_state(
1460 &self,
1461 account_id: AccountId,
1462 ) -> anyhow::Result<AccountState> {
1463 let summary = self.request_cfm_balance_summary().await?;
1464 let ts_event = self.ts_now();
1465 parse_cfm_account_state(&summary, account_id, true, ts_event, ts_event)
1466 }
1467
1468 pub async fn request_position_status_reports(
1475 &self,
1476 account_id: AccountId,
1477 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1478 let response = self
1479 .inner
1480 .get_cfm_positions()
1481 .await
1482 .map_err(|e| anyhow::anyhow!("Failed to fetch CFM positions: {e}"))?;
1483
1484 let ts_init = self.ts_now();
1485 let mut reports = Vec::with_capacity(response.positions.len());
1486
1487 for position in &response.positions {
1488 let instrument = match self.get_or_fetch_instrument(position.product_id).await {
1489 Ok(inst) => inst,
1490 Err(e) => {
1491 log::debug!("Skipping CFM position {}: {e}", position.product_id);
1492 continue;
1493 }
1494 };
1495
1496 match parse_cfm_position_status_report(position, &instrument, account_id, ts_init) {
1497 Ok(report) => reports.push(report),
1498 Err(e) => log::warn!("Failed to parse CFM position {}: {e}", position.product_id),
1499 }
1500 }
1501
1502 Ok(reports)
1503 }
1504
1505 pub async fn request_position_status_report(
1513 &self,
1514 account_id: AccountId,
1515 instrument_id: InstrumentId,
1516 ) -> anyhow::Result<Option<PositionStatusReport>> {
1517 let product_id = instrument_id.symbol.as_str();
1518 let response = self
1519 .inner
1520 .get_cfm_position(product_id)
1521 .await
1522 .map_err(|e| anyhow::anyhow!("Failed to fetch CFM position '{product_id}': {e}"))?;
1523
1524 let instrument = self
1525 .get_or_fetch_instrument(response.position.product_id)
1526 .await?;
1527 let ts_init = self.ts_now();
1528 let report =
1529 parse_cfm_position_status_report(&response.position, &instrument, account_id, ts_init)?;
1530 Ok(Some(report))
1531 }
1532
1533 pub async fn modify_order(
1545 &self,
1546 venue_order_id: VenueOrderId,
1547 price: Option<Price>,
1548 quantity: Option<Quantity>,
1549 trigger_price: Option<Price>,
1550 ) -> anyhow::Result<EditOrderResponse> {
1551 let request = EditOrderRequest {
1552 order_id: venue_order_id.as_str().to_string(),
1553 price: price.map(|p| p.to_string()),
1554 size: quantity.map(|q| q.to_string()),
1555 stop_price: trigger_price.map(|p| p.to_string()),
1556 };
1557 self.inner
1558 .edit_order(&request)
1559 .await
1560 .map_err(|e| anyhow::anyhow!("Failed to edit order: {e}"))
1561 }
1562}
1563
1564pub fn map_order_side(side: OrderSide) -> anyhow::Result<CoinbaseOrderSide> {
1570 match side {
1571 OrderSide::Buy => Ok(CoinbaseOrderSide::Buy),
1572 OrderSide::Sell => Ok(CoinbaseOrderSide::Sell),
1573 OrderSide::NoOrderSide => anyhow::bail!("NoOrderSide is not a valid Coinbase side"),
1574 }
1575}
1576
1577#[allow(clippy::too_many_arguments)]
1590pub fn build_order_configuration(
1591 order_type: OrderType,
1592 side: OrderSide,
1593 quantity: Quantity,
1594 price: Option<Price>,
1595 trigger_price: Option<Price>,
1596 time_in_force: TimeInForce,
1597 expire_time: Option<UnixNanos>,
1598 post_only: bool,
1599 is_quote_quantity: bool,
1600 reduce_only: bool,
1601) -> anyhow::Result<OrderConfiguration> {
1602 let qty = quantity.as_decimal();
1603 let price = price.map(|p| p.as_decimal());
1604 let trigger = trigger_price.map(|p| p.as_decimal());
1605
1606 if reduce_only && matches!(order_type, OrderType::Market) {
1607 log::debug!("Coinbase MARKET orders do not accept reduce_only; ignoring flag");
1608 }
1609
1610 match order_type {
1611 OrderType::Market => {
1612 let params = if is_quote_quantity {
1623 MarketParams {
1624 quote_size: Some(qty),
1625 base_size: None,
1626 }
1627 } else {
1628 MarketParams {
1629 quote_size: None,
1630 base_size: Some(qty),
1631 }
1632 };
1633
1634 match time_in_force {
1635 TimeInForce::Ioc | TimeInForce::Gtc => {
1636 Ok(OrderConfiguration::MarketIoc(MarketIoc {
1637 market_market_ioc: params,
1638 }))
1639 }
1640 TimeInForce::Fok => Ok(OrderConfiguration::MarketFok(MarketFok {
1641 market_market_fok: params,
1642 })),
1643 _ => {
1644 anyhow::bail!(
1645 "Unsupported TIF {time_in_force} for MARKET on Coinbase (use IOC or FOK)"
1646 )
1647 }
1648 }
1649 }
1650 OrderType::Limit => {
1651 let limit_price =
1652 price.ok_or_else(|| anyhow::anyhow!("LIMIT order requires a price"))?;
1653
1654 match time_in_force {
1655 TimeInForce::Gtc => Ok(OrderConfiguration::LimitGtc(LimitGtc {
1656 limit_limit_gtc: LimitGtcParams {
1657 base_size: qty,
1658 limit_price,
1659 post_only,
1660 },
1661 })),
1662 TimeInForce::Gtd => {
1663 let expire = expire_time
1664 .ok_or_else(|| anyhow::anyhow!("GTD LIMIT requires expire_time"))?;
1665 Ok(OrderConfiguration::LimitGtd(LimitGtd {
1666 limit_limit_gtd: LimitGtdParams {
1667 base_size: qty,
1668 limit_price,
1669 end_time: format_rfc3339_from_nanos(expire)?,
1670 post_only,
1671 },
1672 }))
1673 }
1674 TimeInForce::Fok => Ok(OrderConfiguration::LimitFok(LimitFok {
1675 limit_limit_fok: LimitFokParams {
1676 base_size: qty,
1677 limit_price,
1678 },
1679 })),
1680 _ => anyhow::bail!("Unsupported TIF {time_in_force} for LIMIT on Coinbase"),
1681 }
1682 }
1683 OrderType::StopLimit => {
1684 let limit_price =
1685 price.ok_or_else(|| anyhow::anyhow!("STOP_LIMIT order requires a price"))?;
1686 let stop_price = trigger
1687 .ok_or_else(|| anyhow::anyhow!("STOP_LIMIT order requires trigger_price"))?;
1688 let direction = match side {
1689 OrderSide::Buy => CoinbaseStopDirection::StopUp,
1690 OrderSide::Sell => CoinbaseStopDirection::StopDown,
1691 OrderSide::NoOrderSide => {
1692 anyhow::bail!("STOP_LIMIT requires a defined side")
1693 }
1694 };
1695
1696 match time_in_force {
1697 TimeInForce::Gtc => Ok(OrderConfiguration::StopLimitGtc(StopLimitGtc {
1698 stop_limit_stop_limit_gtc: StopLimitGtcParams {
1699 base_size: qty,
1700 limit_price,
1701 stop_price,
1702 stop_direction: direction,
1703 },
1704 })),
1705 TimeInForce::Gtd => {
1706 let expire = expire_time
1707 .ok_or_else(|| anyhow::anyhow!("GTD STOP_LIMIT requires expire_time"))?;
1708 Ok(OrderConfiguration::StopLimitGtd(StopLimitGtd {
1709 stop_limit_stop_limit_gtd: StopLimitGtdParams {
1710 base_size: qty,
1711 limit_price,
1712 stop_price,
1713 stop_direction: direction,
1714 end_time: format_rfc3339_from_nanos(expire)?,
1715 },
1716 }))
1717 }
1718 _ => anyhow::bail!("Unsupported TIF {time_in_force} for STOP_LIMIT on Coinbase"),
1719 }
1720 }
1721 other => anyhow::bail!("Unsupported order type for Coinbase: {other}"),
1722 }
1723}
1724
1725#[cfg(test)]
1726mod tests {
1727 use rstest::rstest;
1728
1729 use super::*;
1730
1731 #[rstest]
1732 fn test_raw_client_construction_live() {
1733 let client = CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1734 assert_eq!(client.environment(), CoinbaseEnvironment::Live);
1735 assert!(!client.is_authenticated());
1736 }
1737
1738 #[rstest]
1739 fn test_raw_client_construction_sandbox() {
1740 let client =
1741 CoinbaseRawHttpClient::new(CoinbaseEnvironment::Sandbox, 10, None, None).unwrap();
1742 assert_eq!(client.environment(), CoinbaseEnvironment::Sandbox);
1743 }
1744
1745 #[rstest]
1746 fn test_raw_build_url() {
1747 let client = CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1748 let url = client.build_url("/products");
1749 assert_eq!(url, "https://api.coinbase.com/api/v3/brokerage/products");
1750 }
1751
1752 #[rstest]
1753 fn test_raw_build_jwt_uri_live() {
1754 let client = CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1755 let uri = client.build_jwt_uri("GET", "/accounts");
1756 assert_eq!(uri, "GET api.coinbase.com/api/v3/brokerage/accounts");
1757 }
1758
1759 #[rstest]
1760 fn test_raw_build_jwt_uri_sandbox() {
1761 let client =
1762 CoinbaseRawHttpClient::new(CoinbaseEnvironment::Sandbox, 10, None, None).unwrap();
1763 let uri = client.build_jwt_uri("GET", "/accounts");
1764 assert_eq!(
1765 uri,
1766 "GET api-sandbox.coinbase.com/api/v3/brokerage/accounts"
1767 );
1768 }
1769
1770 #[rstest]
1771 fn test_raw_build_jwt_uri_custom_base_url() {
1772 let client = CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1773 client.set_base_url("http://localhost:8080".to_string());
1774 let uri = client.build_jwt_uri("POST", "/orders");
1775 assert_eq!(uri, "POST localhost:8080/api/v3/brokerage/orders");
1776 }
1777
1778 #[rstest]
1779 fn test_raw_set_base_url_safe_after_clone_via_arc() {
1780 let raw = Arc::new(
1781 CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap(),
1782 );
1783 let other = Arc::clone(&raw);
1784 raw.set_base_url("http://localhost:1234".to_string());
1786 assert!(other.build_url("/foo").starts_with("http://localhost:1234"));
1787 }
1788
1789 #[rstest]
1790 fn test_raw_auth_headers_without_credentials() {
1791 let client = CoinbaseRawHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1792 let result = client.auth_headers("GET", "/accounts");
1793 assert!(result.is_err());
1794 assert!(result.unwrap_err().is_auth_error());
1795 }
1796
1797 #[rstest]
1798 fn test_domain_client_construction() {
1799 let client = CoinbaseHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1800 assert_eq!(client.environment(), CoinbaseEnvironment::Live);
1801 assert!(!client.is_authenticated());
1802 }
1803
1804 #[rstest]
1805 fn test_domain_client_default() {
1806 let client = CoinbaseHttpClient::default();
1807 assert_eq!(client.environment(), CoinbaseEnvironment::Live);
1808 }
1809
1810 #[rstest]
1811 fn test_domain_client_instruments_cache_empty() {
1812 let client = CoinbaseHttpClient::default();
1813 assert!(client.instruments().is_empty());
1814 }
1815
1816 #[rstest]
1817 fn test_domain_client_set_base_url() {
1818 let client = CoinbaseHttpClient::new(CoinbaseEnvironment::Live, 10, None, None).unwrap();
1819 let cloned = client.clone();
1820 client.set_base_url("http://localhost:9090".to_string());
1822 let url = cloned.inner.build_url("/test");
1823 assert!(url.starts_with("http://localhost:9090"));
1824 }
1825
1826 #[rstest]
1827 fn test_encode_query_escapes_rfc3339_timestamps() {
1828 let query = encode_query(&[("start_date", "2024-01-15T10:00:00+00:00")]);
1829 assert_eq!(query, "start_date=2024-01-15T10%3A00%3A00%2B00%3A00");
1831 }
1832
1833 #[rstest]
1834 fn test_encode_query_escapes_opaque_cursor() {
1835 let query = encode_query(&[("cursor", "a/b+c=?&x")]);
1836 assert!(!query.contains("a/b+c=?&x"));
1838 assert!(query.starts_with("cursor="));
1839 }
1840
1841 #[rstest]
1842 fn test_encode_query_joins_pairs_with_ampersand() {
1843 let query = encode_query(&[("product_id", "BTC-USD"), ("limit", "50")]);
1844 assert_eq!(query, "product_id=BTC-USD&limit=50");
1845 }
1846
1847 #[rstest]
1848 fn test_map_order_side_rejects_no_side() {
1849 assert!(matches!(
1850 map_order_side(OrderSide::Buy).unwrap(),
1851 CoinbaseOrderSide::Buy
1852 ));
1853 assert!(matches!(
1854 map_order_side(OrderSide::Sell).unwrap(),
1855 CoinbaseOrderSide::Sell
1856 ));
1857 assert!(map_order_side(OrderSide::NoOrderSide).is_err());
1858 }
1859
1860 #[rstest]
1861 fn test_build_order_configuration_market_base_size() {
1862 let cfg = build_order_configuration(
1863 OrderType::Market,
1864 OrderSide::Buy,
1865 Quantity::from("1.5"),
1866 None,
1867 None,
1868 TimeInForce::Ioc,
1869 None,
1870 false,
1871 false,
1872 false,
1873 )
1874 .unwrap();
1875
1876 match cfg {
1877 OrderConfiguration::MarketIoc(m) => {
1878 assert!(m.market_market_ioc.base_size.is_some());
1879 assert!(m.market_market_ioc.quote_size.is_none());
1880 }
1881 other => panic!("expected MarketIoc, was {other:?}"),
1882 }
1883 }
1884
1885 #[rstest]
1886 fn test_build_order_configuration_market_quote_size() {
1887 let cfg = build_order_configuration(
1888 OrderType::Market,
1889 OrderSide::Buy,
1890 Quantity::from("100"),
1891 None,
1892 None,
1893 TimeInForce::Ioc,
1894 None,
1895 false,
1896 true, false,
1898 )
1899 .unwrap();
1900
1901 match cfg {
1902 OrderConfiguration::MarketIoc(m) => {
1903 assert!(m.market_market_ioc.quote_size.is_some());
1904 assert!(m.market_market_ioc.base_size.is_none());
1905 }
1906 other => panic!("expected MarketIoc, was {other:?}"),
1907 }
1908 }
1909
1910 #[rstest]
1911 fn test_build_order_configuration_market_fok() {
1912 let cfg = build_order_configuration(
1913 OrderType::Market,
1914 OrderSide::Buy,
1915 Quantity::from("0.5"),
1916 None,
1917 None,
1918 TimeInForce::Fok,
1919 None,
1920 false,
1921 false,
1922 false,
1923 )
1924 .unwrap();
1925
1926 match cfg {
1927 OrderConfiguration::MarketFok(m) => {
1928 assert!(m.market_market_fok.base_size.is_some());
1929 assert!(m.market_market_fok.quote_size.is_none());
1930 }
1931 other => panic!("expected MarketFok, was {other:?}"),
1932 }
1933 }
1934
1935 #[rstest]
1936 #[case(TimeInForce::Day)]
1937 #[case(TimeInForce::Gtd)]
1938 fn test_build_order_configuration_market_rejects_unsupported_tif(#[case] tif: TimeInForce) {
1939 let result = build_order_configuration(
1940 OrderType::Market,
1941 OrderSide::Buy,
1942 Quantity::from("1"),
1943 None,
1944 None,
1945 tif,
1946 None,
1947 false,
1948 false,
1949 false,
1950 );
1951 assert!(result.is_err());
1952 }
1953
1954 #[rstest]
1955 fn test_build_order_configuration_limit_gtc_post_only() {
1956 let cfg = build_order_configuration(
1957 OrderType::Limit,
1958 OrderSide::Sell,
1959 Quantity::from("0.5"),
1960 Some(Price::from("50000.00")),
1961 None,
1962 TimeInForce::Gtc,
1963 None,
1964 true,
1965 false,
1966 false,
1967 )
1968 .unwrap();
1969
1970 match cfg {
1971 OrderConfiguration::LimitGtc(l) => assert!(l.limit_limit_gtc.post_only),
1972 other => panic!("expected LimitGtc, was {other:?}"),
1973 }
1974 }
1975
1976 #[rstest]
1977 fn test_build_order_configuration_limit_gtd_requires_expire_time() {
1978 let result = build_order_configuration(
1979 OrderType::Limit,
1980 OrderSide::Buy,
1981 Quantity::from("1"),
1982 Some(Price::from("100.00")),
1983 None,
1984 TimeInForce::Gtd,
1985 None,
1986 false,
1987 false,
1988 false,
1989 );
1990 assert!(result.is_err());
1991 }
1992
1993 #[rstest]
1994 fn test_build_order_configuration_stop_limit_uses_correct_direction() {
1995 let buy_cfg = build_order_configuration(
1996 OrderType::StopLimit,
1997 OrderSide::Buy,
1998 Quantity::from("1"),
1999 Some(Price::from("100.00")),
2000 Some(Price::from("99.00")),
2001 TimeInForce::Gtc,
2002 None,
2003 false,
2004 false,
2005 false,
2006 )
2007 .unwrap();
2008
2009 match buy_cfg {
2010 OrderConfiguration::StopLimitGtc(s) => assert_eq!(
2011 s.stop_limit_stop_limit_gtc.stop_direction,
2012 CoinbaseStopDirection::StopUp
2013 ),
2014 other => panic!("expected StopLimitGtc, was {other:?}"),
2015 }
2016
2017 let sell_cfg = build_order_configuration(
2018 OrderType::StopLimit,
2019 OrderSide::Sell,
2020 Quantity::from("1"),
2021 Some(Price::from("100.00")),
2022 Some(Price::from("99.00")),
2023 TimeInForce::Gtc,
2024 None,
2025 false,
2026 false,
2027 false,
2028 )
2029 .unwrap();
2030
2031 match sell_cfg {
2032 OrderConfiguration::StopLimitGtc(s) => assert_eq!(
2033 s.stop_limit_stop_limit_gtc.stop_direction,
2034 CoinbaseStopDirection::StopDown
2035 ),
2036 other => panic!("expected StopLimitGtc, was {other:?}"),
2037 }
2038 }
2039
2040 #[rstest]
2041 fn test_build_order_configuration_market_accepts_default_gtc() {
2042 let cfg = build_order_configuration(
2045 OrderType::Market,
2046 OrderSide::Buy,
2047 Quantity::from("1"),
2048 None,
2049 None,
2050 TimeInForce::Gtc,
2051 None,
2052 false,
2053 false,
2054 false,
2055 )
2056 .unwrap();
2057 assert!(matches!(cfg, OrderConfiguration::MarketIoc(_)));
2058 }
2059
2060 #[rstest]
2061 fn test_build_order_configuration_rejects_stop_market() {
2062 let result = build_order_configuration(
2063 OrderType::StopMarket,
2064 OrderSide::Buy,
2065 Quantity::from("1"),
2066 None,
2067 Some(Price::from("100.00")),
2068 TimeInForce::Gtc,
2069 None,
2070 false,
2071 false,
2072 false,
2073 );
2074 assert!(result.is_err());
2075 }
2076
2077 #[rstest]
2078 fn test_rest_quota_matches_documented_limit() {
2079 assert_eq!(COINBASE_REST_QUOTA.burst_size().get(), 30);
2080 }
2081
2082 #[rstest]
2083 fn test_default_retry_config_values() {
2084 let config = default_retry_config();
2085 assert_eq!(config.max_retries, 3);
2086 assert_eq!(config.initial_delay_ms, 100);
2087 assert_eq!(config.max_delay_ms, 5_000);
2088 assert_eq!(config.max_elapsed_ms, Some(180_000));
2089 }
2090}