1use nautilus_common::{
33 factories::OrderEventFactory,
34 messages::{ExecutionEvent, ExecutionReport},
35};
36use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
37use nautilus_model::{
38 enums::{AccountType, LiquiditySide},
39 events::{
40 AccountState, OrderAcceptedBatch, OrderCancelRejected, OrderCanceledBatch, OrderEventAny,
41 OrderModifyRejected, OrderRejected, OrderSubmittedBatch,
42 },
43 identifiers::{
44 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
45 VenueOrderId,
46 },
47 orders::OrderAny,
48 reports::{FillReport, OrderStatusReport, PositionStatusReport},
49 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
50};
51
52#[derive(Debug, Clone)]
60pub struct ExecutionEventEmitter {
61 clock: &'static AtomicTime,
62 factory: OrderEventFactory,
63 sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
64}
65
66impl ExecutionEventEmitter {
67 #[must_use]
71 pub fn new(
72 clock: &'static AtomicTime,
73 trader_id: TraderId,
74 account_id: AccountId,
75 account_type: AccountType,
76 base_currency: Option<Currency>,
77 ) -> Self {
78 Self {
79 clock,
80 factory: OrderEventFactory::new(trader_id, account_id, account_type, base_currency),
81 sender: None,
82 }
83 }
84
85 fn ts_init(&self) -> UnixNanos {
86 self.clock.get_time_ns()
87 }
88
89 pub fn set_sender(&mut self, sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
91 self.sender = Some(sender);
92 }
93
94 #[must_use]
96 pub fn is_initialized(&self) -> bool {
97 self.sender.is_some()
98 }
99
100 #[must_use]
102 pub fn trader_id(&self) -> TraderId {
103 self.factory.trader_id()
104 }
105
106 #[must_use]
108 pub fn account_id(&self) -> AccountId {
109 self.factory.account_id()
110 }
111
112 pub fn emit_account_state(
114 &self,
115 balances: Vec<AccountBalance>,
116 margins: Vec<MarginBalance>,
117 reported: bool,
118 ts_event: UnixNanos,
119 ) {
120 let state = self.factory.generate_account_state(
121 balances,
122 margins,
123 reported,
124 ts_event,
125 self.ts_init(),
126 );
127 self.send_account_state(state);
128 }
129
130 pub fn emit_order_denied(&self, order: &OrderAny, reason: &str) {
132 let event = self
133 .factory
134 .generate_order_denied(order, reason, self.ts_init());
135 self.send_order_event(event);
136 }
137
138 pub fn emit_order_submitted(&self, order: &OrderAny) {
140 let event = self.factory.generate_order_submitted(order, self.ts_init());
141 self.send_order_event(event);
142 }
143
144 pub fn emit_order_rejected(
146 &self,
147 order: &OrderAny,
148 reason: &str,
149 ts_event: UnixNanos,
150 due_post_only: bool,
151 ) {
152 let event = self.factory.generate_order_rejected(
153 order,
154 reason,
155 ts_event,
156 self.ts_init(),
157 due_post_only,
158 );
159 self.send_order_event(event);
160 }
161
162 pub fn emit_order_accepted(
164 &self,
165 order: &OrderAny,
166 venue_order_id: VenueOrderId,
167 ts_event: UnixNanos,
168 ) {
169 let event =
170 self.factory
171 .generate_order_accepted(order, venue_order_id, ts_event, self.ts_init());
172 self.send_order_event(event);
173 }
174
175 pub fn emit_order_modify_rejected(
177 &self,
178 order: &OrderAny,
179 venue_order_id: Option<VenueOrderId>,
180 reason: &str,
181 ts_event: UnixNanos,
182 ) {
183 let event = self.factory.generate_order_modify_rejected(
184 order,
185 venue_order_id,
186 reason,
187 ts_event,
188 self.ts_init(),
189 );
190 self.send_order_event(event);
191 }
192
193 pub fn emit_order_cancel_rejected(
195 &self,
196 order: &OrderAny,
197 venue_order_id: Option<VenueOrderId>,
198 reason: &str,
199 ts_event: UnixNanos,
200 ) {
201 let event = self.factory.generate_order_cancel_rejected(
202 order,
203 venue_order_id,
204 reason,
205 ts_event,
206 self.ts_init(),
207 );
208 self.send_order_event(event);
209 }
210
211 #[expect(clippy::too_many_arguments)]
213 pub fn emit_order_updated(
214 &self,
215 order: &OrderAny,
216 venue_order_id: VenueOrderId,
217 quantity: Quantity,
218 price: Option<Price>,
219 trigger_price: Option<Price>,
220 protection_price: Option<Price>,
221 ts_event: UnixNanos,
222 ) {
223 let event = self.factory.generate_order_updated(
224 order,
225 venue_order_id,
226 quantity,
227 price,
228 trigger_price,
229 protection_price,
230 ts_event,
231 self.ts_init(),
232 );
233 self.send_order_event(event);
234 }
235
236 pub fn emit_order_canceled(
238 &self,
239 order: &OrderAny,
240 venue_order_id: Option<VenueOrderId>,
241 ts_event: UnixNanos,
242 ) {
243 let event =
244 self.factory
245 .generate_order_canceled(order, venue_order_id, ts_event, self.ts_init());
246 self.send_order_event(event);
247 }
248
249 pub fn emit_order_triggered(
251 &self,
252 order: &OrderAny,
253 venue_order_id: Option<VenueOrderId>,
254 ts_event: UnixNanos,
255 ) {
256 let event =
257 self.factory
258 .generate_order_triggered(order, venue_order_id, ts_event, self.ts_init());
259 self.send_order_event(event);
260 }
261
262 pub fn emit_order_expired(
264 &self,
265 order: &OrderAny,
266 venue_order_id: Option<VenueOrderId>,
267 ts_event: UnixNanos,
268 ) {
269 let event =
270 self.factory
271 .generate_order_expired(order, venue_order_id, ts_event, self.ts_init());
272 self.send_order_event(event);
273 }
274
275 #[expect(clippy::too_many_arguments)]
277 pub fn emit_order_filled(
278 &self,
279 order: &OrderAny,
280 venue_order_id: VenueOrderId,
281 venue_position_id: Option<PositionId>,
282 trade_id: TradeId,
283 last_qty: Quantity,
284 last_px: Price,
285 quote_currency: Currency,
286 commission: Option<Money>,
287 liquidity_side: LiquiditySide,
288 ts_event: UnixNanos,
289 ) {
290 let event = self.factory.generate_order_filled(
291 order,
292 venue_order_id,
293 venue_position_id,
294 trade_id,
295 last_qty,
296 last_px,
297 quote_currency,
298 commission,
299 liquidity_side,
300 ts_event,
301 self.ts_init(),
302 );
303 self.send_order_event(event);
304 }
305
306 pub fn emit_order_rejected_event(
308 &self,
309 strategy_id: StrategyId,
310 instrument_id: InstrumentId,
311 client_order_id: ClientOrderId,
312 reason: &str,
313 ts_event: UnixNanos,
314 due_post_only: bool,
315 ) {
316 let event = OrderRejected::new(
317 self.factory.trader_id(),
318 strategy_id,
319 instrument_id,
320 client_order_id,
321 self.factory.account_id(),
322 reason.into(),
323 UUID4::new(),
324 ts_event,
325 self.ts_init(),
326 false,
327 due_post_only,
328 );
329 self.send_order_event(OrderEventAny::Rejected(event));
330 }
331
332 pub fn emit_order_modify_rejected_event(
334 &self,
335 strategy_id: StrategyId,
336 instrument_id: InstrumentId,
337 client_order_id: ClientOrderId,
338 venue_order_id: Option<VenueOrderId>,
339 reason: &str,
340 ts_event: UnixNanos,
341 ) {
342 let event = OrderModifyRejected::new(
343 self.factory.trader_id(),
344 strategy_id,
345 instrument_id,
346 client_order_id,
347 reason.into(),
348 UUID4::new(),
349 ts_event,
350 self.ts_init(),
351 false,
352 venue_order_id,
353 Some(self.factory.account_id()),
354 );
355 self.send_order_event(OrderEventAny::ModifyRejected(event));
356 }
357
358 pub fn emit_order_cancel_rejected_event(
360 &self,
361 strategy_id: StrategyId,
362 instrument_id: InstrumentId,
363 client_order_id: ClientOrderId,
364 venue_order_id: Option<VenueOrderId>,
365 reason: &str,
366 ts_event: UnixNanos,
367 ) {
368 let event = OrderCancelRejected::new(
369 self.factory.trader_id(),
370 strategy_id,
371 instrument_id,
372 client_order_id,
373 reason.into(),
374 UUID4::new(),
375 ts_event,
376 self.ts_init(),
377 false,
378 venue_order_id,
379 Some(self.factory.account_id()),
380 );
381 self.send_order_event(OrderEventAny::CancelRejected(event));
382 }
383
384 pub fn send_order_event(&self, event: OrderEventAny) {
386 if let Some(sender) = &self.sender {
387 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
388 log::warn!("Failed to send order event: {e}");
389 }
390 } else {
391 log::warn!("Cannot send order event: sender not initialized");
392 }
393 }
394
395 pub fn send_order_submitted_batch(&self, batch: OrderSubmittedBatch) {
397 if let Some(sender) = &self.sender {
398 if let Err(e) = sender.send(ExecutionEvent::OrderSubmittedBatch(batch)) {
399 log::warn!("Failed to send order submitted batch: {e}");
400 }
401 } else {
402 log::warn!("Cannot send order submitted batch: sender not initialized");
403 }
404 }
405
406 pub fn send_order_accepted_batch(&self, batch: OrderAcceptedBatch) {
408 if let Some(sender) = &self.sender {
409 if let Err(e) = sender.send(ExecutionEvent::OrderAcceptedBatch(batch)) {
410 log::warn!("Failed to send order accepted batch: {e}");
411 }
412 } else {
413 log::warn!("Cannot send order accepted batch: sender not initialized");
414 }
415 }
416
417 pub fn send_order_canceled_batch(&self, batch: OrderCanceledBatch) {
419 if let Some(sender) = &self.sender {
420 if let Err(e) = sender.send(ExecutionEvent::OrderCanceledBatch(batch)) {
421 log::warn!("Failed to send order canceled batch: {e}");
422 }
423 } else {
424 log::warn!("Cannot send order canceled batch: sender not initialized");
425 }
426 }
427
428 pub fn send_account_state(&self, state: AccountState) {
430 if let Some(sender) = &self.sender {
431 if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
432 log::warn!("Failed to send account state: {e}");
433 }
434 } else {
435 log::warn!("Cannot send account state: sender not initialized");
436 }
437 }
438
439 pub fn send_execution_report(&self, report: ExecutionReport) {
441 if let Some(sender) = &self.sender {
442 if let Err(e) = sender.send(ExecutionEvent::Report(report)) {
443 log::warn!("Failed to send execution report: {e}");
444 }
445 } else {
446 log::warn!("Cannot send execution report: sender not initialized");
447 }
448 }
449
450 pub fn send_order_status_report(&self, report: OrderStatusReport) {
452 self.send_execution_report(ExecutionReport::Order(Box::new(report)));
453 }
454
455 pub fn send_fill_report(&self, report: FillReport) {
457 self.send_execution_report(ExecutionReport::Fill(Box::new(report)));
458 }
459
460 pub fn send_order_with_fills(&self, report: OrderStatusReport, fills: Vec<FillReport>) {
462 self.send_execution_report(ExecutionReport::OrderWithFills(Box::new(report), fills));
463 }
464
465 pub fn send_position_report(&self, report: PositionStatusReport) {
467 self.send_execution_report(ExecutionReport::Position(Box::new(report)));
468 }
469}