Skip to main content

nautilus_infrastructure/sql/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::VecDeque, ops::ControlFlow, pin::Pin, time::Duration};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use nautilus_common::{
21    cache::database::{CacheDatabaseAdapter, CacheMap},
22    live::get_runtime,
23    logging::{log_task_awaiting, log_task_started, log_task_stopped},
24    signal::Signal,
25};
26use nautilus_core::UnixNanos;
27use nautilus_model::{
28    accounts::AccountAny,
29    data::{Bar, CustomData, DataType, FundingRateUpdate, QuoteTick, TradeTick},
30    events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
31    identifiers::{
32        AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
33        VenueOrderId,
34    },
35    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
36    orderbook::OrderBook,
37    orders::{Order, OrderAny},
38    position::Position,
39    types::Currency,
40};
41use sqlx::{PgPool, postgres::PgConnectOptions};
42use tokio::{time::Instant, try_join};
43use ustr::Ustr;
44
45use crate::sql::{
46    pg::{connect_pg, get_postgres_connect_options},
47    queries::DatabaseQueries,
48};
49
50// Task and connection names
51const CACHE_PROCESS: &str = "cache-process";
52
53#[derive(Debug)]
54#[cfg_attr(
55    feature = "python",
56    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
57)]
58pub struct PostgresCacheDatabase {
59    pub pool: PgPool,
60    tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
61    handle: tokio::task::JoinHandle<()>,
62}
63
64#[allow(
65    clippy::large_enum_variant,
66    reason = "variant sizes vary with feature unification; allow stays silent when the lint does not fire"
67)]
68#[derive(Debug, Clone)]
69pub enum DatabaseQuery {
70    Close,
71    Add(String, Vec<u8>),
72    AddCurrency(Currency),
73    AddInstrument(InstrumentAny),
74    AddOrder(OrderAny, Option<ClientId>, bool),
75    AddOrderSnapshot(OrderSnapshot),
76    AddPositionSnapshot(PositionSnapshot),
77    AddAccount(AccountAny, bool),
78    AddSignal(Signal),
79    AddCustom(CustomData),
80    AddQuote(QuoteTick),
81    AddTrade(TradeTick),
82    AddBar(Bar),
83    UpdateOrder(OrderEventAny),
84}
85
86impl PostgresCacheDatabase {
87    /// Connects to the Postgres cache database using the provided connection parameters.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if establishing the database connection fails.
92    ///
93    /// # Panics
94    ///
95    /// Panics if the internal Postgres pool connection attempt (`connect_pg`) unwraps on error.
96    pub async fn connect(
97        host: Option<String>,
98        port: Option<u16>,
99        username: Option<String>,
100        password: Option<String>,
101        database: Option<String>,
102    ) -> Result<Self, sqlx::Error> {
103        let pg_connect_options =
104            get_postgres_connect_options(host, port, username, password, database);
105        let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
106        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
107
108        // Spawn a task to handle messages
109        let handle = tokio::spawn(async move {
110            Self::process_commands(rx, pg_connect_options.clone().into()).await;
111        });
112        Ok(Self { pool, tx, handle })
113    }
114
115    async fn process_commands(
116        mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
117        pg_connect_options: PgConnectOptions,
118    ) {
119        log_task_started(CACHE_PROCESS);
120
121        let pool = connect_pg(pg_connect_options).await.unwrap();
122
123        // Buffering
124        let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
125
126        // TODO: expose this via configuration once tests are fixed
127        let buffer_interval = Duration::from_millis(0);
128
129        // A sleep used to trigger periodic flushing of the buffer.
130        // When `buffer_interval` is zero we skip using the timer and flush immediately
131        // after every message.
132        let flush_timer = tokio::time::sleep(buffer_interval);
133        tokio::pin!(flush_timer);
134
135        // Continue to receive and handle messages until channel is hung up
136        loop {
137            tokio::select! {
138                maybe_msg = rx.recv() => {
139                    let result = handle_query(
140                        maybe_msg,
141                        &mut buffer,
142                        buffer_interval,
143                        &pool,
144                    ).await;
145
146                    if result.is_break() {
147                        break;
148                    }
149                }
150                () = &mut flush_timer, if !buffer_interval.is_zero() => {
151                    flush_buffer(&mut buffer, &pool, &mut flush_timer, buffer_interval).await;
152                }
153            }
154        }
155
156        if !buffer.is_empty() {
157            drain_buffer(&pool, &mut buffer).await;
158        }
159
160        log_task_stopped(CACHE_PROCESS);
161    }
162}
163
164async fn handle_query(
165    maybe_msg: Option<DatabaseQuery>,
166    buffer: &mut VecDeque<DatabaseQuery>,
167    buffer_interval: Duration,
168    pool: &PgPool,
169) -> ControlFlow<()> {
170    let Some(msg) = maybe_msg else {
171        log::debug!("Command channel closed");
172        return ControlFlow::Break(());
173    };
174
175    log::debug!("Received {msg:?}");
176
177    if matches!(msg, DatabaseQuery::Close) {
178        if !buffer.is_empty() {
179            drain_buffer(pool, buffer).await;
180        }
181        return ControlFlow::Break(());
182    }
183
184    buffer.push_back(msg);
185
186    if buffer_interval.is_zero() {
187        drain_buffer(pool, buffer).await;
188    }
189
190    ControlFlow::Continue(())
191}
192
193async fn flush_buffer(
194    buffer: &mut VecDeque<DatabaseQuery>,
195    pool: &PgPool,
196    flush_timer: &mut Pin<&mut tokio::time::Sleep>,
197    buffer_interval: Duration,
198) {
199    if !buffer.is_empty() {
200        drain_buffer(pool, buffer).await;
201    }
202    flush_timer.as_mut().reset(Instant::now() + buffer_interval);
203}
204
205/// Retrieves a `PostgresCacheDatabase` using default connection options.
206///
207/// # Errors
208///
209/// Returns an error if connecting to the database or initializing the cache adapter fails.
210pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
211    let connect_options = get_postgres_connect_options(None, None, None, None, None);
212    Ok(PostgresCacheDatabase::connect(
213        Some(connect_options.host),
214        Some(connect_options.port),
215        Some(connect_options.username),
216        Some(connect_options.password),
217        Some(connect_options.database),
218    )
219    .await?)
220}
221
222#[allow(dead_code)]
223#[allow(unused)]
224#[async_trait::async_trait]
225impl CacheDatabaseAdapter for PostgresCacheDatabase {
226    fn close(&mut self) -> anyhow::Result<()> {
227        let pool = self.pool.clone();
228        let (tx, rx) = std::sync::mpsc::channel();
229
230        log::debug!("Closing connection pool");
231
232        tokio::task::block_in_place(|| {
233            get_runtime().block_on(async {
234                pool.close().await;
235
236                if let Err(e) = tx.send(()) {
237                    log::error!("Error closing pool: {e:?}");
238                }
239            });
240        });
241
242        // Cancel message handling task
243        if let Err(e) = self.tx.send(DatabaseQuery::Close) {
244            log::error!("Error sending close: {e:?}");
245        }
246
247        log_task_awaiting("cache-write");
248
249        tokio::task::block_in_place(|| {
250            if let Err(e) = get_runtime().block_on(&mut self.handle) {
251                log::error!("Error awaiting task 'cache-write': {e:?}");
252            }
253        });
254
255        log::debug!("Closed");
256
257        Ok(rx.recv()?)
258    }
259
260    fn flush(&mut self) -> anyhow::Result<()> {
261        let pool = self.pool.clone();
262        let (tx, rx) = std::sync::mpsc::channel();
263
264        tokio::task::block_in_place(|| {
265            get_runtime().block_on(async {
266                if let Err(e) = DatabaseQueries::truncate(&pool).await {
267                    log::error!("Error flushing pool: {e:?}");
268                }
269
270                if let Err(e) = tx.send(()) {
271                    log::error!("Error sending flush result: {e:?}");
272                }
273            });
274        });
275
276        Ok(rx.recv()?)
277    }
278
279    async fn load_all(&self) -> anyhow::Result<CacheMap> {
280        let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
281            self.load_currencies(),
282            self.load_instruments(),
283            self.load_synthetics(),
284            self.load_accounts(),
285            self.load_orders(),
286            self.load_positions()
287        )
288        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
289
290        // For now, we don't load greeks and yield curves from the database
291        // This will be implemented in the future
292        let greeks = AHashMap::new();
293        let yield_curves = AHashMap::new();
294
295        Ok(CacheMap {
296            currencies,
297            instruments,
298            synthetics,
299            accounts,
300            orders,
301            positions,
302            greeks,
303            yield_curves,
304        })
305    }
306
307    fn load(&self) -> anyhow::Result<AHashMap<String, Bytes>> {
308        let pool = self.pool.clone();
309        let (tx, rx) = std::sync::mpsc::channel();
310
311        tokio::spawn(async move {
312            let result = DatabaseQueries::load(&pool).await;
313            match result {
314                Ok(items) => {
315                    let mapping = items
316                        .into_iter()
317                        .map(|(k, v)| (k, Bytes::from(v)))
318                        .collect();
319
320                    if let Err(e) = tx.send(mapping) {
321                        log::error!("Failed to send general items: {e:?}");
322                    }
323                }
324                Err(e) => {
325                    log::error!("Failed to load general items: {e:?}");
326                    if let Err(e) = tx.send(AHashMap::new()) {
327                        log::error!("Failed to send empty general items: {e:?}");
328                    }
329                }
330            }
331        });
332        Ok(rx.recv()?)
333    }
334
335    async fn load_currencies(&self) -> anyhow::Result<AHashMap<Ustr, Currency>> {
336        let pool = self.pool.clone();
337        let (tx, rx) = std::sync::mpsc::channel();
338
339        tokio::spawn(async move {
340            let result = DatabaseQueries::load_currencies(&pool).await;
341            match result {
342                Ok(currencies) => {
343                    let mapping = currencies
344                        .into_iter()
345                        .map(|currency| (currency.code, currency))
346                        .collect();
347
348                    if let Err(e) = tx.send(mapping) {
349                        log::error!("Failed to send currencies: {e:?}");
350                    }
351                }
352                Err(e) => {
353                    log::error!("Failed to load currencies: {e:?}");
354                    if let Err(e) = tx.send(AHashMap::new()) {
355                        log::error!("Failed to send empty currencies: {e:?}");
356                    }
357                }
358            }
359        });
360        Ok(rx.recv()?)
361    }
362
363    async fn load_instruments(&self) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
364        let pool = self.pool.clone();
365        let (tx, rx) = std::sync::mpsc::channel();
366
367        tokio::spawn(async move {
368            let result = DatabaseQueries::load_instruments(&pool).await;
369            match result {
370                Ok(instruments) => {
371                    let mapping = instruments
372                        .into_iter()
373                        .map(|instrument| (instrument.id(), instrument))
374                        .collect();
375
376                    if let Err(e) = tx.send(mapping) {
377                        log::error!("Failed to send instruments: {e:?}");
378                    }
379                }
380                Err(e) => {
381                    log::error!("Failed to load instruments: {e:?}");
382                    if let Err(e) = tx.send(AHashMap::new()) {
383                        log::error!("Failed to send empty instruments: {e:?}");
384                    }
385                }
386            }
387        });
388        Ok(rx.recv()?)
389    }
390
391    async fn load_synthetics(&self) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
392        todo!()
393    }
394
395    async fn load_accounts(&self) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
396        let pool = self.pool.clone();
397        let (tx, rx) = std::sync::mpsc::channel();
398
399        tokio::spawn(async move {
400            let result = DatabaseQueries::load_accounts(&pool).await;
401            match result {
402                Ok(accounts) => {
403                    let mapping = accounts
404                        .into_iter()
405                        .map(|account| (account.id(), account))
406                        .collect();
407
408                    if let Err(e) = tx.send(mapping) {
409                        log::error!("Failed to send accounts: {e:?}");
410                    }
411                }
412                Err(e) => {
413                    log::error!("Failed to load accounts: {e:?}");
414                    if let Err(e) = tx.send(AHashMap::new()) {
415                        log::error!("Failed to send empty accounts: {e:?}");
416                    }
417                }
418            }
419        });
420        Ok(rx.recv()?)
421    }
422
423    async fn load_orders(&self) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
424        let pool = self.pool.clone();
425        let (tx, rx) = std::sync::mpsc::channel();
426
427        tokio::spawn(async move {
428            let result = DatabaseQueries::load_orders(&pool).await;
429            match result {
430                Ok(orders) => {
431                    let mapping = orders
432                        .into_iter()
433                        .map(|order| (order.client_order_id(), order))
434                        .collect();
435
436                    if let Err(e) = tx.send(mapping) {
437                        log::error!("Failed to send orders: {e:?}");
438                    }
439                }
440                Err(e) => {
441                    log::error!("Failed to load orders: {e:?}");
442                    if let Err(e) = tx.send(AHashMap::new()) {
443                        log::error!("Failed to send empty orders: {e:?}");
444                    }
445                }
446            }
447        });
448        Ok(rx.recv()?)
449    }
450
451    async fn load_positions(&self) -> anyhow::Result<AHashMap<PositionId, Position>> {
452        todo!()
453    }
454
455    fn load_index_order_position(&self) -> anyhow::Result<AHashMap<ClientOrderId, Position>> {
456        todo!()
457    }
458
459    fn load_index_order_client(&self) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
460        let pool = self.pool.clone();
461        let (tx, rx) = std::sync::mpsc::channel();
462
463        tokio::spawn(async move {
464            let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
465            match result {
466                Ok(currency) => {
467                    if let Err(e) = tx.send(currency) {
468                        log::error!("Failed to send load_index_order_client result: {e:?}");
469                    }
470                }
471                Err(e) => {
472                    log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
473                    if let Err(e) = tx.send(AHashMap::new()) {
474                        log::error!("Failed to send empty load_index_order_client result: {e:?}");
475                    }
476                }
477            }
478        });
479        Ok(rx.recv()?)
480    }
481
482    async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
483        let pool = self.pool.clone();
484        let code = code.to_owned(); // Clone the code
485        let (tx, rx) = std::sync::mpsc::channel();
486
487        tokio::spawn(async move {
488            let result = DatabaseQueries::load_currency(&pool, &code).await;
489            match result {
490                Ok(currency) => {
491                    if let Err(e) = tx.send(currency) {
492                        log::error!("Failed to send currency {code}: {e:?}");
493                    }
494                }
495                Err(e) => {
496                    log::error!("Failed to load currency {code}: {e:?}");
497                    if let Err(e) = tx.send(None) {
498                        log::error!("Failed to send None for currency {code}: {e:?}");
499                    }
500                }
501            }
502        });
503        Ok(rx.recv()?)
504    }
505
506    async fn load_instrument(
507        &self,
508        instrument_id: &InstrumentId,
509    ) -> anyhow::Result<Option<InstrumentAny>> {
510        let pool = self.pool.clone();
511        let instrument_id = instrument_id.to_owned(); // Clone the instrument_id
512        let (tx, rx) = std::sync::mpsc::channel();
513
514        tokio::spawn(async move {
515            let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
516            match result {
517                Ok(instrument) => {
518                    if let Err(e) = tx.send(instrument) {
519                        log::error!("Failed to send instrument {instrument_id}: {e:?}");
520                    }
521                }
522                Err(e) => {
523                    log::error!("Failed to load instrument {instrument_id}: {e:?}");
524                    if let Err(e) = tx.send(None) {
525                        log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
526                    }
527                }
528            }
529        });
530        Ok(rx.recv()?)
531    }
532
533    async fn load_synthetic(
534        &self,
535        instrument_id: &InstrumentId,
536    ) -> anyhow::Result<Option<SyntheticInstrument>> {
537        todo!()
538    }
539
540    async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
541        let pool = self.pool.clone();
542        let account_id = account_id.to_owned();
543        let (tx, rx) = std::sync::mpsc::channel();
544
545        tokio::spawn(async move {
546            let result = DatabaseQueries::load_account(&pool, &account_id).await;
547            match result {
548                Ok(account) => {
549                    if let Err(e) = tx.send(account) {
550                        log::error!("Failed to send account {account_id}: {e:?}");
551                    }
552                }
553                Err(e) => {
554                    log::error!("Failed to load account {account_id}: {e:?}");
555                    if let Err(e) = tx.send(None) {
556                        log::error!("Failed to send None for account {account_id}: {e:?}");
557                    }
558                }
559            }
560        });
561        Ok(rx.recv()?)
562    }
563
564    async fn load_order(
565        &self,
566        client_order_id: &ClientOrderId,
567    ) -> anyhow::Result<Option<OrderAny>> {
568        let pool = self.pool.clone();
569        let client_order_id = client_order_id.to_owned();
570        let (tx, rx) = std::sync::mpsc::channel();
571
572        tokio::spawn(async move {
573            let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
574            match result {
575                Ok(order) => {
576                    if let Err(e) = tx.send(order) {
577                        log::error!("Failed to send order {client_order_id}: {e:?}");
578                    }
579                }
580                Err(e) => {
581                    log::error!("Failed to load order {client_order_id}: {e:?}");
582                    let _ = tx.send(None);
583                }
584            }
585        });
586        Ok(rx.recv()?)
587    }
588
589    async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
590        todo!()
591    }
592
593    fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<AHashMap<String, Bytes>> {
594        todo!()
595    }
596
597    fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
598        todo!()
599    }
600
601    fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<AHashMap<String, Bytes>> {
602        todo!()
603    }
604
605    fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
606        todo!()
607    }
608
609    fn delete_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<()> {
610        anyhow::bail!(
611            "delete_order not implemented for PostgreSQL cache adapter: {client_order_id}"
612        )
613    }
614
615    fn delete_position(&self, position_id: &PositionId) -> anyhow::Result<()> {
616        anyhow::bail!("delete_position not implemented for PostgreSQL cache adapter: {position_id}")
617    }
618
619    fn delete_account_event(&self, account_id: &AccountId, event_id: &str) -> anyhow::Result<()> {
620        anyhow::bail!(
621            "delete_account_event not implemented for PostgreSQL cache adapter: {account_id}, {event_id}"
622        )
623    }
624
625    fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
626        let query = DatabaseQuery::Add(key, value.into());
627        self.tx
628            .send(query)
629            .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
630    }
631
632    fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
633        let query = DatabaseQuery::AddCurrency(*currency);
634        self.tx.send(query).map_err(|e| {
635            anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
636        })
637    }
638
639    fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
640        let query = DatabaseQuery::AddInstrument(instrument.clone());
641        self.tx.send(query).map_err(|e| {
642            anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
643        })
644    }
645
646    fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
647        todo!()
648    }
649
650    fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
651        let query = DatabaseQuery::AddAccount(account.clone(), false);
652        self.tx.send(query).map_err(|e| {
653            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
654        })
655    }
656
657    fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
658        let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
659        self.tx.send(query).map_err(|e| {
660            anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
661        })
662    }
663
664    fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
665        let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
666        self.tx.send(query).map_err(|e| {
667            anyhow::anyhow!(
668                "Failed to send query add_order_snapshot to database message handler: {e}"
669            )
670        })
671    }
672
673    fn add_position(&self, position: &Position) -> anyhow::Result<()> {
674        todo!()
675    }
676
677    fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
678        let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
679        self.tx.send(query).map_err(|e| {
680            anyhow::anyhow!(
681                "Failed to send query add_position_snapshot to database message handler: {e}"
682            )
683        })
684    }
685
686    fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
687        todo!()
688    }
689
690    fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
691        let query = DatabaseQuery::AddQuote(quote.to_owned());
692        self.tx.send(query).map_err(|e| {
693            anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
694        })
695    }
696
697    fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
698        let pool = self.pool.clone();
699        let instrument_id = instrument_id.to_owned();
700        let (tx, rx) = std::sync::mpsc::channel();
701
702        tokio::spawn(async move {
703            let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
704            match result {
705                Ok(quotes) => {
706                    if let Err(e) = tx.send(quotes) {
707                        log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
708                    }
709                }
710                Err(e) => {
711                    log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
712                    if let Err(e) = tx.send(Vec::new()) {
713                        log::error!(
714                            "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
715                        );
716                    }
717                }
718            }
719        });
720        Ok(rx.recv()?)
721    }
722
723    fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
724        let query = DatabaseQuery::AddTrade(trade.to_owned());
725        self.tx.send(query).map_err(|e| {
726            anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
727        })
728    }
729
730    fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
731        let pool = self.pool.clone();
732        let instrument_id = instrument_id.to_owned();
733        let (tx, rx) = std::sync::mpsc::channel();
734
735        tokio::spawn(async move {
736            let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
737            match result {
738                Ok(trades) => {
739                    if let Err(e) = tx.send(trades) {
740                        log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
741                    }
742                }
743                Err(e) => {
744                    log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
745                    if let Err(e) = tx.send(Vec::new()) {
746                        log::error!(
747                            "Failed to send empty trades for instrument {instrument_id}: {e:?}"
748                        );
749                    }
750                }
751            }
752        });
753        Ok(rx.recv()?)
754    }
755
756    fn add_funding_rate(&self, _funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
757        anyhow::bail!("add_funding_rate not implemented for PostgreSQL cache adapter")
758    }
759
760    fn load_funding_rates(
761        &self,
762        _instrument_id: &InstrumentId,
763    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
764        anyhow::bail!("load_funding_rates not implemented for PostgreSQL cache adapter")
765    }
766
767    fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
768        let query = DatabaseQuery::AddBar(bar.to_owned());
769        self.tx.send(query).map_err(|e| {
770            anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
771        })
772    }
773
774    fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
775        let pool = self.pool.clone();
776        let instrument_id = instrument_id.to_owned();
777        let (tx, rx) = std::sync::mpsc::channel();
778
779        tokio::spawn(async move {
780            let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
781            match result {
782                Ok(bars) => {
783                    if let Err(e) = tx.send(bars) {
784                        log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
785                    }
786                }
787                Err(e) => {
788                    log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
789                    if let Err(e) = tx.send(Vec::new()) {
790                        log::error!(
791                            "Failed to send empty bars for instrument {instrument_id}: {e:?}"
792                        );
793                    }
794                }
795            }
796        });
797        Ok(rx.recv()?)
798    }
799
800    fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
801        let query = DatabaseQuery::AddSignal(signal.to_owned());
802        self.tx.send(query).map_err(|e| {
803            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
804        })
805    }
806
807    fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
808        let pool = self.pool.clone();
809        let name = name.to_owned();
810        let (tx, rx) = std::sync::mpsc::channel();
811
812        tokio::spawn(async move {
813            let result = DatabaseQueries::load_signals(&pool, &name).await;
814            match result {
815                Ok(signals) => {
816                    if let Err(e) = tx.send(signals) {
817                        log::error!("Failed to send signals for '{name}': {e:?}");
818                    }
819                }
820                Err(e) => {
821                    log::error!("Failed to load signals for '{name}': {e:?}");
822                    if let Err(e) = tx.send(Vec::new()) {
823                        log::error!("Failed to send empty signals for '{name}': {e:?}");
824                    }
825                }
826            }
827        });
828        Ok(rx.recv()?)
829    }
830
831    fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
832        let query = DatabaseQuery::AddCustom(data.to_owned());
833        self.tx.send(query).map_err(|e| {
834            anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
835        })
836    }
837
838    fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
839        let pool = self.pool.clone();
840        let data_type = data_type.to_owned();
841        let (tx, rx) = std::sync::mpsc::channel();
842
843        tokio::spawn(async move {
844            let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
845            match result {
846                Ok(signals) => {
847                    if let Err(e) = tx.send(signals) {
848                        log::error!("Failed to send custom data for '{data_type}': {e:?}");
849                    }
850                }
851                Err(e) => {
852                    log::error!("Failed to load custom data for '{data_type}': {e:?}");
853                    if let Err(e) = tx.send(Vec::new()) {
854                        log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
855                    }
856                }
857            }
858        });
859        Ok(rx.recv()?)
860    }
861
862    fn load_order_snapshot(
863        &self,
864        client_order_id: &ClientOrderId,
865    ) -> anyhow::Result<Option<OrderSnapshot>> {
866        let pool = self.pool.clone();
867        let client_order_id = client_order_id.to_owned();
868        let (tx, rx) = std::sync::mpsc::channel();
869
870        tokio::spawn(async move {
871            let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
872            match result {
873                Ok(snapshot) => {
874                    if let Err(e) = tx.send(snapshot) {
875                        log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
876                    }
877                }
878                Err(e) => {
879                    log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
880                    if let Err(e) = tx.send(None) {
881                        log::error!(
882                            "Failed to send None for order snapshot {client_order_id}: {e:?}"
883                        );
884                    }
885                }
886            }
887        });
888        Ok(rx.recv()?)
889    }
890
891    fn load_position_snapshot(
892        &self,
893        position_id: &PositionId,
894    ) -> anyhow::Result<Option<PositionSnapshot>> {
895        let pool = self.pool.clone();
896        let position_id = position_id.to_owned();
897        let (tx, rx) = std::sync::mpsc::channel();
898
899        tokio::spawn(async move {
900            let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
901            match result {
902                Ok(snapshot) => {
903                    if let Err(e) = tx.send(snapshot) {
904                        log::error!("Failed to send position snapshot {position_id}: {e:?}");
905                    }
906                }
907                Err(e) => {
908                    log::error!("Failed to load position snapshot {position_id}: {e:?}");
909                    if let Err(e) = tx.send(None) {
910                        log::error!(
911                            "Failed to send None for position snapshot {position_id}: {e:?}"
912                        );
913                    }
914                }
915            }
916        });
917        Ok(rx.recv()?)
918    }
919
920    fn index_venue_order_id(
921        &self,
922        client_order_id: ClientOrderId,
923        venue_order_id: VenueOrderId,
924    ) -> anyhow::Result<()> {
925        todo!()
926    }
927
928    fn index_order_position(
929        &self,
930        client_order_id: ClientOrderId,
931        position_id: PositionId,
932    ) -> anyhow::Result<()> {
933        todo!()
934    }
935
936    fn update_actor(&self) -> anyhow::Result<()> {
937        todo!()
938    }
939
940    fn update_strategy(&self) -> anyhow::Result<()> {
941        todo!()
942    }
943
944    fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
945        let query = DatabaseQuery::AddAccount(account.clone(), true);
946        self.tx.send(query).map_err(|e| {
947            anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
948        })
949    }
950
951    fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
952        let query = DatabaseQuery::UpdateOrder(event.clone());
953        self.tx.send(query).map_err(|e| {
954            anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
955        })
956    }
957
958    fn update_position(&self, position: &Position) -> anyhow::Result<()> {
959        todo!()
960    }
961
962    fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
963        todo!()
964    }
965
966    fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
967        todo!()
968    }
969
970    fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
971        todo!()
972    }
973}
974
975async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
976    for cmd in buffer.drain(..) {
977        let result: anyhow::Result<()> = match cmd {
978            DatabaseQuery::Close => Ok(()),
979            DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
980            DatabaseQuery::AddCurrency(currency) => {
981                DatabaseQueries::add_currency(pool, currency).await
982            }
983            DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
984                InstrumentAny::Betting(instrument) => {
985                    DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
986                }
987                InstrumentAny::BinaryOption(instrument) => {
988                    DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
989                        .await
990                }
991                InstrumentAny::CryptoFuture(instrument) => {
992                    DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
993                        .await
994                }
995                InstrumentAny::CryptoOption(instrument) => {
996                    DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
997                        .await
998                }
999                InstrumentAny::CryptoPerpetual(instrument) => {
1000                    DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
1001                        .await
1002                }
1003                InstrumentAny::CurrencyPair(instrument) => {
1004                    DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
1005                        .await
1006                }
1007                InstrumentAny::Equity(equity) => {
1008                    DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
1009                }
1010                InstrumentAny::FuturesContract(instrument) => {
1011                    DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
1012                        .await
1013                }
1014                InstrumentAny::FuturesSpread(instrument) => {
1015                    DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
1016                        .await
1017                }
1018                InstrumentAny::OptionContract(instrument) => {
1019                    DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
1020                        .await
1021                }
1022                InstrumentAny::Commodity(instrument) => {
1023                    DatabaseQueries::add_instrument(pool, "COMMODITY", Box::new(instrument)).await
1024                }
1025                InstrumentAny::IndexInstrument(instrument) => {
1026                    DatabaseQueries::add_instrument(pool, "INDEX_INSTRUMENT", Box::new(instrument))
1027                        .await
1028                }
1029                InstrumentAny::Cfd(instrument) => {
1030                    DatabaseQueries::add_instrument(pool, "CFD", Box::new(instrument)).await
1031                }
1032                InstrumentAny::OptionSpread(instrument) => {
1033                    DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
1034                        .await
1035                }
1036                InstrumentAny::PerpetualContract(instrument) => {
1037                    DatabaseQueries::add_instrument(
1038                        pool,
1039                        "PERPETUAL_CONTRACT",
1040                        Box::new(instrument),
1041                    )
1042                    .await
1043                }
1044                InstrumentAny::TokenizedAsset(instrument) => {
1045                    DatabaseQueries::add_instrument(pool, "TOKENIZED_ASSET", Box::new(instrument))
1046                        .await
1047                }
1048            },
1049            DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
1050                OrderAny::Limit(order) => {
1051                    DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
1052                        .await
1053                }
1054                OrderAny::LimitIfTouched(order) => {
1055                    DatabaseQueries::add_order(
1056                        pool,
1057                        "LIMIT_IF_TOUCHED",
1058                        updated,
1059                        Box::new(order),
1060                        client_id,
1061                    )
1062                    .await
1063                }
1064                OrderAny::Market(order) => {
1065                    DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
1066                        .await
1067                }
1068                OrderAny::MarketIfTouched(order) => {
1069                    DatabaseQueries::add_order(
1070                        pool,
1071                        "MARKET_IF_TOUCHED",
1072                        updated,
1073                        Box::new(order),
1074                        client_id,
1075                    )
1076                    .await
1077                }
1078                OrderAny::MarketToLimit(order) => {
1079                    DatabaseQueries::add_order(
1080                        pool,
1081                        "MARKET_TO_LIMIT",
1082                        updated,
1083                        Box::new(order),
1084                        client_id,
1085                    )
1086                    .await
1087                }
1088                OrderAny::StopLimit(order) => {
1089                    DatabaseQueries::add_order(
1090                        pool,
1091                        "STOP_LIMIT",
1092                        updated,
1093                        Box::new(order),
1094                        client_id,
1095                    )
1096                    .await
1097                }
1098                OrderAny::StopMarket(order) => {
1099                    DatabaseQueries::add_order(
1100                        pool,
1101                        "STOP_MARKET",
1102                        updated,
1103                        Box::new(order),
1104                        client_id,
1105                    )
1106                    .await
1107                }
1108                OrderAny::TrailingStopLimit(order) => {
1109                    DatabaseQueries::add_order(
1110                        pool,
1111                        "TRAILING_STOP_LIMIT",
1112                        updated,
1113                        Box::new(order),
1114                        client_id,
1115                    )
1116                    .await
1117                }
1118                OrderAny::TrailingStopMarket(order) => {
1119                    DatabaseQueries::add_order(
1120                        pool,
1121                        "TRAILING_STOP_MARKET",
1122                        updated,
1123                        Box::new(order),
1124                        client_id,
1125                    )
1126                    .await
1127                }
1128            },
1129            DatabaseQuery::AddOrderSnapshot(snapshot) => {
1130                DatabaseQueries::add_order_snapshot(pool, snapshot).await
1131            }
1132            DatabaseQuery::AddPositionSnapshot(snapshot) => {
1133                DatabaseQueries::add_position_snapshot(pool, snapshot).await
1134            }
1135            DatabaseQuery::AddAccount(account_any, updated) => match account_any {
1136                AccountAny::Margin(account) => {
1137                    DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
1138                }
1139                AccountAny::Cash(account) => {
1140                    DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
1141                }
1142                AccountAny::Betting(account) => {
1143                    DatabaseQueries::add_account(pool, "BETTING", updated, Box::new(account)).await
1144                }
1145            },
1146            DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1147            DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1148            DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, &quote).await,
1149            DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1150            DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1151            DatabaseQuery::UpdateOrder(event) => {
1152                DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1153            }
1154        };
1155
1156        if let Err(e) = result {
1157            log::error!("Error on query: {e:?}");
1158        }
1159    }
1160}