1use ahash::AHashMap;
17use nautilus_common::signal::Signal;
18use nautilus_model::{
19 accounts::{Account, AccountAny},
20 data::{Bar, CustomData, DataType, HasTsInit, QuoteTick, TradeTick},
21 events::{
22 AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
23 position::snapshot::PositionSnapshot,
24 },
25 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
26 instruments::{Instrument, InstrumentAny},
27 orders::{Order, OrderAny},
28 types::{AccountBalance, Currency, MarginBalance},
29};
30use sqlx::{PgPool, Row};
31
32use super::models::{
33 orders::OrderSnapshotModel, positions::PositionSnapshotModel, types::SignalModel,
34};
35use crate::sql::models::{
36 accounts::AccountEventModel,
37 data::{BarModel, QuoteTickModel, TradeTickModel},
38 enums::{
39 AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
40 CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
41 },
42 general::{GeneralRow, OrderEventOrderClientIdCombination},
43 instruments::InstrumentAnyModel,
44 orders::OrderEventAnyModel,
45 types::CurrencyModel,
46};
47
48#[derive(Debug)]
49pub struct DatabaseQueries;
50
51impl DatabaseQueries {
52 pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
58 sqlx::query("SELECT truncate_all_tables()")
59 .execute(pool)
60 .await
61 .map(|_| ())
62 .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
63 }
64
65 pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
71 sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
72 .bind(key)
73 .bind(value)
74 .execute(pool)
75 .await
76 .map(|_| ())
77 .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
78 }
79
80 pub async fn load(pool: &PgPool) -> anyhow::Result<AHashMap<String, Vec<u8>>> {
86 sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
87 .fetch_all(pool)
88 .await
89 .map(|rows| {
90 let mut cache: AHashMap<String, Vec<u8>> = AHashMap::new();
91 for row in rows {
92 cache.insert(row.id, row.value);
93 }
94 cache
95 })
96 .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
97 }
98
99 pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
105 sqlx::query(
106 "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
107 )
108 .bind(currency.code.as_str())
109 .bind(i32::from(currency.precision))
110 .bind(i32::from(currency.iso4217))
111 .bind(currency.name.as_str())
112 .bind(CurrencyTypeModel(currency.currency_type))
113 .execute(pool)
114 .await
115 .map(|_| ())
116 .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
117 }
118
119 pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
125 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
126 .fetch_all(pool)
127 .await
128 .map(|rows| rows.into_iter().map(|row| row.0).collect())
129 .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
130 }
131
132 pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
138 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
139 .bind(code)
140 .fetch_optional(pool)
141 .await
142 .map(|currency| currency.map(|row| row.0))
143 .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
144 }
145
146 pub async fn add_instrument(
152 pool: &PgPool,
153 kind: &str,
154 instrument: Box<dyn Instrument>,
155 ) -> anyhow::Result<()> {
156 sqlx::query(r#"
157 INSERT INTO "instrument" (
158 id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
159 multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
160 price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
161 min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
162 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
163 ON CONFLICT (id)
164 DO UPDATE
165 SET
166 kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
167 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
168 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
169 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32, ts_event = $33, updated_at = CURRENT_TIMESTAMP
170 "#)
171 .bind(instrument.id().to_string())
172 .bind(kind)
173 .bind(instrument.raw_symbol().to_string())
174 .bind(instrument.base_currency().map(|x| x.code.as_str()))
175 .bind(instrument.underlying().map(|x| x.to_string()))
176 .bind(instrument.quote_currency().code.as_str())
177 .bind(instrument.settlement_currency().code.as_str())
178 .bind(instrument.isin().map(|x| x.to_string()))
179 .bind(AssetClassModel(instrument.asset_class()))
180 .bind(instrument.exchange().map(|x| x.to_string()))
181 .bind(instrument.multiplier().to_string())
182 .bind(instrument.option_kind().map(|x| x.to_string()))
183 .bind(instrument.is_inverse())
184 .bind(instrument.strike_price().map(|x| x.to_string()))
185 .bind(instrument.activation_ns().map(|x| x.to_string()))
186 .bind(instrument.expiration_ns().map(|x| x.to_string()))
187 .bind(i32::from(instrument.price_precision()))
188 .bind(i32::from(instrument.size_precision()))
189 .bind(instrument.price_increment().to_string())
190 .bind(instrument.size_increment().to_string())
191 .bind(instrument.maker_fee().to_string())
192 .bind(instrument.taker_fee().to_string())
193 .bind(instrument.margin_init().to_string())
194 .bind(instrument.margin_maint().to_string())
195 .bind(instrument.lot_size().map(|x| x.to_string()))
196 .bind(instrument.max_quantity().map(|x| x.to_string()))
197 .bind(instrument.min_quantity().map(|x| x.to_string()))
198 .bind(instrument.max_notional().map(|x| x.to_string()))
199 .bind(instrument.min_notional().map(|x| x.to_string()))
200 .bind(instrument.max_price().map(|x| x.to_string()))
201 .bind(instrument.min_price().map(|x| x.to_string()))
202 .bind(instrument.ts_init().to_string())
203 .bind(instrument.ts_event().to_string())
204 .execute(pool)
205 .await
206 .map(|_| ())
207 .map_err(|e| anyhow::anyhow!("Failed to insert item {} into instrument table: {:?}", instrument.id(), e))
208 }
209
210 pub async fn load_instrument(
216 pool: &PgPool,
217 instrument_id: &InstrumentId,
218 ) -> anyhow::Result<Option<InstrumentAny>> {
219 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
220 .bind(instrument_id.to_string())
221 .fetch_optional(pool)
222 .await
223 .map(|instrument| instrument.map(|row| row.0))
224 .map_err(|e| {
225 anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
226 })
227 }
228
229 pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
235 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
236 .fetch_all(pool)
237 .await
238 .map(|rows| rows.into_iter().map(|row| row.0).collect())
239 .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
240 }
241
242 pub async fn add_order(
252 pool: &PgPool,
253 _kind: &str,
254 updated: bool,
255 order: Box<dyn Order>,
256 client_id: Option<ClientId>,
257 ) -> anyhow::Result<()> {
258 if updated {
259 let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
260 .await
261 .unwrap();
262 assert!(
263 exists,
264 "OrderInitialized event does not exist for order: {}",
265 order.client_order_id()
266 );
267 }
268
269 match order.last_event().clone() {
270 OrderEventAny::Accepted(event) => {
271 Self::add_order_event(pool, Box::new(event), client_id).await
272 }
273 OrderEventAny::CancelRejected(event) => {
274 Self::add_order_event(pool, Box::new(event), client_id).await
275 }
276 OrderEventAny::Canceled(event) => {
277 Self::add_order_event(pool, Box::new(event), client_id).await
278 }
279 OrderEventAny::Denied(event) => {
280 Self::add_order_event(pool, Box::new(event), client_id).await
281 }
282 OrderEventAny::Emulated(event) => {
283 Self::add_order_event(pool, Box::new(event), client_id).await
284 }
285 OrderEventAny::Expired(event) => {
286 Self::add_order_event(pool, Box::new(event), client_id).await
287 }
288 OrderEventAny::Filled(event) => {
289 Self::add_order_event(pool, Box::new(event), client_id).await
290 }
291 OrderEventAny::Initialized(event) => {
292 Self::add_order_event(pool, Box::new(event), client_id).await
293 }
294 OrderEventAny::ModifyRejected(event) => {
295 Self::add_order_event(pool, Box::new(event), client_id).await
296 }
297 OrderEventAny::PendingCancel(event) => {
298 Self::add_order_event(pool, Box::new(event), client_id).await
299 }
300 OrderEventAny::PendingUpdate(event) => {
301 Self::add_order_event(pool, Box::new(event), client_id).await
302 }
303 OrderEventAny::Rejected(event) => {
304 Self::add_order_event(pool, Box::new(event), client_id).await
305 }
306 OrderEventAny::Released(event) => {
307 Self::add_order_event(pool, Box::new(event), client_id).await
308 }
309 OrderEventAny::Submitted(event) => {
310 Self::add_order_event(pool, Box::new(event), client_id).await
311 }
312 OrderEventAny::Updated(event) => {
313 Self::add_order_event(pool, Box::new(event), client_id).await
314 }
315 OrderEventAny::Triggered(event) => {
316 Self::add_order_event(pool, Box::new(event), client_id).await
317 }
318 }
319 }
320
321 pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
331 let mut transaction = pool.begin().await?;
332
333 sqlx::query(
336 r#"
337 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
338 "#,
339 )
340 .bind(snapshot.trader_id.to_string())
341 .execute(&mut *transaction)
342 .await
343 .map(|_| ())
344 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
345
346 sqlx::query(
347 r#"
348 INSERT INTO "order" (
349 id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
350 account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
351 trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
352 expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
353 is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
354 trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
355 parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
356 created_at, updated_at
357 ) VALUES (
358 $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
359 $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
360 $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
361 CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
362 )
363 ON CONFLICT (id)
364 DO UPDATE SET
365 trader_id = $2,
366 strategy_id = $3,
367 instrument_id = $4,
368 venue_order_id = $5,
369 position_id = $6,
370 account_id = $7,
371 last_trade_id = $8,
372 order_type = $9,
373 order_side = $10,
374 quantity = $11,
375 price = $12,
376 trigger_price = $13,
377 trigger_type = $14,
378 limit_offset = $15,
379 trailing_offset = $16,
380 trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
381 time_in_force = $18,
382 expire_time = $19,
383 filled_qty = $20,
384 liquidity_side = $21,
385 avg_px = $22,
386 slippage = $23,
387 commissions = $24,
388 status = $25,
389 is_post_only = $26,
390 is_reduce_only = $27,
391 is_quote_quantity = $28,
392 display_qty = $29,
393 emulation_trigger = $30,
394 trigger_instrument_id = $31,
395 contingency_type = $32,
396 order_list_id = $33,
397 linked_order_ids = $34,
398 parent_order_id = $35,
399 exec_algorithm_id = $36,
400 exec_algorithm_params = $37,
401 exec_spawn_id = $38,
402 tags = $39,
403 init_id = $40,
404 ts_init = $41,
405 ts_last = $42,
406 updated_at = CURRENT_TIMESTAMP
407 "#)
408 .bind(snapshot.client_order_id.to_string()) .bind(snapshot.trader_id.to_string())
410 .bind(snapshot.strategy_id.to_string())
411 .bind(snapshot.instrument_id.to_string())
412 .bind(snapshot.venue_order_id.map(|x| x.to_string()))
413 .bind(snapshot.position_id.map(|x| x.to_string()))
414 .bind(snapshot.account_id.map(|x| x.to_string()))
415 .bind(snapshot.last_trade_id.map(|x| x.to_string()))
416 .bind(snapshot.order_type.to_string())
417 .bind(snapshot.order_side.to_string())
418 .bind(snapshot.quantity.to_string())
419 .bind(snapshot.price.map(|x| x.to_string()))
420 .bind(snapshot.trigger_price.map(|x| x.to_string()))
421 .bind(snapshot.trigger_type.map(|x| x.to_string()))
422 .bind(snapshot.limit_offset.map(|x| x.to_string()))
423 .bind(snapshot.trailing_offset.map(|x| x.to_string()))
424 .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
425 .bind(snapshot.time_in_force.to_string())
426 .bind(snapshot.expire_time.map(|x| x.to_string()))
427 .bind(snapshot.filled_qty.to_string())
428 .bind(snapshot.liquidity_side.map(|x| x.to_string()))
429 .bind(snapshot.avg_px)
430 .bind(snapshot.slippage)
431 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
432 .bind(snapshot.status.to_string())
433 .bind(snapshot.is_post_only)
434 .bind(snapshot.is_reduce_only)
435 .bind(snapshot.is_quote_quantity)
436 .bind(snapshot.display_qty.map(|x| x.to_string()))
437 .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
438 .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
439 .bind(snapshot.contingency_type.map(|x| x.to_string()))
440 .bind(snapshot.order_list_id.map(|x| x.to_string()))
441 .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
442 .bind(snapshot.parent_order_id.map(|x| x.to_string()))
443 .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
444 .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
445 .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
446 .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
447 .bind(snapshot.init_id.to_string())
448 .bind(snapshot.ts_init.to_string())
449 .bind(snapshot.ts_last.to_string())
450 .execute(&mut *transaction)
451 .await
452 .map(|_| ())
453 .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
454
455 transaction
456 .commit()
457 .await
458 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
459 }
460
461 pub async fn load_order_snapshot(
467 pool: &PgPool,
468 client_order_id: &ClientOrderId,
469 ) -> anyhow::Result<Option<OrderSnapshot>> {
470 sqlx::query_as::<_, OrderSnapshotModel>(
471 r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
472 )
473 .bind(client_order_id.to_string())
474 .fetch_optional(pool)
475 .await
476 .map(|model| model.map(|m| m.0))
477 .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
478 }
479
480 pub async fn add_position_snapshot(
486 pool: &PgPool,
487 snapshot: PositionSnapshot,
488 ) -> anyhow::Result<()> {
489 let mut transaction = pool.begin().await?;
490
491 sqlx::query(
494 r#"
495 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
496 "#,
497 )
498 .bind(snapshot.trader_id.to_string())
499 .execute(&mut *transaction)
500 .await
501 .map(|_| ())
502 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
503
504 sqlx::query(r#"
505 INSERT INTO "position" (
506 id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
507 quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
508 duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
509 ) VALUES (
510 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
511 $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
512 )
513 ON CONFLICT (id)
514 DO UPDATE
515 SET
516 trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
517 peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
518 commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
519 "#)
520 .bind(snapshot.position_id.to_string())
521 .bind(snapshot.trader_id.to_string())
522 .bind(snapshot.strategy_id.to_string())
523 .bind(snapshot.instrument_id.to_string())
524 .bind(snapshot.account_id.to_string())
525 .bind(snapshot.opening_order_id.to_string())
526 .bind(snapshot.closing_order_id.map(|x| x.to_string()))
527 .bind(snapshot.entry.to_string())
528 .bind(snapshot.side.to_string())
529 .bind(snapshot.signed_qty)
530 .bind(snapshot.quantity.to_string())
531 .bind(snapshot.peak_qty.to_string())
532 .bind(snapshot.quote_currency.to_string())
533 .bind(snapshot.base_currency.map(|x| x.to_string()))
534 .bind(snapshot.settlement_currency.to_string())
535 .bind(snapshot.avg_px_open)
536 .bind(snapshot.avg_px_close)
537 .bind(snapshot.realized_return)
538 .bind(snapshot.realized_pnl.map(|x| x.to_string()))
539 .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
540 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
541 .bind(snapshot.duration_ns.map(|x| x.to_string()))
542 .bind(snapshot.ts_opened.to_string())
543 .bind(snapshot.ts_closed.map(|x| x.to_string()))
544 .bind(snapshot.ts_init.to_string())
545 .bind(snapshot.ts_last.to_string())
546 .execute(&mut *transaction)
547 .await
548 .map(|_| ())
549 .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
550 transaction
551 .commit()
552 .await
553 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
554 }
555
556 pub async fn load_position_snapshot(
562 pool: &PgPool,
563 position_id: &PositionId,
564 ) -> anyhow::Result<Option<PositionSnapshot>> {
565 sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
566 .bind(position_id.to_string())
567 .fetch_optional(pool)
568 .await
569 .map(|model| model.map(|m| m.0))
570 .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
571 }
572
573 pub async fn check_if_order_initialized_exists(
579 pool: &PgPool,
580 client_order_id: ClientOrderId,
581 ) -> anyhow::Result<bool> {
582 sqlx::query(r#"
583 SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
584 "#)
585 .bind(client_order_id.to_string())
586 .fetch_one(pool)
587 .await
588 .map(|row| row.get(0))
589 .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
590 }
591
592 pub async fn check_if_account_event_exists(
598 pool: &PgPool,
599 account_id: AccountId,
600 ) -> anyhow::Result<bool> {
601 sqlx::query(
602 r#"
603 SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
604 "#,
605 )
606 .bind(account_id.to_string())
607 .fetch_one(pool)
608 .await
609 .map(|row| row.get(0))
610 .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
611 }
612
613 pub async fn add_order_event(
619 pool: &PgPool,
620 order_event: Box<dyn OrderEvent>,
621 client_id: Option<ClientId>,
622 ) -> anyhow::Result<()> {
623 let mut transaction = pool.begin().await?;
624
625 sqlx::query(
628 r#"
629 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
630 "#,
631 )
632 .bind(order_event.trader_id().to_string())
633 .execute(&mut *transaction)
634 .await
635 .map(|_| ())
636 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
637
638 if let Some(client_id) = client_id {
641 sqlx::query(
642 r#"
643 INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
644 "#,
645 )
646 .bind(client_id.to_string())
647 .execute(&mut *transaction)
648 .await
649 .map(|_| ())
650 .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
651 }
652
653 sqlx::query(r#"
654 INSERT INTO "order_event" (
655 id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
656 post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
657 trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
658 order_list_id, linked_order_ids, parent_order_id,
659 exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
660 ) VALUES (
661 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
662 $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
663 $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
664 )
665 ON CONFLICT (id)
666 DO UPDATE
667 SET
668 kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
669 quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
670 last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
671 emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
672 exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
673
674 "#)
675 .bind(order_event.id().to_string())
676 .bind(order_event.type_name())
677 .bind(order_event.client_order_id().to_string())
678 .bind(order_event.order_type().map(|x| x.to_string()))
679 .bind(order_event.order_side().map(|x| x.to_string()))
680 .bind(order_event.trader_id().to_string())
681 .bind(client_id.map(|x| x.to_string()))
682 .bind(order_event.reason().map(|x| x.to_string()))
683 .bind(order_event.strategy_id().to_string())
684 .bind(order_event.instrument_id().to_string())
685 .bind(order_event.trade_id().map(|x| x.to_string()))
686 .bind(order_event.currency().map(|x| x.code.as_str()))
687 .bind(order_event.quantity().map(|x| x.to_string()))
688 .bind(order_event.time_in_force().map(|x| x.to_string()))
689 .bind(order_event.liquidity_side().map(|x| x.to_string()))
690 .bind(order_event.post_only())
691 .bind(order_event.reduce_only())
692 .bind(order_event.quote_quantity())
693 .bind(order_event.reconciliation())
694 .bind(order_event.price().map(|x| x.to_string()))
695 .bind(order_event.last_px().map(|x| x.to_string()))
696 .bind(order_event.last_qty().map(|x| x.to_string()))
697 .bind(order_event.trigger_price().map(|x| x.to_string()))
698 .bind(order_event.trigger_type().map(|x| x.to_string()))
699 .bind(order_event.limit_offset().map(|x| x.to_string()))
700 .bind(order_event.trailing_offset().map(|x| x.to_string()))
701 .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
702 .bind(order_event.expire_time().map(|x| x.to_string()))
703 .bind(order_event.display_qty().map(|x| x.to_string()))
704 .bind(order_event.emulation_trigger().map(|x| x.to_string()))
705 .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
706 .bind(order_event.contingency_type().map(|x| x.to_string()))
707 .bind(order_event.order_list_id().map(|x| x.to_string()))
708 .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
709 .bind(order_event.parent_order_id().map(|x| x.to_string()))
710 .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
711 .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
712 .bind(order_event.venue_order_id().map(|x| x.to_string()))
713 .bind(order_event.account_id().map(|x| x.to_string()))
714 .bind(order_event.position_id().map(|x| x.to_string()))
715 .bind(order_event.commission().map(|x| x.to_string()))
716 .bind(order_event.ts_event().to_string())
717 .bind(order_event.ts_init().to_string())
718 .execute(&mut *transaction)
719 .await
720 .map(|_| ())
721 .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
722 transaction
723 .commit()
724 .await
725 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
726 }
727
728 pub async fn load_order_events(
734 pool: &PgPool,
735 client_order_id: &ClientOrderId,
736 ) -> anyhow::Result<Vec<OrderEventAny>> {
737 sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
738 .bind(client_order_id.to_string())
739 .fetch_all(pool)
740 .await
741 .map(|rows| rows.into_iter().map(|row| row.0).collect())
742 .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
743 }
744
745 pub async fn load_order(
755 pool: &PgPool,
756 client_order_id: &ClientOrderId,
757 ) -> anyhow::Result<Option<OrderAny>> {
758 let order_events = Self::load_order_events(pool, client_order_id).await;
759
760 match order_events {
761 Ok(order_events) => {
762 if order_events.is_empty() {
763 return Ok(None);
764 }
765 let order = OrderAny::from_events(order_events).unwrap();
766 Ok(Some(order))
767 }
768 Err(e) => anyhow::bail!("Failed to load order events: {e}"),
769 }
770 }
771
772 pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
782 let mut orders: Vec<OrderAny> = Vec::new();
783 let client_order_ids: Vec<ClientOrderId> = sqlx::query(
784 r#"
785 SELECT DISTINCT client_order_id FROM "order_event"
786 "#,
787 )
788 .fetch_all(pool)
789 .await
790 .map(|rows| {
791 rows.into_iter()
792 .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
793 .collect()
794 })
795 .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
796 for id in client_order_ids {
797 let order = Self::load_order(pool, &id).await.unwrap();
798 if let Some(order) = order {
799 orders.push(order);
800 }
801 }
802 Ok(orders)
803 }
804
805 pub async fn add_account(
815 pool: &PgPool,
816 kind: &str,
817 updated: bool,
818 account: Box<dyn Account>,
819 ) -> anyhow::Result<()> {
820 if updated {
821 let exists = Self::check_if_account_event_exists(pool, account.id())
822 .await
823 .unwrap();
824 assert!(
825 exists,
826 "Account event does not exist for account: {}",
827 account.id()
828 );
829 }
830
831 let mut transaction = pool.begin().await?;
832
833 sqlx::query(
834 r#"
835 INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
836 "#,
837 )
838 .bind(account.id().to_string())
839 .execute(&mut *transaction)
840 .await
841 .map(|_| ())
842 .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
843
844 let account_event = account.last_event().unwrap();
845 sqlx::query(r#"
846 INSERT INTO "account_event" (
847 id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
848 ) VALUES (
849 $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
850 )
851 ON CONFLICT (id)
852 DO UPDATE
853 SET
854 kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
855 ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
856 "#)
857 .bind(account_event.event_id.to_string())
858 .bind(kind.to_string())
859 .bind(account_event.account_id.to_string())
860 .bind(account_event.base_currency.map(|x| x.code.as_str()))
861 .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
862 .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
863 .bind(account_event.is_reported)
864 .bind(account_event.ts_event.to_string())
865 .bind(account_event.ts_init.to_string())
866 .execute(&mut *transaction)
867 .await
868 .map(|_| ())
869 .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
870 transaction
871 .commit()
872 .await
873 .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
874 }
875
876 pub async fn load_account_events(
882 pool: &PgPool,
883 account_id: &AccountId,
884 ) -> anyhow::Result<Vec<AccountState>> {
885 sqlx::query_as::<_, AccountEventModel>(
886 r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
887 )
888 .bind(account_id.to_string())
889 .fetch_all(pool)
890 .await
891 .map(|rows| rows.into_iter().map(|row| row.0).collect())
892 .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
893 }
894
895 pub async fn load_account(
905 pool: &PgPool,
906 account_id: &AccountId,
907 ) -> anyhow::Result<Option<AccountAny>> {
908 let account_events = Self::load_account_events(pool, account_id).await;
909 match account_events {
910 Ok(account_events) => {
911 if account_events.is_empty() {
912 return Ok(None);
913 }
914 let account = AccountAny::from_events(&account_events).unwrap();
915 Ok(Some(account))
916 }
917 Err(e) => anyhow::bail!("Failed to load account events: {e}"),
918 }
919 }
920
921 pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
931 let mut accounts: Vec<AccountAny> = Vec::new();
932 let account_ids: Vec<AccountId> = sqlx::query(
933 r#"
934 SELECT DISTINCT account_id FROM "account_event"
935 "#,
936 )
937 .fetch_all(pool)
938 .await
939 .map(|rows| {
940 rows.into_iter()
941 .map(|row| AccountId::from(row.get::<&str, _>(0)))
942 .collect()
943 })
944 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
945 for id in account_ids {
946 let account = Self::load_account(pool, &id).await.unwrap();
947 if let Some(account) = account {
948 accounts.push(account);
949 }
950 }
951 Ok(accounts)
952 }
953
954 pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
960 sqlx::query(r#"
961 INSERT INTO "trade" (
962 instrument_id, price, quantity, aggressor_side, venue_trade_id,
963 ts_event, ts_init, created_at, updated_at
964 ) VALUES (
965 $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
966 )
967 ON CONFLICT (id)
968 DO UPDATE
969 SET
970 instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
971 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
972 "#)
973 .bind(trade.instrument_id.to_string())
974 .bind(trade.price.to_string())
975 .bind(trade.size.to_string())
976 .bind(AggressorSideModel(trade.aggressor_side))
977 .bind(trade.trade_id.to_string())
978 .bind(trade.ts_event.to_string())
979 .bind(trade.ts_init.to_string())
980 .execute(pool)
981 .await
982 .map(|_| ())
983 .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
984 }
985
986 pub async fn load_trades(
992 pool: &PgPool,
993 instrument_id: &InstrumentId,
994 ) -> anyhow::Result<Vec<TradeTick>> {
995 sqlx::query_as::<_, TradeTickModel>(
996 r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
997 )
998 .bind(instrument_id.to_string())
999 .fetch_all(pool)
1000 .await
1001 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1002 .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1003 }
1004
1005 pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1011 sqlx::query(r#"
1012 INSERT INTO "quote" (
1013 instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1014 ) VALUES (
1015 $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1016 )
1017 ON CONFLICT (id)
1018 DO UPDATE
1019 SET
1020 instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1021 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1022 "#)
1023 .bind(quote.instrument_id.to_string())
1024 .bind(quote.bid_price.to_string())
1025 .bind(quote.ask_price.to_string())
1026 .bind(quote.bid_size.to_string())
1027 .bind(quote.ask_size.to_string())
1028 .bind(quote.ts_event.to_string())
1029 .bind(quote.ts_init.to_string())
1030 .execute(pool)
1031 .await
1032 .map(|_| ())
1033 .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1034 }
1035
1036 pub async fn load_quotes(
1042 pool: &PgPool,
1043 instrument_id: &InstrumentId,
1044 ) -> anyhow::Result<Vec<QuoteTick>> {
1045 sqlx::query_as::<_, QuoteTickModel>(
1046 r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1047 )
1048 .bind(instrument_id.to_string())
1049 .fetch_all(pool)
1050 .await
1051 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1052 .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1053 }
1054
1055 pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1061 println!("Adding bar: {bar:?}");
1062 sqlx::query(r#"
1063 INSERT INTO "bar" (
1064 instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1065 ) VALUES (
1066 $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1067 )
1068 ON CONFLICT (id)
1069 DO UPDATE
1070 SET
1071 instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1072 open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1073 "#)
1074 .bind(bar.bar_type.instrument_id().to_string())
1075 .bind(bar.bar_type.spec().step.get() as i32)
1076 .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1077 .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1078 .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1079 .bind(bar.open.to_string())
1080 .bind(bar.high.to_string())
1081 .bind(bar.low.to_string())
1082 .bind(bar.close.to_string())
1083 .bind(bar.volume.to_string())
1084 .bind(bar.ts_event.to_string())
1085 .bind(bar.ts_init.to_string())
1086 .execute(pool)
1087 .await
1088 .map(|_| ())
1089 .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1090 }
1091
1092 pub async fn load_bars(
1098 pool: &PgPool,
1099 instrument_id: &InstrumentId,
1100 ) -> anyhow::Result<Vec<Bar>> {
1101 sqlx::query_as::<_, BarModel>(
1102 r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1103 )
1104 .bind(instrument_id.to_string())
1105 .fetch_all(pool)
1106 .await
1107 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1108 .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1109 }
1110
1111 pub async fn load_distinct_order_event_client_ids(
1117 pool: &PgPool,
1118 ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1119 let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1120 let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1121 r#"
1122 SELECT DISTINCT
1123 client_order_id AS "client_order_id",
1124 client_id AS "client_id"
1125 FROM "order_event"
1126 "#,
1127 )
1128 .fetch_all(pool)
1129 .await
1130 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1131
1132 for id in result {
1133 map.insert(id.client_order_id, id.client_id);
1134 }
1135 Ok(map)
1136 }
1137
1138 pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1144 sqlx::query(
1145 r#"
1146 INSERT INTO "signal" (
1147 name, value, ts_event, ts_init, created_at, updated_at
1148 ) VALUES (
1149 $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1150 )
1151 ON CONFLICT (id)
1152 DO UPDATE
1153 SET
1154 name = $1, value = $2, ts_event = $3, ts_init = $4,
1155 updated_at = CURRENT_TIMESTAMP
1156 "#,
1157 )
1158 .bind(signal.name.to_string())
1159 .bind(signal.value.clone())
1160 .bind(signal.ts_event.to_string())
1161 .bind(signal.ts_init.to_string())
1162 .execute(pool)
1163 .await
1164 .map(|_| ())
1165 .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1166 }
1167
1168 pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1174 sqlx::query_as::<_, SignalModel>(
1175 r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1176 )
1177 .bind(name)
1178 .fetch_all(pool)
1179 .await
1180 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1181 .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1182 }
1183
1184 pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1192 let json_bytes = serde_json::to_vec(data)
1193 .map_err(|e| anyhow::anyhow!("CustomData must be valid JSON: {e}"))?;
1194 let value_json: serde_json::Value = serde_json::from_slice(&json_bytes)
1195 .map_err(|e| anyhow::anyhow!("CustomData value must be valid JSON: {e}"))?;
1196 let data_type_obj = value_json
1197 .get("data_type")
1198 .and_then(|v| v.as_object())
1199 .ok_or_else(|| anyhow::anyhow!("CustomData JSON missing data_type"))?;
1200 let data_type_name = data_type_obj
1201 .get("type_name")
1202 .and_then(|v| v.as_str())
1203 .unwrap_or("");
1204 let metadata_json = data_type_obj
1205 .get("metadata")
1206 .cloned()
1207 .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
1208 let identifier = data_type_obj
1209 .get("identifier")
1210 .and_then(|v| v.as_str())
1211 .unwrap_or("");
1212 sqlx::query(
1213 r#"
1214 INSERT INTO "custom" (data_type, metadata, identifier, value, ts_event, ts_init, created_at, updated_at)
1215 VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
1216 ON CONFLICT (id)
1217 DO UPDATE SET
1218 data_type = EXCLUDED.data_type,
1219 metadata = EXCLUDED.metadata,
1220 identifier = EXCLUDED.identifier,
1221 value = EXCLUDED.value,
1222 ts_event = EXCLUDED.ts_event,
1223 ts_init = EXCLUDED.ts_init,
1224 updated_at = CURRENT_TIMESTAMP
1225 "#,
1226 )
1227 .bind(data_type_name)
1228 .bind(&metadata_json)
1229 .bind(identifier)
1230 .bind(&value_json)
1231 .bind(
1232 value_json
1233 .get("ts_event")
1234 .and_then(|v| v.as_u64())
1235 .unwrap_or_else(|| data.ts_init().as_u64())
1236 .to_string(),
1237 )
1238 .bind(data.ts_init().to_string())
1239 .execute(pool)
1240 .await
1241 .map(|_| ())
1242 .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1243 }
1244
1245 pub async fn load_custom_data(
1253 pool: &PgPool,
1254 data_type: &DataType,
1255 ) -> anyhow::Result<Vec<CustomData>> {
1256 let metadata_json = data_type.metadata().as_ref().map_or(
1257 Ok(serde_json::Value::Object(serde_json::Map::new())),
1258 serde_json::to_value,
1259 )?;
1260
1261 let type_name = data_type.type_name();
1262 let short_type = type_name.rsplit([':', '.']).next().unwrap_or(type_name);
1263
1264 let rows = match data_type.identifier() {
1265 Some(identifier) => {
1266 sqlx::query(
1267 r#"SELECT value, ts_event, ts_init FROM "custom"
1268 WHERE (data_type = $1 OR data_type = $2)
1269 AND metadata = $3
1270 AND identifier = $4
1271 ORDER BY ts_init ASC"#,
1272 )
1273 .bind(type_name)
1274 .bind(short_type)
1275 .bind(&metadata_json)
1276 .bind(identifier)
1277 .fetch_all(pool)
1278 .await
1279 }
1280 None => {
1281 sqlx::query(
1282 r#"SELECT value, ts_event, ts_init FROM "custom"
1283 WHERE (data_type = $1 OR data_type = $2)
1284 AND metadata = $3
1285 AND identifier = ''
1286 ORDER BY ts_init ASC"#,
1287 )
1288 .bind(type_name)
1289 .bind(short_type)
1290 .bind(&metadata_json)
1291 .fetch_all(pool)
1292 .await
1293 }
1294 }
1295 .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))?;
1296
1297 let mut results = Vec::with_capacity(rows.len());
1298 for row in rows {
1299 let value_json: serde_json::Value = row.try_get("value")?;
1300 let json_bytes = serde_json::to_vec(&value_json)
1301 .map_err(|e| anyhow::anyhow!("Failed to serialize JSON: {e}"))?;
1302 let custom =
1303 CustomData::from_json_bytes(&json_bytes).map_err(|e| anyhow::anyhow!("{e}"))?;
1304 results.push(custom);
1305 }
1306 Ok(results)
1307 }
1308}