1#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9pub mod bindings {
18 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35 SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40 fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45 pub fn find_counter_id_by_session(
46 counter_reader: &AeronCountersReader,
47 session_id: i32,
48 ) -> i32 {
49 unsafe {
50 aeron_archive_recording_pos_find_counter_id_by_session_id(
51 counter_reader.get_inner(),
52 session_id,
53 )
54 }
55 }
56 pub fn find_counter_id_by_recording_id(
57 counter_reader: &AeronCountersReader,
58 recording_id: i64,
59 ) -> i32 {
60 unsafe {
61 aeron_archive_recording_pos_find_counter_id_by_recording_id(
62 counter_reader.get_inner(),
63 recording_id,
64 )
65 }
66 }
67
68 pub fn get_recording_id_block(
71 counters_reader: &AeronCountersReader,
72 counter_id: i32,
73 wait: Duration,
74 ) -> Result<i64, AeronCError> {
75 let mut result = Self::get_recording_id(counters_reader, counter_id);
76 let instant = Instant::now();
77
78 while result.is_err() && instant.elapsed() < wait {
79 result = Self::get_recording_id(counters_reader, counter_id);
80 #[cfg(debug_assertions)]
81 std::thread::sleep(Duration::from_millis(10));
82 }
83
84 return result;
85 }
86
87 pub fn get_recording_id(
90 counters_reader: &AeronCountersReader,
91 counter_id: i32,
92 ) -> Result<i64, AeronCError> {
93 pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97 pub const RECORD_ALLOCATED: i32 = 1;
99
100 pub const NULL_RECORDING_ID: i64 = -1;
102
103 if counter_id < 0 {
104 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105 }
106
107 let state = counters_reader.counter_state(counter_id)?;
108 if state != RECORD_ALLOCATED {
109 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110 }
111
112 let type_id = counters_reader.counter_type_id(counter_id)?;
113 if type_id != RECORDING_POSITION_TYPE_ID {
114 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115 }
116
117 let recording_id = Cell::new(-1);
123 counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124 if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125 let mut val = [0u8; 8];
126 val.copy_from_slice(&key[0..8]);
127 let Ok(value) = i64::from_le_bytes(val).try_into();
128 recording_id.set(value);
129 }
130 });
131 let recording_id = recording_id.get();
132 if recording_id < 0 {
133 return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134 }
135
136 Ok(recording_id)
137 }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141 _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143 let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145 data: std::ptr::null(),
146 length: 0,
147 });
148 Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152 pub fn aeron(&self) -> Aeron {
153 self.get_archive_context().get_aeron()
154 }
155}
156
157impl AeronArchiveAsyncConnect {
158 #[inline]
159 pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161 let resource_async = Self::new(ctx)?;
162 resource_async.inner.add_dependency(aeron.clone());
163 Ok(resource_async)
164 }
165}
166
167macro_rules! impl_archive_position_methods {
168 ($pub_type:ty) => {
169 impl $pub_type {
170 pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
173 if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
174 let counter_reader = &aeron.counters_reader();
175 self.get_archive_position_with(counter_reader)
176 } else {
177 Err(AeronCError::from_code(-1))
178 }
179 }
180
181 pub fn get_archive_position_with(
184 &self,
185 counters: &AeronCountersReader,
186 ) -> Result<i64, AeronCError> {
187 let session_id = self.get_constants()?.session_id();
188 let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
189 if counter_id < 0 {
190 return Err(AeronCError::from_code(counter_id));
191 }
192 let position = counters.get_counter_value(counter_id);
193 if position < 0 {
194 return Err(AeronCError::from_code(position as i32));
195 }
196 Ok(position)
197 }
198
199 pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
202 let archive_position = self.get_archive_position().unwrap_or(-1);
203 if archive_position < 0 {
204 return false;
205 }
206 self.position() - archive_position <= length_inclusive as i64
207 }
208 }
209 };
210}
211
212impl_archive_position_methods!(AeronPublication);
213impl_archive_position_methods!(AeronExclusivePublication);
214
215impl AeronArchiveContext {
216 pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
220 self.set_credentials_supplier(
221 Some(default_encoded_credentials),
222 None,
223 None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
224 )
225 }
226
227 pub fn new_with_no_credentials_supplier(
231 aeron: &Aeron,
232 request_control_channel: &str,
233 response_control_channel: &str,
234 recording_events_channel: &str,
235 ) -> Result<AeronArchiveContext, AeronCError> {
236 let context = Self::new()?;
237 context.set_no_credentials_supplier()?;
238 context.set_aeron(aeron)?;
239 context.set_control_request_channel(&request_control_channel.into_c_string())?;
240 context.set_control_response_channel(&response_control_channel.into_c_string())?;
241 context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
242 context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
244 Ok(context)
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use log::{error, info};
252
253 use crate::testing::EmbeddedArchiveMediaDriverProcess;
254 use serial_test::serial;
255 use std::cell::Cell;
256 use std::error;
257 use std::error::Error;
258 use std::str::FromStr;
259 use std::sync::atomic::{AtomicBool, Ordering};
260 use std::sync::Arc;
261 use std::thread::{sleep, JoinHandle};
262 use std::time::{Duration, Instant};
263
264 #[derive(Default, Debug)]
265 struct ErrorCount {
266 error_count: usize,
267 }
268
269 impl AeronErrorHandlerCallback for ErrorCount {
270 fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
271 error!("Aeron error {}: {}", error_code, msg);
272 self.error_count += 1;
273 }
274 }
275
276 pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
277 pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
278 pub const ARCHIVE_RECORDING_EVENTS: &str =
279 "aeron:udp?control-mode=dynamic|control=localhost:8012";
280
281 #[test]
282 fn test_uri_string_builder() -> Result<(), AeronCError> {
283 let builder = AeronUriStringBuilder::default();
284 builder.init_new()?;
285 builder
286 .media(Media::Udp)? .mtu_length(1024 * 64)?
288 .set_initial_position(127424949617280, 1182294755, 65536)?;
289 let uri = builder.build(1024)?;
290 assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
291
292 builder.init_new()?;
293 let uri = builder
294 .media(Media::Udp)?
295 .control_mode(ControlMode::Dynamic)?
296 .reliable(false)?
297 .ttl(2)?
298 .endpoint("localhost:1235")?
299 .control("localhost:1234")?
300 .build(1024)?;
301 assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
302
303 let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
304 .ttl(5)?
305 .build(1024)?;
306
307 assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
308
309 let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
310
311 assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
312
313 Ok(())
314 }
315
316 pub const STREAM_ID: i32 = 1033;
317 pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
318 pub const CONTROL_ENDPOINT: &str = "localhost:23265";
319 pub const RECORDING_ENDPOINT: &str = "localhost:23266";
320 pub const LIVE_ENDPOINT: &str = "localhost:23267";
321 pub const REPLAY_ENDPOINT: &str = "localhost:0";
322 #[test]
325 #[serial]
326 fn test_simple_replay_merge() -> Result<(), AeronCError> {
327 let _ = env_logger::Builder::new()
328 .is_test(true)
329 .filter_level(log::LevelFilter::Info)
330 .try_init();
331
332 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
333 .expect("failed to kill all java processes");
334
335 assert!(is_udp_port_available(23265));
336 assert!(is_udp_port_available(23266));
337 assert!(is_udp_port_available(23267));
338 assert!(is_udp_port_available(23268));
339 let id = Aeron::nano_clock();
340 let aeron_dir = format!("target/aeron/{}/shm", id);
341 let archive_dir = format!("target/aeron/{}/archive", id);
342
343 info!("starting archive media driver");
344 let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
345 &aeron_dir,
346 &format!("{}/archive", aeron_dir),
347 ARCHIVE_CONTROL_REQUEST,
348 ARCHIVE_CONTROL_RESPONSE,
349 ARCHIVE_RECORDING_EVENTS,
350 )
351 .expect("Failed to start embedded media driver");
352
353 info!("connecting to archive");
354 let (archive, aeron) = media_driver
355 .archive_connect()
356 .expect("Could not connect to archive client");
357
358 let running = Arc::new(AtomicBool::new(true));
359
360 info!("connected to archive, adding publication");
361 assert!(!aeron.is_closed());
362
363 let (session_id, publisher_thread) =
364 reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
365
366 {
367 let context = AeronContext::new()?;
368 context.set_dir(&media_driver.aeron_dir)?;
369 let error_handler = Handler::leak(ErrorCount::default());
370 context.set_error_handler(Some(&error_handler))?;
371 let aeron = Aeron::new(&context)?;
372 aeron.start()?;
373 let aeron_archive_context = archive.get_archive_context();
374 let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
375 &aeron,
376 aeron_archive_context.get_control_request_channel(),
377 aeron_archive_context.get_control_response_channel(),
378 aeron_archive_context.get_recording_events_channel(),
379 )?;
380 aeron_archive_context.set_error_handler(Some(&error_handler))?;
381 let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
382 .poll_blocking(Duration::from_secs(30))
383 .expect("failed to connect to archive");
384 replay_merge_subscription(&archive, aeron.clone(), session_id)?;
385 }
386
387 running.store(false, Ordering::Release);
388 publisher_thread.join().unwrap();
389
390 Ok(())
391 }
392
393 fn reply_merge_publisher(
394 archive: &AeronArchive,
395 aeron: Aeron,
396 running: Arc<AtomicBool>,
397 ) -> Result<(i32, JoinHandle<()>), AeronCError> {
398 let publication = aeron.add_publication(
399 &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
401 .into_c_string(),
402 STREAM_ID,
403 Duration::from_secs(5),
404 )?;
405
406 info!(
407 "publication {} [status={:?}]",
408 publication.channel(),
409 publication.channel_status()
410 );
411 assert_eq!(1, publication.channel_status());
412
413 let session_id = publication.session_id();
414 let recording_channel = format!(
415 "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
417 );
418 info!("recording channel {}", recording_channel);
419 archive.start_recording(
420 &recording_channel.into_c_string(),
421 STREAM_ID,
422 SOURCE_LOCATION_REMOTE,
423 true,
424 )?;
425
426 info!("waiting for publisher to be connected");
427 while !publication.is_connected() {
428 thread::sleep(Duration::from_millis(100));
429 }
430 info!("publisher to be connected");
431 let counters_reader = aeron.counters_reader();
432 let mut caught_up_count = 0;
433 let publisher_thread = thread::spawn(move || {
434 let mut message_count = 0;
435
436 while running.load(Ordering::Acquire) {
437 let message = format!("{}{}", MESSAGE_PREFIX, message_count);
438 while publication.offer(
439 message.as_bytes(),
440 Handlers::no_reserved_value_supplier_handler(),
441 ) <= 0
442 {
443 thread::sleep(Duration::from_millis(10));
444 }
445 message_count += 1;
446 if message_count % 10_000 == 0 {
447 info!(
448 "Published {} messages [position={}]",
449 message_count,
450 publication.position()
451 );
452 }
453 if message_count > 10_000 {
455 while !publication.is_archive_position_with(0) {
457 thread::sleep(Duration::from_micros(300));
458 }
459 caught_up_count += 1;
460 }
461 }
462 assert!(caught_up_count > 0);
463 info!("Publisher thread terminated");
464 });
465 Ok((session_id, publisher_thread))
466 }
467
468 fn replay_merge_subscription(
469 archive: &AeronArchive,
470 aeron: Aeron,
471 session_id: i32,
472 ) -> Result<(), AeronCError> {
473 let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
475 info!("replay channel {:?}", replay_channel);
476
477 let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
478 info!("replay destination {:?}", replay_destination);
479
480 let live_destination =
481 format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
482 .into_c_string();
483 info!("live destination {:?}", live_destination);
484
485 let counters_reader = aeron.counters_reader();
486 let mut counter_id = -1;
487
488 while counter_id < 0 {
489 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
490 }
491 info!(
492 "counter id {} {:?}",
493 counter_id,
494 counters_reader.get_counter_label(counter_id, 1024)
495 );
496 info!(
497 "counter id {} position={:?}",
498 counter_id,
499 counters_reader.get_counter_value(counter_id)
500 );
501
502 let start_position = 0;
524 let recording_id = RecordingPos::get_recording_id_block(
525 &aeron.counters_reader(),
526 counter_id,
527 Duration::from_secs(5),
528 )?;
529
530 let subscribe_channel =
531 format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
532 info!("subscribe channel {:?}", subscribe_channel);
533 let subscription = aeron.add_subscription(
534 &subscribe_channel,
535 STREAM_ID,
536 Handlers::no_available_image_handler(),
537 Handlers::no_unavailable_image_handler(),
538 Duration::from_secs(5),
539 )?;
540
541 let replay_merge = AeronArchiveReplayMerge::new(
542 &subscription,
543 &archive,
544 &replay_channel,
545 &replay_destination,
546 &live_destination,
547 recording_id,
548 start_position,
549 Aeron::epoch_clock(),
550 10_000,
551 )?;
552
553 info!(
554 "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
555 recording_id,
556 start_position,
557 subscribe_channel,
558 &replay_channel,
559 &replay_destination,
560 &live_destination
561 );
562
563 info!(
574 "about to start_replay [maxRecordPosition={:?}]",
575 archive.get_max_recorded_position(recording_id)
576 );
577
578 let mut reply_count = 0;
579 while !replay_merge.is_merged() {
580 assert!(!replay_merge.has_failed());
581 if replay_merge.poll_once(
582 |buffer, _header| {
583 reply_count += 1;
584 if reply_count % 10_000 == 0 {
585 info!(
586 "replay-merge [count={}, isMerged={}, isLive={}]",
587 reply_count,
588 replay_merge.is_merged(),
589 replay_merge.is_live_added()
590 );
591 }
592 },
593 100,
594 )? == 0
595 {
596 let err = archive.poll_for_error_response_as_string(4096)?;
597 if !err.is_empty() {
598 panic!("{}", err);
599 }
600 if aeron.errmsg().len() > 0 && "no error" != aeron.errmsg() {
601 panic!("{}", aeron.errmsg());
602 }
603 archive.poll_for_recording_signals()?;
604 thread::sleep(Duration::from_millis(100));
605 }
606 }
607 assert!(!replay_merge.has_failed());
608 assert!(replay_merge.is_live_added());
609 assert!(reply_count > 10_000);
610 Ok(())
611 }
612
613 #[test]
614 fn version_check() {
615 let major = unsafe { crate::aeron_version_major() };
616 let minor = unsafe { crate::aeron_version_minor() };
617 let patch = unsafe { crate::aeron_version_patch() };
618
619 let aeron_version = format!("{}.{}.{}", major, minor, patch);
620
621 let cargo_version = "1.48.4";
622 assert_eq!(aeron_version, cargo_version);
623 }
624
625 use std::thread;
626
627 pub fn start_aeron_archive() -> Result<
628 (
629 Aeron,
630 AeronArchiveContext,
631 EmbeddedArchiveMediaDriverProcess,
632 ),
633 Box<dyn Error>,
634 > {
635 let id = Aeron::nano_clock();
636 let aeron_dir = format!("target/aeron/{}/shm", id);
637 let archive_dir = format!("target/aeron/{}/archive", id);
638
639 let request_port = find_unused_udp_port(8000).expect("Could not find port");
640 let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
641 let recording_event_port =
642 find_unused_udp_port(response_port + 1).expect("Could not find port");
643 let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
644 let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
645 let recording_events_channel =
646 &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
647 assert_ne!(request_control_channel, response_control_channel);
648
649 let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
650 &aeron_dir,
651 &archive_dir,
652 request_control_channel,
653 response_control_channel,
654 recording_events_channel,
655 )
656 .expect("Failed to start Java process");
657
658 let aeron_context = AeronContext::new()?;
659 aeron_context.set_dir(&aeron_dir.into_c_string())?;
660 aeron_context.set_client_name(&"test".into_c_string())?;
661 aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
662 AeronPublicationErrorFrameHandlerLogger,
663 )))?;
664 let error_handler = Handler::leak(ErrorCount::default());
665 aeron_context.set_error_handler(Some(&error_handler))?;
666 let aeron = Aeron::new(&aeron_context)?;
667 aeron.start()?;
668
669 let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
670 &aeron,
671 request_control_channel,
672 response_control_channel,
673 recording_events_channel,
674 )?;
675 archive_context.set_error_handler(Some(&error_handler))?;
676 Ok((aeron, archive_context, archive_media_driver))
677 }
678
679 #[test]
680 #[serial]
681 pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
682 let _ = env_logger::Builder::new()
683 .is_test(true)
684 .filter_level(log::LevelFilter::Info)
685 .try_init();
686 EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
687 .expect("failed to kill all java processes");
688
689 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
690
691 assert!(!aeron.is_closed());
692
693 info!("connected to aeron");
694
695 let archive_connector =
696 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
697 let archive = archive_connector
698 .poll_blocking(Duration::from_secs(30))
699 .expect("failed to connect to aeron archive media driver");
700
701 assert!(archive.get_archive_id() > 0);
702
703 let channel = AERON_IPC_STREAM;
704 let stream_id = 10;
705
706 let subscription_id =
707 archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
708
709 assert!(subscription_id >= 0);
710 info!("subscription id {}", subscription_id);
711
712 let publication = aeron
713 .async_add_exclusive_publication(channel, stream_id)?
714 .poll_blocking(Duration::from_secs(5))?;
715
716 for i in 0..11 {
717 while publication.offer(
718 "123456".as_bytes(),
719 Handlers::no_reserved_value_supplier_handler(),
720 ) <= 0
721 {
722 sleep(Duration::from_millis(50));
723 archive.poll_for_recording_signals()?;
724 let err = archive.poll_for_error_response_as_string(4096)?;
725 if !err.is_empty() {
726 panic!("{}", err);
727 }
728 archive.idle();
729 }
730 info!("sent message {i} [test_aeron_archive]");
731 }
732
733 archive.idle();
734 let session_id = publication.get_constants()?.session_id;
735 info!("publication session id {}", session_id);
736 let stop_position = publication.position();
738 info!(
739 "publication stop position {} [publication={:?}]",
740 stop_position,
741 publication.get_constants()
742 );
743 let counters_reader = aeron.counters_reader();
744 info!("counters reader ready {:?}", counters_reader);
745
746 let mut counter_id = -1;
747
748 let start = Instant::now();
749 while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
750 counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
751 info!("counter id {}", counter_id);
752 }
753
754 assert!(counter_id >= 0);
755
756 info!("counter id {counter_id}, session id {session_id}");
757 while counters_reader.get_counter_value(counter_id) < stop_position {
758 info!(
759 "current archive publication stop position {}",
760 counters_reader.get_counter_value(counter_id)
761 );
762 sleep(Duration::from_millis(50));
763 }
764 info!(
765 "found archive publication stop position {}",
766 counters_reader.get_counter_value(counter_id)
767 );
768
769 archive.stop_recording_channel_and_stream(channel, stream_id)?;
770 drop(publication);
771
772 info!("list recordings");
773 let found_recording_id = Cell::new(-1);
774 let start_pos = Cell::new(-1);
775 let end_pos = Cell::new(-1);
776 let start = Instant::now();
777 while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
778 let mut count = 0;
779 archive.list_recordings_for_uri_once(
780 &mut count,
781 0,
782 i32::MAX,
783 channel,
784 stream_id,
785 |d: AeronArchiveRecordingDescriptor| {
786 assert_eq!(d.stream_id, stream_id);
787 info!("found recording {:#?}", d);
788 info!(
789 "strippedChannel={}, originalChannel={}",
790 d.stripped_channel(),
791 d.original_channel()
792 );
793 if d.stop_position > d.start_position && d.stop_position > 0 {
794 found_recording_id.set(d.recording_id);
795 start_pos.set(d.start_position);
796 end_pos.set(d.stop_position);
797 }
798
799 let copy = d.clone_struct();
801 assert_eq!(copy.deref(), d.deref());
802 assert_eq!(copy.recording_id, d.recording_id);
803 assert_eq!(copy.control_session_id, d.control_session_id);
804 assert_eq!(copy.mtu_length, d.mtu_length);
805 assert_eq!(copy.source_identity_length, d.source_identity_length);
806 },
807 )?;
808 archive.poll_for_recording_signals()?;
809 let err = archive.poll_for_error_response_as_string(4096)?;
810 if !err.is_empty() {
811 panic!("{}", err);
812 }
813 }
814 assert!(start.elapsed() < Duration::from_secs(5));
815 info!("start replay");
816 let params = AeronArchiveReplayParams::new(
817 0,
818 i32::MAX,
819 start_pos.get(),
820 end_pos.get() - start_pos.get(),
821 0,
822 0,
823 )?;
824 info!("replay params {:#?}", params);
825 let replay_stream_id = 45;
826 let replay_session_id =
827 archive.start_replay(found_recording_id.get(), channel, replay_stream_id, ¶ms)?;
828 let session_id = replay_session_id as i32;
829
830 info!("replay session id {}", replay_session_id);
831 info!("session id {}", session_id);
832 let channel_replay =
833 format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
834 info!("archive id: {}", archive.get_archive_id());
835
836 info!("add subscription {:?}", channel_replay);
837 let subscription = aeron
838 .async_add_subscription(
839 &channel_replay,
840 replay_stream_id,
841 Some(&Handler::leak(AeronAvailableImageLogger)),
842 Some(&Handler::leak(AeronUnavailableImageLogger)),
843 )?
844 .poll_blocking(Duration::from_secs(10))?;
845
846 #[derive(Default)]
847 struct FragmentHandler {
848 count: Cell<usize>,
849 }
850
851 impl AeronFragmentHandlerCallback for FragmentHandler {
852 fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
853 assert_eq!(buffer, "123456".as_bytes());
854
855 self.count.set(self.count.get() + 1);
857 }
858 }
859
860 let poll = Handler::leak(FragmentHandler::default());
861
862 let start = Instant::now();
863 while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
864 {
865 let err = archive.poll_for_error_response_as_string(4096)?;
866 if !err.is_empty() {
867 panic!("{}", err);
868 }
869 }
870 assert!(
871 start.elapsed() < Duration::from_secs(10),
872 "messages not received {:?}",
873 poll.count
874 );
875 info!("aeron {:?}", aeron);
876 info!("ctx {:?}", archive_context);
877 assert_eq!(11, poll.count.get());
878 Ok(())
879 }
880
881 #[test]
882 #[serial]
883 fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
884 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
885 let archive_connector =
886 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
887 let archive = archive_connector
888 .poll_blocking(Duration::from_secs(30))
889 .expect("failed to connect to archive");
890
891 let invalid_channel = "invalid:channel".into_c_string();
892 let result =
893 archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
894 assert!(
895 result.is_err(),
896 "Expected error when starting recording with an invalid channel"
897 );
898 Ok(())
899 }
900
901 #[test]
902 #[serial]
903 fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
904 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
905 let archive_connector =
906 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
907 let archive = archive_connector
908 .poll_blocking(Duration::from_secs(30))
909 .expect("failed to connect to archive");
910
911 let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
912 let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
913 assert!(
914 result.is_err(),
915 "Expected error when stopping recording on a non-existent channel"
916 );
917 Ok(())
918 }
919
920 #[test]
921 #[serial]
922 fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
923 let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
924 let archive_connector =
925 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
926 let archive = archive_connector
927 .poll_blocking(Duration::from_secs(30))
928 .expect("failed to connect to archive");
929
930 let invalid_recording_id = -999;
931 let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
932 let result = archive.start_replay(
933 invalid_recording_id,
934 &"aeron:udp?endpoint=localhost:8888".into_c_string(),
935 STREAM_ID,
936 ¶ms,
937 );
938 assert!(
939 result.is_err(),
940 "Expected error when starting replay with an invalid recording id"
941 );
942 Ok(())
943 }
944
945 #[test]
946 #[serial]
947 fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
948 let (aeron, archive_context, media_driver) = start_aeron_archive()?;
949 let archive_connector =
950 AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
951 let archive = archive_connector
952 .poll_blocking(Duration::from_secs(30))
953 .expect("failed to connect to archive");
954
955 drop(archive);
956
957 let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
958 let new_archive = archive_connector
959 .poll_blocking(Duration::from_secs(30))
960 .expect("failed to reconnect to archive");
961 assert!(
962 new_archive.get_archive_id() > 0,
963 "Reconnected archive should have a valid archive id"
964 );
965
966 drop(media_driver);
967 Ok(())
968 }
969}