Skip to main content

nautilus_infrastructure/sql/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use nautilus_common::signal::Signal;
18use nautilus_model::{
19    accounts::{Account, AccountAny},
20    data::{Bar, CustomData, DataType, HasTsInit, QuoteTick, TradeTick},
21    events::{
22        AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
23        position::snapshot::PositionSnapshot,
24    },
25    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
26    instruments::{Instrument, InstrumentAny},
27    orders::{Order, OrderAny},
28    types::{AccountBalance, Currency, MarginBalance},
29};
30use sqlx::{PgPool, Row};
31
32use super::models::{
33    orders::OrderSnapshotModel, positions::PositionSnapshotModel, types::SignalModel,
34};
35use crate::sql::models::{
36    accounts::AccountEventModel,
37    data::{BarModel, QuoteTickModel, TradeTickModel},
38    enums::{
39        AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
40        CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
41    },
42    general::{GeneralRow, OrderEventOrderClientIdCombination},
43    instruments::InstrumentAnyModel,
44    orders::OrderEventAnyModel,
45    types::CurrencyModel,
46};
47
48#[derive(Debug)]
49pub struct DatabaseQueries;
50
51impl DatabaseQueries {
52    /// Truncates all tables in the cache database via the provided Postgres `pool`.
53    ///
54    /// # Errors
55    ///
56    /// Returns an error if the TRUNCATE operation fails.
57    pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
58        sqlx::query("SELECT truncate_all_tables()")
59            .execute(pool)
60            .await
61            .map(|_| ())
62            .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
63    }
64
65    /// Inserts a raw key-value entry into the `general` table via the provided `pool`.
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the INSERT operation fails.
70    pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
71        sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
72            .bind(key)
73            .bind(value)
74            .execute(pool)
75            .await
76            .map(|_| ())
77            .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
78    }
79
80    /// Loads all entries from the `general` table via the provided `pool`.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the SELECT operation fails.
85    pub async fn load(pool: &PgPool) -> anyhow::Result<AHashMap<String, Vec<u8>>> {
86        sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
87            .fetch_all(pool)
88            .await
89            .map(|rows| {
90                let mut cache: AHashMap<String, Vec<u8>> = AHashMap::new();
91                for row in rows {
92                    cache.insert(row.id, row.value);
93                }
94                cache
95            })
96            .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
97    }
98
99    /// Inserts or ignores a `Currency` row via the provided `pool`.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the INSERT operation fails.
104    pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
105        sqlx::query(
106            "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
107        )
108            .bind(currency.code.as_str())
109            .bind(i32::from(currency.precision))
110            .bind(i32::from(currency.iso4217))
111            .bind(currency.name.as_str())
112            .bind(CurrencyTypeModel(currency.currency_type))
113            .execute(pool)
114            .await
115            .map(|_| ())
116            .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
117    }
118
119    /// Loads all `Currency` entries via the provided `pool`.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the SELECT operation fails.
124    pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
125        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
126            .fetch_all(pool)
127            .await
128            .map(|rows| rows.into_iter().map(|row| row.0).collect())
129            .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
130    }
131
132    /// Loads a single `Currency` entry by `code` via the provided `pool`.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the SELECT operation fails.
137    pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
138        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
139            .bind(code)
140            .fetch_optional(pool)
141            .await
142            .map(|currency| currency.map(|row| row.0))
143            .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
144    }
145
146    /// Inserts or updates an `InstrumentAny` entry via the provided `pool`.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the INSERT or UPDATE operation fails.
151    pub async fn add_instrument(
152        pool: &PgPool,
153        kind: &str,
154        instrument: Box<dyn Instrument>,
155    ) -> anyhow::Result<()> {
156        sqlx::query(r#"
157            INSERT INTO "instrument" (
158                id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
159                multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
160                price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
161                min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
162            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
163            ON CONFLICT (id)
164            DO UPDATE
165            SET
166                kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
167                 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
168                 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
169                 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32,  ts_event = $33, updated_at = CURRENT_TIMESTAMP
170            "#)
171            .bind(instrument.id().to_string())
172            .bind(kind)
173            .bind(instrument.raw_symbol().to_string())
174            .bind(instrument.base_currency().map(|x| x.code.as_str()))
175            .bind(instrument.underlying().map(|x| x.to_string()))
176            .bind(instrument.quote_currency().code.as_str())
177            .bind(instrument.settlement_currency().code.as_str())
178            .bind(instrument.isin().map(|x| x.to_string()))
179            .bind(AssetClassModel(instrument.asset_class()))
180            .bind(instrument.exchange().map(|x| x.to_string()))
181            .bind(instrument.multiplier().to_string())
182            .bind(instrument.option_kind().map(|x| x.to_string()))
183            .bind(instrument.is_inverse())
184            .bind(instrument.strike_price().map(|x| x.to_string()))
185            .bind(instrument.activation_ns().map(|x| x.to_string()))
186            .bind(instrument.expiration_ns().map(|x| x.to_string()))
187            .bind(i32::from(instrument.price_precision()))
188            .bind(i32::from(instrument.size_precision()))
189            .bind(instrument.price_increment().to_string())
190            .bind(instrument.size_increment().to_string())
191            .bind(instrument.maker_fee().to_string())
192            .bind(instrument.taker_fee().to_string())
193            .bind(instrument.margin_init().to_string())
194            .bind(instrument.margin_maint().to_string())
195            .bind(instrument.lot_size().map(|x| x.to_string()))
196            .bind(instrument.max_quantity().map(|x| x.to_string()))
197            .bind(instrument.min_quantity().map(|x| x.to_string()))
198            .bind(instrument.max_notional().map(|x| x.to_string()))
199            .bind(instrument.min_notional().map(|x| x.to_string()))
200            .bind(instrument.max_price().map(|x| x.to_string()))
201            .bind(instrument.min_price().map(|x| x.to_string()))
202            .bind(instrument.ts_init().to_string())
203            .bind(instrument.ts_event().to_string())
204            .execute(pool)
205            .await
206            .map(|_| ())
207            .map_err(|e| anyhow::anyhow!("Failed to insert item {} into instrument table: {:?}", instrument.id(), e))
208    }
209
210    /// Loads a single `InstrumentAny` entry by `instrument_id` via the provided `pool`.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the SELECT operation fails.
215    pub async fn load_instrument(
216        pool: &PgPool,
217        instrument_id: &InstrumentId,
218    ) -> anyhow::Result<Option<InstrumentAny>> {
219        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
220            .bind(instrument_id.to_string())
221            .fetch_optional(pool)
222            .await
223            .map(|instrument| instrument.map(|row| row.0))
224            .map_err(|e| {
225                anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
226            })
227    }
228
229    /// Loads all `InstrumentAny` entries via the provided `pool`.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the SELECT operation fails.
234    pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
235        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
236            .fetch_all(pool)
237            .await
238            .map(|rows| rows.into_iter().map(|row| row.0).collect())
239            .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
240    }
241
242    /// Inserts or updates an `OrderAny` entry via the provided `pool`.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if the SQL INSERT or UPDATE operation fails.
247    ///
248    /// # Panics
249    ///
250    /// Panics if the order initialization existence check unwraps `None` after awaiting.
251    pub async fn add_order(
252        pool: &PgPool,
253        _kind: &str,
254        updated: bool,
255        order: Box<dyn Order>,
256        client_id: Option<ClientId>,
257    ) -> anyhow::Result<()> {
258        if updated {
259            let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
260                .await
261                .unwrap();
262            assert!(
263                exists,
264                "OrderInitialized event does not exist for order: {}",
265                order.client_order_id()
266            );
267        }
268
269        match order.last_event().clone() {
270            OrderEventAny::Accepted(event) => {
271                Self::add_order_event(pool, Box::new(event), client_id).await
272            }
273            OrderEventAny::CancelRejected(event) => {
274                Self::add_order_event(pool, Box::new(event), client_id).await
275            }
276            OrderEventAny::Canceled(event) => {
277                Self::add_order_event(pool, Box::new(event), client_id).await
278            }
279            OrderEventAny::Denied(event) => {
280                Self::add_order_event(pool, Box::new(event), client_id).await
281            }
282            OrderEventAny::Emulated(event) => {
283                Self::add_order_event(pool, Box::new(event), client_id).await
284            }
285            OrderEventAny::Expired(event) => {
286                Self::add_order_event(pool, Box::new(event), client_id).await
287            }
288            OrderEventAny::Filled(event) => {
289                Self::add_order_event(pool, Box::new(event), client_id).await
290            }
291            OrderEventAny::Initialized(event) => {
292                Self::add_order_event(pool, Box::new(event), client_id).await
293            }
294            OrderEventAny::ModifyRejected(event) => {
295                Self::add_order_event(pool, Box::new(event), client_id).await
296            }
297            OrderEventAny::PendingCancel(event) => {
298                Self::add_order_event(pool, Box::new(event), client_id).await
299            }
300            OrderEventAny::PendingUpdate(event) => {
301                Self::add_order_event(pool, Box::new(event), client_id).await
302            }
303            OrderEventAny::Rejected(event) => {
304                Self::add_order_event(pool, Box::new(event), client_id).await
305            }
306            OrderEventAny::Released(event) => {
307                Self::add_order_event(pool, Box::new(event), client_id).await
308            }
309            OrderEventAny::Submitted(event) => {
310                Self::add_order_event(pool, Box::new(event), client_id).await
311            }
312            OrderEventAny::Updated(event) => {
313                Self::add_order_event(pool, Box::new(event), client_id).await
314            }
315            OrderEventAny::Triggered(event) => {
316                Self::add_order_event(pool, Box::new(event), client_id).await
317            }
318        }
319    }
320
321    /// Inserts an `OrderSnapshot` entry via the provided `pool`.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if the SQL INSERT operation fails.
326    ///
327    /// # Panics
328    ///
329    /// Panics if serialization of `snapshot.exec_algorithm_params` fails.
330    pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
331        let mut transaction = pool.begin().await?;
332
333        // Insert trader if it does not exist
334        // TODO remove this when node and trader initialization is implemented
335        sqlx::query(
336            r#"
337            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
338            "#,
339        )
340        .bind(snapshot.trader_id.to_string())
341        .execute(&mut *transaction)
342        .await
343        .map(|_| ())
344        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
345
346        sqlx::query(
347            r#"
348            INSERT INTO "order" (
349                id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
350                account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
351                trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
352                expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
353                is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
354                trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
355                parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
356                created_at, updated_at
357            ) VALUES (
358                $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
359                $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
360                $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
361                CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
362            )
363            ON CONFLICT (id)
364            DO UPDATE SET
365                trader_id = $2,
366                strategy_id = $3,
367                instrument_id = $4,
368                venue_order_id = $5,
369                position_id = $6,
370                account_id = $7,
371                last_trade_id = $8,
372                order_type = $9,
373                order_side = $10,
374                quantity = $11,
375                price = $12,
376                trigger_price = $13,
377                trigger_type = $14,
378                limit_offset = $15,
379                trailing_offset = $16,
380                trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
381                time_in_force = $18,
382                expire_time = $19,
383                filled_qty = $20,
384                liquidity_side = $21,
385                avg_px = $22,
386                slippage = $23,
387                commissions = $24,
388                status = $25,
389                is_post_only = $26,
390                is_reduce_only = $27,
391                is_quote_quantity = $28,
392                display_qty = $29,
393                emulation_trigger = $30,
394                trigger_instrument_id = $31,
395                contingency_type = $32,
396                order_list_id = $33,
397                linked_order_ids = $34,
398                parent_order_id = $35,
399                exec_algorithm_id = $36,
400                exec_algorithm_params = $37,
401                exec_spawn_id = $38,
402                tags = $39,
403                init_id = $40,
404                ts_init = $41,
405                ts_last = $42,
406                updated_at = CURRENT_TIMESTAMP
407        "#)
408            .bind(snapshot.client_order_id.to_string())  // Used for both id and client_order_id
409            .bind(snapshot.trader_id.to_string())
410            .bind(snapshot.strategy_id.to_string())
411            .bind(snapshot.instrument_id.to_string())
412            .bind(snapshot.venue_order_id.map(|x| x.to_string()))
413            .bind(snapshot.position_id.map(|x| x.to_string()))
414            .bind(snapshot.account_id.map(|x| x.to_string()))
415            .bind(snapshot.last_trade_id.map(|x| x.to_string()))
416            .bind(snapshot.order_type.to_string())
417            .bind(snapshot.order_side.to_string())
418            .bind(snapshot.quantity.to_string())
419            .bind(snapshot.price.map(|x| x.to_string()))
420            .bind(snapshot.trigger_price.map(|x| x.to_string()))
421            .bind(snapshot.trigger_type.map(|x| x.to_string()))
422            .bind(snapshot.limit_offset.map(|x| x.to_string()))
423            .bind(snapshot.trailing_offset.map(|x| x.to_string()))
424            .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
425            .bind(snapshot.time_in_force.to_string())
426            .bind(snapshot.expire_time.map(|x| x.to_string()))
427            .bind(snapshot.filled_qty.to_string())
428            .bind(snapshot.liquidity_side.map(|x| x.to_string()))
429            .bind(snapshot.avg_px)
430            .bind(snapshot.slippage)
431            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
432            .bind(snapshot.status.to_string())
433            .bind(snapshot.is_post_only)
434            .bind(snapshot.is_reduce_only)
435            .bind(snapshot.is_quote_quantity)
436            .bind(snapshot.display_qty.map(|x| x.to_string()))
437            .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
438            .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
439            .bind(snapshot.contingency_type.map(|x| x.to_string()))
440            .bind(snapshot.order_list_id.map(|x| x.to_string()))
441            .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
442            .bind(snapshot.parent_order_id.map(|x| x.to_string()))
443            .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
444            .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
445            .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
446            .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
447            .bind(snapshot.init_id.to_string())
448            .bind(snapshot.ts_init.to_string())
449            .bind(snapshot.ts_last.to_string())
450            .execute(&mut *transaction)
451            .await
452            .map(|_| ())
453            .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
454
455        transaction
456            .commit()
457            .await
458            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
459    }
460
461    /// Loads an `OrderSnapshot` entry by client order ID via the provided `pool`.
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the SQL SELECT or deserialization fails.
466    pub async fn load_order_snapshot(
467        pool: &PgPool,
468        client_order_id: &ClientOrderId,
469    ) -> anyhow::Result<Option<OrderSnapshot>> {
470        sqlx::query_as::<_, OrderSnapshotModel>(
471            r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
472        )
473        .bind(client_order_id.to_string())
474        .fetch_optional(pool)
475        .await
476        .map(|model| model.map(|m| m.0))
477        .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
478    }
479
480    /// Inserts or updates a `PositionSnapshot` entry via the provided `pool`.
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if the SQL INSERT or UPDATE operation fails, or if beginning the transaction fails.
485    pub async fn add_position_snapshot(
486        pool: &PgPool,
487        snapshot: PositionSnapshot,
488    ) -> anyhow::Result<()> {
489        let mut transaction = pool.begin().await?;
490
491        // Insert trader if it does not exist
492        // TODO remove this when node and trader initialization is implemented
493        sqlx::query(
494            r#"
495            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
496        "#,
497        )
498        .bind(snapshot.trader_id.to_string())
499        .execute(&mut *transaction)
500        .await
501        .map(|_| ())
502        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
503
504        sqlx::query(r#"
505            INSERT INTO "position" (
506                id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
507                quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
508                duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
509            ) VALUES (
510                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
511                $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
512            )
513            ON CONFLICT (id)
514            DO UPDATE
515            SET
516                trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
517                peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
518                commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
519        "#)
520            .bind(snapshot.position_id.to_string())
521            .bind(snapshot.trader_id.to_string())
522            .bind(snapshot.strategy_id.to_string())
523            .bind(snapshot.instrument_id.to_string())
524            .bind(snapshot.account_id.to_string())
525            .bind(snapshot.opening_order_id.to_string())
526            .bind(snapshot.closing_order_id.map(|x| x.to_string()))
527            .bind(snapshot.entry.to_string())
528            .bind(snapshot.side.to_string())
529            .bind(snapshot.signed_qty)
530            .bind(snapshot.quantity.to_string())
531            .bind(snapshot.peak_qty.to_string())
532            .bind(snapshot.quote_currency.to_string())
533            .bind(snapshot.base_currency.map(|x| x.to_string()))
534            .bind(snapshot.settlement_currency.to_string())
535            .bind(snapshot.avg_px_open)
536            .bind(snapshot.avg_px_close)
537            .bind(snapshot.realized_return)
538            .bind(snapshot.realized_pnl.map(|x| x.to_string()))
539            .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
540            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
541            .bind(snapshot.duration_ns.map(|x| x.to_string()))
542            .bind(snapshot.ts_opened.to_string())
543            .bind(snapshot.ts_closed.map(|x| x.to_string()))
544            .bind(snapshot.ts_init.to_string())
545            .bind(snapshot.ts_last.to_string())
546            .execute(&mut *transaction)
547            .await
548            .map(|_| ())
549            .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
550        transaction
551            .commit()
552            .await
553            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
554    }
555
556    /// Loads a `PositionSnapshot` entry by `position_id` via the provided `pool`.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if the SQL SELECT or deserialization fails.
561    pub async fn load_position_snapshot(
562        pool: &PgPool,
563        position_id: &PositionId,
564    ) -> anyhow::Result<Option<PositionSnapshot>> {
565        sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
566            .bind(position_id.to_string())
567            .fetch_optional(pool)
568            .await
569            .map(|model| model.map(|m| m.0))
570            .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
571    }
572
573    /// Checks if an `OrderInitialized` event exists for the given `client_order_id` via the provided `pool`.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the SQL SELECT operation fails.
578    pub async fn check_if_order_initialized_exists(
579        pool: &PgPool,
580        client_order_id: ClientOrderId,
581    ) -> anyhow::Result<bool> {
582        sqlx::query(r#"
583            SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
584        "#)
585            .bind(client_order_id.to_string())
586            .fetch_one(pool)
587            .await
588            .map(|row| row.get(0))
589            .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
590    }
591
592    /// Checks if any account event exists for the given `account_id` via the provided `pool`.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if the SQL SELECT operation fails.
597    pub async fn check_if_account_event_exists(
598        pool: &PgPool,
599        account_id: AccountId,
600    ) -> anyhow::Result<bool> {
601        sqlx::query(
602            r#"
603            SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
604        "#,
605        )
606        .bind(account_id.to_string())
607        .fetch_one(pool)
608        .await
609        .map(|row| row.get(0))
610        .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
611    }
612
613    /// Inserts or updates an order event entry via the provided `pool`.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if the SQL INSERT or UPDATE operation fails.
618    pub async fn add_order_event(
619        pool: &PgPool,
620        order_event: Box<dyn OrderEvent>,
621        client_id: Option<ClientId>,
622    ) -> anyhow::Result<()> {
623        let mut transaction = pool.begin().await?;
624
625        // Insert trader if it does not exist
626        // TODO remove this when node and trader initialization is implemented
627        sqlx::query(
628            r#"
629            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
630        "#,
631        )
632        .bind(order_event.trader_id().to_string())
633        .execute(&mut *transaction)
634        .await
635        .map(|_| ())
636        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
637
638        // Insert client if it does not exist
639        // TODO remove this when client initialization is implemented
640        if let Some(client_id) = client_id {
641            sqlx::query(
642                r#"
643                INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
644            "#,
645            )
646            .bind(client_id.to_string())
647            .execute(&mut *transaction)
648            .await
649            .map(|_| ())
650            .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
651        }
652
653        sqlx::query(r#"
654            INSERT INTO "order_event" (
655                id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
656                post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
657                trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
658                order_list_id, linked_order_ids, parent_order_id,
659                exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
660            ) VALUES (
661                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
662                $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
663                $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
664            )
665            ON CONFLICT (id)
666            DO UPDATE
667            SET
668                kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
669                quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
670                last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
671                emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
672                exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
673
674        "#)
675            .bind(order_event.id().to_string())
676            .bind(order_event.type_name())
677            .bind(order_event.client_order_id().to_string())
678            .bind(order_event.order_type().map(|x| x.to_string()))
679            .bind(order_event.order_side().map(|x| x.to_string()))
680            .bind(order_event.trader_id().to_string())
681            .bind(client_id.map(|x| x.to_string()))
682            .bind(order_event.reason().map(|x| x.to_string()))
683            .bind(order_event.strategy_id().to_string())
684            .bind(order_event.instrument_id().to_string())
685            .bind(order_event.trade_id().map(|x| x.to_string()))
686            .bind(order_event.currency().map(|x| x.code.as_str()))
687            .bind(order_event.quantity().map(|x| x.to_string()))
688            .bind(order_event.time_in_force().map(|x| x.to_string()))
689            .bind(order_event.liquidity_side().map(|x| x.to_string()))
690            .bind(order_event.post_only())
691            .bind(order_event.reduce_only())
692            .bind(order_event.quote_quantity())
693            .bind(order_event.reconciliation())
694            .bind(order_event.price().map(|x| x.to_string()))
695            .bind(order_event.last_px().map(|x| x.to_string()))
696            .bind(order_event.last_qty().map(|x| x.to_string()))
697            .bind(order_event.trigger_price().map(|x| x.to_string()))
698            .bind(order_event.trigger_type().map(|x| x.to_string()))
699            .bind(order_event.limit_offset().map(|x| x.to_string()))
700            .bind(order_event.trailing_offset().map(|x| x.to_string()))
701            .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
702            .bind(order_event.expire_time().map(|x| x.to_string()))
703            .bind(order_event.display_qty().map(|x| x.to_string()))
704            .bind(order_event.emulation_trigger().map(|x| x.to_string()))
705            .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
706            .bind(order_event.contingency_type().map(|x| x.to_string()))
707            .bind(order_event.order_list_id().map(|x| x.to_string()))
708            .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
709            .bind(order_event.parent_order_id().map(|x| x.to_string()))
710            .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
711            .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
712            .bind(order_event.venue_order_id().map(|x| x.to_string()))
713            .bind(order_event.account_id().map(|x| x.to_string()))
714            .bind(order_event.position_id().map(|x| x.to_string()))
715            .bind(order_event.commission().map(|x| x.to_string()))
716            .bind(order_event.ts_event().to_string())
717            .bind(order_event.ts_init().to_string())
718            .execute(&mut *transaction)
719            .await
720            .map(|_| ())
721            .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
722        transaction
723            .commit()
724            .await
725            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
726    }
727
728    /// Loads all order events for a `client_order_id` via the provided `pool`.
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if the SQL SELECT or deserialization fails.
733    pub async fn load_order_events(
734        pool: &PgPool,
735        client_order_id: &ClientOrderId,
736    ) -> anyhow::Result<Vec<OrderEventAny>> {
737        sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
738        .bind(client_order_id.to_string())
739        .fetch_all(pool)
740        .await
741        .map(|rows| rows.into_iter().map(|row| row.0).collect())
742        .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
743    }
744
745    /// Loads and assembles a complete `OrderAny` for a `client_order_id` via the provided `pool`.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if assembling events or SQL operations fail.
750    ///
751    /// # Panics
752    ///
753    /// Panics if assembling the order from events fails.
754    pub async fn load_order(
755        pool: &PgPool,
756        client_order_id: &ClientOrderId,
757    ) -> anyhow::Result<Option<OrderAny>> {
758        let order_events = Self::load_order_events(pool, client_order_id).await;
759
760        match order_events {
761            Ok(order_events) => {
762                if order_events.is_empty() {
763                    return Ok(None);
764                }
765                let order = OrderAny::from_events(order_events).unwrap();
766                Ok(Some(order))
767            }
768            Err(e) => anyhow::bail!("Failed to load order events: {e}"),
769        }
770    }
771
772    /// Loads and assembles all `OrderAny` entries via the provided `pool`.
773    ///
774    /// # Errors
775    ///
776    /// Returns an error if loading events or SQL operations fail.
777    ///
778    /// # Panics
779    ///
780    /// Panics if loading or assembling any individual order fails.
781    pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
782        let mut orders: Vec<OrderAny> = Vec::new();
783        let client_order_ids: Vec<ClientOrderId> = sqlx::query(
784            r#"
785            SELECT DISTINCT client_order_id FROM "order_event"
786        "#,
787        )
788        .fetch_all(pool)
789        .await
790        .map(|rows| {
791            rows.into_iter()
792                .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
793                .collect()
794        })
795        .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
796        for id in client_order_ids {
797            let order = Self::load_order(pool, &id).await.unwrap();
798            if let Some(order) = order {
799                orders.push(order);
800            }
801        }
802        Ok(orders)
803    }
804
805    /// Inserts or updates an `AccountAny` entry via the provided `pool`.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the SQL INSERT or UPDATE operation fails.
810    ///
811    /// # Panics
812    ///
813    /// Panics if checking for existing account event unwrap fails.
814    pub async fn add_account(
815        pool: &PgPool,
816        kind: &str,
817        updated: bool,
818        account: Box<dyn Account>,
819    ) -> anyhow::Result<()> {
820        if updated {
821            let exists = Self::check_if_account_event_exists(pool, account.id())
822                .await
823                .unwrap();
824            assert!(
825                exists,
826                "Account event does not exist for account: {}",
827                account.id()
828            );
829        }
830
831        let mut transaction = pool.begin().await?;
832
833        sqlx::query(
834            r#"
835            INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
836        "#,
837        )
838        .bind(account.id().to_string())
839        .execute(&mut *transaction)
840        .await
841        .map(|_| ())
842        .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
843
844        let account_event = account.last_event().unwrap();
845        sqlx::query(r#"
846            INSERT INTO "account_event" (
847                id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
848            ) VALUES (
849                $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
850            )
851            ON CONFLICT (id)
852            DO UPDATE
853            SET
854                kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
855                ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
856        "#)
857            .bind(account_event.event_id.to_string())
858            .bind(kind.to_string())
859            .bind(account_event.account_id.to_string())
860            .bind(account_event.base_currency.map(|x| x.code.as_str()))
861            .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
862            .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
863            .bind(account_event.is_reported)
864            .bind(account_event.ts_event.to_string())
865            .bind(account_event.ts_init.to_string())
866            .execute(&mut *transaction)
867            .await
868            .map(|_| ())
869            .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
870        transaction
871            .commit()
872            .await
873            .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
874    }
875
876    /// Loads all account events for `account_id` via the provided `pool`.
877    ///
878    /// # Errors
879    ///
880    /// Returns an error if the SQL SELECT or deserialization fails.
881    pub async fn load_account_events(
882        pool: &PgPool,
883        account_id: &AccountId,
884    ) -> anyhow::Result<Vec<AccountState>> {
885        sqlx::query_as::<_, AccountEventModel>(
886            r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
887        )
888        .bind(account_id.to_string())
889        .fetch_all(pool)
890        .await
891        .map(|rows| rows.into_iter().map(|row| row.0).collect())
892        .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
893    }
894
895    /// Loads and assembles a complete `AccountAny` for `account_id` via the provided `pool`.
896    ///
897    /// # Errors
898    ///
899    /// Returns an error if assembling events or SQL operations fail.
900    ///
901    /// # Panics
902    ///
903    /// Panics if assembling the account from events fails.
904    pub async fn load_account(
905        pool: &PgPool,
906        account_id: &AccountId,
907    ) -> anyhow::Result<Option<AccountAny>> {
908        let account_events = Self::load_account_events(pool, account_id).await;
909        match account_events {
910            Ok(account_events) => {
911                if account_events.is_empty() {
912                    return Ok(None);
913                }
914                let account = AccountAny::from_events(&account_events).unwrap();
915                Ok(Some(account))
916            }
917            Err(e) => anyhow::bail!("Failed to load account events: {e}"),
918        }
919    }
920
921    /// Loads and assembles all `AccountAny` entries via the provided `pool`.
922    ///
923    /// # Errors
924    ///
925    /// Returns an error if loading events or SQL operations fail.
926    ///
927    /// # Panics
928    ///
929    /// Panics if loading or assembling any individual account fails.
930    pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
931        let mut accounts: Vec<AccountAny> = Vec::new();
932        let account_ids: Vec<AccountId> = sqlx::query(
933            r#"
934            SELECT DISTINCT account_id FROM "account_event"
935        "#,
936        )
937        .fetch_all(pool)
938        .await
939        .map(|rows| {
940            rows.into_iter()
941                .map(|row| AccountId::from(row.get::<&str, _>(0)))
942                .collect()
943        })
944        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
945        for id in account_ids {
946            let account = Self::load_account(pool, &id).await.unwrap();
947            if let Some(account) = account {
948                accounts.push(account);
949            }
950        }
951        Ok(accounts)
952    }
953
954    /// Inserts a `TradeTick` entry via the provided `pool`.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the SQL INSERT operation fails.
959    pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
960        sqlx::query(r#"
961            INSERT INTO "trade" (
962                instrument_id, price, quantity, aggressor_side, venue_trade_id,
963                ts_event, ts_init, created_at, updated_at
964            ) VALUES (
965                $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
966            )
967            ON CONFLICT (id)
968            DO UPDATE
969            SET
970                instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
971                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
972        "#)
973            .bind(trade.instrument_id.to_string())
974            .bind(trade.price.to_string())
975            .bind(trade.size.to_string())
976            .bind(AggressorSideModel(trade.aggressor_side))
977            .bind(trade.trade_id.to_string())
978            .bind(trade.ts_event.to_string())
979            .bind(trade.ts_init.to_string())
980            .execute(pool)
981            .await
982            .map(|_| ())
983            .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
984    }
985
986    /// Loads all `TradeTick` entries for `instrument_id` via the provided `pool`.
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if the SQL SELECT or deserialization fails.
991    pub async fn load_trades(
992        pool: &PgPool,
993        instrument_id: &InstrumentId,
994    ) -> anyhow::Result<Vec<TradeTick>> {
995        sqlx::query_as::<_, TradeTickModel>(
996            r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
997        )
998        .bind(instrument_id.to_string())
999        .fetch_all(pool)
1000        .await
1001        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1002        .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1003    }
1004
1005    /// Inserts a `QuoteTick` entry via the provided `pool`.
1006    ///
1007    /// # Errors
1008    ///
1009    /// Returns an error if the SQL INSERT operation fails.
1010    pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1011        sqlx::query(r#"
1012            INSERT INTO "quote" (
1013                instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1014            ) VALUES (
1015                $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1016            )
1017            ON CONFLICT (id)
1018            DO UPDATE
1019            SET
1020                instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1021                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1022        "#)
1023            .bind(quote.instrument_id.to_string())
1024            .bind(quote.bid_price.to_string())
1025            .bind(quote.ask_price.to_string())
1026            .bind(quote.bid_size.to_string())
1027            .bind(quote.ask_size.to_string())
1028            .bind(quote.ts_event.to_string())
1029            .bind(quote.ts_init.to_string())
1030            .execute(pool)
1031            .await
1032            .map(|_| ())
1033            .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1034    }
1035
1036    /// Loads all `QuoteTick` entries for `instrument_id` via the provided `pool`.
1037    ///
1038    /// # Errors
1039    ///
1040    /// Returns an error if the SQL SELECT or deserialization fails.
1041    pub async fn load_quotes(
1042        pool: &PgPool,
1043        instrument_id: &InstrumentId,
1044    ) -> anyhow::Result<Vec<QuoteTick>> {
1045        sqlx::query_as::<_, QuoteTickModel>(
1046            r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1047        )
1048        .bind(instrument_id.to_string())
1049        .fetch_all(pool)
1050        .await
1051        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1052        .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1053    }
1054
1055    /// Inserts a `Bar` entry via the provided `pool`.
1056    ///
1057    /// # Errors
1058    ///
1059    /// Returns an error if the SQL INSERT operation fails.
1060    pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1061        println!("Adding bar: {bar:?}");
1062        sqlx::query(r#"
1063            INSERT INTO "bar" (
1064                instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1065            ) VALUES (
1066                $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1067            )
1068            ON CONFLICT (id)
1069            DO UPDATE
1070            SET
1071                instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1072                open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1073        "#)
1074            .bind(bar.bar_type.instrument_id().to_string())
1075            .bind(bar.bar_type.spec().step.get() as i32)
1076            .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1077            .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1078            .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1079            .bind(bar.open.to_string())
1080            .bind(bar.high.to_string())
1081            .bind(bar.low.to_string())
1082            .bind(bar.close.to_string())
1083            .bind(bar.volume.to_string())
1084            .bind(bar.ts_event.to_string())
1085            .bind(bar.ts_init.to_string())
1086            .execute(pool)
1087            .await
1088            .map(|_| ())
1089            .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1090    }
1091
1092    /// Loads all `Bar` entries for `instrument_id` via the provided `pool`.
1093    ///
1094    /// # Errors
1095    ///
1096    /// Returns an error if the SQL SELECT or deserialization fails.
1097    pub async fn load_bars(
1098        pool: &PgPool,
1099        instrument_id: &InstrumentId,
1100    ) -> anyhow::Result<Vec<Bar>> {
1101        sqlx::query_as::<_, BarModel>(
1102            r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1103        )
1104        .bind(instrument_id.to_string())
1105        .fetch_all(pool)
1106        .await
1107        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1108        .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1109    }
1110
1111    /// Loads all distinct client order IDs from order events via the provided `pool`.
1112    ///
1113    /// # Errors
1114    ///
1115    /// Returns an error if the SQL SELECT or iteration fails.
1116    pub async fn load_distinct_order_event_client_ids(
1117        pool: &PgPool,
1118    ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1119        let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1120        let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1121            r#"
1122            SELECT DISTINCT
1123                client_order_id AS "client_order_id",
1124                client_id AS "client_id"
1125            FROM "order_event"
1126        "#,
1127        )
1128        .fetch_all(pool)
1129        .await
1130        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1131
1132        for id in result {
1133            map.insert(id.client_order_id, id.client_id);
1134        }
1135        Ok(map)
1136    }
1137
1138    /// Inserts a `Signal` entry via the provided `pool`.
1139    ///
1140    /// # Errors
1141    ///
1142    /// Returns an error if the SQL INSERT operation fails.
1143    pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1144        sqlx::query(
1145            r#"
1146            INSERT INTO "signal" (
1147                name, value, ts_event, ts_init, created_at, updated_at
1148            ) VALUES (
1149                $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1150            )
1151            ON CONFLICT (id)
1152            DO UPDATE
1153            SET
1154                name = $1, value = $2, ts_event = $3, ts_init = $4,
1155                updated_at = CURRENT_TIMESTAMP
1156        "#,
1157        )
1158        .bind(signal.name.to_string())
1159        .bind(signal.value.clone())
1160        .bind(signal.ts_event.to_string())
1161        .bind(signal.ts_init.to_string())
1162        .execute(pool)
1163        .await
1164        .map(|_| ())
1165        .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1166    }
1167
1168    /// Loads all `Signal` entries by `name` via the provided `pool`.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the SQL SELECT or deserialization fails.
1173    pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1174        sqlx::query_as::<_, SignalModel>(
1175            r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1176        )
1177        .bind(name)
1178        .fetch_all(pool)
1179        .await
1180        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1181        .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1182    }
1183
1184    /// Inserts a `CustomData` entry via the provided `pool`.
1185    ///
1186    /// Serializes the model `CustomData` to full JSON and stores it in the JSONB `value` column.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if the SQL INSERT operation fails.
1191    pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1192        let json_bytes = serde_json::to_vec(data)
1193            .map_err(|e| anyhow::anyhow!("CustomData must be valid JSON: {e}"))?;
1194        let value_json: serde_json::Value = serde_json::from_slice(&json_bytes)
1195            .map_err(|e| anyhow::anyhow!("CustomData value must be valid JSON: {e}"))?;
1196        let data_type_obj = value_json
1197            .get("data_type")
1198            .and_then(|v| v.as_object())
1199            .ok_or_else(|| anyhow::anyhow!("CustomData JSON missing data_type"))?;
1200        let data_type_name = data_type_obj
1201            .get("type_name")
1202            .and_then(|v| v.as_str())
1203            .unwrap_or("");
1204        let metadata_json = data_type_obj
1205            .get("metadata")
1206            .cloned()
1207            .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
1208        let identifier = data_type_obj
1209            .get("identifier")
1210            .and_then(|v| v.as_str())
1211            .unwrap_or("");
1212        sqlx::query(
1213            r#"
1214            INSERT INTO "custom" (data_type, metadata, identifier, value, ts_event, ts_init, created_at, updated_at)
1215            VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
1216            ON CONFLICT (id)
1217            DO UPDATE SET
1218                data_type = EXCLUDED.data_type,
1219                metadata = EXCLUDED.metadata,
1220                identifier = EXCLUDED.identifier,
1221                value = EXCLUDED.value,
1222                ts_event = EXCLUDED.ts_event,
1223                ts_init = EXCLUDED.ts_init,
1224                updated_at = CURRENT_TIMESTAMP
1225        "#,
1226        )
1227        .bind(data_type_name)
1228        .bind(&metadata_json)
1229        .bind(identifier)
1230        .bind(&value_json)
1231        .bind(
1232            value_json
1233                .get("ts_event")
1234                .and_then(|v| v.as_u64())
1235                .unwrap_or_else(|| data.ts_init().as_u64())
1236                .to_string(),
1237        )
1238        .bind(data.ts_init().to_string())
1239        .execute(pool)
1240        .await
1241        .map(|_| ())
1242        .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1243    }
1244
1245    /// Loads all `CustomData` entries of `data_type` via the provided `pool`.
1246    ///
1247    /// Filters by `data_type`, `metadata`, and `identifier` columns to match the requested data type.
1248    ///
1249    /// # Errors
1250    ///
1251    /// Returns an error if the SQL SELECT or deserialization fails.
1252    pub async fn load_custom_data(
1253        pool: &PgPool,
1254        data_type: &DataType,
1255    ) -> anyhow::Result<Vec<CustomData>> {
1256        let metadata_json = data_type.metadata().as_ref().map_or(
1257            Ok(serde_json::Value::Object(serde_json::Map::new())),
1258            serde_json::to_value,
1259        )?;
1260
1261        let type_name = data_type.type_name();
1262        let short_type = type_name.rsplit([':', '.']).next().unwrap_or(type_name);
1263
1264        let rows = match data_type.identifier() {
1265            Some(identifier) => {
1266                sqlx::query(
1267                    r#"SELECT value, ts_event, ts_init FROM "custom"
1268                   WHERE (data_type = $1 OR data_type = $2)
1269                     AND metadata = $3
1270                     AND identifier = $4
1271                   ORDER BY ts_init ASC"#,
1272                )
1273                .bind(type_name)
1274                .bind(short_type)
1275                .bind(&metadata_json)
1276                .bind(identifier)
1277                .fetch_all(pool)
1278                .await
1279            }
1280            None => {
1281                sqlx::query(
1282                    r#"SELECT value, ts_event, ts_init FROM "custom"
1283                   WHERE (data_type = $1 OR data_type = $2)
1284                     AND metadata = $3
1285                     AND identifier = ''
1286                   ORDER BY ts_init ASC"#,
1287                )
1288                .bind(type_name)
1289                .bind(short_type)
1290                .bind(&metadata_json)
1291                .fetch_all(pool)
1292                .await
1293            }
1294        }
1295        .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))?;
1296
1297        let mut results = Vec::with_capacity(rows.len());
1298        for row in rows {
1299            let value_json: serde_json::Value = row.try_get("value")?;
1300            let json_bytes = serde_json::to_vec(&value_json)
1301                .map_err(|e| anyhow::anyhow!("Failed to serialize JSON: {e}"))?;
1302            let custom =
1303                CustomData::from_json_bytes(&json_bytes).map_err(|e| anyhow::anyhow!("{e}"))?;
1304            results.push(custom);
1305        }
1306        Ok(results)
1307    }
1308}