1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9pub mod bindings {
18 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40 fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45 pub fn find_counter_id_by_session(
46 counter_reader: &AeronCountersReader,
47 session_id: i32,
48 ) -> i32 {
49 unsafe {
50 aeron_archive_recording_pos_find_counter_id_by_session_id(
51 counter_reader.get_inner(),
52 session_id,
53 )
54 }
55 }
56 pub fn find_counter_id_by_recording_id(
57 counter_reader: &AeronCountersReader,
58 recording_id: i64,
59 ) -> i32 {
60 unsafe {
61 aeron_archive_recording_pos_find_counter_id_by_recording_id(
62 counter_reader.get_inner(),
63 recording_id,
64 )
65 }
66 }
67
68 pub fn get_recording_id_block(
71 counters_reader: &AeronCountersReader,
72 counter_id: i32,
73 wait: Duration,
74 ) -> Result<i64, AeronCError> {
75 let mut result = Self::get_recording_id(counters_reader, counter_id);
76 let instant = Instant::now();
77
78 while result.is_err() && instant.elapsed() < wait {
79 result = Self::get_recording_id(counters_reader, counter_id);
80 #[cfg(debug_assertions)]
81 std::thread::sleep(Duration::from_millis(10));
82 }
83
84 return result;
85 }
86
87 pub fn get_recording_id(
90 counters_reader: &AeronCountersReader,
91 counter_id: i32,
92 ) -> Result<i64, AeronCError> {
93 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97 pub const RECORD_ALLOCATED: i32 = 1;
99
100 pub const NULL_RECORDING_ID: i64 = -1;
102
103 if counter_id < 0 {
104 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105 }
106
107 let state = counters_reader.counter_state(counter_id)?;
108 if state != RECORD_ALLOCATED {
109 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110 }
111
112 let type_id = counters_reader.counter_type_id(counter_id)?;
113 if type_id != RECORDING_POSITION_TYPE_ID {
114 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115 }
116
117 let recording_id = Cell::new(-1);
123 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125 let mut val = [0u8; 8];
126 val.copy_from_slice(&key[0..8]);
127 let Ok(value) = i64::from_le_bytes(val).try_into();
128 recording_id.set(value);
129 }
130 });
131 let recording_id = recording_id.get();
132 if recording_id < 0 {
133 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134 }
135
136 Ok(recording_id)
137 }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141 _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145 data: std::ptr::null(),
146 length: 0,
147 });
148 Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152 pub fn aeron(&self) -> Aeron {
153 self.get_archive_context().get_aeron()
154 }
155}
156
157impl AeronArchiveAsyncConnect {
158 #[inline]
159 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161 let resource_async = Self::new(ctx)?;
162 resource_async
163 .owned_inner
164 .clone()
165 .unwrap()
166 .add_dependency(aeron.clone());
167 Ok(resource_async)
168 }
169}
170
171macro_rules! impl_archive_position_methods {
172 ($pub_type:ty) => {
173 impl $pub_type {
174 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
177 if let Some(aeron) = self.owned_inner.clone().unwrap().get_dependency::<Aeron>() {
178 let counter_reader = &aeron.counters_reader();
179 self.get_archive_position_with(counter_reader)
180 } else {
181 Err(AeronCError::from_code(-1))
182 }
183 }
184
185 pub fn get_archive_position_with(
188 &self,
189 counters: &AeronCountersReader,
190 ) -> Result<i64, AeronCError> {
191 let session_id = self.get_constants()?.session_id();
192 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
193 if counter_id < 0 {
194 return Err(AeronCError::from_code(counter_id));
195 }
196 let position = counters.get_counter_value(counter_id);
197 if position < 0 {
198 return Err(AeronCError::from_code(position as i32));
199 }
200 Ok(position)
201 }
202
203 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
206 let archive_position = self.get_archive_position().unwrap_or(-1);
207 if archive_position < 0 {
208 return false;
209 }
210 self.position() - archive_position <= length_inclusive as i64
211 }
212 }
213 };
214}
215
216impl_archive_position_methods!(AeronPublication);
217impl_archive_position_methods!(AeronExclusivePublication);
218
219impl AeronArchiveContext {
220 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
224 self.set_credentials_supplier(
225 Some(default_encoded_credentials),
226 None,
227 None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
228 )
229 }
230
231 pub fn new_with_no_credentials_supplier(
235 aeron: &Aeron,
236 request_control_channel: &str,
237 response_control_channel: &str,
238 recording_events_channel: &str,
239 ) -> Result<AeronArchiveContext, AeronCError> {
240 let context = Self::new()?;
241 context.set_no_credentials_supplier()?;
242 context.set_aeron(aeron)?;
243 context.set_control_request_channel(request_control_channel)?;
244 context.set_control_response_channel(response_control_channel)?;
245 context.set_recording_events_channel(recording_events_channel)?;
246 context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
248 Ok(context)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use log::{error, info};
256
257 use crate::testing::EmbeddedArchiveMediaDriverProcess;
258 use serial_test::serial;
259 use std::cell::Cell;
260 use std::error;
261 use std::error::Error;
262 use std::str::FromStr;
263 use std::sync::atomic::{AtomicBool, Ordering};
264 use std::sync::Arc;
265 use std::thread::{sleep, JoinHandle};
266 use std::time::{Duration, Instant};
267
268 #[derive(Default, Debug)]
269 struct ErrorCount {
270 error_count: usize,
271 }
272
273 impl AeronErrorHandlerCallback for ErrorCount {
274 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
275 error!("Aeron error {}: {}", error_code, msg);
276 self.error_count += 1;
277 }
278 }
279
280 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
281 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
282 pub const ARCHIVE_RECORDING_EVENTS: &str =
283 "aeron:udp?control-mode=dynamic|control=localhost:8012";
284
285 #[test]
286 fn test_uri_string_builder() -> Result<(), AeronCError> {
287 let builder = AeronUriStringBuilder::default();
288 builder.init_new()?;
289 builder
290 .media(Media::Udp)? .mtu_length(1024 * 64)?
292 .set_initial_position(127424949617280, 1182294755, 65536)?;
293 let uri = builder.build(1024)?;
294 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
295
296 builder.init_new()?;
297 let uri = builder
298 .media(Media::Udp)?
299 .control_mode(ControlMode::Dynamic)?
300 .reliable(false)?
301 .ttl(2)?
302 .endpoint("localhost:1235")?
303 .control("localhost:1234")?
304 .build(1024)?;
305 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
306
307 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
308 .ttl(5)?
309 .build(1024)?;
310
311 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
312
313 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
314
315 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
316
317 Ok(())
318 }
319
320 pub const STREAM_ID: i32 = 1033;
321 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
322 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
323 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
324 pub const LIVE_ENDPOINT: &str = "localhost:23267";
325 pub const REPLAY_ENDPOINT: &str = "localhost:0";
326 #[test]
329 #[serial]
330 fn test_simple_replay_merge() -> Result<(), AeronCError> {
331 let _ = env_logger::Builder::new()
332 .is_test(true)
333 .filter_level(log::LevelFilter::Info)
334 .try_init();
335
336 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
337 .expect("failed to kill all java processes");
338
339 assert!(is_udp_port_available(23265));
340 assert!(is_udp_port_available(23266));
341 assert!(is_udp_port_available(23267));
342 assert!(is_udp_port_available(23268));
343 let id = Aeron::nano_clock();
344 let aeron_dir = format!("target/aeron/{}/shm", id);
345 let archive_dir = format!("target/aeron/{}/archive", id);
346
347 info!("starting archive media driver");
348 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
349 &aeron_dir,
350 &format!("{}/archive", aeron_dir),
351 ARCHIVE_CONTROL_REQUEST,
352 ARCHIVE_CONTROL_RESPONSE,
353 ARCHIVE_RECORDING_EVENTS,
354 )
355 .expect("Failed to start embedded media driver");
356
357 info!("connecting to archive");
358 let (archive, aeron) = media_driver
359 .archive_connect()
360 .expect("Could not connect to archive client");
361
362 let running = Arc::new(AtomicBool::new(true));
363
364 info!("connected to archive, adding publication");
365 assert!(!aeron.is_closed());
366
367 let (session_id, publisher_thread) =
368 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
369
370 {
371 let context = AeronContext::new()?;
372 context.set_dir(&media_driver.aeron_dir)?;
373 let error_handler = Handler::leak(ErrorCount::default());
374 context.set_error_handler(Some(&error_handler))?;
375 let aeron = Aeron::new(&context)?;
376 aeron.start()?;
377 let aeron_archive_context = archive.get_archive_context();
378 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
379 &aeron,
380 aeron_archive_context.get_control_request_channel(),
381 aeron_archive_context.get_control_response_channel(),
382 aeron_archive_context.get_recording_events_channel(),
383 )?;
384 aeron_archive_context.set_error_handler(Some(&error_handler))?;
385 let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
386 .poll_blocking(Duration::from_secs(30))
387 .expect("failed to connect to archive");
388 replay_merge_subscription(&archive, aeron.clone(), session_id)?;
389 }
390
391 running.store(false, Ordering::Release);
392 publisher_thread.join().unwrap();
393
394 Ok(())
395 }
396
397 fn reply_merge_publisher(
398 archive: &AeronArchive,
399 aeron: Aeron,
400 running: Arc<AtomicBool>,
401 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
402 let publication = aeron.add_publication(
403 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536"),
405 STREAM_ID,
406 Duration::from_secs(5),
407 )?;
408
409 info!(
410 "publication {} [status={:?}]",
411 publication.channel(),
412 publication.channel_status()
413 );
414 assert_eq!(1, publication.channel_status());
415
416 let session_id = publication.session_id();
417 let recording_channel = format!(
418 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
420 );
421 info!("recording channel {}", recording_channel);
422 archive.start_recording(&recording_channel, STREAM_ID, SOURCE_LOCATION_REMOTE, true)?;
423
424 info!("waiting for publisher to be connected");
425 while !publication.is_connected() {
426 thread::sleep(Duration::from_millis(100));
427 }
428 info!("publisher to be connected");
429 let counters_reader = aeron.counters_reader();
430 let mut caught_up_count = 0;
431 let publisher_thread = thread::spawn(move || {
432 let mut message_count = 0;
433
434 while running.load(Ordering::Acquire) {
435 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
436 while publication.offer(
437 message.as_bytes(),
438 Handlers::no_reserved_value_supplier_handler(),
439 ) <= 0
440 {
441 thread::sleep(Duration::from_millis(10));
442 }
443 message_count += 1;
444 if message_count % 10_000 == 0 {
445 info!(
446 "Published {} messages [position={}]",
447 message_count,
448 publication.position()
449 );
450 }
451 if message_count > 10_000 {
453 while !publication.is_archive_position_with(0) {
455 thread::sleep(Duration::from_micros(300));
456 }
457 caught_up_count += 1;
458 }
459 }
460 assert!(caught_up_count > 0);
461 info!("Publisher thread terminated");
462 });
463 Ok((session_id, publisher_thread))
464 }
465
466 fn replay_merge_subscription(
467 archive: &AeronArchive,
468 aeron: Aeron,
469 session_id: i32,
470 ) -> Result<(), AeronCError> {
471 let replay_channel = format!("aeron:udp?session-id={session_id}");
473 info!("replay channel {}", replay_channel);
474
475 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}");
476 info!("replay destination {}", replay_destination);
477
478 let live_destination =
479 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}");
480 info!("live destination {}", live_destination);
481
482 let counters_reader = aeron.counters_reader();
483 let mut counter_id = -1;
484
485 while counter_id < 0 {
486 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
487 }
488 info!(
489 "counter id {} {:?}",
490 counter_id,
491 counters_reader.get_counter_label(counter_id, 1024)
492 );
493 info!(
494 "counter id {} position={:?}",
495 counter_id,
496 counters_reader.get_counter_value(counter_id)
497 );
498
499 let start_position = 0;
521 let recording_id = RecordingPos::get_recording_id_block(
522 &aeron.counters_reader(),
523 counter_id,
524 Duration::from_secs(5),
525 )?;
526
527 let subscribe_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
528 info!("subscribe channel {}", subscribe_channel);
529 let subscription = aeron.add_subscription(
530 &subscribe_channel,
531 STREAM_ID,
532 Handlers::no_available_image_handler(),
533 Handlers::no_unavailable_image_handler(),
534 Duration::from_secs(5),
535 )?;
536
537 let replay_merge = AeronArchiveReplayMerge::new(
538 &subscription,
539 &archive,
540 &replay_channel,
541 &replay_destination,
542 &live_destination,
543 recording_id,
544 start_position,
545 Aeron::epoch_clock(),
546 10_000,
547 )?;
548
549 info!(
550 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={}, replayChannel={}, replayDestination={}, liveDestination={}",
551 recording_id,
552 start_position,
553 subscribe_channel,
554 replay_channel,
555 replay_destination,
556 live_destination
557 );
558
559 info!(
570 "about to start_replay [maxRecordPosition={:?}]",
571 archive.get_max_recorded_position(recording_id)
572 );
573
574 let mut reply_count = 0;
575 while !replay_merge.is_merged() {
576 assert!(!replay_merge.has_failed());
577 if replay_merge.poll_once(
578 |buffer, _header| {
579 reply_count += 1;
580 if reply_count % 10_000 == 0 {
581 info!(
582 "replay-merge [count={}, isMerged={}, isLive={}]",
583 reply_count,
584 replay_merge.is_merged(),
585 replay_merge.is_live_added()
586 );
587 }
588 },
589 100,
590 )? == 0
591 {
592 let err = archive.poll_for_error_response_as_string(4096)?;
593 if !err.is_empty() {
594 panic!("{}", err);
595 }
596 if aeron.errmsg().len() > 0 && "no error" != aeron.errmsg() {
597 panic!("{}", aeron.errmsg());
598 }
599 archive.poll_for_recording_signals()?;
600 thread::sleep(Duration::from_millis(100));
601 }
602 }
603 assert!(!replay_merge.has_failed());
604 assert!(replay_merge.is_live_added());
605 assert!(reply_count > 10_000);
606 Ok(())
607 }
608
609 #[test]
610 fn version_check() {
611 let major = unsafe { crate::aeron_version_major() };
612 let minor = unsafe { crate::aeron_version_minor() };
613 let patch = unsafe { crate::aeron_version_patch() };
614
615 let aeron_version = format!("{}.{}.{}", major, minor, patch);
616
617 let cargo_version = "1.47.4";
618 assert_eq!(aeron_version, cargo_version);
619 }
620
621 use std::thread;
622
623 pub fn start_aeron_archive() -> Result<
624 (
625 Aeron,
626 AeronArchiveContext,
627 EmbeddedArchiveMediaDriverProcess,
628 ),
629 Box<dyn Error>,
630 > {
631 let id = Aeron::nano_clock();
632 let aeron_dir = format!("target/aeron/{}/shm", id);
633 let archive_dir = format!("target/aeron/{}/archive", id);
634
635 let request_port = find_unused_udp_port(8000).expect("Could not find port");
636 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
637 let recording_event_port =
638 find_unused_udp_port(response_port + 1).expect("Could not find port");
639 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
640 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
641 let recording_events_channel =
642 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
643 assert_ne!(request_control_channel, response_control_channel);
644
645 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
646 &aeron_dir,
647 &archive_dir,
648 request_control_channel,
649 response_control_channel,
650 recording_events_channel,
651 )
652 .expect("Failed to start Java process");
653
654 let aeron_context = AeronContext::new()?;
655 aeron_context.set_dir(&aeron_dir)?;
656 aeron_context.set_client_name("test")?;
657 aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
658 AeronPublicationErrorFrameHandlerLogger,
659 )))?;
660 let error_handler = Handler::leak(ErrorCount::default());
661 aeron_context.set_error_handler(Some(&error_handler))?;
662 let aeron = Aeron::new(&aeron_context)?;
663 aeron.start()?;
664
665 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
666 &aeron,
667 request_control_channel,
668 response_control_channel,
669 recording_events_channel,
670 )?;
671 archive_context.set_error_handler(Some(&error_handler))?;
672 Ok((aeron, archive_context, archive_media_driver))
673 }
674
675 #[test]
676 #[serial]
677 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
678 let _ = env_logger::Builder::new()
679 .is_test(true)
680 .filter_level(log::LevelFilter::Info)
681 .try_init();
682 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
683 .expect("failed to kill all java processes");
684
685 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
686
687 assert!(!aeron.is_closed());
688
689 info!("connected to aeron");
690
691 let archive_connector =
692 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
693 let archive = archive_connector
694 .poll_blocking(Duration::from_secs(30))
695 .expect("failed to connect to aeron archive media driver");
696
697 assert!(archive.get_archive_id() > 0);
698
699 let channel = AERON_IPC_STREAM;
700 let stream_id = 10;
701
702 let subscription_id =
703 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
704
705 assert!(subscription_id >= 0);
706 info!("subscription id {}", subscription_id);
707
708 let publication = aeron
709 .async_add_exclusive_publication(channel, stream_id)?
710 .poll_blocking(Duration::from_secs(5))?;
711
712 for i in 0..11 {
713 while publication.offer(
714 "123456".as_bytes(),
715 Handlers::no_reserved_value_supplier_handler(),
716 ) <= 0
717 {
718 sleep(Duration::from_millis(50));
719 archive.poll_for_recording_signals()?;
720 let err = archive.poll_for_error_response_as_string(4096)?;
721 if !err.is_empty() {
722 panic!("{}", err);
723 }
724 archive.idle();
725 }
726 info!("sent message {i} [test_aeron_archive]");
727 }
728
729 archive.idle();
730 let session_id = publication.get_constants()?.session_id;
731 info!("publication session id {}", session_id);
732 let stop_position = publication.position();
734 info!(
735 "publication stop position {} [publication={:?}]",
736 stop_position,
737 publication.get_constants()
738 );
739 let counters_reader = aeron.counters_reader();
740 info!("counters reader ready {:?}", counters_reader);
741
742 let mut counter_id = -1;
743
744 let start = Instant::now();
745 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
746 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
747 info!("counter id {}", counter_id);
748 }
749
750 assert!(counter_id >= 0);
751
752 info!("counter id {counter_id}, session id {session_id}");
753 while counters_reader.get_counter_value(counter_id) < stop_position {
754 info!(
755 "current archive publication stop position {}",
756 counters_reader.get_counter_value(counter_id)
757 );
758 sleep(Duration::from_millis(50));
759 }
760 info!(
761 "found archive publication stop position {}",
762 counters_reader.get_counter_value(counter_id)
763 );
764
765 archive.stop_recording_channel_and_stream(channel, stream_id)?;
766 drop(publication);
767
768 info!("list recordings");
769 let found_recording_id = Cell::new(-1);
770 let start_pos = Cell::new(-1);
771 let end_pos = Cell::new(-1);
772 let start = Instant::now();
773 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
774 let mut count = 0;
775 archive.list_recordings_for_uri_once(
776 &mut count,
777 0,
778 i32::MAX,
779 channel,
780 stream_id,
781 |d: AeronArchiveRecordingDescriptor| {
782 assert_eq!(d.stream_id, stream_id);
783 info!("found recording {:#?}", d);
784 info!(
785 "strippedChannel={}, originalChannel={}",
786 d.stripped_channel(),
787 d.original_channel()
788 );
789 if d.stop_position > d.start_position && d.stop_position > 0 {
790 found_recording_id.set(d.recording_id);
791 start_pos.set(d.start_position);
792 end_pos.set(d.stop_position);
793 }
794
795 let copy = d.clone_struct();
797 assert_eq!(copy.deref(), d.deref());
798 assert_eq!(copy.recording_id, d.recording_id);
799 assert_eq!(copy.control_session_id, d.control_session_id);
800 assert_eq!(copy.mtu_length, d.mtu_length);
801 assert_eq!(copy.source_identity_length, d.source_identity_length);
802 },
803 )?;
804 archive.poll_for_recording_signals()?;
805 let err = archive.poll_for_error_response_as_string(4096)?;
806 if !err.is_empty() {
807 panic!("{}", err);
808 }
809 }
810 assert!(start.elapsed() < Duration::from_secs(5));
811 info!("start replay");
812 let params = AeronArchiveReplayParams::new(
813 0,
814 i32::MAX,
815 start_pos.get(),
816 end_pos.get() - start_pos.get(),
817 0,
818 0,
819 )?;
820 info!("replay params {:#?}", params);
821 let replay_stream_id = 45;
822 let replay_session_id =
823 archive.start_replay(found_recording_id.get(), channel, replay_stream_id, ¶ms)?;
824 let session_id = replay_session_id as i32;
825
826 info!("replay session id {}", replay_session_id);
827 info!("session id {}", session_id);
828 let channel_replay = format!("{}?session-id={}", channel, session_id);
829 info!("archive id: {}", archive.get_archive_id());
830
831 info!("add subscription {}", channel_replay);
832 let subscription = aeron
833 .async_add_subscription(
834 &channel_replay,
835 replay_stream_id,
836 Some(&Handler::leak(AeronAvailableImageLogger)),
837 Some(&Handler::leak(AeronUnavailableImageLogger)),
838 )?
839 .poll_blocking(Duration::from_secs(10))?;
840
841 #[derive(Default)]
842 struct FragmentHandler {
843 count: Cell<usize>,
844 }
845
846 impl AeronFragmentHandlerCallback for FragmentHandler {
847 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
848 assert_eq!(buffer, "123456".as_bytes());
849
850 self.count.set(self.count.get() + 1);
852 }
853 }
854
855 let poll = Handler::leak(FragmentHandler::default());
856
857 let start = Instant::now();
858 while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
859 {
860 let err = archive.poll_for_error_response_as_string(4096)?;
861 if !err.is_empty() {
862 panic!("{}", err);
863 }
864 }
865 assert!(
866 start.elapsed() < Duration::from_secs(10),
867 "messages not received {:?}",
868 poll.count
869 );
870 info!("aeron {:?}", aeron);
871 info!("ctx {:?}", archive_context);
872 assert_eq!(11, poll.count.get());
873 Ok(())
874 }
875
876 #[test]
877 #[serial]
878 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
879 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
880 let archive_connector =
881 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
882 let archive = archive_connector
883 .poll_blocking(Duration::from_secs(30))
884 .expect("failed to connect to archive");
885
886 let invalid_channel = "invalid:channel";
887 let result =
888 archive.start_recording(invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
889 assert!(
890 result.is_err(),
891 "Expected error when starting recording with an invalid channel"
892 );
893 Ok(())
894 }
895
896 #[test]
897 #[serial]
898 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
899 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
900 let archive_connector =
901 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
902 let archive = archive_connector
903 .poll_blocking(Duration::from_secs(30))
904 .expect("failed to connect to archive");
905
906 let nonexistent_channel = "aeron:udp?endpoint=localhost:9999";
907 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
908 assert!(
909 result.is_err(),
910 "Expected error when stopping recording on a non-existent channel"
911 );
912 Ok(())
913 }
914
915 #[test]
916 #[serial]
917 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
918 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
919 let archive_connector =
920 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
921 let archive = archive_connector
922 .poll_blocking(Duration::from_secs(30))
923 .expect("failed to connect to archive");
924
925 let invalid_recording_id = -999;
926 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
927 let result = archive.start_replay(
928 invalid_recording_id,
929 "aeron:udp?endpoint=localhost:8888",
930 STREAM_ID,
931 ¶ms,
932 );
933 assert!(
934 result.is_err(),
935 "Expected error when starting replay with an invalid recording id"
936 );
937 Ok(())
938 }
939
940 #[test]
941 #[serial]
942 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
943 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
944 let archive_connector =
945 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
946 let archive = archive_connector
947 .poll_blocking(Duration::from_secs(30))
948 .expect("failed to connect to archive");
949
950 drop(archive);
951
952 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
953 let new_archive = archive_connector
954 .poll_blocking(Duration::from_secs(30))
955 .expect("failed to reconnect to archive");
956 assert!(
957 new_archive.get_archive_id() > 0,
958 "Reconnected archive should have a valid archive id"
959 );
960
961 drop(media_driver);
962 Ok(())
963 }
964}