rusteron_media_driver/
lib.rs
1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8pub mod bindings {
17 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21use log::info;
22use std::path::Path;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::sync::Arc;
25use std::thread::{sleep, JoinHandle};
26use std::time::Duration;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31unsafe impl Sync for AeronDriverContext {}
32unsafe impl Send for AeronDriverContext {}
33unsafe impl Sync for AeronDriver {}
34unsafe impl Send for AeronDriver {}
35
36impl AeronDriver {
37 pub fn launch_embedded(
38 aeron_context: AeronDriverContext,
39 register_sigint: bool,
40 ) -> (Arc<AtomicBool>, JoinHandle<Result<(), AeronCError>>) {
41 AeronDriver::wait_for_previous_media_driver_to_timeout(&aeron_context);
42
43 let stop = Arc::new(AtomicBool::new(false));
44 let stop_copy = stop.clone();
45 if register_sigint {
47 let stop_copy2 = stop.clone();
48 ctrlc::set_handler(move || {
49 stop_copy2.store(true, Ordering::SeqCst);
50 })
51 .expect("Error setting Ctrl-C handler");
52 }
53
54 let started = Arc::new(AtomicBool::new(false));
55 let started2 = started.clone();
56
57 let dir = aeron_context.get_dir().to_string();
58 info!("Starting media driver [dir={}]", dir);
59 let handle = std::thread::spawn(move || {
60 let aeron_context = aeron_context.clone();
61 let aeron_driver = AeronDriver::new(&aeron_context)?;
62 aeron_driver.start(true)?;
63
64 info!(
65 "Aeron driver started [dir={}]",
66 aeron_driver.context().get_dir()
67 );
68
69 started2.store(true, Ordering::SeqCst);
70
71 while !stop.load(Ordering::Acquire) {
73 while aeron_driver.main_do_work()? > 0 {
74 }
76 }
77
78 info!("stopping media driver");
79
80 Ok::<_, AeronCError>(())
81 });
82
83 while !started.load(Ordering::SeqCst) && !handle.is_finished() {
84 sleep(Duration::from_millis(100));
85 }
86
87 if handle.is_finished() {
88 panic!("failed to start media driver {:?}", handle.join())
89 }
90 info!("started media driver [dir={}]", dir);
91
92 (stop_copy, handle)
93 }
94
95 pub fn wait_for_previous_media_driver_to_timeout(aeron_context: &AeronDriverContext) {
98 if !aeron_context.get_dir_delete_on_start() {
99 let cnc_file = Path::new(aeron_context.get_dir()).join("cnc.dat");
100
101 if cnc_file.exists() {
102 let timeout = Duration::from_millis(aeron_context.get_driver_timeout_ms() * 2)
103 .as_nanos() as i64;
104
105 let mut duration = timeout;
106
107 if let Ok(md) = cnc_file.metadata() {
108 if let Ok(modified_time) = md.modified() {
109 if let Ok(took) = modified_time.elapsed() {
110 duration = took.as_nanos() as i64;
111 }
112 }
113 }
114
115 let delay = timeout - duration;
116
117 if delay > 0 {
118 let sleep_duration = Duration::from_nanos((delay + 1_000_000) as u64);
119 info!("cnc file already exists, will need to wait {sleep_duration:?} for timeout [file={cnc_file:?}]");
120 sleep(sleep_duration);
121 }
122 }
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use log::error;
131 use std::os::raw::c_int;
132 use std::sync::atomic::Ordering;
133 use std::time::Duration;
134
135 #[test]
136 fn version_check() {
137 let major = unsafe { crate::aeron_version_major() };
138 let minor = unsafe { crate::aeron_version_minor() };
139 let patch = unsafe { crate::aeron_version_patch() };
140
141 let aeron_version = format!("{}.{}.{}", major, minor, patch);
142 let cargo_version = "1.47.4";
143 assert_eq!(aeron_version, cargo_version);
144 }
145
146 #[test]
147 fn send_message() -> Result<(), AeronCError> {
148 let _ = env_logger::Builder::new()
149 .is_test(true)
150 .filter_level(log::LevelFilter::Info)
151 .try_init();
152 let topic = AERON_IPC_STREAM;
153 let stream_id = 32;
154
155 let aeron_context = AeronDriverContext::new()?;
156 aeron_context.set_dir_delete_on_shutdown(true)?;
157 aeron_context.set_dir_delete_on_start(true)?;
158
159 let (stop, _driver_handle) = AeronDriver::launch_embedded(aeron_context.clone(), false);
160
161 info!("aeron dir: {:?}", aeron_context.get_dir());
167
168 let dir = aeron_context.get_dir().to_string();
169 let ctx = AeronContext::new()?;
170 ctx.set_dir(&dir.into_c_string())?;
171
172 let client = Aeron::new(&ctx)?;
173
174 #[derive(Default, Debug)]
175 struct ErrorCount {
176 error_count: usize,
177 }
178
179 impl AeronErrorHandlerCallback for ErrorCount {
180 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
181 error!("Aeron error {}: {}", error_code, msg);
182 self.error_count += 1;
183 }
184 }
185
186 let error_handler = Some(Handler::leak(ErrorCount::default()));
187 ctx.set_error_handler(error_handler.as_ref())?;
188
189 struct Test {}
190 impl AeronAvailableCounterCallback for Test {
191 fn handle_aeron_on_available_counter(
192 &mut self,
193 counters_reader: AeronCountersReader,
194 registration_id: i64,
195 counter_id: i32,
196 ) -> () {
197 info!("new counter counters_reader={counters_reader:?} registration_id={registration_id} counter_id={counter_id}");
198 }
199 }
200
201 impl AeronNewPublicationCallback for Test {
202 fn handle_aeron_on_new_publication(
203 &mut self,
204 async_: AeronAsyncAddPublication,
205 channel: &str,
206 stream_id: i32,
207 session_id: i32,
208 correlation_id: i64,
209 ) -> () {
210 info!("on new publication {async_:?} {channel} {stream_id} {session_id} {correlation_id}")
211 }
212 }
213 let handler = Some(Handler::leak(Test {}));
214 ctx.set_on_available_counter(handler.as_ref())?;
215 ctx.set_on_new_publication(handler.as_ref())?;
216
217 client.start()?;
218 info!("aeron driver started");
219 assert!(Aeron::epoch_clock() > 0);
220 assert!(Aeron::nano_clock() > 0);
221
222 let counter_async =
223 AeronAsyncAddCounter::new(&client, 2543543, "12312312".as_bytes(), "abcd")?;
224
225 let counter = counter_async.poll_blocking(Duration::from_secs(15))?;
226 unsafe {
227 *counter.addr() += 1;
228 }
229
230 let result = AeronAsyncAddPublication::new(&client, topic, stream_id)?;
231
232 let publication = result.poll_blocking(std::time::Duration::from_secs(15))?;
233
234 info!("publication channel: {:?}", publication.channel());
235 info!("publication stream_id: {:?}", publication.stream_id());
236 info!("publication status: {:?}", publication.channel_status());
237
238 stop.store(true, Ordering::SeqCst);
243
244 Ok(())
245 }
246
247 #[test]
248 pub fn test_debug() -> Result<(), Box<dyn std::error::Error>> {
249 let ctx = AeronDriverContext::new()?;
250
251 println!("{:#?}", ctx);
252
253 struct AgentStartHandler {
254 ctx: AeronDriverContext,
255 }
256
257 impl AeronAgentStartFuncCallback for AgentStartHandler {
258 fn handle_aeron_agent_on_start_func(&mut self, role: &str) -> () {
259 unsafe {
260 aeron_set_thread_affinity_on_start(
261 self.ctx.get_inner() as *mut _,
262 std::ffi::CString::new(role).unwrap().into_raw(),
263 );
264 }
265 }
266 }
267
268 ctx.set_agent_on_start_function(Some(&Handler::leak(AgentStartHandler {
269 ctx: ctx.clone(),
270 })))?;
271
272 println!("{:#?}", ctx);
273
274 Ok(())
275 }
276}