1use std::{fmt::Debug, sync::Arc};
19
20use ahash::AHashMap;
21use nautilus_core::{
22 AtomicMap, UnixNanos,
23 time::{AtomicTime, get_atomic_clock_realtime},
24};
25use nautilus_model::{
26 data::{Bar, BarType, OrderBookDeltas, QuoteTick, TradeTick},
27 identifiers::{AccountId, InstrumentId},
28 instruments::{Instrument, InstrumentAny},
29 reports::OrderStatusReport,
30};
31use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
32use tokio_tungstenite::tungstenite::Message;
33use ustr::Ustr;
34
35use crate::{
36 common::consts::COINBASE,
37 websocket::{
38 client::COINBASE_WS_SUBSCRIPTION_KEYS,
39 messages::{CoinbaseWsMessage, CoinbaseWsSubscription, WsEventType, WsOrderUpdate},
40 parse::{
41 parse_ws_candle, parse_ws_l2_snapshot, parse_ws_l2_update, parse_ws_ticker,
42 parse_ws_trade, parse_ws_user_event_to_order_status_report,
43 },
44 },
45};
46
47fn instrument_id_from_product(product_id: &Ustr) -> InstrumentId {
48 InstrumentId::from(format!("{product_id}.{COINBASE}").as_str())
49}
50
51fn resolve_instrument_id(aliases: &AtomicMap<Ustr, Ustr>, product_id: &Ustr) -> InstrumentId {
52 let resolved = aliases.get_cloned(product_id).unwrap_or(*product_id);
53 instrument_id_from_product(&resolved)
54}
55
56pub enum HandlerCommand {
58 SetClient(WebSocketClient),
60 Subscribe(CoinbaseWsSubscription),
62 Unsubscribe(CoinbaseWsSubscription),
64 Disconnect,
66 InitializeInstruments(Vec<InstrumentAny>),
68 UpdateInstrument(Box<InstrumentAny>),
70 AddBarType { key: String, bar_type: BarType },
72 RemoveBarType { key: String },
74 SetAccountId(AccountId),
76}
77
78impl Debug for HandlerCommand {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::SetClient(_) => f.write_str("SetClient"),
82 Self::Subscribe(s) => write!(f, "Subscribe({:?})", s.channel),
83 Self::Unsubscribe(s) => write!(f, "Unsubscribe({:?})", s.channel),
84 Self::Disconnect => f.write_str("Disconnect"),
85 Self::InitializeInstruments(v) => write!(f, "InitializeInstruments({})", v.len()),
86 Self::UpdateInstrument(i) => write!(f, "UpdateInstrument({})", i.id()),
87 Self::AddBarType { key, .. } => write!(f, "AddBarType({key})"),
88 Self::RemoveBarType { key } => write!(f, "RemoveBarType({key})"),
89 Self::SetAccountId(id) => write!(f, "SetAccountId({id})"),
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
106pub struct UserOrderUpdate {
107 pub report: Box<OrderStatusReport>,
108 pub update: Box<WsOrderUpdate>,
109 pub instrument: InstrumentAny,
110 pub is_snapshot: bool,
111 pub ts_event: UnixNanos,
112 pub ts_init: UnixNanos,
113}
114
115#[derive(Debug, Clone)]
117pub enum NautilusWsMessage {
118 Trade(TradeTick),
120 Quote(QuoteTick),
122 Deltas(OrderBookDeltas),
124 Bar(Bar),
126 UserOrder(Box<UserOrderUpdate>),
128 FuturesBalanceSummary(Box<crate::websocket::messages::WsFcmBalanceSummary>),
131 Reconnected,
133 Error(String),
135}
136
137#[derive(Debug)]
139pub struct FeedHandler {
140 clock: &'static AtomicTime,
141 signal: Arc<std::sync::atomic::AtomicBool>,
142 client: Option<WebSocketClient>,
143 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
144 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
145 instruments: AHashMap<InstrumentId, InstrumentAny>,
146 subscription_aliases: Arc<AtomicMap<Ustr, Ustr>>,
150 bar_types: AHashMap<String, BarType>,
151 account_id: Option<AccountId>,
152 buffer: Vec<NautilusWsMessage>,
153}
154
155impl FeedHandler {
156 pub fn new(
158 signal: Arc<std::sync::atomic::AtomicBool>,
159 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
160 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
161 subscription_aliases: Arc<AtomicMap<Ustr, Ustr>>,
162 ) -> Self {
163 Self {
164 clock: get_atomic_clock_realtime(),
165 signal,
166 client: None,
167 cmd_rx,
168 raw_rx,
169 instruments: AHashMap::new(),
170 subscription_aliases,
171 bar_types: AHashMap::new(),
172 account_id: None,
173 buffer: Vec::new(),
174 }
175 }
176
177 fn resolve_instrument_id(&self, product_id: &Ustr) -> InstrumentId {
178 resolve_instrument_id(&self.subscription_aliases, product_id)
179 }
180
181 pub fn set_account_id(&mut self, account_id: AccountId) {
183 self.account_id = Some(account_id);
184 }
185
186 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
190 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
193 self.buffer.clear();
194 return None;
195 }
196
197 if let Some(msg) = self.buffer.pop() {
198 return Some(msg);
199 }
200
201 loop {
202 if self.signal.load(std::sync::atomic::Ordering::Acquire) {
203 return None;
204 }
205
206 tokio::select! {
207 Some(cmd) = self.cmd_rx.recv() => {
208 match cmd {
209 HandlerCommand::SetClient(client) => {
210 self.client = Some(client);
211 }
212 HandlerCommand::Subscribe(sub) => {
213 self.send_subscription(&sub).await;
214 }
215 HandlerCommand::Unsubscribe(sub) => {
216 self.send_subscription(&sub).await;
217 }
218 HandlerCommand::Disconnect => {
219 if let Some(client) = self.client.take() {
220 client.notify_closed();
223 }
224 return None;
225 }
226 HandlerCommand::InitializeInstruments(instruments) => {
227 for inst in instruments {
228 self.instruments.insert(inst.id(), inst);
229 }
230 }
231 HandlerCommand::UpdateInstrument(inst) => {
232 self.instruments.insert(inst.id(), *inst);
233 }
234 HandlerCommand::AddBarType { key, bar_type } => {
235 self.bar_types.insert(key, bar_type);
236 }
237 HandlerCommand::RemoveBarType { key } => {
238 self.bar_types.remove(&key);
239 }
240 HandlerCommand::SetAccountId(account_id) => {
241 self.account_id = Some(account_id);
242 }
243 }
244 }
245 Some(raw) = self.raw_rx.recv() => {
246 match raw {
247 Message::Text(text) => {
248 if let Some(msg) = self.handle_text(&text) {
249 return Some(msg);
250 }
251 }
252 Message::Ping(data) => {
253 if let Some(client) = &self.client
254 && let Err(e) = client.send_pong(data.to_vec()).await
255 {
256 log::error!("Failed to send pong: {e}");
257 }
258 }
259 Message::Close(_) => return None,
260 _ => {}
261 }
262 }
263 else => return None,
264 }
265 }
266 }
267
268 async fn send_subscription(&self, sub: &CoinbaseWsSubscription) {
269 let Some(client) = &self.client else {
270 log::warn!("Cannot send subscription, no WebSocket client set");
271 return;
272 };
273
274 match serde_json::to_string(sub) {
275 Ok(json) => {
276 if let Err(e) = client
277 .send_text(json, Some(COINBASE_WS_SUBSCRIPTION_KEYS.as_slice()))
278 .await
279 {
280 log::error!("Failed to send subscription: {e}");
281 }
282 }
283 Err(e) => log::error!("Failed to serialize subscription: {e}"),
284 }
285 }
286
287 fn handle_text(&mut self, text: &str) -> Option<NautilusWsMessage> {
288 if text == RECONNECTED {
289 return Some(NautilusWsMessage::Reconnected);
290 }
291
292 let ts_init = self.clock.get_time_ns();
293
294 let msg: CoinbaseWsMessage = match serde_json::from_str(text) {
295 Ok(m) => m,
296 Err(e) => {
297 log::warn!("Failed to parse WS message: {e}");
298 return None;
299 }
300 };
301
302 match msg {
303 CoinbaseWsMessage::L2Data {
304 timestamp, events, ..
305 } => self.handle_l2_events(&events, ×tamp, ts_init),
306 CoinbaseWsMessage::MarketTrades { events, .. } => {
307 self.handle_market_trades(&events, ts_init)
308 }
309 CoinbaseWsMessage::Ticker {
310 timestamp, events, ..
311 }
312 | CoinbaseWsMessage::TickerBatch {
313 timestamp, events, ..
314 } => self.handle_ticker(&events, ×tamp, ts_init),
315 CoinbaseWsMessage::Candles { events, .. } => self.handle_candles(&events, ts_init),
316 CoinbaseWsMessage::Heartbeats { .. } => None,
317 CoinbaseWsMessage::Subscriptions { events, .. } => {
318 log::debug!("Subscription state: {events:?}");
322 None
323 }
324 CoinbaseWsMessage::User {
325 timestamp, events, ..
326 } => self.handle_user_events(&events, ×tamp, ts_init),
327 CoinbaseWsMessage::FuturesBalanceSummary { events, .. } => {
328 self.handle_futures_balance_summary(events)
329 }
330 CoinbaseWsMessage::Status { events, .. } => {
331 log::debug!(
332 "Ignoring {} status events until venue status handling lands",
333 events.len()
334 );
335 None
336 }
337 }
338 }
339
340 fn handle_l2_events(
341 &mut self,
342 events: &[crate::websocket::messages::WsL2DataEvent],
343 timestamp: &str,
344 ts_init: UnixNanos,
345 ) -> Option<NautilusWsMessage> {
346 let ts_event = match crate::http::parse::parse_rfc3339_timestamp(timestamp) {
347 Ok(ts) => ts,
348 Err(e) => {
349 log::warn!("Failed to parse L2 message timestamp {timestamp}: {e}");
350 ts_init
351 }
352 };
353
354 let mut first: Option<NautilusWsMessage> = None;
355
356 for event in events {
357 let instrument_id = self.resolve_instrument_id(&event.product_id);
358
359 let instrument = match self.instruments.get(&instrument_id) {
360 Some(inst) => inst,
361 None => {
362 log::warn!("No instrument cached for {instrument_id}");
363 continue;
364 }
365 };
366
367 let result = match event.event_type {
368 WsEventType::Snapshot => parse_ws_l2_snapshot(event, instrument, ts_event, ts_init),
369 WsEventType::Update => parse_ws_l2_update(event, instrument, ts_event, ts_init),
370 };
371
372 match result {
373 Ok(deltas) => {
374 let msg = NautilusWsMessage::Deltas(deltas);
375
376 if first.is_none() {
377 first = Some(msg);
378 } else {
379 self.buffer.push(msg);
380 }
381 }
382 Err(e) => log::warn!("Failed to parse L2 event: {e}"),
383 }
384 }
385
386 if first.is_some() {
387 self.buffer.reverse();
388 }
389 first
390 }
391
392 fn handle_market_trades(
393 &mut self,
394 events: &[crate::websocket::messages::WsMarketTradesEvent],
395 ts_init: UnixNanos,
396 ) -> Option<NautilusWsMessage> {
397 for event in events {
398 for trade in &event.trades {
399 let instrument_id = self.resolve_instrument_id(&trade.product_id);
400
401 let instrument = match self.instruments.get(&instrument_id) {
402 Some(inst) => inst,
403 None => {
404 log::warn!("No instrument cached for {instrument_id}");
405 continue;
406 }
407 };
408
409 match parse_ws_trade(trade, instrument, ts_init) {
410 Ok(tick) => {
411 self.buffer_remaining_trades(events, event, trade, ts_init);
412 self.buffer.reverse();
414 return Some(NautilusWsMessage::Trade(tick));
415 }
416 Err(e) => log::warn!("Failed to parse trade: {e}"),
417 }
418 }
419 }
420 None
421 }
422
423 fn buffer_remaining_trades(
424 &mut self,
425 events: &[crate::websocket::messages::WsMarketTradesEvent],
426 current_event: &crate::websocket::messages::WsMarketTradesEvent,
427 current_trade: &crate::websocket::messages::WsTrade,
428 ts_init: UnixNanos,
429 ) {
430 let mut found_current = false;
431
432 for event in events {
433 let is_current_event = std::ptr::eq(event, current_event);
434
435 for trade in &event.trades {
436 if !found_current {
437 if is_current_event && std::ptr::eq(trade, current_trade) {
438 found_current = true;
439 }
440 continue;
441 }
442
443 let instrument_id = self.resolve_instrument_id(&trade.product_id);
444
445 if let Some(instrument) = self.instruments.get(&instrument_id)
446 && let Ok(tick) = parse_ws_trade(trade, instrument, ts_init)
447 {
448 self.buffer.push(NautilusWsMessage::Trade(tick));
449 }
450 }
451 }
452 }
453
454 fn handle_ticker(
455 &mut self,
456 events: &[crate::websocket::messages::WsTickerEvent],
457 timestamp: &str,
458 ts_init: UnixNanos,
459 ) -> Option<NautilusWsMessage> {
460 let ts_event = crate::http::parse::parse_rfc3339_timestamp(timestamp).unwrap_or(ts_init);
461
462 let mut first: Option<NautilusWsMessage> = None;
463
464 for event in events {
465 for ticker in &event.tickers {
466 let instrument_id = self.resolve_instrument_id(&ticker.product_id);
467
468 let instrument = match self.instruments.get(&instrument_id) {
469 Some(inst) => inst,
470 None => {
471 log::warn!("No instrument cached for {instrument_id}");
472 continue;
473 }
474 };
475
476 match parse_ws_ticker(ticker, instrument, ts_event, ts_init) {
477 Ok(quote) => {
478 let msg = NautilusWsMessage::Quote(quote);
479
480 if first.is_none() {
481 first = Some(msg);
482 } else {
483 self.buffer.push(msg);
484 }
485 }
486 Err(e) => log::warn!("Failed to parse ticker: {e}"),
487 }
488 }
489 }
490
491 if first.is_some() {
492 self.buffer.reverse();
493 }
494 first
495 }
496
497 fn handle_user_events(
498 &mut self,
499 events: &[crate::websocket::messages::WsUserEvent],
500 timestamp: &str,
501 ts_init: UnixNanos,
502 ) -> Option<NautilusWsMessage> {
503 let Some(account_id) = self.account_id else {
504 log::debug!(
505 "Dropping user event: account_id not set (call SetAccountId after connect)"
506 );
507 return None;
508 };
509
510 let ts_event = match crate::http::parse::parse_rfc3339_timestamp(timestamp) {
511 Ok(ts) => ts,
512 Err(e) => {
513 log::warn!("Failed to parse user message timestamp {timestamp}: {e}");
514 ts_init
515 }
516 };
517
518 let mut first: Option<NautilusWsMessage> = None;
519
520 for event in events {
521 let is_snapshot = matches!(event.event_type, WsEventType::Snapshot);
522
523 for order in &event.orders {
524 let instrument_id = self.resolve_instrument_id(&order.product_id);
525 let instrument = match self.instruments.get(&instrument_id).cloned() {
526 Some(inst) => inst,
527 None => {
528 log::warn!("No instrument cached for {instrument_id}");
529 continue;
530 }
531 };
532
533 self.emit_user_event_messages(
534 order,
535 &instrument,
536 account_id,
537 is_snapshot,
538 ts_event,
539 ts_init,
540 &mut first,
541 );
542 }
543 }
544
545 if first.is_some() {
546 self.buffer.reverse();
547 }
548 first
549 }
550
551 #[allow(clippy::too_many_arguments)]
552 fn emit_user_event_messages(
553 &mut self,
554 order: &WsOrderUpdate,
555 instrument: &InstrumentAny,
556 account_id: AccountId,
557 is_snapshot: bool,
558 ts_event: UnixNanos,
559 ts_init: UnixNanos,
560 first: &mut Option<NautilusWsMessage>,
561 ) {
562 let report = match parse_ws_user_event_to_order_status_report(
563 order, instrument, account_id, ts_event, ts_init,
564 ) {
565 Ok(r) => r,
566 Err(e) => {
567 log::warn!("Failed to parse user order update: {e}");
568 return;
569 }
570 };
571
572 let msg = NautilusWsMessage::UserOrder(Box::new(UserOrderUpdate {
573 report: Box::new(report),
574 update: Box::new(order.clone()),
575 instrument: instrument.clone(),
576 is_snapshot,
577 ts_event,
578 ts_init,
579 }));
580
581 if first.is_none() {
582 *first = Some(msg);
583 } else {
584 self.buffer.push(msg);
585 }
586 }
587
588 fn handle_futures_balance_summary(
589 &mut self,
590 events: Vec<crate::websocket::messages::WsFuturesBalanceSummaryEvent>,
591 ) -> Option<NautilusWsMessage> {
592 let mut first: Option<NautilusWsMessage> = None;
593
594 for event in events {
595 let msg = NautilusWsMessage::FuturesBalanceSummary(Box::new(event.fcm_balance_summary));
596
597 if first.is_none() {
598 first = Some(msg);
599 } else {
600 self.buffer.push(msg);
601 }
602 }
603
604 if first.is_some() {
605 self.buffer.reverse();
606 }
607 first
608 }
609
610 fn handle_candles(
611 &mut self,
612 events: &[crate::websocket::messages::WsCandlesEvent],
613 ts_init: UnixNanos,
614 ) -> Option<NautilusWsMessage> {
615 let mut first: Option<NautilusWsMessage> = None;
616
617 for event in events {
618 for candle in &event.candles {
619 let key = candle.product_id.as_str();
620
621 let bar_type = match self.bar_types.get(key) {
622 Some(bt) => *bt,
623 None => {
624 log::debug!("No bar type registered for {key}");
625 continue;
626 }
627 };
628
629 let instrument_id = self.resolve_instrument_id(&candle.product_id);
630
631 let instrument = match self.instruments.get(&instrument_id) {
632 Some(inst) => inst,
633 None => {
634 log::warn!("No instrument cached for {instrument_id}");
635 continue;
636 }
637 };
638
639 match parse_ws_candle(candle, bar_type, instrument, ts_init) {
640 Ok(bar) => {
641 let msg = NautilusWsMessage::Bar(bar);
642
643 if first.is_none() {
644 first = Some(msg);
645 } else {
646 self.buffer.push(msg);
647 }
648 }
649 Err(e) => log::warn!("Failed to parse candle: {e}"),
650 }
651 }
652 }
653
654 if first.is_some() {
655 self.buffer.reverse();
656 }
657 first
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use std::sync::{Arc, atomic::AtomicBool};
664
665 use nautilus_model::{
666 identifiers::{Symbol, Venue},
667 instruments::CurrencyPair,
668 types::{Currency, Price, Quantity},
669 };
670 use rstest::rstest;
671
672 use super::*;
673 use crate::common::testing::load_test_fixture;
674
675 fn test_handler() -> FeedHandler {
676 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
677 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
678 FeedHandler::new(
679 Arc::new(AtomicBool::new(false)),
680 cmd_rx,
681 raw_rx,
682 Arc::new(AtomicMap::new()),
683 )
684 }
685
686 fn btc_usd_instrument() -> InstrumentAny {
687 let instrument_id =
688 InstrumentId::new(Symbol::new("BTC-USD"), Venue::new(Ustr::from("COINBASE")));
689 InstrumentAny::CurrencyPair(CurrencyPair::new(
690 instrument_id,
691 Symbol::new("BTC-USD"),
692 Currency::get_or_create_crypto("BTC"),
693 Currency::get_or_create_crypto("USD"),
694 2,
695 8,
696 Price::from("0.01"),
697 Quantity::from("0.00000001"),
698 None,
699 None,
700 None,
701 Some(Quantity::from("0.00000001")),
702 None,
703 None,
704 None,
705 None,
706 None,
707 None,
708 None,
709 None,
710 None,
711 UnixNanos::default(),
712 UnixNanos::default(),
713 ))
714 }
715
716 #[rstest]
717 fn test_handle_text_drops_user_channel_when_account_id_unset() {
718 let json = load_test_fixture("ws_user.json");
719 let mut handler = test_handler();
720
721 assert!(handler.handle_text(&json).is_none());
723 assert!(handler.buffer.is_empty());
724 }
725
726 #[rstest]
727 fn test_handle_user_event_emits_user_order_update() {
728 use nautilus_model::{
729 enums::{OrderSide, OrderStatus},
730 identifiers::AccountId,
731 types::Quantity,
732 };
733
734 use crate::common::enums::CoinbaseProductType;
735
736 let json = load_test_fixture("ws_user.json");
737 let mut handler = test_handler();
738 handler.set_account_id(AccountId::new("COINBASE-001"));
739 handler
740 .instruments
741 .insert(btc_usd_instrument().id(), btc_usd_instrument());
742
743 let msg = handler
744 .handle_text(&json)
745 .expect("handler should emit a user-channel update");
746
747 match msg {
748 NautilusWsMessage::UserOrder(carrier) => {
749 assert_eq!(carrier.report.account_id.as_str(), "COINBASE-001");
751 assert_eq!(carrier.report.instrument_id, btc_usd_instrument().id());
752 assert_eq!(
753 carrier.report.venue_order_id.as_str(),
754 "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
755 );
756 assert_eq!(
757 carrier.report.client_order_id.unwrap().as_str(),
758 "11111-000000-000001"
759 );
760 assert_eq!(carrier.report.order_side, OrderSide::Buy);
761 assert_eq!(carrier.report.order_status, OrderStatus::Accepted);
762 assert_eq!(carrier.report.filled_qty, Quantity::from("0.00000000"));
763 assert_eq!(carrier.report.quantity, Quantity::from("0.00100000"));
764
765 assert_eq!(carrier.update.product_id, "BTC-USD");
767 assert_eq!(carrier.update.product_type, CoinbaseProductType::Spot);
768 assert_eq!(carrier.update.cumulative_quantity, "0");
769 assert_eq!(carrier.update.leaves_quantity, "0.001");
770
771 assert_eq!(carrier.instrument.id(), btc_usd_instrument().id());
773 assert!(carrier.ts_event.as_u64() > 0);
774 }
775 other => panic!("expected UserOrder, was {other:?}"),
776 }
777 }
778
779 #[rstest]
780 fn test_handle_text_ignores_status_channel() {
781 let json = r#"{
782 "channel": "status",
783 "client_id": "",
784 "timestamp": "2023-02-09T20:29:49.753424311Z",
785 "sequence_num": 0,
786 "events": [
787 {
788 "type": "snapshot",
789 "products": [
790 {
791 "product_type": "SPOT",
792 "id": "BTC-USD",
793 "base_currency": "BTC",
794 "quote_currency": "USD",
795 "base_increment": "0.00000001",
796 "quote_increment": "0.01",
797 "display_name": "BTC/USD",
798 "status": "online",
799 "status_message": "",
800 "min_market_funds": "1"
801 }
802 ]
803 }
804 ]
805 }"#;
806 let mut handler = test_handler();
807
808 assert!(handler.handle_text(json).is_none());
809 assert!(handler.buffer.is_empty());
810 }
811
812 #[rstest]
813 fn test_handle_l2_update_uses_batch_timestamp_for_all_deltas() {
814 let json = load_test_fixture("ws_l2_data_update.json");
815 let mut handler = test_handler();
816 handler
817 .instruments
818 .insert(btc_usd_instrument().id(), btc_usd_instrument());
819
820 let msg = handler
821 .handle_text(&json)
822 .expect("handler should emit deltas for a valid L2 update");
823
824 let deltas = match msg {
825 NautilusWsMessage::Deltas(d) => d,
826 other => panic!("expected Deltas, was {other:?}"),
827 };
828
829 assert!(!deltas.deltas.is_empty());
830 let expected_ts = deltas.deltas[0].ts_event;
831 for delta in &deltas.deltas {
832 assert_eq!(
833 delta.ts_event, expected_ts,
834 "all deltas in a batch must share ts_event"
835 );
836 }
837 }
838
839 #[rstest]
840 fn test_handle_l2_update_malformed_timestamp_falls_back_to_ts_init() {
841 let json = load_test_fixture("ws_l2_data_update.json")
842 .replace("2026-04-07T14:30:01.456789Z", "not-a-valid-timestamp");
843 let mut handler = test_handler();
844 handler
845 .instruments
846 .insert(btc_usd_instrument().id(), btc_usd_instrument());
847
848 let msg = handler
849 .handle_text(&json)
850 .expect("handler should still emit deltas when timestamp is malformed");
851
852 let deltas = match msg {
853 NautilusWsMessage::Deltas(d) => d,
854 other => panic!("expected Deltas, was {other:?}"),
855 };
856
857 assert!(!deltas.deltas.is_empty());
858 for delta in &deltas.deltas {
859 assert_eq!(
860 delta.ts_event, delta.ts_init,
861 "malformed timestamp must fall back to ts_init"
862 );
863 }
864 }
865
866 #[rstest]
867 fn test_handle_text_emits_futures_balance_summary_snapshot() {
868 use rust_decimal::Decimal;
869
870 let json = r#"{
871 "channel": "futures_balance_summary",
872 "client_id": "",
873 "timestamp": "2023-02-09T20:33:57.609931463Z",
874 "sequence_num": 0,
875 "events": [
876 {
877 "type": "snapshot",
878 "fcm_balance_summary": {
879 "futures_buying_power": "100.00",
880 "total_usd_balance": "200.00",
881 "cbi_usd_balance": "300.00",
882 "cfm_usd_balance": "400.00",
883 "total_open_orders_hold_amount": "500.00",
884 "unrealized_pnl": "600.00",
885 "daily_realized_pnl": "0",
886 "initial_margin": "700.00",
887 "available_margin": "800.00",
888 "liquidation_threshold": "900.00",
889 "liquidation_buffer_amount": "1000.00",
890 "liquidation_buffer_percentage": "1000",
891 "intraday_margin_window_measure": {
892 "margin_window_type": "FCM_MARGIN_WINDOW_TYPE_INTRADAY",
893 "margin_level": "MARGIN_LEVEL_TYPE_BASE",
894 "initial_margin": "100.00",
895 "maintenance_margin": "200.00",
896 "liquidation_buffer_percentage": "1000",
897 "total_hold": "100.00",
898 "futures_buying_power": "400.00"
899 },
900 "overnight_margin_window_measure": {
901 "margin_window_type": "FCM_MARGIN_WINDOW_TYPE_OVERNIGHT",
902 "margin_level": "MARGIN_LEVEL_TYPE_BASE",
903 "initial_margin": "300.00",
904 "maintenance_margin": "200.00",
905 "liquidation_buffer_percentage": "1000",
906 "total_hold": "-30.00",
907 "futures_buying_power": "2000.00"
908 }
909 }
910 }
911 ]
912 }"#;
913 let mut handler = test_handler();
914
915 let msg = handler
916 .handle_text(json)
917 .expect("handler should emit a futures balance summary");
918 match msg {
919 NautilusWsMessage::FuturesBalanceSummary(summary) => {
920 assert_eq!(summary.futures_buying_power, Decimal::from(100));
921 assert_eq!(summary.total_usd_balance, Decimal::from(200));
922 assert_eq!(summary.total_open_orders_hold_amount, Decimal::from(500));
923 assert_eq!(summary.available_margin, Decimal::from(800));
924 let intraday = &summary.intraday_margin_window_measure;
925 assert_eq!(intraday.initial_margin, Decimal::from(100));
926 assert_eq!(intraday.maintenance_margin, Decimal::from(200));
927 let overnight = &summary.overnight_margin_window_measure;
928 assert_eq!(overnight.initial_margin, Decimal::from(300));
929 assert_eq!(overnight.maintenance_margin, Decimal::from(200));
930 assert_eq!(overnight.total_hold, "-30".parse::<Decimal>().unwrap());
933 }
934 other => panic!("expected FuturesBalanceSummary, was {other:?}"),
935 }
936 }
937
938 #[rstest]
939 fn test_handle_text_routes_reconnected_sentinel() {
940 let mut handler = test_handler();
941 let result = handler.handle_text(RECONNECTED);
942 assert!(matches!(result, Some(NautilusWsMessage::Reconnected)));
943 }
944
945 #[rstest]
946 fn test_signal_release_acquire_exits_handler_loop() {
947 use std::sync::atomic::Ordering;
948
949 let signal = Arc::new(AtomicBool::new(false));
950 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
951 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
952 let mut handler =
953 FeedHandler::new(signal.clone(), cmd_rx, raw_rx, Arc::new(AtomicMap::new()));
954
955 signal.store(true, Ordering::Release);
956
957 let runtime = tokio::runtime::Builder::new_current_thread()
958 .enable_all()
959 .build()
960 .unwrap();
961 let result = runtime.block_on(async { handler.next().await });
962 assert!(result.is_none(), "{result:?}");
963 }
964}