rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
15//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
16
17pub mod bindings {
18    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40    fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45    pub fn find_counter_id_by_session(
46        counter_reader: &AeronCountersReader,
47        session_id: i32,
48    ) -> i32 {
49        unsafe {
50            aeron_archive_recording_pos_find_counter_id_by_session_id(
51                counter_reader.get_inner(),
52                session_id,
53            )
54        }
55    }
56    pub fn find_counter_id_by_recording_id(
57        counter_reader: &AeronCountersReader,
58        recording_id: i64,
59    ) -> i32 {
60        unsafe {
61            aeron_archive_recording_pos_find_counter_id_by_recording_id(
62                counter_reader.get_inner(),
63                recording_id,
64            )
65        }
66    }
67
68    /// Return the recordingId embedded in the key of the given counter
69    /// if it is indeed a "recording position" counter. Otherwise return -1.
70    pub fn get_recording_id_block(
71        counters_reader: &AeronCountersReader,
72        counter_id: i32,
73        wait: Duration,
74    ) -> Result<i64, AeronCError> {
75        let mut result = Self::get_recording_id(counters_reader, counter_id);
76        let instant = Instant::now();
77
78        while result.is_err() && instant.elapsed() < wait {
79            result = Self::get_recording_id(counters_reader, counter_id);
80            #[cfg(debug_assertions)]
81            std::thread::sleep(Duration::from_millis(10));
82        }
83
84        return result;
85    }
86
87    /// Return the recordingId embedded in the key of the given counter
88    /// if it is indeed a "recording position" counter. Otherwise return -1.
89    pub fn get_recording_id(
90        counters_reader: &AeronCountersReader,
91        counter_id: i32,
92    ) -> Result<i64, AeronCError> {
93        /// The type id for an Aeron Archive recording position counter.
94        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
95        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97        /// from Aeron Java code
98        pub const RECORD_ALLOCATED: i32 = 1;
99
100        /// A constant to mean "no valid recording ID".
101        pub const NULL_RECORDING_ID: i64 = -1;
102
103        if counter_id < 0 {
104            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105        }
106
107        let state = counters_reader.counter_state(counter_id)?;
108        if state != RECORD_ALLOCATED {
109            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110        }
111
112        let type_id = counters_reader.counter_type_id(counter_id)?;
113        if type_id != RECORDING_POSITION_TYPE_ID {
114            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115        }
116
117        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
118        //    - offset 0..8 => the i64 recording_id
119        //    - offset 8..12 => the session_id (int)
120        //    etc...
121        // only need the first 8 bytes to get the recordingId.
122        let recording_id = Cell::new(-1);
123        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125                let mut val = [0u8; 8];
126                val.copy_from_slice(&key[0..8]);
127                let Ok(value) = i64::from_le_bytes(val).try_into();
128                recording_id.set(value);
129            }
130        });
131        let recording_id = recording_id.get();
132        if recording_id < 0 {
133            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134        }
135
136        Ok(recording_id)
137    }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141    _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
144    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145        data: std::ptr::null(),
146        length: 0,
147    });
148    Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152    pub fn aeron(&self) -> Aeron {
153        self.get_archive_context().get_aeron()
154    }
155}
156
157impl AeronArchiveAsyncConnect {
158    #[inline]
159    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
160    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161        let resource_async = Self::new(ctx)?;
162        resource_async.inner.add_dependency(aeron.clone());
163        Ok(resource_async)
164    }
165}
166
167macro_rules! impl_archive_position_methods {
168    ($pub_type:ty) => {
169        impl $pub_type {
170            /// Retrieves the current active live archive position using the Aeron counters.
171            /// Returns an error if not found.
172            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
173                if let Some(aeron) = self.inner.get_dependency::<Aeron>() {
174                    let counter_reader = &aeron.counters_reader();
175                    self.get_archive_position_with(counter_reader)
176                } else {
177                    Err(AeronCError::from_code(-1))
178                }
179            }
180
181            /// Retrieves the current active live archive position using the provided counter reader.
182            /// Returns an error if not found.
183            pub fn get_archive_position_with(
184                &self,
185                counters: &AeronCountersReader,
186            ) -> Result<i64, AeronCError> {
187                let session_id = self.get_constants()?.session_id();
188                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
189                if counter_id < 0 {
190                    return Err(AeronCError::from_code(counter_id));
191                }
192                let position = counters.get_counter_value(counter_id);
193                if position < 0 {
194                    return Err(AeronCError::from_code(position as i32));
195                }
196                Ok(position)
197            }
198
199            /// Checks if the publication's current position is within a specified inclusive length
200            /// of the archive position.
201            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
202                let archive_position = self.get_archive_position().unwrap_or(-1);
203                if archive_position < 0 {
204                    return false;
205                }
206                self.position() - archive_position <= length_inclusive as i64
207            }
208        }
209    };
210}
211
212impl_archive_position_methods!(AeronPublication);
213impl_archive_position_methods!(AeronExclusivePublication);
214
215impl AeronArchiveContext {
216    // The method below sets no credentials supplier, which is essential for the operation
217    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
218    // segmentation faults in the C bindings.
219    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
220        self.set_credentials_supplier(
221            Some(default_encoded_credentials),
222            None,
223            None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
224        )
225    }
226
227    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
228    /// If you do not set a credentials supplier, it will segfault.
229    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
230    pub fn new_with_no_credentials_supplier(
231        aeron: &Aeron,
232        request_control_channel: &str,
233        response_control_channel: &str,
234        recording_events_channel: &str,
235    ) -> Result<AeronArchiveContext, AeronCError> {
236        let context = Self::new()?;
237        context.set_no_credentials_supplier()?;
238        context.set_aeron(aeron)?;
239        context.set_control_request_channel(&request_control_channel.into_c_string())?;
240        context.set_control_response_channel(&response_control_channel.into_c_string())?;
241        context.set_recording_events_channel(&recording_events_channel.into_c_string())?;
242        // see https://github.com/mimran1980/rusteron/issues/18
243        context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
244        Ok(context)
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use log::{error, info};
252
253    use crate::testing::EmbeddedArchiveMediaDriverProcess;
254    use serial_test::serial;
255    use std::cell::Cell;
256    use std::error;
257    use std::error::Error;
258    use std::str::FromStr;
259    use std::sync::atomic::{AtomicBool, Ordering};
260    use std::sync::Arc;
261    use std::thread::{sleep, JoinHandle};
262    use std::time::{Duration, Instant};
263
264    #[derive(Default, Debug)]
265    struct ErrorCount {
266        error_count: usize,
267    }
268
269    impl AeronErrorHandlerCallback for ErrorCount {
270        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
271            error!("Aeron error {}: {}", error_code, msg);
272            self.error_count += 1;
273        }
274    }
275
276    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
277    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
278    pub const ARCHIVE_RECORDING_EVENTS: &str =
279        "aeron:udp?control-mode=dynamic|control=localhost:8012";
280
281    #[test]
282    fn test_uri_string_builder() -> Result<(), AeronCError> {
283        let builder = AeronUriStringBuilder::default();
284        builder.init_new()?;
285        builder
286            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
287            .mtu_length(1024 * 64)?
288            .set_initial_position(127424949617280, 1182294755, 65536)?;
289        let uri = builder.build(1024)?;
290        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
291
292        builder.init_new()?;
293        let uri = builder
294            .media(Media::Udp)?
295            .control_mode(ControlMode::Dynamic)?
296            .reliable(false)?
297            .ttl(2)?
298            .endpoint("localhost:1235")?
299            .control("localhost:1234")?
300            .build(1024)?;
301        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
302
303        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
304            .ttl(5)?
305            .build(1024)?;
306
307        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
308
309        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
310
311        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
312
313        Ok(())
314    }
315
316    pub const STREAM_ID: i32 = 1033;
317    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
318    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
319    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
320    pub const LIVE_ENDPOINT: &str = "localhost:23267";
321    pub const REPLAY_ENDPOINT: &str = "localhost:0";
322    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
323
324    #[test]
325    #[serial]
326    fn test_simple_replay_merge() -> Result<(), AeronCError> {
327        let _ = env_logger::Builder::new()
328            .is_test(true)
329            .filter_level(log::LevelFilter::Info)
330            .try_init();
331
332        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
333            .expect("failed to kill all java processes");
334
335        assert!(is_udp_port_available(23265));
336        assert!(is_udp_port_available(23266));
337        assert!(is_udp_port_available(23267));
338        assert!(is_udp_port_available(23268));
339        let id = Aeron::nano_clock();
340        let aeron_dir = format!("target/aeron/{}/shm", id);
341        let archive_dir = format!("target/aeron/{}/archive", id);
342
343        info!("starting archive media driver");
344        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
345            &aeron_dir,
346            &format!("{}/archive", aeron_dir),
347            ARCHIVE_CONTROL_REQUEST,
348            ARCHIVE_CONTROL_RESPONSE,
349            ARCHIVE_RECORDING_EVENTS,
350        )
351        .expect("Failed to start embedded media driver");
352
353        info!("connecting to archive");
354        let (archive, aeron) = media_driver
355            .archive_connect()
356            .expect("Could not connect to archive client");
357
358        let running = Arc::new(AtomicBool::new(true));
359
360        info!("connected to archive, adding publication");
361        assert!(!aeron.is_closed());
362
363        let (session_id, publisher_thread) =
364            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
365
366        {
367            let context = AeronContext::new()?;
368            context.set_dir(&media_driver.aeron_dir)?;
369            let error_handler = Handler::leak(ErrorCount::default());
370            context.set_error_handler(Some(&error_handler))?;
371            let aeron = Aeron::new(&context)?;
372            aeron.start()?;
373            let aeron_archive_context = archive.get_archive_context();
374            let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
375                &aeron,
376                aeron_archive_context.get_control_request_channel(),
377                aeron_archive_context.get_control_response_channel(),
378                aeron_archive_context.get_recording_events_channel(),
379            )?;
380            aeron_archive_context.set_error_handler(Some(&error_handler))?;
381            let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
382                .poll_blocking(Duration::from_secs(30))
383                .expect("failed to connect to archive");
384            replay_merge_subscription(&archive, aeron.clone(), session_id)?;
385        }
386
387        running.store(false, Ordering::Release);
388        publisher_thread.join().unwrap();
389
390        Ok(())
391    }
392
393    fn reply_merge_publisher(
394        archive: &AeronArchive,
395        aeron: Aeron,
396        running: Arc<AtomicBool>,
397    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
398        let publication = aeron.add_publication(
399            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
400            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536")
401                .into_c_string(),
402            STREAM_ID,
403            Duration::from_secs(5),
404        )?;
405
406        info!(
407            "publication {} [status={:?}]",
408            publication.channel(),
409            publication.channel_status()
410        );
411        assert_eq!(1, publication.channel_status());
412
413        let session_id = publication.session_id();
414        let recording_channel = format!(
415            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
416            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
417        );
418        info!("recording channel {}", recording_channel);
419        archive.start_recording(
420            &recording_channel.into_c_string(),
421            STREAM_ID,
422            SOURCE_LOCATION_REMOTE,
423            true,
424        )?;
425
426        info!("waiting for publisher to be connected");
427        while !publication.is_connected() {
428            thread::sleep(Duration::from_millis(100));
429        }
430        info!("publisher to be connected");
431        let counters_reader = aeron.counters_reader();
432        let mut caught_up_count = 0;
433        let publisher_thread = thread::spawn(move || {
434            let mut message_count = 0;
435
436            while running.load(Ordering::Acquire) {
437                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
438                while publication.offer(
439                    message.as_bytes(),
440                    Handlers::no_reserved_value_supplier_handler(),
441                ) <= 0
442                {
443                    thread::sleep(Duration::from_millis(10));
444                }
445                message_count += 1;
446                if message_count % 10_000 == 0 {
447                    info!(
448                        "Published {} messages [position={}]",
449                        message_count,
450                        publication.position()
451                    );
452                }
453                // slow down publishing so can catch up
454                if message_count > 10_000 {
455                    // ensure archiver is caught up
456                    while !publication.is_archive_position_with(0) {
457                        thread::sleep(Duration::from_micros(300));
458                    }
459                    caught_up_count += 1;
460                }
461            }
462            assert!(caught_up_count > 0);
463            info!("Publisher thread terminated");
464        });
465        Ok((session_id, publisher_thread))
466    }
467
468    fn replay_merge_subscription(
469        archive: &AeronArchive,
470        aeron: Aeron,
471        session_id: i32,
472    ) -> Result<(), AeronCError> {
473        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
474        let replay_channel = format!("aeron:udp?session-id={session_id}").into_c_string();
475        info!("replay channel {:?}", replay_channel);
476
477        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}").into_c_string();
478        info!("replay destination {:?}", replay_destination);
479
480        let live_destination =
481            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}")
482                .into_c_string();
483        info!("live destination {:?}", live_destination);
484
485        let counters_reader = aeron.counters_reader();
486        let mut counter_id = -1;
487
488        while counter_id < 0 {
489            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
490        }
491        info!(
492            "counter id {} {:?}",
493            counter_id,
494            counters_reader.get_counter_label(counter_id, 1024)
495        );
496        info!(
497            "counter id {} position={:?}",
498            counter_id,
499            counters_reader.get_counter_value(counter_id)
500        );
501
502        // let recording_id = Cell::new(-1);
503        // let start_position = Cell::new(-1);
504
505        // let mut count = 0;
506        // assert!(
507        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
508        //         info!("Recording descriptor: {:?}", descriptor);
509        //         recording_id.set(descriptor.recording_id);
510        //         start_position.set(descriptor.start_position);
511        //         assert_eq!(descriptor.session_id, session_id);
512        //         assert_eq!(descriptor.stream_id, STREAM_ID);
513        //     })? >= 0
514        // );
515        // assert!(count > 0);
516        // assert!(recording_id.get() >= 0);
517
518        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
519        // assert_eq!(recording_id.get(), record_id);
520        //
521        // let recording_id = recording_id.get();
522        // let start_position = start_position.get();
523        let start_position = 0;
524        let recording_id = RecordingPos::get_recording_id_block(
525            &aeron.counters_reader(),
526            counter_id,
527            Duration::from_secs(5),
528        )?;
529
530        let subscribe_channel =
531            format!("aeron:udp?control-mode=manual|session-id={session_id}").into_c_string();
532        info!("subscribe channel {:?}", subscribe_channel);
533        let subscription = aeron.add_subscription(
534            &subscribe_channel,
535            STREAM_ID,
536            Handlers::no_available_image_handler(),
537            Handlers::no_unavailable_image_handler(),
538            Duration::from_secs(5),
539        )?;
540
541        let replay_merge = AeronArchiveReplayMerge::new(
542            &subscription,
543            &archive,
544            &replay_channel,
545            &replay_destination,
546            &live_destination,
547            recording_id,
548            start_position,
549            Aeron::epoch_clock(),
550            10_000,
551        )?;
552
553        info!(
554            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={:?}, replayChannel={:?}, replayDestination={:?}, liveDestination={:?}",
555            recording_id,
556            start_position,
557            subscribe_channel,
558            &replay_channel,
559            &replay_destination,
560            &live_destination
561        );
562
563        // media_driver
564        //     .run_aeron_stats()
565        //     .expect("Failed to run aeron stats");
566
567        // info!("Waiting for subscription to connect...");
568        // while !subscription.is_connected() {
569        //     thread::sleep(Duration::from_millis(100));
570        // }
571        // info!("Subscription connected");
572
573        info!(
574            "about to start_replay [maxRecordPosition={:?}]",
575            archive.get_max_recorded_position(recording_id)
576        );
577
578        let mut reply_count = 0;
579        while !replay_merge.is_merged() {
580            assert!(!replay_merge.has_failed());
581            if replay_merge.poll_once(
582                |buffer, _header| {
583                    reply_count += 1;
584                    if reply_count % 10_000 == 0 {
585                        info!(
586                            "replay-merge [count={}, isMerged={}, isLive={}]",
587                            reply_count,
588                            replay_merge.is_merged(),
589                            replay_merge.is_live_added()
590                        );
591                    }
592                },
593                100,
594            )? == 0
595            {
596                let err = archive.poll_for_error_response_as_string(4096)?;
597                if !err.is_empty() {
598                    panic!("{}", err);
599                }
600                if aeron.errmsg().len() > 0 && "no error" != aeron.errmsg() {
601                    panic!("{}", aeron.errmsg());
602                }
603                archive.poll_for_recording_signals()?;
604                thread::sleep(Duration::from_millis(100));
605            }
606        }
607        assert!(!replay_merge.has_failed());
608        assert!(replay_merge.is_live_added());
609        assert!(reply_count > 10_000);
610        Ok(())
611    }
612
613    #[test]
614    fn version_check() {
615        let major = unsafe { crate::aeron_version_major() };
616        let minor = unsafe { crate::aeron_version_minor() };
617        let patch = unsafe { crate::aeron_version_patch() };
618
619        let aeron_version = format!("{}.{}.{}", major, minor, patch);
620
621        let cargo_version = "1.48.4";
622        assert_eq!(aeron_version, cargo_version);
623    }
624
625    use std::thread;
626
627    pub fn start_aeron_archive() -> Result<
628        (
629            Aeron,
630            AeronArchiveContext,
631            EmbeddedArchiveMediaDriverProcess,
632        ),
633        Box<dyn Error>,
634    > {
635        let id = Aeron::nano_clock();
636        let aeron_dir = format!("target/aeron/{}/shm", id);
637        let archive_dir = format!("target/aeron/{}/archive", id);
638
639        let request_port = find_unused_udp_port(8000).expect("Could not find port");
640        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
641        let recording_event_port =
642            find_unused_udp_port(response_port + 1).expect("Could not find port");
643        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
644        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
645        let recording_events_channel =
646            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
647        assert_ne!(request_control_channel, response_control_channel);
648
649        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
650            &aeron_dir,
651            &archive_dir,
652            request_control_channel,
653            response_control_channel,
654            recording_events_channel,
655        )
656        .expect("Failed to start Java process");
657
658        let aeron_context = AeronContext::new()?;
659        aeron_context.set_dir(&aeron_dir.into_c_string())?;
660        aeron_context.set_client_name(&"test".into_c_string())?;
661        aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
662            AeronPublicationErrorFrameHandlerLogger,
663        )))?;
664        let error_handler = Handler::leak(ErrorCount::default());
665        aeron_context.set_error_handler(Some(&error_handler))?;
666        let aeron = Aeron::new(&aeron_context)?;
667        aeron.start()?;
668
669        let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
670            &aeron,
671            request_control_channel,
672            response_control_channel,
673            recording_events_channel,
674        )?;
675        archive_context.set_error_handler(Some(&error_handler))?;
676        Ok((aeron, archive_context, archive_media_driver))
677    }
678
679    #[test]
680    #[serial]
681    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
682        let _ = env_logger::Builder::new()
683            .is_test(true)
684            .filter_level(log::LevelFilter::Info)
685            .try_init();
686        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
687            .expect("failed to kill all java processes");
688
689        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
690
691        assert!(!aeron.is_closed());
692
693        info!("connected to aeron");
694
695        let archive_connector =
696            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
697        let archive = archive_connector
698            .poll_blocking(Duration::from_secs(30))
699            .expect("failed to connect to aeron archive media driver");
700
701        assert!(archive.get_archive_id() > 0);
702
703        let channel = AERON_IPC_STREAM;
704        let stream_id = 10;
705
706        let subscription_id =
707            archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
708
709        assert!(subscription_id >= 0);
710        info!("subscription id {}", subscription_id);
711
712        let publication = aeron
713            .async_add_exclusive_publication(channel, stream_id)?
714            .poll_blocking(Duration::from_secs(5))?;
715
716        for i in 0..11 {
717            while publication.offer(
718                "123456".as_bytes(),
719                Handlers::no_reserved_value_supplier_handler(),
720            ) <= 0
721            {
722                sleep(Duration::from_millis(50));
723                archive.poll_for_recording_signals()?;
724                let err = archive.poll_for_error_response_as_string(4096)?;
725                if !err.is_empty() {
726                    panic!("{}", err);
727                }
728                archive.idle();
729            }
730            info!("sent message {i} [test_aeron_archive]");
731        }
732
733        archive.idle();
734        let session_id = publication.get_constants()?.session_id;
735        info!("publication session id {}", session_id);
736        // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
737        let stop_position = publication.position();
738        info!(
739            "publication stop position {} [publication={:?}]",
740            stop_position,
741            publication.get_constants()
742        );
743        let counters_reader = aeron.counters_reader();
744        info!("counters reader ready {:?}", counters_reader);
745
746        let mut counter_id = -1;
747
748        let start = Instant::now();
749        while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
750            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
751            info!("counter id {}", counter_id);
752        }
753
754        assert!(counter_id >= 0);
755
756        info!("counter id {counter_id}, session id {session_id}");
757        while counters_reader.get_counter_value(counter_id) < stop_position {
758            info!(
759                "current archive publication stop position {}",
760                counters_reader.get_counter_value(counter_id)
761            );
762            sleep(Duration::from_millis(50));
763        }
764        info!(
765            "found archive publication stop position {}",
766            counters_reader.get_counter_value(counter_id)
767        );
768
769        archive.stop_recording_channel_and_stream(channel, stream_id)?;
770        drop(publication);
771
772        info!("list recordings");
773        let found_recording_id = Cell::new(-1);
774        let start_pos = Cell::new(-1);
775        let end_pos = Cell::new(-1);
776        let start = Instant::now();
777        while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
778            let mut count = 0;
779            archive.list_recordings_for_uri_once(
780                &mut count,
781                0,
782                i32::MAX,
783                channel,
784                stream_id,
785                |d: AeronArchiveRecordingDescriptor| {
786                    assert_eq!(d.stream_id, stream_id);
787                    info!("found recording {:#?}", d);
788                    info!(
789                        "strippedChannel={}, originalChannel={}",
790                        d.stripped_channel(),
791                        d.original_channel()
792                    );
793                    if d.stop_position > d.start_position && d.stop_position > 0 {
794                        found_recording_id.set(d.recording_id);
795                        start_pos.set(d.start_position);
796                        end_pos.set(d.stop_position);
797                    }
798
799                    // verify clone_struct works
800                    let copy = d.clone_struct();
801                    assert_eq!(copy.deref(), d.deref());
802                    assert_eq!(copy.recording_id, d.recording_id);
803                    assert_eq!(copy.control_session_id, d.control_session_id);
804                    assert_eq!(copy.mtu_length, d.mtu_length);
805                    assert_eq!(copy.source_identity_length, d.source_identity_length);
806                },
807            )?;
808            archive.poll_for_recording_signals()?;
809            let err = archive.poll_for_error_response_as_string(4096)?;
810            if !err.is_empty() {
811                panic!("{}", err);
812            }
813        }
814        assert!(start.elapsed() < Duration::from_secs(5));
815        info!("start replay");
816        let params = AeronArchiveReplayParams::new(
817            0,
818            i32::MAX,
819            start_pos.get(),
820            end_pos.get() - start_pos.get(),
821            0,
822            0,
823        )?;
824        info!("replay params {:#?}", params);
825        let replay_stream_id = 45;
826        let replay_session_id =
827            archive.start_replay(found_recording_id.get(), channel, replay_stream_id, &params)?;
828        let session_id = replay_session_id as i32;
829
830        info!("replay session id {}", replay_session_id);
831        info!("session id {}", session_id);
832        let channel_replay =
833            format!("{}?session-id={}", channel.to_str().unwrap(), session_id).into_c_string();
834        info!("archive id: {}", archive.get_archive_id());
835
836        info!("add subscription {:?}", channel_replay);
837        let subscription = aeron
838            .async_add_subscription(
839                &channel_replay,
840                replay_stream_id,
841                Some(&Handler::leak(AeronAvailableImageLogger)),
842                Some(&Handler::leak(AeronUnavailableImageLogger)),
843            )?
844            .poll_blocking(Duration::from_secs(10))?;
845
846        #[derive(Default)]
847        struct FragmentHandler {
848            count: Cell<usize>,
849        }
850
851        impl AeronFragmentHandlerCallback for FragmentHandler {
852            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
853                assert_eq!(buffer, "123456".as_bytes());
854
855                // Update count (using Cell for interior mutability)
856                self.count.set(self.count.get() + 1);
857            }
858        }
859
860        let poll = Handler::leak(FragmentHandler::default());
861
862        let start = Instant::now();
863        while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
864        {
865            let err = archive.poll_for_error_response_as_string(4096)?;
866            if !err.is_empty() {
867                panic!("{}", err);
868            }
869        }
870        assert!(
871            start.elapsed() < Duration::from_secs(10),
872            "messages not received {:?}",
873            poll.count
874        );
875        info!("aeron {:?}", aeron);
876        info!("ctx {:?}", archive_context);
877        assert_eq!(11, poll.count.get());
878        Ok(())
879    }
880
881    #[test]
882    #[serial]
883    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
884        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
885        let archive_connector =
886            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
887        let archive = archive_connector
888            .poll_blocking(Duration::from_secs(30))
889            .expect("failed to connect to archive");
890
891        let invalid_channel = "invalid:channel".into_c_string();
892        let result =
893            archive.start_recording(&invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
894        assert!(
895            result.is_err(),
896            "Expected error when starting recording with an invalid channel"
897        );
898        Ok(())
899    }
900
901    #[test]
902    #[serial]
903    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
904        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
905        let archive_connector =
906            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
907        let archive = archive_connector
908            .poll_blocking(Duration::from_secs(30))
909            .expect("failed to connect to archive");
910
911        let nonexistent_channel = &"aeron:udp?endpoint=localhost:9999".into_c_string();
912        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
913        assert!(
914            result.is_err(),
915            "Expected error when stopping recording on a non-existent channel"
916        );
917        Ok(())
918    }
919
920    #[test]
921    #[serial]
922    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
923        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
924        let archive_connector =
925            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
926        let archive = archive_connector
927            .poll_blocking(Duration::from_secs(30))
928            .expect("failed to connect to archive");
929
930        let invalid_recording_id = -999;
931        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
932        let result = archive.start_replay(
933            invalid_recording_id,
934            &"aeron:udp?endpoint=localhost:8888".into_c_string(),
935            STREAM_ID,
936            &params,
937        );
938        assert!(
939            result.is_err(),
940            "Expected error when starting replay with an invalid recording id"
941        );
942        Ok(())
943    }
944
945    #[test]
946    #[serial]
947    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
948        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
949        let archive_connector =
950            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
951        let archive = archive_connector
952            .poll_blocking(Duration::from_secs(30))
953            .expect("failed to connect to archive");
954
955        drop(archive);
956
957        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
958        let new_archive = archive_connector
959            .poll_blocking(Duration::from_secs(30))
960            .expect("failed to reconnect to archive");
961        assert!(
962            new_archive.get_archive_id() > 0,
963            "Reconnected archive should have a valid archive id"
964        );
965
966        drop(media_driver);
967        Ok(())
968    }
969}