rusteron_media_driver/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8//! # Features
9//!
10//! - **`static`**: When enabled, this feature statically links the Aeron C code.
11//!   By default, the library uses dynamic linking to the Aeron C libraries.
12//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
13//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
14//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
15
16pub mod bindings {
17    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21use log::info;
22use std::path::Path;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::thread::{sleep, JoinHandle};
26use std::time::Duration;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31unsafe impl Sync for AeronDriverContext {}
32unsafe impl Send for AeronDriverContext {}
33unsafe impl Sync for AeronDriver {}
34unsafe impl Send for AeronDriver {}
35
36impl AeronDriver {
37    pub fn launch_embedded(
38        aeron_context: AeronDriverContext,
39        register_sigint: bool,
40    ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
41        AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
42
43        let stop = Arc::new(AtomicBool::new(false));
44        let stop_copy = stop.clone();
45        // Register signal handler for SIGINT (Ctrl+C)
46        if register_sigint {
47            let stop_copy2 = stop.clone();
48            ctrlc::set_handler(move || {
49                stop_copy2.store(true, Ordering::SeqCst);
50            })
51            .expect("Error setting Ctrl-C handler");
52        }
53
54        let started = Arc::new(AtomicBool::new(false));
55        let started2 = started.clone();
56
57        let dir = aeron_context.get_dir().to_string();
58        info!("Starting media driver [dir={}]", dir);
59        let handle = std::thread::spawn(move || {
60            let aeron_context = aeron_context.clone();
61            let aeron_driver = AeronDriver::new(&aeron_context)?;
62            aeron_driver.start(true)?;
63
64            info!(
65                "Aeron driver started [dir={}]",
66                aeron_driver.context().get_dir()
67            );
68
69            started2.store(true, Ordering::SeqCst);
70
71            // Poll for work until Ctrl+C is pressed
72            while !stop.load(Ordering::Acquire) {
73                while aeron_driver.main_do_work()? > 0 {
74                    // busy spin
75                }
76            }
77
78            info!("stopping media driver");
79
80            Ok::<_, AeronCError>(())
81        });
82
83        while !started.load(Ordering::SeqCst) && !handle.is_finished() {
84            sleep(Duration::from_millis(100));
85        }
86
87        if handle.is_finished() {
88            panic!("failed to start media driver {:?}", handle.join())
89        }
90        info!("started media driver [dir={}]", dir);
91
92        (stop_copy, handle)
93    }
94
95    /// if you have existing shm files and its before the driver timeout it will try to reuse it and fail
96    /// this makes sure that if that is the case it will wait else it proceeds
97    pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
98        if !aeron_context.get_dir_delete_on_start() {
99            let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
100
101            if cnc_file.exists() {
102                let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
103                    .as_nanos() as i64;
104
105                let mut duration = timeout;
106
107                if let Ok(md) = cnc_file.metadata() {
108                    if let Ok(modified_time) = md.modified() {
109                        if let Ok(took) = modified_time.elapsed() {
110                            duration = took.as_nanos() as i64;
111                        }
112                    }
113                }
114
115                let delay = timeout - duration;
116
117                if delay > 0 {
118                    let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
119                    info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
120                    sleep(sleep_duration);
121                }
122            }
123        }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use log::error;
131    use std::os::raw::c_int;
132    use std::sync::atomic::Ordering;
133    use std::time::Duration;
134
135    #[test]
136    fn version_check() {
137        let major = unsafe { crate::aeron_version_major() };
138        let minor = unsafe { crate::aeron_version_minor() };
139        let patch = unsafe { crate::aeron_version_patch() };
140
141        let aeron_version = format!("{}.{}.{}", major, minor, patch);
142        let cargo_version = "1.47.4";
143        assert_eq!(aeron_version, cargo_version);
144    }
145
146    #[test]
147    fn send_message() -> Result<(), AeronCError> {
148        let _ = env_logger::Builder::new()
149            .is_test(true)
150            .filter_level(log::LevelFilter::Info)
151            .try_init();
152        let topic = AERON_IPC_STREAM;
153        let stream_id = 32;
154
155        let aeron_context = AeronDriverContext::new()?;
156        aeron_context.set_dir_delete_on_shutdown(true)?;
157        aeron_context.set_dir_delete_on_start(true)?;
158
159        let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
160
161        // aeron_driver
162        //     .conductor()
163        //     .context()
164        //     .print_configuration();
165        // aeron_driver.main_do_work()?;
166        info!("aeron dir: {:?}", aeron_context.get_dir());
167
168        let dir = aeron_context.get_dir().to_string();
169        let ctx = AeronContext::new()?;
170        ctx.set_dir(&dir.into_c_string())?;
171
172        let client = Aeron::new(&ctx)?;
173
174        #[derive(Default, Debug)]
175        struct ErrorCount {
176            error_count: usize,
177        }
178
179        impl AeronErrorHandlerCallback for ErrorCount {
180            fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
181                error!("Aeron error {}: {}", error_code, msg);
182                self.error_count += 1;
183            }
184        }
185
186        let error_handler = Some(Handler::leak(ErrorCount::default()));
187        ctx.set_error_handler(error_handler.as_ref())?;
188
189        struct Test {}
190        impl AeronAvailableCounterCallback for Test {
191            fn handle_aeron_on_available_counter(
192                &mut self,
193                counters_reader: AeronCountersReader,
194                registration_id: i64,
195                counter_id: i32,
196            ) -> () {
197                info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
198            }
199        }
200
201        impl AeronNewPublicationCallback for Test {
202            fn handle_aeron_on_new_publication(
203                &mut self,
204                async_: AeronAsyncAddPublication,
205                channel: &str,
206                stream_id: i32,
207                session_id: i32,
208                correlation_id: i64,
209            ) -> () {
210                info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
211            }
212        }
213        let handler = Some(Handler::leak(Test {}));
214        ctx.set_on_available_counter(handler.as_ref())?;
215        ctx.set_on_new_publication(handler.as_ref())?;
216
217        client.start()?;
218        info!("aeron driver started");
219        assert!(Aeron::epoch_clock() > 0);
220        assert!(Aeron::nano_clock() > 0);
221
222        let counter_async =
223            AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
224
225        let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
226        unsafe {
227            *counter.addr() += 1;
228        }
229
230        let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
231
232        let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
233
234        info!("publication channel: {:?}", publication.channel());
235        info!("publication stream_id: {:?}", publication.stream_id());
236        info!("publication status: {:?}", publication.channel_status());
237
238        // client.main_do_work();
239        // let claim = AeronBufferClaim::default();
240        // assert!(publication.try_claim(100, &claim) > 0, "publication claim is empty");
241
242        stop.store(true, Ordering::SeqCst);
243
244        Ok(())
245    }
246
247    #[test]
248    pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
249        let ctx = AeronDriverContext::new()?;
250
251        println!("{:#?}", ctx);
252
253        struct AgentStartHandler {
254            ctx: AeronDriverContext,
255        }
256
257        impl AeronAgentStartFuncCallback for AgentStartHandler {
258            fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
259                unsafe {
260                    aeron_set_thread_affinity_on_start(
261                        self.ctx.get_inner() as *mut _,
262                        std::ffi::CString::new(role).unwrap().into_raw(),
263                    );
264                }
265            }
266        }
267
268        ctx.set_agent_on_start_function(Some(&Handler::leak(AgentStartHandler {
269            ctx: ctx.clone(),
270        })))?;
271
272        println!("{:#?}", ctx);
273
274        Ok(())
275    }
276}