1#![allow(non_upper_case_globals)]
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(clippy::all)]
5#![allow(unused_unsafe)]
6#![allow(unused_variables)]
7#![doc = include_str!("../README.md")]
8pub mod bindings {
17 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
18}
19
20use bindings::*;
21
22include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
23include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
24
25#[cfg(test)]
26mod tests {
27 use super::*;
28 use crate::test_alloc::current_allocs;
29 use log::{error, info};
30 use rusteron_media_driver::AeronDriverContext;
31 use serial_test::serial;
32 use std::error;
33 use std::error::Error;
34 use std::io::Write;
35 use std::os::raw::c_int;
36 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
37 use std::sync::Arc;
38 use std::thread::{sleep, JoinHandle};
39 use std::time::{Duration, Instant};
40
41 #[derive(Default, Debug)]
42 struct ErrorCount {
43 error_count: usize,
44 }
45
46 impl AeronErrorHandlerCallback for ErrorCount {
47 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
48 error!("Aeron error {}: {}", error_code, msg);
49 self.error_count += 1;
50 }
51 }
52
53 #[test]
54 #[serial]
55 fn version_check() -> Result<(), Box<dyn error::Error>> {
56 let alloc_count = current_allocs();
57
58 {
59 let major = unsafe { crate::aeron_version_major() };
60 let minor = unsafe { crate::aeron_version_minor() };
61 let patch = unsafe { crate::aeron_version_patch() };
62
63 let cargo_version = "1.48.4";
64 let aeron_version = format!("{}.{}.{}", major, minor, patch);
65 assert_eq!(aeron_version, cargo_version);
66
67 let ctx = AeronContext::new()?;
68 let error_count = 1;
69 let mut handler = Handler::leak(ErrorCount::default());
70 ctx.set_error_handler(Some(&handler))?;
71
72 assert!(Aeron::epoch_clock() > 0);
73 drop(ctx);
74 assert!(handler.should_drop);
75 handler.release();
76 assert!(!handler.should_drop);
77 drop(handler);
78 }
79
80 assert!(
81 current_allocs() <= alloc_count,
82 "allocations {} > {alloc_count}",
83 current_allocs()
84 );
85
86 Ok(())
87 }
88
89 #[test]
90 #[serial]
91 pub fn simple_large_send() -> Result<(), Box<dyn error::Error>> {
92 let _ = env_logger::Builder::new()
93 .is_test(true)
94 .filter_level(log::LevelFilter::Info)
95 .try_init();
96 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
97 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
98 media_driver_ctx.set_dir_delete_on_start(true)?;
99 media_driver_ctx.set_dir(
100 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
101 )?;
102 let (stop, driver_handle) =
103 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
104
105 let ctx = AeronContext::new()?;
106 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
107 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
108 let error_count = 1;
109 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
110 ctx.set_on_new_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
111 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
112 ctx.set_on_close_client(Some(&Handler::leak(AeronCloseClientLogger)))?;
113 ctx.set_on_new_subscription(Some(&Handler::leak(AeronNewSubscriptionLogger)))?;
114 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
115 ctx.set_on_available_counter(Some(&Handler::leak(AeronAvailableCounterLogger)))?;
116 ctx.set_on_new_exclusive_publication(Some(&Handler::leak(AeronNewPublicationLogger)))?;
117
118 info!("creating client [simple_large_send test]");
119 let aeron = Aeron::new(&ctx)?;
120 info!("starting client");
121
122 aeron.start()?;
123 info!("client started");
124 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
125 info!("created publisher");
126
127 assert!(AeronCncMetadata::load_from_file(ctx.get_dir())?.pid > 0);
128 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
129 AeronCncMetadata::read_from_file(&cstr, |cnc| {
130 assert!(cnc.pid > 0);
131 })?;
132 assert!(AeronCnc::new_on_heap(ctx.get_dir())?.get_to_driver_heartbeat_ms()? > 0);
133 let cstr = std::ffi::CString::new(ctx.get_dir()).unwrap();
134 for _ in 0..50 {
135 AeronCnc::read_on_partial_stack(&cstr, |cnc| {
136 assert!(cnc.get_to_driver_heartbeat_ms().unwrap() > 0);
137 })?;
138 }
139
140 let subscription = aeron.add_subscription(
141 AERON_IPC_STREAM,
142 123,
143 Handlers::no_available_image_handler(),
144 Handlers::no_unavailable_image_handler(),
145 Duration::from_secs(5),
146 )?;
147 info!("created subscription");
148
149 subscription
150 .poll_once(|msg, header| println!("foo"), 1024)
151 .unwrap();
152
153 let string_len = media_driver_ctx.ipc_mtu_length * 100;
155 info!("string length: {}", string_len);
156
157 let publisher_handler = {
158 let stop = stop.clone();
159 std::thread::spawn(move || {
160 let binding = "1".repeat(string_len);
161 let large_msg = binding.as_bytes();
162 loop {
163 if stop.load(Ordering::Acquire) || publisher.is_closed() {
164 break;
165 }
166 let result =
167 publisher.offer(large_msg, Handlers::no_reserved_value_supplier_handler());
168
169 assert_eq!(123, publisher.get_constants().unwrap().stream_id);
170
171 if result < large_msg.len() as i64 {
172 let error = AeronCError::from_code(result as i32);
173 match error.kind() {
174 AeronErrorType::PublicationBackPressured
175 | AeronErrorType::PublicationAdminAction => {
176 }
178 _ => {
179 error!(
180 "ERROR: failed to send message {:?}",
181 AeronCError::from_code(result as i32)
182 );
183 }
184 }
185 sleep(Duration::from_millis(500));
186 }
187 }
188 info!("stopping publisher thread");
189 })
190 };
191
192 let mut assembler = AeronFragmentClosureAssembler::new()?;
193
194 struct Context {
195 count: Arc<AtomicUsize>,
196 stop: Arc<AtomicBool>,
197 string_len: usize,
198 }
199
200 let count = Arc::new(AtomicUsize::new(0usize));
201 let mut context = Context {
202 count: count.clone(),
203 stop: stop.clone(),
204 string_len,
205 };
206
207 let start_time = Instant::now();
209
210 loop {
211 if start_time.elapsed() > Duration::from_secs(30) {
212 info!("Failed: exceeded 30-second timeout");
213 return Err(Box::new(std::io::Error::new(
214 std::io::ErrorKind::TimedOut,
215 "Timeout exceeded",
216 )));
217 }
218 let c = count.load(Ordering::SeqCst);
219 if c > 100 {
220 break;
221 }
222
223 fn process_msg(ctx: &mut Context, buffer: &[u8], header: AeronHeader) {
224 ctx.count.fetch_add(1, Ordering::SeqCst);
225
226 let values = header.get_values().unwrap();
227 assert_ne!(values.frame.session_id, 0);
228
229 if buffer.len() != ctx.string_len {
230 ctx.stop.store(true, Ordering::SeqCst);
231 error!(
232 "ERROR: message was {} but was expecting {} [header={:?}]",
233 buffer.len(),
234 ctx.string_len,
235 header
236 );
237 sleep(Duration::from_secs(1));
238 }
239
240 assert_eq!(buffer.len(), ctx.string_len);
241 assert_eq!(buffer, "1".repeat(ctx.string_len).as_bytes());
242 }
243
244 subscription.poll(assembler.process(&mut context, process_msg), 128)?;
245 assert_eq!(123, subscription.get_constants().unwrap().stream_id);
246 }
247
248 subscription.close(Handlers::no_notification_handler())?;
249
250 info!("stopping client");
251 stop.store(true, Ordering::SeqCst);
252
253 let _ = publisher_handler.join().unwrap();
254 let _ = driver_handle.join().unwrap();
255
256 let cnc = AeronCnc::new_on_heap(ctx.get_dir())?;
257 cnc.counters_reader().foreach_counter_once(
258 |value: i64, id: i32, type_id: i32, key: &[u8], label: &str| {
259 println!("counter reader id={id}, type_id={type_id}, key={key:?}, label={label}, value={value} [type={:?}]",
260 AeronSystemCounterType::try_from(type_id));
261 },
262 );
263 cnc.error_log_read_once(| observation_count: i32,
264 first_observation_timestamp: i64,
265 last_observation_timestamp: i64,
266 error: &str| {
267 println!("error: {error} observationCount={observation_count}, first_observation_timestamp={first_observation_timestamp}, last_observation_timestamp={last_observation_timestamp}");
268 }, 0);
269 cnc.loss_reporter_read_once(| observation_count: i64,
270 total_bytes_lost: i64,
271 first_observation_timestamp: i64,
272 last_observation_timestamp: i64,
273 session_id: i32,
274 stream_id: i32,
275 channel: &str,
276 source: &str,| {
277 println!("loss reporter observationCount={observation_count}, totalBytesLost={total_bytes_lost}, first_observed={first_observation_timestamp}, last_observed={last_observation_timestamp}, session_id={session_id}, stream_id={stream_id}, channel={channel} source={source}");
278 })?;
279
280 Ok(())
281 }
282
283 #[test]
284 #[serial]
285 pub fn try_claim() -> Result<(), Box<dyn error::Error>> {
286 let _ = env_logger::Builder::new()
287 .is_test(true)
288 .filter_level(log::LevelFilter::Info)
289 .try_init();
290 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
291 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
292 media_driver_ctx.set_dir_delete_on_start(true)?;
293 media_driver_ctx.set_dir(
294 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
295 )?;
296 let (stop, driver_handle) =
297 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
298
299 let ctx = AeronContext::new()?;
300 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
301 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
302 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
303
304 info!("creating client [try_claim test]");
305 let aeron = Aeron::new(&ctx)?;
306 info!("starting client");
307
308 aeron.start()?;
309 info!("client started");
310 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
311 info!("created publisher");
312
313 let subscription = aeron.add_subscription(
314 AERON_IPC_STREAM,
315 123,
316 Handlers::no_available_image_handler(),
317 Handlers::no_unavailable_image_handler(),
318 Duration::from_secs(5),
319 )?;
320 info!("created subscription");
321
322 let string_len = 156;
324 info!("string length: {}", string_len);
325
326 let publisher_handler = {
327 let stop = stop.clone();
328 std::thread::spawn(move || {
329 let binding = "1".repeat(string_len);
330 let msg = binding.as_bytes();
331 let buffer = AeronBufferClaim::default();
332 loop {
333 if stop.load(Ordering::Acquire) || publisher.is_closed() {
334 break;
335 }
336
337 let result = publisher.try_claim(string_len, &buffer);
338
339 if result < msg.len() as i64 {
340 error!(
341 "ERROR: failed to send message {:?}",
342 AeronCError::from_code(result as i32)
343 );
344 } else {
345 buffer.data().write_all(&msg).unwrap();
346 buffer.commit().unwrap();
347 }
348 }
349 info!("stopping publisher thread");
350 })
351 };
352
353 let count = Arc::new(AtomicUsize::new(0usize));
354 let count_copy = Arc::clone(&count);
355 let stop2 = stop.clone();
356
357 struct FragmentHandler {
358 count_copy: Arc<AtomicUsize>,
359 stop2: Arc<AtomicBool>,
360 string_len: usize,
361 }
362
363 impl AeronFragmentHandlerCallback for FragmentHandler {
364 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], header: AeronHeader) {
365 self.count_copy.fetch_add(1, Ordering::SeqCst);
366
367 if buffer.len() != self.string_len {
368 self.stop2.store(true, Ordering::SeqCst);
369 error!(
370 "ERROR: message was {} but was expecting {} [header={:?}]",
371 buffer.len(),
372 self.string_len,
373 header
374 );
375 sleep(Duration::from_secs(1));
376 }
377
378 assert_eq!(buffer.len(), self.string_len);
379 assert_eq!(buffer, "1".repeat(self.string_len).as_bytes());
380 }
381 }
382
383 let (closure, _inner) = Handler::leak_with_fragment_assembler(FragmentHandler {
384 count_copy,
385 stop2,
386 string_len,
387 })?;
388 let start_time = Instant::now();
389
390 loop {
391 if start_time.elapsed() > Duration::from_secs(30) {
392 info!("Failed: exceeded 30-second timeout");
393 return Err(Box::new(std::io::Error::new(
394 std::io::ErrorKind::TimedOut,
395 "Timeout exceeded",
396 )));
397 }
398 let c = count.load(Ordering::SeqCst);
399 if c > 100 {
400 break;
401 }
402 subscription.poll(Some(&closure), 128)?;
403 }
404
405 info!("stopping client");
406
407 stop.store(true, Ordering::SeqCst);
408
409 let _ = publisher_handler.join().unwrap();
410 let _ = driver_handle.join().unwrap();
411 Ok(())
412 }
413
414 #[test]
415 #[serial]
416 pub fn counters() -> Result<(), Box<dyn error::Error>> {
417 let _ = env_logger::Builder::new()
418 .is_test(true)
419 .filter_level(log::LevelFilter::Info)
420 .try_init();
421 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
422 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
423 media_driver_ctx.set_dir_delete_on_start(true)?;
424 media_driver_ctx.set_dir(
425 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
426 )?;
427 let (stop, driver_handle) =
428 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
429
430 let ctx = AeronContext::new()?;
431 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
432 assert_eq!(media_driver_ctx.get_dir(), ctx.get_dir());
433 ctx.set_error_handler(Some(&Handler::leak(ErrorCount::default())))?;
434 ctx.set_on_unavailable_counter(Some(&Handler::leak(AeronUnavailableCounterLogger)))?;
435
436 struct AvailableCounterHandler {
437 found_counter: bool,
438 }
439
440 impl AeronAvailableCounterCallback for AvailableCounterHandler {
441 fn handle_aeron_on_available_counter(
442 &mut self,
443 counters_reader: AeronCountersReader,
444 registration_id: i64,
445 counter_id: i32,
446 ) -> () {
447 info!(
448 "on counter key={:?}, label={:?} registration_id={registration_id}, counter_id={counter_id}, value={}, {counters_reader:?}",
449 String::from_utf8(counters_reader.get_counter_key(counter_id).unwrap()),
450 counters_reader.get_counter_label(counter_id, 1000),
451 counters_reader.addr(counter_id)
452 );
453
454 assert_eq!(
455 counters_reader.counter_registration_id(counter_id).unwrap(),
456 registration_id
457 );
458
459 if let Ok(label) = counters_reader.get_counter_label(counter_id, 1000) {
460 if label == "label_buffer" {
461 self.found_counter = true;
462 assert_eq!(
463 &counters_reader.get_counter_key(counter_id).unwrap(),
464 "key".as_bytes()
465 );
466 }
467 }
468 }
469 }
470
471 let handler = &Handler::leak(AvailableCounterHandler {
472 found_counter: false,
473 });
474 ctx.set_on_available_counter(Some(handler))?;
475
476 info!("creating client");
477 let aeron = Aeron::new(&ctx)?;
478 info!("starting client");
479
480 aeron.start()?;
481 info!("client started [counters test]");
482
483 let counter = aeron.add_counter(
484 123,
485 "key".as_bytes(),
486 "label_buffer",
487 Duration::from_secs(5),
488 )?;
489 let constants = counter.get_constants()?;
490 let counter_id = constants.counter_id;
491
492 let publisher_handler = {
493 let stop = stop.clone();
494 let counter = counter.clone();
495 std::thread::spawn(move || {
496 for _ in 0..150 {
497 if stop.load(Ordering::Acquire) || counter.is_closed() {
498 break;
499 }
500 counter.addr_atomic().fetch_add(1, Ordering::SeqCst);
501 }
502 info!("stopping publisher thread");
503 })
504 };
505
506 let now = Instant::now();
507 while counter.addr_atomic().load(Ordering::SeqCst) < 100
508 && now.elapsed() < Duration::from_secs(10)
509 {
510 sleep(Duration::from_micros(10));
511 }
512
513 assert!(now.elapsed() < Duration::from_secs(10));
514
515 info!(
516 "counter is {}",
517 counter.addr_atomic().load(Ordering::SeqCst)
518 );
519
520 info!("stopping client");
521
522 #[cfg(not(target_os = "windows"))] assert!(handler.found_counter);
524
525 stop.store(true, Ordering::SeqCst);
526
527 let reader = aeron.counters_reader();
528 assert_eq!(reader.get_counter_label(counter_id, 256)?, "label_buffer");
529 assert_eq!(reader.get_counter_key(counter_id)?, "key".as_bytes());
530 let buffers = AeronCountersReaderBuffers::default();
531 reader.get_buffers(&buffers)?;
532
533 let _ = publisher_handler.join().unwrap();
534 let _ = driver_handle.join().unwrap();
535 Ok(())
536 }
537
538 #[derive(Default, Debug)]
540 struct TestErrorCount {
541 pub error_count: usize,
542 }
543
544 impl Drop for TestErrorCount {
545 fn drop(&mut self) {
546 info!("TestErrorCount dropped with {} errors", self.error_count);
547 }
548 }
549
550 impl AeronErrorHandlerCallback for TestErrorCount {
551 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
552 error!("Aeron error {}: {}", error_code, msg);
553 self.error_count += 1;
554 }
555 }
556
557 #[test]
558 #[serial]
559 pub fn backpressure_recovery_test() -> Result<(), Box<dyn error::Error>> {
560 let _ = env_logger::Builder::new()
561 .is_test(true)
562 .filter_level(log::LevelFilter::Info)
563 .try_init();
564
565 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
566 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
567 media_driver_ctx.set_dir_delete_on_start(true)?;
568 media_driver_ctx.set_dir(
569 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
570 )?;
571 let (stop, driver_handle) =
572 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
573
574 let ctx = AeronContext::new()?;
575 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
576 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
577
578 let aeron = Aeron::new(&ctx)?;
579 aeron.start()?;
580
581 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
582 let subscription = aeron.add_subscription(
583 AERON_IPC_STREAM,
584 123,
585 Handlers::no_available_image_handler(),
586 Handlers::no_unavailable_image_handler(),
587 Duration::from_secs(5),
588 )?;
589
590 let count = Arc::new(AtomicUsize::new(0));
591 let start_time = Instant::now();
592
593 let publisher_thread = {
595 let stop = stop.clone();
596 std::thread::spawn(move || {
597 while !stop.load(Ordering::Acquire) {
598 let msg = b"test";
599 let result =
600 publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
601 if result == AeronErrorType::PublicationBackPressured.code() as i64 {
603 sleep(Duration::from_millis(50));
604 }
605 }
606 })
607 };
608
609 while count.load(Ordering::SeqCst) < 50 && start_time.elapsed() < Duration::from_secs(10) {
611 let _ = subscription.poll_once(
612 |_msg, _header| {
613 count.fetch_add(1, Ordering::SeqCst);
614 },
615 128,
616 )?;
617 }
618
619 stop.store(true, Ordering::SeqCst);
620 publisher_thread.join().unwrap();
621 let _ = driver_handle.join().unwrap();
622
623 assert!(
624 count.load(Ordering::SeqCst) >= 50,
625 "Expected at least 50 messages received"
626 );
627 Ok(())
628 }
629
630 #[test]
631 #[serial]
632 pub fn multi_subscription_test() -> Result<(), Box<dyn error::Error>> {
633 let _ = env_logger::Builder::new()
634 .is_test(true)
635 .filter_level(log::LevelFilter::Info)
636 .try_init();
637
638 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
639 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
640 media_driver_ctx.set_dir_delete_on_start(true)?;
641 media_driver_ctx.set_dir(
642 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
643 )?;
644 let (_stop, driver_handle) =
645 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
646
647 let ctx = AeronContext::new()?;
648 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
649 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
650
651 let aeron = Aeron::new(&ctx)?;
652 aeron.start()?;
653 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
654
655 let subscription1 = aeron.add_subscription(
657 AERON_IPC_STREAM,
658 123,
659 Handlers::no_available_image_handler(),
660 Handlers::no_unavailable_image_handler(),
661 Duration::from_secs(5),
662 )?;
663 let subscription2 = aeron.add_subscription(
664 AERON_IPC_STREAM,
665 123,
666 Handlers::no_available_image_handler(),
667 Handlers::no_unavailable_image_handler(),
668 Duration::from_secs(5),
669 )?;
670
671 let count1 = Arc::new(AtomicUsize::new(0));
672 let count2 = Arc::new(AtomicUsize::new(0));
673
674 let msg = b"hello multi-subscription";
676 let result = publisher.offer(msg, Handlers::no_reserved_value_supplier_handler());
677 assert!(
678 result >= msg.len() as i64,
679 "Message should be sent successfully"
680 );
681
682 let start_time = Instant::now();
683 while (count1.load(Ordering::SeqCst) < 1 || count2.load(Ordering::SeqCst) < 1)
685 && start_time.elapsed() < Duration::from_secs(5)
686 {
687 let _ = subscription1.poll_once(
688 |_msg, _header| {
689 count1.fetch_add(1, Ordering::SeqCst);
690 },
691 128,
692 )?;
693 let _ = subscription2.poll_once(
694 |_msg, _header| {
695 count2.fetch_add(1, Ordering::SeqCst);
696 },
697 128,
698 )?;
699 }
700
701 assert!(
702 count1.load(Ordering::SeqCst) >= 1,
703 "Subscription 1 did not receive the message"
704 );
705 assert!(
706 count2.load(Ordering::SeqCst) >= 1,
707 "Subscription 2 did not receive the message"
708 );
709
710 _stop.store(true, Ordering::SeqCst);
711 let _ = driver_handle.join().unwrap();
712 Ok(())
713 }
714
715 #[test]
716 #[serial]
717 pub fn should_be_able_to_drop_after_close_manually_being_closed(
718 ) -> Result<(), Box<dyn error::Error>> {
719 let _ = env_logger::Builder::new()
720 .is_test(true)
721 .filter_level(log::LevelFilter::Info)
722 .try_init();
723
724 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
725 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
726 media_driver_ctx.set_dir_delete_on_start(true)?;
727 media_driver_ctx.set_dir(
728 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
729 )?;
730 let (_stop, driver_handle) =
731 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
732
733 let ctx = AeronContext::new()?;
734 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
735 ctx.set_error_handler(Some(&Handler::leak(AeronErrorHandlerLogger)))?;
736
737 let aeron = Aeron::new(&ctx)?;
738 aeron.start()?;
739
740 {
741 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
742 info!("created publication [sessionId={}]", publisher.session_id());
743 publisher.close_with_no_args()?;
744 drop(publisher);
745 }
746
747 {
748 let publisher = aeron.add_publication(AERON_IPC_STREAM, 124, Duration::from_secs(5))?;
749 info!("created publication [sessionId={}]", publisher.session_id());
750 publisher.close(Handlers::no_notification_handler())?;
751 drop(publisher);
752 }
753
754 {
755 let publisher = aeron.add_publication(AERON_IPC_STREAM, 125, Duration::from_secs(5))?;
756 publisher.close_once(|| println!("on close"))?;
757 info!("created publication [sessionId={}]", publisher.session_id());
758 drop(publisher);
759 }
760
761 Ok(())
762 }
763
764 #[test]
765 #[serial]
766 pub fn offer_on_closed_publication_error_test() -> Result<(), Box<dyn error::Error>> {
767 let _ = env_logger::Builder::new()
768 .is_test(true)
769 .filter_level(log::LevelFilter::Info)
770 .try_init();
771
772 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
773 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
774 media_driver_ctx.set_dir_delete_on_start(true)?;
775 media_driver_ctx.set_dir(
776 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
777 )?;
778 let (_stop, driver_handle) =
779 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
780
781 let ctx = AeronContext::new()?;
782 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
783 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
784
785 let aeron = Aeron::new(&ctx)?;
786 aeron.start()?;
787 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
788
789 publisher.close(Handlers::no_notification_handler())?;
791
792 let result = publisher.offer(
794 b"should fail",
795 Handlers::no_reserved_value_supplier_handler(),
796 );
797 assert!(
798 result < 0,
799 "Offering on a closed publication should return a negative error code"
800 );
801
802 _stop.store(true, Ordering::SeqCst);
803 let _ = driver_handle.join().unwrap();
804 Ok(())
805 }
806
807 #[test]
809 #[serial]
810 pub fn empty_message_test() -> Result<(), Box<dyn error::Error>> {
811 let _ = env_logger::Builder::new()
812 .is_test(true)
813 .filter_level(log::LevelFilter::Info)
814 .try_init();
815
816 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
817 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
818 media_driver_ctx.set_dir_delete_on_start(true)?;
819 media_driver_ctx.set_dir(
820 &format!("{}{}", media_driver_ctx.get_dir(), Aeron::epoch_clock()).into_c_string(),
821 )?;
822 let (_stop, driver_handle) =
823 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
824
825 let ctx = AeronContext::new()?;
826 ctx.set_dir(&media_driver_ctx.get_dir().into_c_string())?;
827 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
828
829 let aeron = Aeron::new(&ctx)?;
830 aeron.start()?;
831 let publisher = aeron.add_publication(AERON_IPC_STREAM, 123, Duration::from_secs(5))?;
832 let subscription = aeron.add_subscription(
833 AERON_IPC_STREAM,
834 123,
835 Handlers::no_available_image_handler(),
836 Handlers::no_unavailable_image_handler(),
837 Duration::from_secs(5),
838 )?;
839
840 let empty_received = Arc::new(AtomicBool::new(false));
841 let start_time = Instant::now();
842
843 let result = publisher.offer(b"", Handlers::no_reserved_value_supplier_handler());
844 assert!(result > 0);
845
846 while !empty_received.load(Ordering::SeqCst)
847 && start_time.elapsed() < Duration::from_secs(5)
848 {
849 let _ = subscription.poll_once(
850 |msg, _header| {
851 if msg.is_empty() {
852 empty_received.store(true, Ordering::SeqCst);
853 }
854 },
855 128,
856 )?;
857 }
858
859 assert!(
860 empty_received.load(Ordering::SeqCst),
861 "Empty message was not received"
862 );
863 _stop.store(true, Ordering::SeqCst);
864 let _ = driver_handle.join().unwrap();
865 Ok(())
866 }
867
868 #[test]
869 #[serial]
870 #[ignore] pub fn tags() -> Result<(), Box<dyn error::Error>> {
872 let _ = env_logger::Builder::new()
873 .is_test(true)
874 .filter_level(log::LevelFilter::Debug)
875 .try_init();
876
877 let (md_ctx, stop, md) = start_media_driver(1)?;
878
879 let (_a_ctx2, aeron_sub) = create_client(&md_ctx)?;
880
881 info!("creating suscriber 1");
882 let sub = aeron_sub
883 .add_subscription(
884 &"aeron:udp?tags=100".into_c_string(),
885 123,
886 Handlers::no_available_image_handler(),
887 Handlers::no_unavailable_image_handler(),
888 Duration::from_secs(50),
889 )
890 .map_err(|e| {
891 error!("aeron error={}", aeron_sub.errmsg());
892 e
893 })?;
894
895 let ctx = AeronContext::new()?;
896 ctx.set_dir(&aeron_sub.context().get_dir().into_c_string())?;
897 let aeron = Aeron::new(&ctx)?;
898 aeron.start()?;
899
900 info!("creating suscriber 2");
901 let sub2 = aeron_sub.add_subscription(
902 &"aeron:udp?tags=100".into_c_string(),
903 123,
904 Handlers::no_available_image_handler(),
905 Handlers::no_unavailable_image_handler(),
906 Duration::from_secs(50),
907 )?;
908
909 let (_a_ctx1, aeron_pub) = create_client(&md_ctx)?;
910 info!("creating publisher");
911 assert!(!aeron_pub.is_closed());
912 let publisher = aeron_pub
913 .add_publication(
914 &"aeron:udp?endpoint=localhost:4040|alias=test|tags=100".into_c_string(),
915 123,
916 Duration::from_secs(5),
917 )
918 .map_err(|e| {
919 error!("aeron error={}", aeron_pub.errmsg());
920 e
921 })?;
922
923 info!("publishing msg");
924
925 loop {
926 let result = publisher.offer(
927 "213".as_bytes(),
928 Handlers::no_reserved_value_supplier_handler(),
929 );
930 if result < 0 {
931 error!(
932 "failed to publish {:?}",
933 AeronCError::from_code(result as i32)
934 );
935 } else {
936 break;
937 }
938 }
939
940 sub.poll_once(
941 |msg, _header| {
942 println!("Received message: {:?}", msg);
943 },
944 128,
945 )?;
946 sub2.poll_once(
947 |msg, _header| {
948 println!("Received message: {:?}", msg);
949 },
950 128,
951 )?;
952
953 stop.store(true, Ordering::SeqCst);
954
955 Ok(())
956 }
957
958 fn create_client(
959 media_driver_ctx: &AeronDriverContext,
960 ) -> Result<(AeronContext, Aeron), Box<dyn Error>> {
961 let dir = media_driver_ctx.get_dir();
962 info!("creating aeron client [dir={}]", dir);
963 let ctx = AeronContext::new()?;
964 ctx.set_dir(&dir.into_c_string())?;
965 ctx.set_error_handler(Some(&Handler::leak(TestErrorCount::default())))?;
966 let aeron = Aeron::new(&ctx)?;
967 aeron.start()?;
968 Ok((ctx, aeron))
969 }
970
971 fn start_media_driver(
972 instance: u64,
973 ) -> Result<
974 (
975 AeronDriverContext,
976 Arc<AtomicBool>,
977 JoinHandle<Result<(), rusteron_media_driver::AeronCError>>,
978 ),
979 Box<dyn Error>,
980 > {
981 let media_driver_ctx = rusteron_media_driver::AeronDriverContext::new()?;
982 media_driver_ctx.set_dir_delete_on_shutdown(true)?;
983 media_driver_ctx.set_dir_delete_on_start(true)?;
984 media_driver_ctx.set_dir(
985 &format!(
986 "{}{}-{}",
987 media_driver_ctx.get_dir(),
988 Aeron::epoch_clock(),
989 instance
990 )
991 .into_c_string(),
992 )?;
993 let (stop, driver_handle) =
994 rusteron_media_driver::AeronDriver::launch_embedded(media_driver_ctx.clone(), false);
995 Ok((media_driver_ctx, stop, driver_handle))
996 }
997
998 #[doc = include_str!("../../README.md")]
999 mod readme_tests {}
1000}