nautilus_indicators/volatility/
kp.rs1use std::fmt::{Debug, Display};
17
18use nautilus_model::data::Bar;
19
20use super::kc::KeltnerChannel;
21use crate::{average::MovingAverageType, indicator::Indicator};
22
23#[repr(C)]
24#[derive(Debug)]
25#[cfg_attr(
26 feature = "python",
27 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.indicators", unsendable)
28)]
29#[cfg_attr(
30 feature = "python",
31 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.indicators")
32)]
33pub struct KeltnerPosition {
34 pub period: usize,
35 pub k_multiplier: f64,
36 pub ma_type: MovingAverageType,
37 pub ma_type_atr: MovingAverageType,
38 pub use_previous: bool,
39 pub atr_floor: f64,
40 pub value: f64,
41 pub initialized: bool,
42 has_inputs: bool,
43 kc: KeltnerChannel,
44}
45
46impl Display for KeltnerPosition {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(
49 f,
50 "{}({},{},{},{},{})",
51 self.name(),
52 self.period,
53 self.k_multiplier,
54 self.ma_type,
55 self.ma_type_atr,
56 self.use_previous
57 )
58 }
59}
60
61impl Indicator for KeltnerPosition {
62 fn name(&self) -> String {
63 stringify!(KeltnerPosition).to_string()
64 }
65
66 fn has_inputs(&self) -> bool {
67 self.has_inputs
68 }
69
70 fn initialized(&self) -> bool {
71 self.initialized
72 }
73
74 fn handle_bar(&mut self, bar: &Bar) {
75 self.update_raw((&bar.high).into(), (&bar.low).into(), (&bar.close).into());
76 }
77
78 fn reset(&mut self) {
79 self.kc.reset();
80 self.value = 0.0;
81 self.has_inputs = false;
82 self.initialized = false;
83 }
84}
85
86impl KeltnerPosition {
87 #[must_use]
89 pub fn new(
90 period: usize,
91 k_multiplier: f64,
92 ma_type: Option<MovingAverageType>,
93 ma_type_atr: Option<MovingAverageType>,
94 use_previous: Option<bool>,
95 atr_floor: Option<f64>,
96 ) -> Self {
97 Self {
98 period,
99 k_multiplier,
100 ma_type: ma_type.unwrap_or(MovingAverageType::Simple),
101 ma_type_atr: ma_type_atr.unwrap_or(MovingAverageType::Simple),
102 use_previous: use_previous.unwrap_or(true),
103 atr_floor: atr_floor.unwrap_or(0.0),
104 value: 0.0,
105 has_inputs: false,
106 initialized: false,
107 kc: KeltnerChannel::new(
108 period,
109 k_multiplier,
110 ma_type,
111 ma_type_atr,
112 use_previous,
113 atr_floor,
114 ),
115 }
116 }
117
118 pub fn update_raw(&mut self, high: f64, low: f64, close: f64) {
119 self.kc.update_raw(high, low, close);
120
121 if !self.initialized {
123 self.has_inputs = true;
124
125 if self.kc.initialized() {
126 self.initialized = true;
127 }
128 }
129
130 let k_width = (self.kc.upper - self.kc.lower) / 2.0;
131
132 if k_width > 0.0 {
133 self.value = (close - self.kc.middle) / k_width;
134 } else {
135 self.value = 0.0;
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use rstest::rstest;
143
144 use super::*;
145 use crate::stubs::kp_10;
146
147 #[rstest]
148 fn test_name_returns_expected_string(kp_10: KeltnerPosition) {
149 assert_eq!(kp_10.name(), "KeltnerPosition");
150 }
151
152 #[rstest]
153 fn test_str_repr_returns_expected_string(kp_10: KeltnerPosition) {
154 assert_eq!(
155 format!("{kp_10}"),
156 "KeltnerPosition(10,2,SIMPLE,SIMPLE,true)"
157 );
158 }
159
160 #[rstest]
161 fn test_period_returns_expected_value(kp_10: KeltnerPosition) {
162 assert_eq!(kp_10.period, 10);
163 assert_eq!(kp_10.k_multiplier, 2.0);
164 }
165
166 #[rstest]
167 fn test_initialized_without_inputs_returns_false(kp_10: KeltnerPosition) {
168 assert!(!kp_10.initialized());
169 }
170
171 #[rstest]
172 fn test_value_with_all_higher_inputs_returns_expected_value(mut kp_10: KeltnerPosition) {
173 let high_values = [
174 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0,
175 ];
176 let low_values = [
177 0.9, 1.9, 2.9, 3.9, 4.9, 5.9, 6.9, 7.9, 8.9, 9.9, 10.1, 10.2, 10.3, 11.1, 11.4,
178 ];
179
180 let close_values = [
181 0.95, 1.95, 2.95, 3.95, 4.95, 5.95, 6.95, 7.95, 8.95, 9.95, 10.05, 10.15, 10.25, 11.05,
182 11.45,
183 ];
184
185 for i in 0..15 {
186 kp_10.update_raw(high_values[i], low_values[i], close_values[i]);
187 }
188
189 assert!(kp_10.initialized());
190 assert_eq!(kp_10.value, 0.471_631_205_673_758_94);
191 }
192
193 #[rstest]
194 fn test_reset_successfully_returns_indicator_to_fresh_state(mut kp_10: KeltnerPosition) {
195 kp_10.update_raw(1.00020, 1.00050, 1.00030);
196 kp_10.update_raw(1.00030, 1.00060, 1.00040);
197 kp_10.update_raw(1.00070, 1.00080, 1.00075);
198
199 kp_10.reset();
200
201 assert!(!kp_10.initialized());
202 assert!(!kp_10.has_inputs);
203 assert_eq!(kp_10.value, 0.0);
204 assert_eq!(kp_10.kc.upper, 0.0);
205 assert_eq!(kp_10.kc.middle, 0.0);
206 assert_eq!(kp_10.kc.lower, 0.0);
207 }
208}