1use std::{collections::VecDeque, ops::ControlFlow, pin::Pin, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21 cache::database::{CacheDatabaseAdapter, CacheMap},
22 live::get_runtime,
23 logging::{log_task_awaiting, log_task_started, log_task_stopped},
24 signal::Signal,
25};
26use nautilus_core::UnixNanos;
27use nautilus_model::{
28 accounts::AccountAny,
29 data::{Bar, CustomData, DataType, FundingRateUpdate, QuoteTick, TradeTick},
30 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
31 identifiers::{
32 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
33 VenueOrderId,
34 },
35 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
36 orderbook::OrderBook,
37 orders::{Order, OrderAny},
38 position::Position,
39 types::Currency,
40};
41use sqlx::{PgPool, postgres::PgConnectOptions};
42use tokio::{time::Instant, try_join};
43use ustr::Ustr;
44
45use crate::sql::{
46 pg::{connect_pg, get_postgres_connect_options},
47 queries::DatabaseQueries,
48};
49
50const CACHE_PROCESS: &str = "cache-process";
52
53#[derive(Debug)]
54#[cfg_attr(
55 feature = "python",
56 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
57)]
58pub struct PostgresCacheDatabase {
59 pub pool: PgPool,
60 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
61 handle: tokio::task::JoinHandle<()>,
62}
63
64#[allow(
65 clippy::large_enum_variant,
66 reason = "variant sizes vary with feature unification; allow stays silent when the lint does not fire"
67)]
68#[derive(Debug, Clone)]
69pub enum DatabaseQuery {
70 Close,
71 Add(String, Vec<u8>),
72 AddCurrency(Currency),
73 AddInstrument(InstrumentAny),
74 AddOrder(OrderAny, Option<ClientId>, bool),
75 AddOrderSnapshot(OrderSnapshot),
76 AddPositionSnapshot(PositionSnapshot),
77 AddAccount(AccountAny, bool),
78 AddSignal(Signal),
79 AddCustom(CustomData),
80 AddQuote(QuoteTick),
81 AddTrade(TradeTick),
82 AddBar(Bar),
83 UpdateOrder(OrderEventAny),
84}
85
86impl PostgresCacheDatabase {
87 pub async fn connect(
97 host: Option<String>,
98 port: Option<u16>,
99 username: Option<String>,
100 password: Option<String>,
101 database: Option<String>,
102 ) -> Result<Self, sqlx::Error> {
103 let pg_connect_options =
104 get_postgres_connect_options(host, port, username, password, database);
105 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
106 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
107
108 let handle = tokio::spawn(async move {
110 Self::process_commands(rx, pg_connect_options.clone().into()).await;
111 });
112 Ok(Self { pool, tx, handle })
113 }
114
115 async fn process_commands(
116 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
117 pg_connect_options: PgConnectOptions,
118 ) {
119 log_task_started(CACHE_PROCESS);
120
121 let pool = connect_pg(pg_connect_options).await.unwrap();
122
123 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
125
126 let buffer_interval = Duration::from_millis(0);
128
129 let flush_timer = tokio::time::sleep(buffer_interval);
133 tokio::pin!(flush_timer);
134
135 loop {
137 tokio::select! {
138 maybe_msg = rx.recv() => {
139 let result = handle_query(
140 maybe_msg,
141 &mut buffer,
142 buffer_interval,
143 &pool,
144 ).await;
145
146 if result.is_break() {
147 break;
148 }
149 }
150 () = &mut flush_timer, if !buffer_interval.is_zero() => {
151 flush_buffer(&mut buffer, &pool, &mut flush_timer, buffer_interval).await;
152 }
153 }
154 }
155
156 if !buffer.is_empty() {
157 drain_buffer(&pool, &mut buffer).await;
158 }
159
160 log_task_stopped(CACHE_PROCESS);
161 }
162}
163
164async fn handle_query(
165 maybe_msg: Option<DatabaseQuery>,
166 buffer: &mut VecDeque<DatabaseQuery>,
167 buffer_interval: Duration,
168 pool: &PgPool,
169) -> ControlFlow<()> {
170 let Some(msg) = maybe_msg else {
171 log::debug!("Command channel closed");
172 return ControlFlow::Break(());
173 };
174
175 log::debug!("Received {msg:?}");
176
177 if matches!(msg, DatabaseQuery::Close) {
178 if !buffer.is_empty() {
179 drain_buffer(pool, buffer).await;
180 }
181 return ControlFlow::Break(());
182 }
183
184 buffer.push_back(msg);
185
186 if buffer_interval.is_zero() {
187 drain_buffer(pool, buffer).await;
188 }
189
190 ControlFlow::Continue(())
191}
192
193async fn flush_buffer(
194 buffer: &mut VecDeque<DatabaseQuery>,
195 pool: &PgPool,
196 flush_timer: &mut Pin<&mut tokio::time::Sleep>,
197 buffer_interval: Duration,
198) {
199 if !buffer.is_empty() {
200 drain_buffer(pool, buffer).await;
201 }
202 flush_timer.as_mut().reset(Instant::now() + buffer_interval);
203}
204
205pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
211 let connect_options = get_postgres_connect_options(None, None, None, None, None);
212 Ok(PostgresCacheDatabase::connect(
213 Some(connect_options.host),
214 Some(connect_options.port),
215 Some(connect_options.username),
216 Some(connect_options.password),
217 Some(connect_options.database),
218 )
219 .await?)
220}
221
222#[allow(dead_code)]
223#[allow(unused)]
224#[async_trait::async_trait]
225impl CacheDatabaseAdapter for PostgresCacheDatabase {
226 fn close(&mut self) -> anyhow::Result<()> {
227 let pool = self.pool.clone();
228 let (tx, rx) = std::sync::mpsc::channel();
229
230 log::debug!("Closing connection pool");
231
232 tokio::task::block_in_place(|| {
233 get_runtime().block_on(async {
234 pool.close().await;
235
236 if let Err(e) = tx.send(()) {
237 log::error!("Error closing pool: {e:?}");
238 }
239 });
240 });
241
242 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
244 log::error!("Error sending close: {e:?}");
245 }
246
247 log_task_awaiting("cache-write");
248
249 tokio::task::block_in_place(|| {
250 if let Err(e) = get_runtime().block_on(&mut self.handle) {
251 log::error!("Error awaiting task 'cache-write': {e:?}");
252 }
253 });
254
255 log::debug!("Closed");
256
257 Ok(rx.recv()?)
258 }
259
260 fn flush(&mut self) -> anyhow::Result<()> {
261 let pool = self.pool.clone();
262 let (tx, rx) = std::sync::mpsc::channel();
263
264 tokio::task::block_in_place(|| {
265 get_runtime().block_on(async {
266 if let Err(e) = DatabaseQueries::truncate(&pool).await {
267 log::error!("Error flushing pool: {e:?}");
268 }
269
270 if let Err(e) = tx.send(()) {
271 log::error!("Error sending flush result: {e:?}");
272 }
273 });
274 });
275
276 Ok(rx.recv()?)
277 }
278
279 async fn load_all(&self) -> anyhow::Result<CacheMap> {
280 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
281 self.load_currencies(),
282 self.load_instruments(),
283 self.load_synthetics(),
284 self.load_accounts(),
285 self.load_orders(),
286 self.load_positions()
287 )
288 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
289
290 let greeks = AHashMap::new();
293 let yield_curves = AHashMap::new();
294
295 Ok(CacheMap {
296 currencies,
297 instruments,
298 synthetics,
299 accounts,
300 orders,
301 positions,
302 greeks,
303 yield_curves,
304 })
305 }
306
307 fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
308 let pool = self.pool.clone();
309 let (tx, rx) = std::sync::mpsc::channel();
310
311 tokio::spawn(async move {
312 let result = DatabaseQueries::load(&pool).await;
313 match result {
314 Ok(items) => {
315 let mapping = items
316 .into_iter()
317 .map(|(k, v)| (k, Bytes::from(v)))
318 .collect();
319
320 if let Err(e) = tx.send(mapping) {
321 log::error!("Failed to send general items: {e:?}");
322 }
323 }
324 Err(e) => {
325 log::error!("Failed to load general items: {e:?}");
326 if let Err(e) = tx.send(AHashMap::new()) {
327 log::error!("Failed to send empty general items: {e:?}");
328 }
329 }
330 }
331 });
332 Ok(rx.recv()?)
333 }
334
335 async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
336 let pool = self.pool.clone();
337 let (tx, rx) = std::sync::mpsc::channel();
338
339 tokio::spawn(async move {
340 let result = DatabaseQueries::load_currencies(&pool).await;
341 match result {
342 Ok(currencies) => {
343 let mapping = currencies
344 .into_iter()
345 .map(|currency| (currency.code, currency))
346 .collect();
347
348 if let Err(e) = tx.send(mapping) {
349 log::error!("Failed to send currencies: {e:?}");
350 }
351 }
352 Err(e) => {
353 log::error!("Failed to load currencies: {e:?}");
354 if let Err(e) = tx.send(AHashMap::new()) {
355 log::error!("Failed to send empty currencies: {e:?}");
356 }
357 }
358 }
359 });
360 Ok(rx.recv()?)
361 }
362
363 async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
364 let pool = self.pool.clone();
365 let (tx, rx) = std::sync::mpsc::channel();
366
367 tokio::spawn(async move {
368 let result = DatabaseQueries::load_instruments(&pool).await;
369 match result {
370 Ok(instruments) => {
371 let mapping = instruments
372 .into_iter()
373 .map(|instrument| (instrument.id(), instrument))
374 .collect();
375
376 if let Err(e) = tx.send(mapping) {
377 log::error!("Failed to send instruments: {e:?}");
378 }
379 }
380 Err(e) => {
381 log::error!("Failed to load instruments: {e:?}");
382 if let Err(e) = tx.send(AHashMap::new()) {
383 log::error!("Failed to send empty instruments: {e:?}");
384 }
385 }
386 }
387 });
388 Ok(rx.recv()?)
389 }
390
391 async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
392 todo!()
393 }
394
395 async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
396 let pool = self.pool.clone();
397 let (tx, rx) = std::sync::mpsc::channel();
398
399 tokio::spawn(async move {
400 let result = DatabaseQueries::load_accounts(&pool).await;
401 match result {
402 Ok(accounts) => {
403 let mapping = accounts
404 .into_iter()
405 .map(|account| (account.id(), account))
406 .collect();
407
408 if let Err(e) = tx.send(mapping) {
409 log::error!("Failed to send accounts: {e:?}");
410 }
411 }
412 Err(e) => {
413 log::error!("Failed to load accounts: {e:?}");
414 if let Err(e) = tx.send(AHashMap::new()) {
415 log::error!("Failed to send empty accounts: {e:?}");
416 }
417 }
418 }
419 });
420 Ok(rx.recv()?)
421 }
422
423 async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
424 let pool = self.pool.clone();
425 let (tx, rx) = std::sync::mpsc::channel();
426
427 tokio::spawn(async move {
428 let result = DatabaseQueries::load_orders(&pool).await;
429 match result {
430 Ok(orders) => {
431 let mapping = orders
432 .into_iter()
433 .map(|order| (order.client_order_id(), order))
434 .collect();
435
436 if let Err(e) = tx.send(mapping) {
437 log::error!("Failed to send orders: {e:?}");
438 }
439 }
440 Err(e) => {
441 log::error!("Failed to load orders: {e:?}");
442 if let Err(e) = tx.send(AHashMap::new()) {
443 log::error!("Failed to send empty orders: {e:?}");
444 }
445 }
446 }
447 });
448 Ok(rx.recv()?)
449 }
450
451 async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
452 todo!()
453 }
454
455 fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
456 todo!()
457 }
458
459 fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
460 let pool = self.pool.clone();
461 let (tx, rx) = std::sync::mpsc::channel();
462
463 tokio::spawn(async move {
464 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
465 match result {
466 Ok(currency) => {
467 if let Err(e) = tx.send(currency) {
468 log::error!("Failed to send load_index_order_client result: {e:?}");
469 }
470 }
471 Err(e) => {
472 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
473 if let Err(e) = tx.send(AHashMap::new()) {
474 log::error!("Failed to send empty load_index_order_client result: {e:?}");
475 }
476 }
477 }
478 });
479 Ok(rx.recv()?)
480 }
481
482 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
483 let pool = self.pool.clone();
484 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
486
487 tokio::spawn(async move {
488 let result = DatabaseQueries::load_currency(&pool, &code).await;
489 match result {
490 Ok(currency) => {
491 if let Err(e) = tx.send(currency) {
492 log::error!("Failed to send currency {code}: {e:?}");
493 }
494 }
495 Err(e) => {
496 log::error!("Failed to load currency {code}: {e:?}");
497 if let Err(e) = tx.send(None) {
498 log::error!("Failed to send None for currency {code}: {e:?}");
499 }
500 }
501 }
502 });
503 Ok(rx.recv()?)
504 }
505
506 async fn load_instrument(
507 &self,
508 instrument_id: &InstrumentId,
509 ) -> anyhow::Result<Option<InstrumentAny>> {
510 let pool = self.pool.clone();
511 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
513
514 tokio::spawn(async move {
515 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
516 match result {
517 Ok(instrument) => {
518 if let Err(e) = tx.send(instrument) {
519 log::error!("Failed to send instrument {instrument_id}: {e:?}");
520 }
521 }
522 Err(e) => {
523 log::error!("Failed to load instrument {instrument_id}: {e:?}");
524 if let Err(e) = tx.send(None) {
525 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
526 }
527 }
528 }
529 });
530 Ok(rx.recv()?)
531 }
532
533 async fn load_synthetic(
534 &self,
535 instrument_id: &InstrumentId,
536 ) -> anyhow::Result<Option<SyntheticInstrument>> {
537 todo!()
538 }
539
540 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
541 let pool = self.pool.clone();
542 let account_id = account_id.to_owned();
543 let (tx, rx) = std::sync::mpsc::channel();
544
545 tokio::spawn(async move {
546 let result = DatabaseQueries::load_account(&pool, &account_id).await;
547 match result {
548 Ok(account) => {
549 if let Err(e) = tx.send(account) {
550 log::error!("Failed to send account {account_id}: {e:?}");
551 }
552 }
553 Err(e) => {
554 log::error!("Failed to load account {account_id}: {e:?}");
555 if let Err(e) = tx.send(None) {
556 log::error!("Failed to send None for account {account_id}: {e:?}");
557 }
558 }
559 }
560 });
561 Ok(rx.recv()?)
562 }
563
564 async fn load_order(
565 &self,
566 client_order_id: &ClientOrderId,
567 ) -> anyhow::Result<Option<OrderAny>> {
568 let pool = self.pool.clone();
569 let client_order_id = client_order_id.to_owned();
570 let (tx, rx) = std::sync::mpsc::channel();
571
572 tokio::spawn(async move {
573 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
574 match result {
575 Ok(order) => {
576 if let Err(e) = tx.send(order) {
577 log::error!("Failed to send order {client_order_id}: {e:?}");
578 }
579 }
580 Err(e) => {
581 log::error!("Failed to load order {client_order_id}: {e:?}");
582 let _ = tx.send(None);
583 }
584 }
585 });
586 Ok(rx.recv()?)
587 }
588
589 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
590 todo!()
591 }
592
593 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
594 todo!()
595 }
596
597 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
598 todo!()
599 }
600
601 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
602 todo!()
603 }
604
605 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
606 todo!()
607 }
608
609 fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
610 anyhow::bail!(
611 "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
612 )
613 }
614
615 fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
616 anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
617 }
618
619 fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
620 anyhow::bail!(
621 "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
622 )
623 }
624
625 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
626 let query = DatabaseQuery::Add(key, value.into());
627 self.tx
628 .send(query)
629 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
630 }
631
632 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
633 let query = DatabaseQuery::AddCurrency(*currency);
634 self.tx.send(query).map_err(|e| {
635 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
636 })
637 }
638
639 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
640 let query = DatabaseQuery::AddInstrument(instrument.clone());
641 self.tx.send(query).map_err(|e| {
642 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
643 })
644 }
645
646 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
647 todo!()
648 }
649
650 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
651 let query = DatabaseQuery::AddAccount(account.clone(), false);
652 self.tx.send(query).map_err(|e| {
653 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
654 })
655 }
656
657 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
658 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
659 self.tx.send(query).map_err(|e| {
660 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
661 })
662 }
663
664 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
665 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
666 self.tx.send(query).map_err(|e| {
667 anyhow::anyhow!(
668 "Failed to send query add_order_snapshot to database message handler: {e}"
669 )
670 })
671 }
672
673 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
674 todo!()
675 }
676
677 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
678 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
679 self.tx.send(query).map_err(|e| {
680 anyhow::anyhow!(
681 "Failed to send query add_position_snapshot to database message handler: {e}"
682 )
683 })
684 }
685
686 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
687 todo!()
688 }
689
690 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
691 let query = DatabaseQuery::AddQuote(quote.to_owned());
692 self.tx.send(query).map_err(|e| {
693 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
694 })
695 }
696
697 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
698 let pool = self.pool.clone();
699 let instrument_id = instrument_id.to_owned();
700 let (tx, rx) = std::sync::mpsc::channel();
701
702 tokio::spawn(async move {
703 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
704 match result {
705 Ok(quotes) => {
706 if let Err(e) = tx.send(quotes) {
707 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
708 }
709 }
710 Err(e) => {
711 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
712 if let Err(e) = tx.send(Vec::new()) {
713 log::error!(
714 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
715 );
716 }
717 }
718 }
719 });
720 Ok(rx.recv()?)
721 }
722
723 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
724 let query = DatabaseQuery::AddTrade(trade.to_owned());
725 self.tx.send(query).map_err(|e| {
726 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
727 })
728 }
729
730 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
731 let pool = self.pool.clone();
732 let instrument_id = instrument_id.to_owned();
733 let (tx, rx) = std::sync::mpsc::channel();
734
735 tokio::spawn(async move {
736 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
737 match result {
738 Ok(trades) => {
739 if let Err(e) = tx.send(trades) {
740 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
741 }
742 }
743 Err(e) => {
744 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
745 if let Err(e) = tx.send(Vec::new()) {
746 log::error!(
747 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
748 );
749 }
750 }
751 }
752 });
753 Ok(rx.recv()?)
754 }
755
756 fn add_funding_rate(&self, _funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
757 anyhow::bail!("add_funding_rate not implemented for PostgreSQL cache adapter")
758 }
759
760 fn load_funding_rates(
761 &self,
762 _instrument_id: &InstrumentId,
763 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
764 anyhow::bail!("load_funding_rates not implemented for PostgreSQL cache adapter")
765 }
766
767 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
768 let query = DatabaseQuery::AddBar(bar.to_owned());
769 self.tx.send(query).map_err(|e| {
770 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
771 })
772 }
773
774 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
775 let pool = self.pool.clone();
776 let instrument_id = instrument_id.to_owned();
777 let (tx, rx) = std::sync::mpsc::channel();
778
779 tokio::spawn(async move {
780 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
781 match result {
782 Ok(bars) => {
783 if let Err(e) = tx.send(bars) {
784 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
785 }
786 }
787 Err(e) => {
788 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
789 if let Err(e) = tx.send(Vec::new()) {
790 log::error!(
791 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
792 );
793 }
794 }
795 }
796 });
797 Ok(rx.recv()?)
798 }
799
800 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
801 let query = DatabaseQuery::AddSignal(signal.to_owned());
802 self.tx.send(query).map_err(|e| {
803 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
804 })
805 }
806
807 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
808 let pool = self.pool.clone();
809 let name = name.to_owned();
810 let (tx, rx) = std::sync::mpsc::channel();
811
812 tokio::spawn(async move {
813 let result = DatabaseQueries::load_signals(&pool, &name).await;
814 match result {
815 Ok(signals) => {
816 if let Err(e) = tx.send(signals) {
817 log::error!("Failed to send signals for '{name}': {e:?}");
818 }
819 }
820 Err(e) => {
821 log::error!("Failed to load signals for '{name}': {e:?}");
822 if let Err(e) = tx.send(Vec::new()) {
823 log::error!("Failed to send empty signals for '{name}': {e:?}");
824 }
825 }
826 }
827 });
828 Ok(rx.recv()?)
829 }
830
831 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
832 let query = DatabaseQuery::AddCustom(data.to_owned());
833 self.tx.send(query).map_err(|e| {
834 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
835 })
836 }
837
838 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
839 let pool = self.pool.clone();
840 let data_type = data_type.to_owned();
841 let (tx, rx) = std::sync::mpsc::channel();
842
843 tokio::spawn(async move {
844 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
845 match result {
846 Ok(signals) => {
847 if let Err(e) = tx.send(signals) {
848 log::error!("Failed to send custom data for '{data_type}': {e:?}");
849 }
850 }
851 Err(e) => {
852 log::error!("Failed to load custom data for '{data_type}': {e:?}");
853 if let Err(e) = tx.send(Vec::new()) {
854 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
855 }
856 }
857 }
858 });
859 Ok(rx.recv()?)
860 }
861
862 fn load_order_snapshot(
863 &self,
864 client_order_id: &ClientOrderId,
865 ) -> anyhow::Result<Option<OrderSnapshot>> {
866 let pool = self.pool.clone();
867 let client_order_id = client_order_id.to_owned();
868 let (tx, rx) = std::sync::mpsc::channel();
869
870 tokio::spawn(async move {
871 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
872 match result {
873 Ok(snapshot) => {
874 if let Err(e) = tx.send(snapshot) {
875 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
876 }
877 }
878 Err(e) => {
879 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
880 if let Err(e) = tx.send(None) {
881 log::error!(
882 "Failed to send None for order snapshot {client_order_id}: {e:?}"
883 );
884 }
885 }
886 }
887 });
888 Ok(rx.recv()?)
889 }
890
891 fn load_position_snapshot(
892 &self,
893 position_id: &PositionId,
894 ) -> anyhow::Result<Option<PositionSnapshot>> {
895 let pool = self.pool.clone();
896 let position_id = position_id.to_owned();
897 let (tx, rx) = std::sync::mpsc::channel();
898
899 tokio::spawn(async move {
900 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
901 match result {
902 Ok(snapshot) => {
903 if let Err(e) = tx.send(snapshot) {
904 log::error!("Failed to send position snapshot {position_id}: {e:?}");
905 }
906 }
907 Err(e) => {
908 log::error!("Failed to load position snapshot {position_id}: {e:?}");
909 if let Err(e) = tx.send(None) {
910 log::error!(
911 "Failed to send None for position snapshot {position_id}: {e:?}"
912 );
913 }
914 }
915 }
916 });
917 Ok(rx.recv()?)
918 }
919
920 fn index_venue_order_id(
921 &self,
922 client_order_id: ClientOrderId,
923 venue_order_id: VenueOrderId,
924 ) -> anyhow::Result<()> {
925 todo!()
926 }
927
928 fn index_order_position(
929 &self,
930 client_order_id: ClientOrderId,
931 position_id: PositionId,
932 ) -> anyhow::Result<()> {
933 todo!()
934 }
935
936 fn update_actor(&self) -> anyhow::Result<()> {
937 todo!()
938 }
939
940 fn update_strategy(&self) -> anyhow::Result<()> {
941 todo!()
942 }
943
944 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
945 let query = DatabaseQuery::AddAccount(account.clone(), true);
946 self.tx.send(query).map_err(|e| {
947 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
948 })
949 }
950
951 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
952 let query = DatabaseQuery::UpdateOrder(event.clone());
953 self.tx.send(query).map_err(|e| {
954 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
955 })
956 }
957
958 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
959 todo!()
960 }
961
962 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
963 todo!()
964 }
965
966 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
967 todo!()
968 }
969
970 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
971 todo!()
972 }
973}
974
975async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
976 for cmd in buffer.drain(..) {
977 let result: anyhow::Result<()> = match cmd {
978 DatabaseQuery::Close => Ok(()),
979 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
980 DatabaseQuery::AddCurrency(currency) => {
981 DatabaseQueries::add_currency(pool, currency).await
982 }
983 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
984 InstrumentAny::Betting(instrument) => {
985 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
986 }
987 InstrumentAny::BinaryOption(instrument) => {
988 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
989 .await
990 }
991 InstrumentAny::CryptoFuture(instrument) => {
992 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
993 .await
994 }
995 InstrumentAny::CryptoOption(instrument) => {
996 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
997 .await
998 }
999 InstrumentAny::CryptoPerpetual(instrument) => {
1000 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
1001 .await
1002 }
1003 InstrumentAny::CurrencyPair(instrument) => {
1004 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
1005 .await
1006 }
1007 InstrumentAny::Equity(equity) => {
1008 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
1009 }
1010 InstrumentAny::FuturesContract(instrument) => {
1011 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
1012 .await
1013 }
1014 InstrumentAny::FuturesSpread(instrument) => {
1015 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
1016 .await
1017 }
1018 InstrumentAny::OptionContract(instrument) => {
1019 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
1020 .await
1021 }
1022 InstrumentAny::Commodity(instrument) => {
1023 DatabaseQueries::add_instrument(pool, "COMMODITY", Box::new(instrument)).await
1024 }
1025 InstrumentAny::IndexInstrument(instrument) => {
1026 DatabaseQueries::add_instrument(pool, "INDEX_INSTRUMENT", Box::new(instrument))
1027 .await
1028 }
1029 InstrumentAny::Cfd(instrument) => {
1030 DatabaseQueries::add_instrument(pool, "CFD", Box::new(instrument)).await
1031 }
1032 InstrumentAny::OptionSpread(instrument) => {
1033 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
1034 .await
1035 }
1036 InstrumentAny::PerpetualContract(instrument) => {
1037 DatabaseQueries::add_instrument(
1038 pool,
1039 "PERPETUAL_CONTRACT",
1040 Box::new(instrument),
1041 )
1042 .await
1043 }
1044 InstrumentAny::TokenizedAsset(instrument) => {
1045 DatabaseQueries::add_instrument(pool, "TOKENIZED_ASSET", Box::new(instrument))
1046 .await
1047 }
1048 },
1049 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
1050 OrderAny::Limit(order) => {
1051 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
1052 .await
1053 }
1054 OrderAny::LimitIfTouched(order) => {
1055 DatabaseQueries::add_order(
1056 pool,
1057 "LIMIT_IF_TOUCHED",
1058 updated,
1059 Box::new(order),
1060 client_id,
1061 )
1062 .await
1063 }
1064 OrderAny::Market(order) => {
1065 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
1066 .await
1067 }
1068 OrderAny::MarketIfTouched(order) => {
1069 DatabaseQueries::add_order(
1070 pool,
1071 "MARKET_IF_TOUCHED",
1072 updated,
1073 Box::new(order),
1074 client_id,
1075 )
1076 .await
1077 }
1078 OrderAny::MarketToLimit(order) => {
1079 DatabaseQueries::add_order(
1080 pool,
1081 "MARKET_TO_LIMIT",
1082 updated,
1083 Box::new(order),
1084 client_id,
1085 )
1086 .await
1087 }
1088 OrderAny::StopLimit(order) => {
1089 DatabaseQueries::add_order(
1090 pool,
1091 "STOP_LIMIT",
1092 updated,
1093 Box::new(order),
1094 client_id,
1095 )
1096 .await
1097 }
1098 OrderAny::StopMarket(order) => {
1099 DatabaseQueries::add_order(
1100 pool,
1101 "STOP_MARKET",
1102 updated,
1103 Box::new(order),
1104 client_id,
1105 )
1106 .await
1107 }
1108 OrderAny::TrailingStopLimit(order) => {
1109 DatabaseQueries::add_order(
1110 pool,
1111 "TRAILING_STOP_LIMIT",
1112 updated,
1113 Box::new(order),
1114 client_id,
1115 )
1116 .await
1117 }
1118 OrderAny::TrailingStopMarket(order) => {
1119 DatabaseQueries::add_order(
1120 pool,
1121 "TRAILING_STOP_MARKET",
1122 updated,
1123 Box::new(order),
1124 client_id,
1125 )
1126 .await
1127 }
1128 },
1129 DatabaseQuery::AddOrderSnapshot(snapshot) => {
1130 DatabaseQueries::add_order_snapshot(pool, snapshot).await
1131 }
1132 DatabaseQuery::AddPositionSnapshot(snapshot) => {
1133 DatabaseQueries::add_position_snapshot(pool, snapshot).await
1134 }
1135 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1136 AccountAny::Margin(account) => {
1137 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1138 }
1139 AccountAny::Cash(account) => {
1140 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1141 }
1142 AccountAny::Betting(account) => {
1143 DatabaseQueries::add_account(pool, "BETTING", updated, Box::new(account)).await
1144 }
1145 },
1146 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1147 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1148 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1149 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1150 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1151 DatabaseQuery::UpdateOrder(event) => {
1152 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1153 }
1154 };
1155
1156 if let Err(e) = result {
1157 log::error!("Error on query: {e:?}");
1158 }
1159 }
1160}