rusteron_client/
lib.rs

1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8//! # Features
9//!
10//! - **`static`**: When enabled, this feature statically links the Aeron C code.
11//!   By default, the library uses dynamic linking to the Aeron C libraries.
12//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
13//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
14//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
15
16pub mod bindings {
17    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21
22include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
23include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
24
25#[cfg(test)]
26mod tests {
27    use super::*;
28    use crate::test_alloc::current_allocs;
29    use log::{error, info};
30    use rusteron_media_driver::AeronDriverContext;
31    use serial_test::serial;
32    use std::error;
33    use std::error::Error;
34    use std::io::Write;
35    use std::os::raw::c_int;
36    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37    use std::sync::Arc;
38    use std::thread::{sleep, JoinHandle};
39    use std::time::{Duration, Instant};
40
41    #[derive(Default, Debug)]
42    struct ErrorCount {
43        error_count: usize,
44    }
45
46    impl AeronErrorHandlerCallback for ErrorCount {
47        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
48            error!("Aeron error {}: {}", error_code, msg);
49            self.error_count += 1;
50        }
51    }
52
53    #[test]
54    #[serial]
55    fn version_check() -> Result<(), Box<dyn error::Error>> {
56        let alloc_count = current_allocs();
57
58        {
59            let major = unsafe { crate::aeron_version_major() };
60            let minor = unsafe { crate::aeron_version_minor() };
61            let patch = unsafe { crate::aeron_version_patch() };
62
63            let cargo_version = "1.48.4";
64            let aeron_version = format!("{}.{}.{}", major, minor, patch);
65            assert_eq!(aeron_version, cargo_version);
66
67            let ctx = AeronContext::new()?;
68            let error_count = 1;
69            let mut handler = Handler::leak(ErrorCount::default());
70            ctx.set_error_handler(Some(&handler))?;
71
72            assert!(Aeron::epoch_clock() > 0);
73            drop(ctx);
74            assert!(handler.should_drop);
75            handler.release();
76            assert!(!handler.should_drop);
77            drop(handler);
78        }
79
80        assert!(
81            current_allocs() <= alloc_count,
82            "allocations {} > {alloc_count}",
83            current_allocs()
84        );
85
86        Ok(())
87    }
88
89    #[test]
90    #[serial]
91    pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
92        let _ = env_logger::Builder::new()
93            .is_test(true)
94            .filter_level(log::LevelFilter::Info)
95            .try_init();
96        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
97        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
98        media_driver_ctx.set_dir_delete_on_start(true)?;
99        media_driver_ctx.set_dir(
100            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
101        )?;
102        let (stop, driver_handle) =
103            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
104
105        let ctx = AeronContext::new()?;
106        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
107        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
108        let error_count = 1;
109        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
110        ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
111        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
112        ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
113        ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
114        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
115        ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
116        ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
117
118        info!("creating client [simple_large_send test]");
119        let aeron = Aeron::new(&ctx)?;
120        info!("starting client");
121
122        aeron.start()?;
123        info!("client started");
124        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
125        info!("created publisher");
126
127        assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
128        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
129        AeronCncMetadata::read_from_file(&cstr, |cnc| {
130            assert!(cnc.pid > 0);
131        })?;
132        assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
133        let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
134        for _ in 0..50 {
135            AeronCnc::read_on_partial_stack(&cstr, |cnc| {
136                assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
137            })?;
138        }
139
140        let subscription = aeron.add_subscription(
141            AERON_IPC_STREAM,
142            123,
143            Handlers::no_available_image_handler(),
144            Handlers::no_unavailable_image_handler(),
145            Duration::from_secs(5),
146        )?;
147        info!("created subscription");
148
149        subscription
150            .poll_once(|msg, header| println!("foo"), 1024)
151            .unwrap();
152
153        // pick a large enough size to confirm fragement assembler is working
154        let string_len = media_driver_ctx.ipc_mtu_length * 100;
155        info!("string length: {}", string_len);
156
157        let publisher_handler = {
158            let stop = stop.clone();
159            std::thread::spawn(move || {
160                let binding = "1".repeat(string_len);
161                let large_msg = binding.as_bytes();
162                loop {
163                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
164                        break;
165                    }
166                    let result =
167                        publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
168
169                    assert_eq!(123, publisher.get_constants().unwrap().stream_id);
170
171                    if result < large_msg.len() as i64 {
172                        let error = AeronCError::from_code(result as i32);
173                        match error.kind() {
174                            AeronErrorType::PublicationBackPressured
175                            | AeronErrorType::PublicationAdminAction => {
176                                // ignore
177                            }
178                            _ => {
179                                error!(
180                                    "ERROR: failed to send message {:?}",
181                                    AeronCError::from_code(result as i32)
182                                );
183                            }
184                        }
185                        sleep(Duration::from_millis(500));
186                    }
187                }
188                info!("stopping publisher thread");
189            })
190        };
191
192        let mut assembler = AeronFragmentClosureAssembler::new()?;
193
194        struct Context {
195            count: Arc<AtomicUsize>,
196            stop: Arc<AtomicBool>,
197            string_len: usize,
198        }
199
200        let count = Arc::new(AtomicUsize::new(0usize));
201        let mut context = Context {
202            count: count.clone(),
203            stop: stop.clone(),
204            string_len,
205        };
206
207        // Start the timer
208        let start_time = Instant::now();
209
210        loop {
211            if start_time.elapsed() > Duration::from_secs(30) {
212                info!("Failed: exceeded 30-second timeout");
213                return Err(Box::new(std::io::Error::new(
214                    std::io::ErrorKind::TimedOut,
215                    "Timeout exceeded",
216                )));
217            }
218            let c = count.load(Ordering::SeqCst);
219            if c > 100 {
220                break;
221            }
222
223            fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
224                ctx.count.fetch_add(1, Ordering::SeqCst);
225
226                let values = header.get_values().unwrap();
227                assert_ne!(values.frame.session_id, 0);
228
229                if buffer.len() != ctx.string_len {
230                    ctx.stop.store(true, Ordering::SeqCst);
231                    error!(
232                        "ERROR: message was {} but was expecting {} [header={:?}]",
233                        buffer.len(),
234                        ctx.string_len,
235                        header
236                    );
237                    sleep(Duration::from_secs(1));
238                }
239
240                assert_eq!(buffer.len(), ctx.string_len);
241                assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
242            }
243
244            subscription.poll(assembler.process(&mut context, process_msg), 128)?;
245            assert_eq!(123, subscription.get_constants().unwrap().stream_id);
246        }
247
248        subscription.close(Handlers::no_notification_handler())?;
249
250        info!("stopping client");
251        stop.store(true, Ordering::SeqCst);
252
253        let _ = publisher_handler.join().unwrap();
254        let _ = driver_handle.join().unwrap();
255
256        let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
257        cnc.counters_reader().foreach_counter_once(
258            |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
259                println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
260                AeronSystemCounterType::try_from(type_id));
261            },
262        );
263        cnc.error_log_read_once(| observation_count: i32,
264                                     first_observation_timestamp: i64,
265                                     last_observation_timestamp: i64,
266                                     error: &str| {
267            println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
268        }, 0);
269        cnc.loss_reporter_read_once(|    observation_count: i64,
270                                    total_bytes_lost: i64,
271                                    first_observation_timestamp: i64,
272                                    last_observation_timestamp: i64,
273                                    session_id: i32,
274                                    stream_id: i32,
275                                    channel: &str,
276                                    source: &str,| {
277            println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
278        })?;
279
280        Ok(())
281    }
282
283    #[test]
284    #[serial]
285    pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
286        let _ = env_logger::Builder::new()
287            .is_test(true)
288            .filter_level(log::LevelFilter::Info)
289            .try_init();
290        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
291        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
292        media_driver_ctx.set_dir_delete_on_start(true)?;
293        media_driver_ctx.set_dir(
294            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
295        )?;
296        let (stop, driver_handle) =
297            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
298
299        let ctx = AeronContext::new()?;
300        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
301        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
302        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
303
304        info!("creating client [try_claim test]");
305        let aeron = Aeron::new(&ctx)?;
306        info!("starting client");
307
308        aeron.start()?;
309        info!("client started");
310        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
311        info!("created publisher");
312
313        let subscription = aeron.add_subscription(
314            AERON_IPC_STREAM,
315            123,
316            Handlers::no_available_image_handler(),
317            Handlers::no_unavailable_image_handler(),
318            Duration::from_secs(5),
319        )?;
320        info!("created subscription");
321
322        // pick a large enough size to confirm fragement assembler is working
323        let string_len = 156;
324        info!("string length: {}", string_len);
325
326        let publisher_handler = {
327            let stop = stop.clone();
328            std::thread::spawn(move || {
329                let binding = "1".repeat(string_len);
330                let msg = binding.as_bytes();
331                let buffer = AeronBufferClaim::default();
332                loop {
333                    if stop.load(Ordering::Acquire) || publisher.is_closed() {
334                        break;
335                    }
336
337                    let result = publisher.try_claim(string_len, &buffer);
338
339                    if result < msg.len() as i64 {
340                        error!(
341                            "ERROR: failed to send message {:?}",
342                            AeronCError::from_code(result as i32)
343                        );
344                    } else {
345                        buffer.data().write_all(&msg).unwrap();
346                        buffer.commit().unwrap();
347                    }
348                }
349                info!("stopping publisher thread");
350            })
351        };
352
353        let count = Arc::new(AtomicUsize::new(0usize));
354        let count_copy = Arc::clone(&count);
355        let stop2 = stop.clone();
356
357        struct FragmentHandler {
358            count_copy: Arc<AtomicUsize>,
359            stop2: Arc<AtomicBool>,
360            string_len: usize,
361        }
362
363        impl AeronFragmentHandlerCallback for FragmentHandler {
364            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
365                self.count_copy.fetch_add(1, Ordering::SeqCst);
366
367                if buffer.len() != self.string_len {
368                    self.stop2.store(true, Ordering::SeqCst);
369                    error!(
370                        "ERROR: message was {} but was expecting {} [header={:?}]",
371                        buffer.len(),
372                        self.string_len,
373                        header
374                    );
375                    sleep(Duration::from_secs(1));
376                }
377
378                assert_eq!(buffer.len(), self.string_len);
379                assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
380            }
381        }
382
383        let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
384            count_copy,
385            stop2,
386            string_len,
387        })?;
388        let start_time = Instant::now();
389
390        loop {
391            if start_time.elapsed() > Duration::from_secs(30) {
392                info!("Failed: exceeded 30-second timeout");
393                return Err(Box::new(std::io::Error::new(
394                    std::io::ErrorKind::TimedOut,
395                    "Timeout exceeded",
396                )));
397            }
398            let c = count.load(Ordering::SeqCst);
399            if c > 100 {
400                break;
401            }
402            subscription.poll(Some(&closure), 128)?;
403        }
404
405        info!("stopping client");
406
407        stop.store(true, Ordering::SeqCst);
408
409        let _ = publisher_handler.join().unwrap();
410        let _ = driver_handle.join().unwrap();
411        Ok(())
412    }
413
414    #[test]
415    #[serial]
416    pub fn counters() -> Result<(), Box<dyn error::Error>> {
417        let _ = env_logger::Builder::new()
418            .is_test(true)
419            .filter_level(log::LevelFilter::Info)
420            .try_init();
421        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
422        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
423        media_driver_ctx.set_dir_delete_on_start(true)?;
424        media_driver_ctx.set_dir(
425            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
426        )?;
427        let (stop, driver_handle) =
428            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
429
430        let ctx = AeronContext::new()?;
431        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
432        assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
433        ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
434        ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
435
436        struct AvailableCounterHandler {
437            found_counter: bool,
438        }
439
440        impl AeronAvailableCounterCallback for AvailableCounterHandler {
441            fn handle_aeron_on_available_counter(
442                &mut self,
443                counters_reader: AeronCountersReader,
444                registration_id: i64,
445                counter_id: i32,
446            ) -> () {
447                info!(
448            "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
449            String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
450            counters_reader.get_counter_label(counter_id, 1000),
451            counters_reader.addr(counter_id)
452        );
453
454                assert_eq!(
455                    counters_reader.counter_registration_id(counter_id).unwrap(),
456                    registration_id
457                );
458
459                if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
460                    if label == "label_buffer" {
461                        self.found_counter = true;
462                        assert_eq!(
463                            &counters_reader.get_counter_key(counter_id).unwrap(),
464                            "key".as_bytes()
465                        );
466                    }
467                }
468            }
469        }
470
471        let handler = &Handler::leak(AvailableCounterHandler {
472            found_counter: false,
473        });
474        ctx.set_on_available_counter(Some(handler))?;
475
476        info!("creating client");
477        let aeron = Aeron::new(&ctx)?;
478        info!("starting client");
479
480        aeron.start()?;
481        info!("client started [counters test]");
482
483        let counter = aeron.add_counter(
484            123,
485            "key".as_bytes(),
486            "label_buffer",
487            Duration::from_secs(5),
488        )?;
489        let constants = counter.get_constants()?;
490        let counter_id = constants.counter_id;
491
492        let publisher_handler = {
493            let stop = stop.clone();
494            let counter = counter.clone();
495            std::thread::spawn(move || {
496                for _ in 0..150 {
497                    if stop.load(Ordering::Acquire) || counter.is_closed() {
498                        break;
499                    }
500                    counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
501                }
502                info!("stopping publisher thread");
503            })
504        };
505
506        let now = Instant::now();
507        while counter.addr_atomic().load(Ordering::SeqCst) < 100
508            && now.elapsed() < Duration::from_secs(10)
509        {
510            sleep(Duration::from_micros(10));
511        }
512
513        assert!(now.elapsed() < Duration::from_secs(10));
514
515        info!(
516            "counter is {}",
517            counter.addr_atomic().load(Ordering::SeqCst)
518        );
519
520        info!("stopping client");
521
522        #[cfg(not(target_os = "windows"))] // not sure why windows version doesn't fire event
523        assert!(handler.found_counter);
524
525        stop.store(true, Ordering::SeqCst);
526
527        let reader = aeron.counters_reader();
528        assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
529        assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
530        let buffers = AeronCountersReaderBuffers::default();
531        reader.get_buffers(&buffers)?;
532
533        let _ = publisher_handler.join().unwrap();
534        let _ = driver_handle.join().unwrap();
535        Ok(())
536    }
537
538    /// A simple error counter for testing error callback invocation.
539    #[derive(Default, Debug)]
540    struct TestErrorCount {
541        pub error_count: usize,
542    }
543
544    impl Drop for TestErrorCount {
545        fn drop(&mut self) {
546            info!("TestErrorCount dropped with {} errors", self.error_count);
547        }
548    }
549
550    impl AeronErrorHandlerCallback for TestErrorCount {
551        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
552            error!("Aeron error {}: {}", error_code, msg);
553            self.error_count += 1;
554        }
555    }
556
557    #[test]
558    #[serial]
559    pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
560        let _ = env_logger::Builder::new()
561            .is_test(true)
562            .filter_level(log::LevelFilter::Info)
563            .try_init();
564
565        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
566        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
567        media_driver_ctx.set_dir_delete_on_start(true)?;
568        media_driver_ctx.set_dir(
569            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
570        )?;
571        let (stop, driver_handle) =
572            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
573
574        let ctx = AeronContext::new()?;
575        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
576        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
577
578        let aeron = Aeron::new(&ctx)?;
579        aeron.start()?;
580
581        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
582        let subscription = aeron.add_subscription(
583            AERON_IPC_STREAM,
584            123,
585            Handlers::no_available_image_handler(),
586            Handlers::no_unavailable_image_handler(),
587            Duration::from_secs(5),
588        )?;
589
590        let count = Arc::new(AtomicUsize::new(0));
591        let start_time = Instant::now();
592
593        // Spawn a publisher thread that repeatedly sends "test" messages.
594        let publisher_thread = {
595            let stop = stop.clone();
596            std::thread::spawn(move || {
597                while !stop.load(Ordering::Acquire) {
598                    let msg = b"test";
599                    let result =
600                        publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
601                    // If backpressure is encountered, sleep a bit.
602                    if result == AeronErrorType::PublicationBackPressured.code() as i64 {
603                        sleep(Duration::from_millis(50));
604                    }
605                }
606            })
607        };
608
609        // Poll using the inline closure via poll_once until we receive at least 50 messages.
610        while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
611            let _ = subscription.poll_once(
612                |_msg, _header| {
613                    count.fetch_add(1, Ordering::SeqCst);
614                },
615                128,
616            )?;
617        }
618
619        stop.store(true, Ordering::SeqCst);
620        publisher_thread.join().unwrap();
621        let _ = driver_handle.join().unwrap();
622
623        assert!(
624            count.load(Ordering::SeqCst) >= 50,
625            "Expected at least 50 messages received"
626        );
627        Ok(())
628    }
629
630    #[test]
631    #[serial]
632    pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
633        let _ = env_logger::Builder::new()
634            .is_test(true)
635            .filter_level(log::LevelFilter::Info)
636            .try_init();
637
638        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
639        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
640        media_driver_ctx.set_dir_delete_on_start(true)?;
641        media_driver_ctx.set_dir(
642            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
643        )?;
644        let (_stop, driver_handle) =
645            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
646
647        let ctx = AeronContext::new()?;
648        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
649        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
650
651        let aeron = Aeron::new(&ctx)?;
652        aeron.start()?;
653        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
654
655        // Create two subscriptions on the same channel.
656        let subscription1 = aeron.add_subscription(
657            AERON_IPC_STREAM,
658            123,
659            Handlers::no_available_image_handler(),
660            Handlers::no_unavailable_image_handler(),
661            Duration::from_secs(5),
662        )?;
663        let subscription2 = aeron.add_subscription(
664            AERON_IPC_STREAM,
665            123,
666            Handlers::no_available_image_handler(),
667            Handlers::no_unavailable_image_handler(),
668            Duration::from_secs(5),
669        )?;
670
671        let count1 = Arc::new(AtomicUsize::new(0));
672        let count2 = Arc::new(AtomicUsize::new(0));
673
674        // Publish a single message.
675        let msg = b"hello multi-subscription";
676        let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
677        assert!(
678            result >= msg.len() as i64,
679            "Message should be sent successfully"
680        );
681
682        let start_time = Instant::now();
683        // Poll both subscriptions with inline closures until each has received at least one message.
684        while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
685            && start_time.elapsed() < Duration::from_secs(5)
686        {
687            let _ = subscription1.poll_once(
688                |_msg, _header| {
689                    count1.fetch_add(1, Ordering::SeqCst);
690                },
691                128,
692            )?;
693            let _ = subscription2.poll_once(
694                |_msg, _header| {
695                    count2.fetch_add(1, Ordering::SeqCst);
696                },
697                128,
698            )?;
699        }
700
701        assert!(
702            count1.load(Ordering::SeqCst) >= 1,
703            "Subscription 1 did not receive the message"
704        );
705        assert!(
706            count2.load(Ordering::SeqCst) >= 1,
707            "Subscription 2 did not receive the message"
708        );
709
710        _stop.store(true, Ordering::SeqCst);
711        let _ = driver_handle.join().unwrap();
712        Ok(())
713    }
714
715    #[test]
716    #[serial]
717    pub fn should_be_able_to_drop_after_close_manually_being_closed(
718    ) -> Result<(), Box<dyn error::Error>> {
719        let _ = env_logger::Builder::new()
720            .is_test(true)
721            .filter_level(log::LevelFilter::Info)
722            .try_init();
723
724        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
725        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
726        media_driver_ctx.set_dir_delete_on_start(true)?;
727        media_driver_ctx.set_dir(
728            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
729        )?;
730        let (_stop, driver_handle) =
731            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
732
733        let ctx = AeronContext::new()?;
734        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
735        ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
736
737        let aeron = Aeron::new(&ctx)?;
738        aeron.start()?;
739
740        {
741            let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
742            info!("created publication [sessionId={}]", publisher.session_id());
743            publisher.close_with_no_args()?;
744            drop(publisher);
745        }
746
747        {
748            let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
749            info!("created publication [sessionId={}]", publisher.session_id());
750            publisher.close(Handlers::no_notification_handler())?;
751            drop(publisher);
752        }
753
754        {
755            let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
756            publisher.close_once(|| println!("on close"))?;
757            info!("created publication [sessionId={}]", publisher.session_id());
758            drop(publisher);
759        }
760
761        Ok(())
762    }
763
764    #[test]
765    #[serial]
766    pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
767        let _ = env_logger::Builder::new()
768            .is_test(true)
769            .filter_level(log::LevelFilter::Info)
770            .try_init();
771
772        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
773        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
774        media_driver_ctx.set_dir_delete_on_start(true)?;
775        media_driver_ctx.set_dir(
776            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
777        )?;
778        let (_stop, driver_handle) =
779            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
780
781        let ctx = AeronContext::new()?;
782        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
783        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
784
785        let aeron = Aeron::new(&ctx)?;
786        aeron.start()?;
787        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
788
789        // Close the publication immediately.
790        publisher.close(Handlers::no_notification_handler())?;
791
792        // Attempt to send a message after the publication is closed.
793        let result = publisher.offer(
794            b"should fail",
795            Handlers::no_reserved_value_supplier_handler(),
796        );
797        assert!(
798            result < 0,
799            "Offering on a closed publication should return a negative error code"
800        );
801
802        _stop.store(true, Ordering::SeqCst);
803        let _ = driver_handle.join().unwrap();
804        Ok(())
805    }
806
807    /// Test sending and receiving an empty (zero-length) message using inline closures with poll_once.
808    #[test]
809    #[serial]
810    pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
811        let _ = env_logger::Builder::new()
812            .is_test(true)
813            .filter_level(log::LevelFilter::Info)
814            .try_init();
815
816        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
817        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
818        media_driver_ctx.set_dir_delete_on_start(true)?;
819        media_driver_ctx.set_dir(
820            &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
821        )?;
822        let (_stop, driver_handle) =
823            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
824
825        let ctx = AeronContext::new()?;
826        ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
827        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
828
829        let aeron = Aeron::new(&ctx)?;
830        aeron.start()?;
831        let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
832        let subscription = aeron.add_subscription(
833            AERON_IPC_STREAM,
834            123,
835            Handlers::no_available_image_handler(),
836            Handlers::no_unavailable_image_handler(),
837            Duration::from_secs(5),
838        )?;
839
840        let empty_received = Arc::new(AtomicBool::new(false));
841        let start_time = Instant::now();
842
843        let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
844        assert!(result > 0);
845
846        while !empty_received.load(Ordering::SeqCst)
847            && start_time.elapsed() < Duration::from_secs(5)
848        {
849            let _ = subscription.poll_once(
850                |msg, _header| {
851                    if msg.is_empty() {
852                        empty_received.store(true, Ordering::SeqCst);
853                    }
854                },
855                128,
856            )?;
857        }
858
859        assert!(
860            empty_received.load(Ordering::SeqCst),
861            "Empty message was not received"
862        );
863        _stop.store(true, Ordering::SeqCst);
864        let _ = driver_handle.join().unwrap();
865        Ok(())
866    }
867
868    #[test]
869    #[serial]
870    #[ignore] // need to work to get tags working properly, its more of testing issue then tag issue
871    pub fn tags() -> Result<(), Box<dyn error::Error>> {
872        let _ = env_logger::Builder::new()
873            .is_test(true)
874            .filter_level(log::LevelFilter::Debug)
875            .try_init();
876
877        let (md_ctx, stop, md) = start_media_driver(1)?;
878
879        let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
880
881        info!("creating suscriber 1");
882        let sub = aeron_sub
883            .add_subscription(
884                &"aeron:udp?tags=100".into_c_string(),
885                123,
886                Handlers::no_available_image_handler(),
887                Handlers::no_unavailable_image_handler(),
888                Duration::from_secs(50),
889            )
890            .map_err(|e| {
891                error!("aeron error={}", aeron_sub.errmsg());
892                e
893            })?;
894
895        let ctx = AeronContext::new()?;
896        ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
897        let aeron = Aeron::new(&ctx)?;
898        aeron.start()?;
899
900        info!("creating suscriber 2");
901        let sub2 = aeron_sub.add_subscription(
902            &"aeron:udp?tags=100".into_c_string(),
903            123,
904            Handlers::no_available_image_handler(),
905            Handlers::no_unavailable_image_handler(),
906            Duration::from_secs(50),
907        )?;
908
909        let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
910        info!("creating publisher");
911        assert!(!aeron_pub.is_closed());
912        let publisher = aeron_pub
913            .add_publication(
914                &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
915                123,
916                Duration::from_secs(5),
917            )
918            .map_err(|e| {
919                error!("aeron error={}", aeron_pub.errmsg());
920                e
921            })?;
922
923        info!("publishing msg");
924
925        loop {
926            let result = publisher.offer(
927                "213".as_bytes(),
928                Handlers::no_reserved_value_supplier_handler(),
929            );
930            if result < 0 {
931                error!(
932                    "failed to publish {:?}",
933                    AeronCError::from_code(result as i32)
934                );
935            } else {
936                break;
937            }
938        }
939
940        sub.poll_once(
941            |msg, _header| {
942                println!("Received message: {:?}", msg);
943            },
944            128,
945        )?;
946        sub2.poll_once(
947            |msg, _header| {
948                println!("Received message: {:?}", msg);
949            },
950            128,
951        )?;
952
953        stop.store(true, Ordering::SeqCst);
954
955        Ok(())
956    }
957
958    fn create_client(
959        media_driver_ctx: &AeronDriverContext,
960    ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
961        let dir = media_driver_ctx.get_dir();
962        info!("creating aeron client [dir={}]", dir);
963        let ctx = AeronContext::new()?;
964        ctx.set_dir(&dir.into_c_string())?;
965        ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
966        let aeron = Aeron::new(&ctx)?;
967        aeron.start()?;
968        Ok((ctx, aeron))
969    }
970
971    fn start_media_driver(
972        instance: u64,
973    ) -> Result<
974        (
975            AeronDriverContext,
976            Arc<AtomicBool>,
977            JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
978        ),
979        Box<dyn Error>,
980    > {
981        let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
982        media_driver_ctx.set_dir_delete_on_shutdown(true)?;
983        media_driver_ctx.set_dir_delete_on_start(true)?;
984        media_driver_ctx.set_dir(
985            &format!(
986                "{}{}-{}",
987                media_driver_ctx.get_dir(),
988                Aeron::epoch_clock(),
989                instance
990            )
991            .into_c_string(),
992        )?;
993        let (stop, driver_handle) =
994            rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
995        Ok((media_driver_ctx, stop, driver_handle))
996    }
997
998    #[doc = include_str!("../../README.md")]
999    mod readme_tests {}
1000}