Skip to main content

nautilus_tardis/machine/
message.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Deserializer, Serialize, de::Error};
18use ustr::Ustr;
19
20use crate::common::{enums::TardisExchange, parse::deserialize_uppercase};
21
22/// Represents a single level in the order book (bid or ask).
23#[derive(Debug, Clone, Deserialize, Serialize)]
24pub struct BookLevel {
25    /// The price at this level.
26    pub price: f64,
27    /// The amount at this level.
28    pub amount: f64,
29}
30
31/// Represents a Tardis WebSocket message for book changes.
32#[derive(Debug, Clone, Deserialize, Serialize)]
33#[serde(rename_all = "camelCase")]
34pub struct BookChangeMsg {
35    /// The symbol as provided by the exchange.
36    #[serde(deserialize_with = "deserialize_uppercase")]
37    pub symbol: Ustr,
38    /// The exchange ID.
39    pub exchange: TardisExchange,
40    /// Indicates whether this is an initial order book snapshot.
41    pub is_snapshot: bool,
42    /// Updated bids, with price and amount levels.
43    #[serde(deserialize_with = "deserialize_book_levels")]
44    pub bids: Vec<BookLevel>,
45    /// Updated asks, with price and amount levels.
46    #[serde(deserialize_with = "deserialize_book_levels")]
47    pub asks: Vec<BookLevel>,
48    /// The order book update timestamp provided by the exchange (ISO 8601 format).
49    pub timestamp: DateTime<Utc>,
50    /// The local timestamp when the message was received.
51    pub local_timestamp: DateTime<Utc>,
52}
53
54/// Represents a Tardis WebSocket message for book snapshots.
55#[derive(Debug, Clone, Deserialize, Serialize)]
56#[serde(rename_all = "camelCase")]
57pub struct BookSnapshotMsg {
58    /// The symbol as provided by the exchange.
59    #[serde(deserialize_with = "deserialize_uppercase")]
60    pub symbol: Ustr,
61    /// The exchange ID.
62    pub exchange: TardisExchange,
63    /// The name of the snapshot, e.g., `book_snapshot_{depth}_{interval}{time_unit}`.
64    pub name: String,
65    /// The requested number of levels (top bids/asks).
66    pub depth: u32,
67    /// The requested snapshot interval in milliseconds.
68    pub interval: u32,
69    /// The top bids price-amount levels.
70    #[serde(deserialize_with = "deserialize_book_levels")]
71    pub bids: Vec<BookLevel>,
72    /// The top asks price-amount levels.
73    #[serde(deserialize_with = "deserialize_book_levels")]
74    pub asks: Vec<BookLevel>,
75    /// The snapshot timestamp based on the last book change message processed timestamp.
76    pub timestamp: DateTime<Utc>,
77    /// The local timestamp when the message was received.
78    pub local_timestamp: DateTime<Utc>,
79}
80
81/// Represents a Tardis WebSocket message for trades.
82#[derive(Debug, Clone, Deserialize, Serialize)]
83#[serde(tag = "type")]
84#[serde(rename_all = "camelCase")]
85pub struct TradeMsg {
86    /// The symbol as provided by the exchange.
87    #[serde(deserialize_with = "deserialize_uppercase")]
88    pub symbol: Ustr,
89    /// The exchange ID.
90    pub exchange: TardisExchange,
91    /// The trade ID provided by the exchange (optional).
92    pub id: Option<String>,
93    /// The trade price as provided by the exchange.
94    pub price: f64,
95    /// The trade amount as provided by the exchange.
96    pub amount: f64,
97    /// The liquidity taker side (aggressor) for the trade.
98    pub side: String,
99    /// The trade timestamp provided by the exchange.
100    pub timestamp: DateTime<Utc>,
101    /// The local timestamp when the message was received.
102    pub local_timestamp: DateTime<Utc>,
103}
104
105/// Derivative instrument ticker info sourced from real-time ticker & instrument channels.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub struct DerivativeTickerMsg {
109    /// The symbol as provided by the exchange.
110    #[serde(deserialize_with = "deserialize_uppercase")]
111    pub symbol: Ustr,
112    /// The exchange ID.
113    pub exchange: TardisExchange,
114    /// The last instrument price if provided by exchange.
115    pub last_price: Option<f64>,
116    /// The last open interest if provided by exchange.
117    pub open_interest: Option<f64>,
118    /// The last funding rate if provided by exchange.
119    pub funding_rate: Option<f64>,
120    /// The last index price if provided by exchange.
121    pub index_price: Option<f64>,
122    /// The last mark price if provided by exchange.
123    pub mark_price: Option<f64>,
124    /// The message timestamp provided by exchange.
125    pub timestamp: DateTime<Utc>,
126    /// The local timestamp when the message was received.
127    pub local_timestamp: DateTime<Utc>,
128}
129
130/// Trades data in aggregated form, known as OHLC, candlesticks, klines etc. Not only most common
131/// time based aggregation is supported, but volume and tick count based as well. Bars are computed
132/// from tick-by-tick raw trade data, if in given interval no trades happened, there is no bar produced.
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(rename_all = "camelCase")]
135pub struct BarMsg {
136    /// The symbol as provided by the exchange.
137    #[serde(deserialize_with = "deserialize_uppercase")]
138    pub symbol: Ustr,
139    /// The exchange ID.
140    pub exchange: TardisExchange,
141    /// name with format `trade_bar`_{interval}
142    pub name: String,
143    /// The requested trade bar interval.
144    pub interval: u64,
145    /// The open price.
146    pub open: f64,
147    /// The high price.
148    pub high: f64,
149    /// The low price.
150    pub low: f64,
151    /// The close price.
152    pub close: f64,
153    /// The total volume traded in given interval.
154    pub volume: f64,
155    /// The buy volume traded in given interval.
156    pub buy_volume: f64,
157    /// The sell volume traded in given interval.
158    pub sell_volume: f64,
159    /// The trades count in given interval.
160    pub trades: u64,
161    /// The volume weighted average price.
162    pub vwap: f64,
163    /// The timestamp of first trade for given bar.
164    pub open_timestamp: DateTime<Utc>,
165    /// The timestamp of last trade for given bar.
166    pub close_timestamp: DateTime<Utc>,
167    /// The end of interval period timestamp.
168    pub timestamp: DateTime<Utc>,
169    /// The message arrival timestamp that triggered given bar computation.
170    pub local_timestamp: DateTime<Utc>,
171}
172
173/// Message that marks events when real-time WebSocket connection that was used to collect the
174/// historical data got disconnected.
175#[derive(Debug, Clone, Serialize, Deserialize)]
176#[serde(rename_all = "camelCase")]
177pub struct DisconnectMsg {
178    /// The exchange ID.
179    pub exchange: TardisExchange,
180    /// The message arrival timestamp that triggered given bar computation (ISO 8601 format).
181    pub local_timestamp: DateTime<Utc>,
182}
183
184/// A Tardis Machine Server message type.
185#[allow(missing_docs)]
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[serde(rename_all = "snake_case", tag = "type")]
188pub enum WsMessage {
189    BookChange(BookChangeMsg),
190    BookSnapshot(BookSnapshotMsg),
191    Trade(TradeMsg),
192    TradeBar(BarMsg),
193    DerivativeTicker(DerivativeTickerMsg),
194    Disconnect(DisconnectMsg),
195}
196
197#[derive(Debug, Deserialize)]
198struct RawBookLevel {
199    price: Option<f64>,
200    amount: Option<f64>,
201}
202
203fn deserialize_book_levels<'de, D>(deserializer: D) -> Result<Vec<BookLevel>, D::Error>
204where
205    D: Deserializer<'de>,
206{
207    Vec::<RawBookLevel>::deserialize(deserializer)?
208        .into_iter()
209        .filter_map(|level| match (level.price, level.amount) {
210            (Some(price), Some(amount)) => Some(Ok(BookLevel { price, amount })),
211            (None, None) => None,
212            (None, Some(_)) => Some(Err(D::Error::custom("book level missing price"))),
213            (Some(_), None) => Some(Err(D::Error::custom("book level missing amount"))),
214        })
215        .collect()
216}
217
218#[cfg(test)]
219mod tests {
220    use rstest::rstest;
221
222    use super::*;
223    use crate::common::testing::load_test_json;
224
225    #[rstest]
226    fn test_parse_book_change_message() {
227        let json_data = load_test_json("book_change.json");
228        let message: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
229
230        assert_eq!(message.symbol, "XBTUSD");
231        assert_eq!(message.exchange, TardisExchange::Bitmex);
232        assert!(!message.is_snapshot);
233        assert!(message.bids.is_empty());
234        assert_eq!(message.asks.len(), 1);
235        assert_eq!(message.asks[0].price, 7_985.0);
236        assert_eq!(message.asks[0].amount, 283_318.0);
237        assert_eq!(
238            message.timestamp,
239            DateTime::parse_from_rfc3339("2019-10-23T11:29:53.469Z").unwrap()
240        );
241        assert_eq!(
242            message.local_timestamp,
243            DateTime::parse_from_rfc3339("2019-10-23T11:29:53.469Z").unwrap()
244        );
245    }
246
247    #[rstest]
248    fn test_parse_book_change_message_skips_empty_book_levels() {
249        let json_data = r#"{
250            "type": "book_change",
251            "symbol": "XBTUSD",
252            "exchange": "bitmex",
253            "isSnapshot": false,
254            "bids": [{"price": 7984, "amount": 100}, {}],
255            "asks": [{}],
256            "timestamp": "2019-10-23T11:29:53.469Z",
257            "localTimestamp": "2019-10-23T11:29:53.469Z"
258        }"#;
259
260        let message: BookChangeMsg = serde_json::from_str(json_data).unwrap();
261
262        assert_eq!(message.bids.len(), 1);
263        assert!(message.asks.is_empty());
264        assert_eq!(message.bids[0].price, 7_984.0);
265        assert_eq!(message.bids[0].amount, 100.0);
266    }
267
268    #[rstest]
269    #[case(r#"[{"price": 7984}]"#, "book level missing amount")]
270    #[case(r#"[{"amount": 100}]"#, "book level missing price")]
271    fn test_parse_book_change_message_rejects_partial_book_level(
272        #[case] bids: &str,
273        #[case] error_message: &str,
274    ) {
275        let json_data = format!(
276            r#"{{
277                "type": "book_change",
278                "symbol": "XBTUSD",
279                "exchange": "bitmex",
280                "isSnapshot": false,
281                "bids": {bids},
282                "asks": [],
283                "timestamp": "2019-10-23T11:29:53.469Z",
284                "localTimestamp": "2019-10-23T11:29:53.469Z"
285            }}"#
286        );
287
288        let error = serde_json::from_str::<BookChangeMsg>(&json_data).unwrap_err();
289
290        assert!(error.to_string().contains(error_message));
291    }
292
293    #[rstest]
294    fn test_parse_book_snapshot_message() {
295        let json_data = load_test_json("book_snapshot.json");
296        let message: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
297
298        assert_eq!(message.symbol, "XBTUSD");
299        assert_eq!(message.exchange, TardisExchange::Bitmex);
300        assert_eq!(message.name, "book_snapshot_2_50ms");
301        assert_eq!(message.depth, 2);
302        assert_eq!(message.interval, 50);
303        assert_eq!(message.bids.len(), 2);
304        assert_eq!(message.asks.len(), 2);
305        assert_eq!(message.bids[0].price, 7_633.5);
306        assert_eq!(message.bids[0].amount, 1_906_067.0);
307        assert_eq!(message.asks[0].price, 7_634.0);
308        assert_eq!(message.asks[0].amount, 1_467_849.0);
309        assert_eq!(
310            message.timestamp,
311            DateTime::parse_from_rfc3339("2019-10-25T13:39:46.950Z").unwrap(),
312        );
313        assert_eq!(
314            message.local_timestamp,
315            DateTime::parse_from_rfc3339("2019-10-25T13:39:46.961Z").unwrap()
316        );
317    }
318
319    #[rstest]
320    fn test_parse_book_snapshot_message_skips_empty_book_levels() {
321        let json_data = r#"{
322            "type": "book_snapshot",
323            "symbol": "ETC",
324            "exchange": "hyperliquid",
325            "name": "book_snapshot_20_10s",
326            "depth": 20,
327            "interval": 10000,
328            "bids": [{"price": 20.002, "amount": 5.81}],
329            "asks": [{"price": 20.003, "amount": 162.45}, {}],
330            "timestamp": "2025-03-03T10:48:10.000Z",
331            "localTimestamp": "2025-03-03T10:48:10.596818Z"
332        }"#;
333
334        let message: BookSnapshotMsg = serde_json::from_str(json_data).unwrap();
335
336        assert_eq!(message.symbol, "ETC");
337        assert_eq!(message.exchange, TardisExchange::Hyperliquid);
338        assert_eq!(message.bids.len(), 1);
339        assert_eq!(message.asks.len(), 1);
340        assert_eq!(message.asks[0].price, 20.003);
341        assert_eq!(message.asks[0].amount, 162.45);
342    }
343
344    #[rstest]
345    fn test_parse_book_snapshot_message_rejects_partial_book_level() {
346        let json_data = r#"{
347            "type": "book_snapshot",
348            "symbol": "ETC",
349            "exchange": "hyperliquid",
350            "name": "book_snapshot_20_10s",
351            "depth": 20,
352            "interval": 10000,
353            "bids": [{"price": 20.002}],
354            "asks": [],
355            "timestamp": "2025-03-03T10:48:10.000Z",
356            "localTimestamp": "2025-03-03T10:48:10.596818Z"
357        }"#;
358
359        let error = serde_json::from_str::<BookSnapshotMsg>(json_data).unwrap_err();
360
361        assert!(error.to_string().contains("book level missing amount"));
362    }
363
364    #[rstest]
365    fn test_parse_trade_message() {
366        let json_data = load_test_json("trade.json");
367        let message: TradeMsg = serde_json::from_str(&json_data).unwrap();
368
369        assert_eq!(message.symbol, "XBTUSD");
370        assert_eq!(message.exchange, TardisExchange::Bitmex);
371        assert_eq!(
372            message.id,
373            Some("282a0445-0e3a-abeb-f403-11003204ea1b".to_string())
374        );
375        assert_eq!(message.price, 7_996.0);
376        assert_eq!(message.amount, 50.0);
377        assert_eq!(message.side, "sell");
378        assert_eq!(
379            message.timestamp,
380            DateTime::parse_from_rfc3339("2019-10-23T10:32:49.669Z").unwrap()
381        );
382        assert_eq!(
383            message.local_timestamp,
384            DateTime::parse_from_rfc3339("2019-10-23T10:32:49.740Z").unwrap()
385        );
386    }
387
388    #[rstest]
389    fn test_parse_derivative_ticker_message() {
390        let json_data = load_test_json("derivative_ticker.json");
391        let message: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
392
393        assert_eq!(message.symbol, "BTC-PERPETUAL");
394        assert_eq!(message.exchange, TardisExchange::Deribit);
395        assert_eq!(message.last_price, Some(7_987.5));
396        assert_eq!(message.open_interest, Some(84_129_491.0));
397        assert_eq!(message.funding_rate, Some(-0.00001568));
398        assert_eq!(message.index_price, Some(7_989.28));
399        assert_eq!(message.mark_price, Some(7_987.56));
400        assert_eq!(
401            message.timestamp,
402            DateTime::parse_from_rfc3339("2019-10-23T11:34:29.302Z").unwrap()
403        );
404        assert_eq!(
405            message.local_timestamp,
406            DateTime::parse_from_rfc3339("2019-10-23T11:34:29.416Z").unwrap()
407        );
408    }
409
410    #[rstest]
411    fn test_parse_bar_message() {
412        let json_data = load_test_json("bar.json");
413        let message: BarMsg = serde_json::from_str(&json_data).unwrap();
414
415        assert_eq!(message.symbol, "XBTUSD");
416        assert_eq!(message.exchange, TardisExchange::Bitmex);
417        assert_eq!(message.name, "trade_bar_10000ms");
418        assert_eq!(message.interval, 10_000);
419        assert_eq!(message.open, 7_623.5);
420        assert_eq!(message.high, 7_623.5);
421        assert_eq!(message.low, 7_623.0);
422        assert_eq!(message.close, 7_623.5);
423        assert_eq!(message.volume, 37_034.0);
424        assert_eq!(message.buy_volume, 24_244.0);
425        assert_eq!(message.sell_volume, 12_790.0);
426        assert_eq!(message.trades, 9);
427        assert_eq!(message.vwap, 7_623.327320840309);
428        assert_eq!(
429            message.open_timestamp,
430            DateTime::parse_from_rfc3339("2019-10-25T13:11:31.574Z").unwrap()
431        );
432        assert_eq!(
433            message.close_timestamp,
434            DateTime::parse_from_rfc3339("2019-10-25T13:11:39.212Z").unwrap()
435        );
436        assert_eq!(
437            message.local_timestamp,
438            DateTime::parse_from_rfc3339("2019-10-25T13:11:40.369Z").unwrap()
439        );
440        assert_eq!(
441            message.timestamp,
442            DateTime::parse_from_rfc3339("2019-10-25T13:11:40.000Z").unwrap()
443        );
444    }
445
446    #[rstest]
447    fn test_parse_disconnect_message() {
448        let json_data = load_test_json("disconnect.json");
449        let message: DisconnectMsg = serde_json::from_str(&json_data).unwrap();
450
451        assert_eq!(message.exchange, TardisExchange::Deribit);
452        assert_eq!(
453            message.local_timestamp,
454            DateTime::parse_from_rfc3339("2019-10-23T11:34:29.416Z").unwrap()
455        );
456    }
457}