Skip to main content

nautilus_live/
emitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution event emitter for async event dispatch.
17//!
18//! This module provides [`ExecutionEventEmitter`], which combines event generation (via
19//! [`OrderEventFactory`]) with async dispatch. Adapters use the `emit_*` convenience
20//! methods to generate and send events in a single call.
21//!
22//! # Architecture
23//!
24//! ```text
25//! Adapter
26//! ├── core: ExecutionClientCore    (identity + connection state)
27//! └── emitter: ExecutionEventEmitter   (event generation + async dispatch)
28//!     ├── factory: OrderEventFactory
29//!     └── sender: Option<Sender>   (set in start())
30//! ```
31
32use nautilus_common::{
33    factories::OrderEventFactory,
34    messages::{ExecutionEvent, ExecutionReport},
35};
36use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
37use nautilus_model::{
38    enums::{AccountType, LiquiditySide},
39    events::{
40        AccountState, OrderAcceptedBatch, OrderCancelRejected, OrderCanceledBatch, OrderEventAny,
41        OrderModifyRejected, OrderRejected, OrderSubmittedBatch,
42    },
43    identifiers::{
44        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
45        VenueOrderId,
46    },
47    orders::OrderAny,
48    reports::{FillReport, OrderStatusReport, PositionStatusReport},
49    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
50};
51
52/// Event emitter for live trading - combines event generation with async dispatch.
53///
54/// This struct wraps an [`OrderEventFactory`] for event construction and an unbounded
55/// channel sender for async dispatch. It provides `emit_*` convenience methods that
56/// generate and send events in a single call.
57///
58/// The sender is set during the adapter's `start()` phase via [`set_sender`](Self::set_sender).
59#[derive(Debug, Clone)]
60pub struct ExecutionEventEmitter {
61    clock: &'static AtomicTime,
62    factory: OrderEventFactory,
63    sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
64}
65
66impl ExecutionEventEmitter {
67    /// Creates a new [`ExecutionEventEmitter`] with no sender.
68    ///
69    /// Call [`set_sender`](Self::set_sender) in the adapter's `start()` method.
70    #[must_use]
71    pub fn new(
72        clock: &'static AtomicTime,
73        trader_id: TraderId,
74        account_id: AccountId,
75        account_type: AccountType,
76        base_currency: Option<Currency>,
77    ) -> Self {
78        Self {
79            clock,
80            factory: OrderEventFactory::new(trader_id, account_id, account_type, base_currency),
81            sender: None,
82        }
83    }
84
85    fn ts_init(&self) -> UnixNanos {
86        self.clock.get_time_ns()
87    }
88
89    /// Sets the sender. Call in adapter's `start()`.
90    pub fn set_sender(&mut self, sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
91        self.sender = Some(sender);
92    }
93
94    /// Returns true if the sender is initialized.
95    #[must_use]
96    pub fn is_initialized(&self) -> bool {
97        self.sender.is_some()
98    }
99
100    /// Returns the trader ID.
101    #[must_use]
102    pub fn trader_id(&self) -> TraderId {
103        self.factory.trader_id()
104    }
105
106    /// Returns the account ID.
107    #[must_use]
108    pub fn account_id(&self) -> AccountId {
109        self.factory.account_id()
110    }
111
112    /// Generates and emits an account state event.
113    pub fn emit_account_state(
114        &self,
115        balances: Vec<AccountBalance>,
116        margins: Vec<MarginBalance>,
117        reported: bool,
118        ts_event: UnixNanos,
119    ) {
120        let state = self.factory.generate_account_state(
121            balances,
122            margins,
123            reported,
124            ts_event,
125            self.ts_init(),
126        );
127        self.send_account_state(state);
128    }
129
130    /// Generates and emits an order denied event.
131    pub fn emit_order_denied(&self, order: &OrderAny, reason: &str) {
132        let event = self
133            .factory
134            .generate_order_denied(order, reason, self.ts_init());
135        self.send_order_event(event);
136    }
137
138    /// Generates and emits an order submitted event.
139    pub fn emit_order_submitted(&self, order: &OrderAny) {
140        let event = self.factory.generate_order_submitted(order, self.ts_init());
141        self.send_order_event(event);
142    }
143
144    /// Generates and emits an order rejected event.
145    pub fn emit_order_rejected(
146        &self,
147        order: &OrderAny,
148        reason: &str,
149        ts_event: UnixNanos,
150        due_post_only: bool,
151    ) {
152        let event = self.factory.generate_order_rejected(
153            order,
154            reason,
155            ts_event,
156            self.ts_init(),
157            due_post_only,
158        );
159        self.send_order_event(event);
160    }
161
162    /// Generates and emits an order accepted event.
163    pub fn emit_order_accepted(
164        &self,
165        order: &OrderAny,
166        venue_order_id: VenueOrderId,
167        ts_event: UnixNanos,
168    ) {
169        let event =
170            self.factory
171                .generate_order_accepted(order, venue_order_id, ts_event, self.ts_init());
172        self.send_order_event(event);
173    }
174
175    /// Generates and emits an order modify rejected event.
176    pub fn emit_order_modify_rejected(
177        &self,
178        order: &OrderAny,
179        venue_order_id: Option<VenueOrderId>,
180        reason: &str,
181        ts_event: UnixNanos,
182    ) {
183        let event = self.factory.generate_order_modify_rejected(
184            order,
185            venue_order_id,
186            reason,
187            ts_event,
188            self.ts_init(),
189        );
190        self.send_order_event(event);
191    }
192
193    /// Generates and emits an order cancel rejected event.
194    pub fn emit_order_cancel_rejected(
195        &self,
196        order: &OrderAny,
197        venue_order_id: Option<VenueOrderId>,
198        reason: &str,
199        ts_event: UnixNanos,
200    ) {
201        let event = self.factory.generate_order_cancel_rejected(
202            order,
203            venue_order_id,
204            reason,
205            ts_event,
206            self.ts_init(),
207        );
208        self.send_order_event(event);
209    }
210
211    /// Generates and emits an order updated event.
212    #[expect(clippy::too_many_arguments)]
213    pub fn emit_order_updated(
214        &self,
215        order: &OrderAny,
216        venue_order_id: VenueOrderId,
217        quantity: Quantity,
218        price: Option<Price>,
219        trigger_price: Option<Price>,
220        protection_price: Option<Price>,
221        ts_event: UnixNanos,
222    ) {
223        let event = self.factory.generate_order_updated(
224            order,
225            venue_order_id,
226            quantity,
227            price,
228            trigger_price,
229            protection_price,
230            ts_event,
231            self.ts_init(),
232        );
233        self.send_order_event(event);
234    }
235
236    /// Generates and emits an order canceled event.
237    pub fn emit_order_canceled(
238        &self,
239        order: &OrderAny,
240        venue_order_id: Option<VenueOrderId>,
241        ts_event: UnixNanos,
242    ) {
243        let event =
244            self.factory
245                .generate_order_canceled(order, venue_order_id, ts_event, self.ts_init());
246        self.send_order_event(event);
247    }
248
249    /// Generates and emits an order triggered event.
250    pub fn emit_order_triggered(
251        &self,
252        order: &OrderAny,
253        venue_order_id: Option<VenueOrderId>,
254        ts_event: UnixNanos,
255    ) {
256        let event =
257            self.factory
258                .generate_order_triggered(order, venue_order_id, ts_event, self.ts_init());
259        self.send_order_event(event);
260    }
261
262    /// Generates and emits an order expired event.
263    pub fn emit_order_expired(
264        &self,
265        order: &OrderAny,
266        venue_order_id: Option<VenueOrderId>,
267        ts_event: UnixNanos,
268    ) {
269        let event =
270            self.factory
271                .generate_order_expired(order, venue_order_id, ts_event, self.ts_init());
272        self.send_order_event(event);
273    }
274
275    /// Generates and emits an order filled event.
276    #[expect(clippy::too_many_arguments)]
277    pub fn emit_order_filled(
278        &self,
279        order: &OrderAny,
280        venue_order_id: VenueOrderId,
281        venue_position_id: Option<PositionId>,
282        trade_id: TradeId,
283        last_qty: Quantity,
284        last_px: Price,
285        quote_currency: Currency,
286        commission: Option<Money>,
287        liquidity_side: LiquiditySide,
288        ts_event: UnixNanos,
289    ) {
290        let event = self.factory.generate_order_filled(
291            order,
292            venue_order_id,
293            venue_position_id,
294            trade_id,
295            last_qty,
296            last_px,
297            quote_currency,
298            commission,
299            liquidity_side,
300            ts_event,
301            self.ts_init(),
302        );
303        self.send_order_event(event);
304    }
305
306    /// Constructs and emits an order rejected event from raw fields.
307    pub fn emit_order_rejected_event(
308        &self,
309        strategy_id: StrategyId,
310        instrument_id: InstrumentId,
311        client_order_id: ClientOrderId,
312        reason: &str,
313        ts_event: UnixNanos,
314        due_post_only: bool,
315    ) {
316        let event = OrderRejected::new(
317            self.factory.trader_id(),
318            strategy_id,
319            instrument_id,
320            client_order_id,
321            self.factory.account_id(),
322            reason.into(),
323            UUID4::new(),
324            ts_event,
325            self.ts_init(),
326            false,
327            due_post_only,
328        );
329        self.send_order_event(OrderEventAny::Rejected(event));
330    }
331
332    /// Constructs and emits an order modify rejected event from raw fields.
333    pub fn emit_order_modify_rejected_event(
334        &self,
335        strategy_id: StrategyId,
336        instrument_id: InstrumentId,
337        client_order_id: ClientOrderId,
338        venue_order_id: Option<VenueOrderId>,
339        reason: &str,
340        ts_event: UnixNanos,
341    ) {
342        let event = OrderModifyRejected::new(
343            self.factory.trader_id(),
344            strategy_id,
345            instrument_id,
346            client_order_id,
347            reason.into(),
348            UUID4::new(),
349            ts_event,
350            self.ts_init(),
351            false,
352            venue_order_id,
353            Some(self.factory.account_id()),
354        );
355        self.send_order_event(OrderEventAny::ModifyRejected(event));
356    }
357
358    /// Constructs and emits an order cancel rejected event from raw fields.
359    pub fn emit_order_cancel_rejected_event(
360        &self,
361        strategy_id: StrategyId,
362        instrument_id: InstrumentId,
363        client_order_id: ClientOrderId,
364        venue_order_id: Option<VenueOrderId>,
365        reason: &str,
366        ts_event: UnixNanos,
367    ) {
368        let event = OrderCancelRejected::new(
369            self.factory.trader_id(),
370            strategy_id,
371            instrument_id,
372            client_order_id,
373            reason.into(),
374            UUID4::new(),
375            ts_event,
376            self.ts_init(),
377            false,
378            venue_order_id,
379            Some(self.factory.account_id()),
380        );
381        self.send_order_event(OrderEventAny::CancelRejected(event));
382    }
383
384    /// Emits an order event.
385    pub fn send_order_event(&self, event: OrderEventAny) {
386        if let Some(sender) = &self.sender {
387            if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
388                log::warn!("Failed to send order event: {e}");
389            }
390        } else {
391            log::warn!("Cannot send order event: sender not initialized");
392        }
393    }
394
395    /// Emits a batch of order submitted events as a single channel message.
396    pub fn send_order_submitted_batch(&self, batch: OrderSubmittedBatch) {
397        if let Some(sender) = &self.sender {
398            if let Err(e) = sender.send(ExecutionEvent::OrderSubmittedBatch(batch)) {
399                log::warn!("Failed to send order submitted batch: {e}");
400            }
401        } else {
402            log::warn!("Cannot send order submitted batch: sender not initialized");
403        }
404    }
405
406    /// Emits a batch of order accepted events as a single channel message.
407    pub fn send_order_accepted_batch(&self, batch: OrderAcceptedBatch) {
408        if let Some(sender) = &self.sender {
409            if let Err(e) = sender.send(ExecutionEvent::OrderAcceptedBatch(batch)) {
410                log::warn!("Failed to send order accepted batch: {e}");
411            }
412        } else {
413            log::warn!("Cannot send order accepted batch: sender not initialized");
414        }
415    }
416
417    /// Emits a batch of order canceled events as a single channel message.
418    pub fn send_order_canceled_batch(&self, batch: OrderCanceledBatch) {
419        if let Some(sender) = &self.sender {
420            if let Err(e) = sender.send(ExecutionEvent::OrderCanceledBatch(batch)) {
421                log::warn!("Failed to send order canceled batch: {e}");
422            }
423        } else {
424            log::warn!("Cannot send order canceled batch: sender not initialized");
425        }
426    }
427
428    /// Emits an account state event.
429    pub fn send_account_state(&self, state: AccountState) {
430        if let Some(sender) = &self.sender {
431            if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
432                log::warn!("Failed to send account state: {e}");
433            }
434        } else {
435            log::warn!("Cannot send account state: sender not initialized");
436        }
437    }
438
439    /// Emits an execution report.
440    pub fn send_execution_report(&self, report: ExecutionReport) {
441        if let Some(sender) = &self.sender {
442            if let Err(e) = sender.send(ExecutionEvent::Report(report)) {
443                log::warn!("Failed to send execution report: {e}");
444            }
445        } else {
446            log::warn!("Cannot send execution report: sender not initialized");
447        }
448    }
449
450    /// Emits an order status report.
451    pub fn send_order_status_report(&self, report: OrderStatusReport) {
452        self.send_execution_report(ExecutionReport::Order(Box::new(report)));
453    }
454
455    /// Emits a fill report.
456    pub fn send_fill_report(&self, report: FillReport) {
457        self.send_execution_report(ExecutionReport::Fill(Box::new(report)));
458    }
459
460    /// Emits an order status report bundled with the fills that produced it.
461    pub fn send_order_with_fills(&self, report: OrderStatusReport, fills: Vec<FillReport>) {
462        self.send_execution_report(ExecutionReport::OrderWithFills(Box::new(report), fills));
463    }
464
465    /// Emits a position status report.
466    pub fn send_position_report(&self, report: PositionStatusReport) {
467        self.send_execution_report(ExecutionReport::Position(Box::new(report)));
468    }
469}