rusteron_archive/
lib.rs

1/**/
2#![allow(non_upper_case_globals)]
3#![allow(non_camel_case_types)]
4#![allow(non_snake_case)]
5#![allow(clippy::all)]
6#![allow(unused_unsafe)]
7#![allow(unused_variables)]
8#![doc = include_str!("../README.md")]
9//! # Features
10//!
11//! - **`static`**: When enabled, this feature statically links the Aeron C code.
12//!   By default, the library uses dynamic linking to the Aeron C libraries.
13//! - **`backtrace`** - When enabled will log a backtrace for each AeronCError
14//! - **`extra-logging`** - When enabled will log when resource is created and destroyed. useful if your seeing a segfault due to a resource being closed
15//! - **`precompile`** - When enabled will use precompiled c code instead of requiring cmake and java to me installed
16
17pub mod bindings {
18    include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
19}
20
21use bindings::*;
22use std::cell::Cell;
23use std::os::raw::c_int;
24use std::time::{Duration, Instant};
25
26pub mod testing;
27
28include!(concat!(env!("OUT_DIR"), "/aeron.rs"));
29include!(concat!(env!("OUT_DIR"), "/aeron_custom.rs"));
30
31pub type SourceLocation = bindings::aeron_archive_source_location_t;
32pub const SOURCE_LOCATION_LOCAL: aeron_archive_source_location_en =
33    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_LOCAL;
34pub const SOURCE_LOCATION_REMOTE: aeron_archive_source_location_en =
35    SourceLocation::AERON_ARCHIVE_SOURCE_LOCATION_REMOTE;
36
37pub struct NoOpAeronIdleStrategyFunc;
38
39impl AeronIdleStrategyFuncCallback for NoOpAeronIdleStrategyFunc {
40    fn handle_aeron_idle_strategy_func(&mut self, _work_count: c_int) -> () {}
41}
42
43pub struct RecordingPos;
44impl RecordingPos {
45    pub fn find_counter_id_by_session(
46        counter_reader: &AeronCountersReader,
47        session_id: i32,
48    ) -> i32 {
49        unsafe {
50            aeron_archive_recording_pos_find_counter_id_by_session_id(
51                counter_reader.get_inner(),
52                session_id,
53            )
54        }
55    }
56    pub fn find_counter_id_by_recording_id(
57        counter_reader: &AeronCountersReader,
58        recording_id: i64,
59    ) -> i32 {
60        unsafe {
61            aeron_archive_recording_pos_find_counter_id_by_recording_id(
62                counter_reader.get_inner(),
63                recording_id,
64            )
65        }
66    }
67
68    /// Return the recordingId embedded in the key of the given counter
69    /// if it is indeed a "recording position" counter. Otherwise return -1.
70    pub fn get_recording_id_block(
71        counters_reader: &AeronCountersReader,
72        counter_id: i32,
73        wait: Duration,
74    ) -> Result<i64, AeronCError> {
75        let mut result = Self::get_recording_id(counters_reader, counter_id);
76        let instant = Instant::now();
77
78        while result.is_err() && instant.elapsed() < wait {
79            result = Self::get_recording_id(counters_reader, counter_id);
80            #[cfg(debug_assertions)]
81            std::thread::sleep(Duration::from_millis(10));
82        }
83
84        return result;
85    }
86
87    /// Return the recordingId embedded in the key of the given counter
88    /// if it is indeed a "recording position" counter. Otherwise return -1.
89    pub fn get_recording_id(
90        counters_reader: &AeronCountersReader,
91        counter_id: i32,
92    ) -> Result<i64, AeronCError> {
93        /// The type id for an Aeron Archive recording position counter.
94        /// In Aeron Java, this is AeronCounters.ARCHIVE_RECORDING_POSITION_TYPE_ID (which is typically 100).
95        pub const RECORDING_POSITION_TYPE_ID: i32 = 100;
96
97        /// from Aeron Java code
98        pub const RECORD_ALLOCATED: i32 = 1;
99
100        /// A constant to mean "no valid recording ID".
101        pub const NULL_RECORDING_ID: i64 = -1;
102
103        if counter_id < 0 {
104            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
105        }
106
107        let state = counters_reader.counter_state(counter_id)?;
108        if state != RECORD_ALLOCATED {
109            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
110        }
111
112        let type_id = counters_reader.counter_type_id(counter_id)?;
113        if type_id != RECORDING_POSITION_TYPE_ID {
114            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
115        }
116
117        // Read the key area. For a RECORDING_POSITION_TYPE_ID counter:
118        //    - offset 0..8 => the i64 recording_id
119        //    - offset 8..12 => the session_id (int)
120        //    etc...
121        // only need the first 8 bytes to get the recordingId.
122        let recording_id = Cell::new(-1);
123        counters_reader.foreach_counter_once(|value, id, type_id, key, label| {
124            if id == counter_id && type_id == RECORDING_POSITION_TYPE_ID {
125                let mut val = [0u8; 8];
126                val.copy_from_slice(&key[0..8]);
127                let Ok(value) = i64::from_le_bytes(val).try_into();
128                recording_id.set(value);
129            }
130        });
131        let recording_id = recording_id.get();
132        if recording_id < 0 {
133            return Err(AeronCError::from_code(NULL_RECORDING_ID as i32));
134        }
135
136        Ok(recording_id)
137    }
138}
139
140unsafe extern "C" fn default_encoded_credentials(
141    _clientd: *mut std::os::raw::c_void,
142) -> *mut aeron_archive_encoded_credentials_t {
143    // Allocate a zeroed instance of `aeron_archive_encoded_credentials_t`
144    let empty_credentials = Box::new(aeron_archive_encoded_credentials_t {
145        data: std::ptr::null(),
146        length: 0,
147    });
148    Box::into_raw(empty_credentials)
149}
150
151impl AeronArchive {
152    pub fn aeron(&self) -> Aeron {
153        self.get_archive_context().get_aeron()
154    }
155}
156
157impl AeronArchiveAsyncConnect {
158    #[inline]
159    /// recommend using this method instead of standard `new` as it will link the archive to aeron so if a drop occurs archive is dropped before aeron
160    pub fn new_with_aeron(ctx: &AeronArchiveContext, aeron: &Aeron) -> Result<Self, AeronCError> {
161        let resource_async = Self::new(ctx)?;
162        resource_async
163            .owned_inner
164            .clone()
165            .unwrap()
166            .add_dependency(aeron.clone());
167        Ok(resource_async)
168    }
169}
170
171macro_rules! impl_archive_position_methods {
172    ($pub_type:ty) => {
173        impl $pub_type {
174            /// Retrieves the current active live archive position using the Aeron counters.
175            /// Returns an error if not found.
176            pub fn get_archive_position(&self) -> Result<i64, AeronCError> {
177                if let Some(aeron) = self.owned_inner.clone().unwrap().get_dependency::<Aeron>() {
178                    let counter_reader = &aeron.counters_reader();
179                    self.get_archive_position_with(counter_reader)
180                } else {
181                    Err(AeronCError::from_code(-1))
182                }
183            }
184
185            /// Retrieves the current active live archive position using the provided counter reader.
186            /// Returns an error if not found.
187            pub fn get_archive_position_with(
188                &self,
189                counters: &AeronCountersReader,
190            ) -> Result<i64, AeronCError> {
191                let session_id = self.get_constants()?.session_id();
192                let counter_id = RecordingPos::find_counter_id_by_session(counters, session_id);
193                if counter_id < 0 {
194                    return Err(AeronCError::from_code(counter_id));
195                }
196                let position = counters.get_counter_value(counter_id);
197                if position < 0 {
198                    return Err(AeronCError::from_code(position as i32));
199                }
200                Ok(position)
201            }
202
203            /// Checks if the publication's current position is within a specified inclusive length
204            /// of the archive position.
205            pub fn is_archive_position_with(&self, length_inclusive: usize) -> bool {
206                let archive_position = self.get_archive_position().unwrap_or(-1);
207                if archive_position < 0 {
208                    return false;
209                }
210                self.position() - archive_position <= length_inclusive as i64
211            }
212        }
213    };
214}
215
216impl_archive_position_methods!(AeronPublication);
217impl_archive_position_methods!(AeronExclusivePublication);
218
219impl AeronArchiveContext {
220    // The method below sets no credentials supplier, which is essential for the operation
221    // of the Aeron Archive Context. The `set_credentials_supplier` must be set to prevent
222    // segmentation faults in the C bindings.
223    pub fn set_no_credentials_supplier(&self) -> Result<i32, AeronCError> {
224        self.set_credentials_supplier(
225            Some(default_encoded_credentials),
226            None,
227            None::<&Handler<AeronArchiveCredentialsFreeFuncLogger>>,
228        )
229    }
230
231    /// This method creates a new `AeronArchiveContext` with a no-op credentials supplier.
232    /// If you do not set a credentials supplier, it will segfault.
233    /// This method ensures that a non-functional credentials supplier is set to avoid the segfault.
234    pub fn new_with_no_credentials_supplier(
235        aeron: &Aeron,
236        request_control_channel: &str,
237        response_control_channel: &str,
238        recording_events_channel: &str,
239    ) -> Result<AeronArchiveContext, AeronCError> {
240        let context = Self::new()?;
241        context.set_no_credentials_supplier()?;
242        context.set_aeron(aeron)?;
243        context.set_control_request_channel(request_control_channel)?;
244        context.set_control_response_channel(response_control_channel)?;
245        context.set_recording_events_channel(recording_events_channel)?;
246        // see https://github.com/mimran1980/rusteron/issues/18
247        context.set_idle_strategy(Some(&Handler::leak(NoOpAeronIdleStrategyFunc)))?;
248        Ok(context)
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use log::{error, info};
256
257    use crate::testing::EmbeddedArchiveMediaDriverProcess;
258    use serial_test::serial;
259    use std::cell::Cell;
260    use std::error;
261    use std::error::Error;
262    use std::str::FromStr;
263    use std::sync::atomic::{AtomicBool, Ordering};
264    use std::sync::Arc;
265    use std::thread::{sleep, JoinHandle};
266    use std::time::{Duration, Instant};
267
268    #[derive(Default, Debug)]
269    struct ErrorCount {
270        error_count: usize,
271    }
272
273    impl AeronErrorHandlerCallback for ErrorCount {
274        fn handle_aeron_error_handler(&mut self, error_code: c_int, msg: &str) {
275            error!("Aeron error {}: {}", error_code, msg);
276            self.error_count += 1;
277        }
278    }
279
280    pub const ARCHIVE_CONTROL_REQUEST: &str = "aeron:udp?endpoint=localhost:8010";
281    pub const ARCHIVE_CONTROL_RESPONSE: &str = "aeron:udp?endpoint=localhost:8011";
282    pub const ARCHIVE_RECORDING_EVENTS: &str =
283        "aeron:udp?control-mode=dynamic|control=localhost:8012";
284
285    #[test]
286    fn test_uri_string_builder() -> Result<(), AeronCError> {
287        let builder = AeronUriStringBuilder::default();
288        builder.init_new()?;
289        builder
290            .media(Media::Udp)? // very important to set media else set_initial_position will give an error of -1
291            .mtu_length(1024 * 64)?
292            .set_initial_position(127424949617280, 1182294755, 65536)?;
293        let uri = builder.build(1024)?;
294        assert_eq!("aeron:udp?term-id=-1168322114|term-length=65536|mtu=65536|init-term-id=1182294755|term-offset=33408", uri);
295
296        builder.init_new()?;
297        let uri = builder
298            .media(Media::Udp)?
299            .control_mode(ControlMode::Dynamic)?
300            .reliable(false)?
301            .ttl(2)?
302            .endpoint("localhost:1235")?
303            .control("localhost:1234")?
304            .build(1024)?;
305        assert_eq!("aeron:udp?ttl=2|control-mode=dynamic|endpoint=localhost:1235|control=localhost:1234|reliable=false", uri);
306
307        let uri = AeronUriStringBuilder::from_str("aeron:udp?endpoint=localhost:8010")?
308            .ttl(5)?
309            .build(1024)?;
310
311        assert_eq!("aeron:udp?ttl=5|endpoint=localhost:8010", uri);
312
313        let uri = uri.parse::<AeronUriStringBuilder>()?.ttl(6)?.build(1024)?;
314
315        assert_eq!("aeron:udp?ttl=6|endpoint=localhost:8010", uri);
316
317        Ok(())
318    }
319
320    pub const STREAM_ID: i32 = 1033;
321    pub const MESSAGE_PREFIX: &str = "Message-Prefix-";
322    pub const CONTROL_ENDPOINT: &str = "localhost:23265";
323    pub const RECORDING_ENDPOINT: &str = "localhost:23266";
324    pub const LIVE_ENDPOINT: &str = "localhost:23267";
325    pub const REPLAY_ENDPOINT: &str = "localhost:0";
326    // pub const REPLAY_ENDPOINT: &str = "localhost:23268";
327
328    #[test]
329    #[serial]
330    fn test_simple_replay_merge() -> Result<(), AeronCError> {
331        let _ = env_logger::Builder::new()
332            .is_test(true)
333            .filter_level(log::LevelFilter::Info)
334            .try_init();
335
336        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
337            .expect("failed to kill all java processes");
338
339        assert!(is_udp_port_available(23265));
340        assert!(is_udp_port_available(23266));
341        assert!(is_udp_port_available(23267));
342        assert!(is_udp_port_available(23268));
343        let id = Aeron::nano_clock();
344        let aeron_dir = format!("target/aeron/{}/shm", id);
345        let archive_dir = format!("target/aeron/{}/archive", id);
346
347        info!("starting archive media driver");
348        let media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
349            &aeron_dir,
350            &format!("{}/archive", aeron_dir),
351            ARCHIVE_CONTROL_REQUEST,
352            ARCHIVE_CONTROL_RESPONSE,
353            ARCHIVE_RECORDING_EVENTS,
354        )
355        .expect("Failed to start embedded media driver");
356
357        info!("connecting to archive");
358        let (archive, aeron) = media_driver
359            .archive_connect()
360            .expect("Could not connect to archive client");
361
362        let running = Arc::new(AtomicBool::new(true));
363
364        info!("connected to archive, adding publication");
365        assert!(!aeron.is_closed());
366
367        let (session_id, publisher_thread) =
368            reply_merge_publisher(&archive, aeron.clone(), running.clone())?;
369
370        {
371            let context = AeronContext::new()?;
372            context.set_dir(&media_driver.aeron_dir)?;
373            let error_handler = Handler::leak(ErrorCount::default());
374            context.set_error_handler(Some(&error_handler))?;
375            let aeron = Aeron::new(&context)?;
376            aeron.start()?;
377            let aeron_archive_context = archive.get_archive_context();
378            let aeron_archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
379                &aeron,
380                aeron_archive_context.get_control_request_channel(),
381                aeron_archive_context.get_control_response_channel(),
382                aeron_archive_context.get_recording_events_channel(),
383            )?;
384            aeron_archive_context.set_error_handler(Some(&error_handler))?;
385            let archive = AeronArchiveAsyncConnect::new_with_aeron(&aeron_archive_context, &aeron)?
386                .poll_blocking(Duration::from_secs(30))
387                .expect("failed to connect to archive");
388            replay_merge_subscription(&archive, aeron.clone(), session_id)?;
389        }
390
391        running.store(false, Ordering::Release);
392        publisher_thread.join().unwrap();
393
394        Ok(())
395    }
396
397    fn reply_merge_publisher(
398        archive: &AeronArchive,
399        aeron: Aeron,
400        running: Arc<AtomicBool>,
401    ) -> Result<(i32, JoinHandle<()>), AeronCError> {
402        let publication = aeron.add_publication(
403            // &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536|fc=tagged,g:99901/1,t:5s"),
404            &format!("aeron:udp?control={CONTROL_ENDPOINT}|control-mode=dynamic|term-length=65536"),
405            STREAM_ID,
406            Duration::from_secs(5),
407        )?;
408
409        info!(
410            "publication {} [status={:?}]",
411            publication.channel(),
412            publication.channel_status()
413        );
414        assert_eq!(1, publication.channel_status());
415
416        let session_id = publication.session_id();
417        let recording_channel = format!(
418            // "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}|gtag=99901"
419            "aeron:udp?endpoint={RECORDING_ENDPOINT}|control={CONTROL_ENDPOINT}|session-id={session_id}"
420        );
421        info!("recording channel {}", recording_channel);
422        archive.start_recording(&recording_channel, STREAM_ID, SOURCE_LOCATION_REMOTE, true)?;
423
424        info!("waiting for publisher to be connected");
425        while !publication.is_connected() {
426            thread::sleep(Duration::from_millis(100));
427        }
428        info!("publisher to be connected");
429        let counters_reader = aeron.counters_reader();
430        let mut caught_up_count = 0;
431        let publisher_thread = thread::spawn(move || {
432            let mut message_count = 0;
433
434            while running.load(Ordering::Acquire) {
435                let message = format!("{}{}", MESSAGE_PREFIX, message_count);
436                while publication.offer(
437                    message.as_bytes(),
438                    Handlers::no_reserved_value_supplier_handler(),
439                ) <= 0
440                {
441                    thread::sleep(Duration::from_millis(10));
442                }
443                message_count += 1;
444                if message_count % 10_000 == 0 {
445                    info!(
446                        "Published {} messages [position={}]",
447                        message_count,
448                        publication.position()
449                    );
450                }
451                // slow down publishing so can catch up
452                if message_count > 10_000 {
453                    // ensure archiver is caught up
454                    while !publication.is_archive_position_with(0) {
455                        thread::sleep(Duration::from_micros(300));
456                    }
457                    caught_up_count += 1;
458                }
459            }
460            assert!(caught_up_count > 0);
461            info!("Publisher thread terminated");
462        });
463        Ok((session_id, publisher_thread))
464    }
465
466    fn replay_merge_subscription(
467        archive: &AeronArchive,
468        aeron: Aeron,
469        session_id: i32,
470    ) -> Result<(), AeronCError> {
471        // let replay_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
472        let replay_channel = format!("aeron:udp?session-id={session_id}");
473        info!("replay channel {}", replay_channel);
474
475        let replay_destination = format!("aeron:udp?endpoint={REPLAY_ENDPOINT}");
476        info!("replay destination {}", replay_destination);
477
478        let live_destination =
479            format!("aeron:udp?endpoint={LIVE_ENDPOINT}|control={CONTROL_ENDPOINT}");
480        info!("live destination {}", live_destination);
481
482        let counters_reader = aeron.counters_reader();
483        let mut counter_id = -1;
484
485        while counter_id < 0 {
486            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
487        }
488        info!(
489            "counter id {} {:?}",
490            counter_id,
491            counters_reader.get_counter_label(counter_id, 1024)
492        );
493        info!(
494            "counter id {} position={:?}",
495            counter_id,
496            counters_reader.get_counter_value(counter_id)
497        );
498
499        // let recording_id = Cell::new(-1);
500        // let start_position = Cell::new(-1);
501
502        // let mut count = 0;
503        // assert!(
504        //     archive.list_recordings_once(&mut count, 0, 1000, |descriptor| {
505        //         info!("Recording descriptor: {:?}", descriptor);
506        //         recording_id.set(descriptor.recording_id);
507        //         start_position.set(descriptor.start_position);
508        //         assert_eq!(descriptor.session_id, session_id);
509        //         assert_eq!(descriptor.stream_id, STREAM_ID);
510        //     })? >= 0
511        // );
512        // assert!(count > 0);
513        // assert!(recording_id.get() >= 0);
514
515        // let record_id = RecordingPos::get_recording_id(&aeron.counters_reader(), counter_id)?;
516        // assert_eq!(recording_id.get(), record_id);
517        //
518        // let recording_id = recording_id.get();
519        // let start_position = start_position.get();
520        let start_position = 0;
521        let recording_id = RecordingPos::get_recording_id_block(
522            &aeron.counters_reader(),
523            counter_id,
524            Duration::from_secs(5),
525        )?;
526
527        let subscribe_channel = format!("aeron:udp?control-mode=manual|session-id={session_id}");
528        info!("subscribe channel {}", subscribe_channel);
529        let subscription = aeron.add_subscription(
530            &subscribe_channel,
531            STREAM_ID,
532            Handlers::no_available_image_handler(),
533            Handlers::no_unavailable_image_handler(),
534            Duration::from_secs(5),
535        )?;
536
537        let replay_merge = AeronArchiveReplayMerge::new(
538            &subscription,
539            &archive,
540            &replay_channel,
541            &replay_destination,
542            &live_destination,
543            recording_id,
544            start_position,
545            Aeron::epoch_clock(),
546            10_000,
547        )?;
548
549        info!(
550            "ReplayMerge initialization: recordingId={}, startPosition={}, subscriptionChannel={}, replayChannel={}, replayDestination={}, liveDestination={}",
551            recording_id,
552            start_position,
553            subscribe_channel,
554            replay_channel,
555            replay_destination,
556            live_destination
557        );
558
559        // media_driver
560        //     .run_aeron_stats()
561        //     .expect("Failed to run aeron stats");
562
563        // info!("Waiting for subscription to connect...");
564        // while !subscription.is_connected() {
565        //     thread::sleep(Duration::from_millis(100));
566        // }
567        // info!("Subscription connected");
568
569        info!(
570            "about to start_replay [maxRecordPosition={:?}]",
571            archive.get_max_recorded_position(recording_id)
572        );
573
574        let mut reply_count = 0;
575        while !replay_merge.is_merged() {
576            assert!(!replay_merge.has_failed());
577            if replay_merge.poll_once(
578                |buffer, _header| {
579                    reply_count += 1;
580                    if reply_count % 10_000 == 0 {
581                        info!(
582                            "replay-merge [count={}, isMerged={}, isLive={}]",
583                            reply_count,
584                            replay_merge.is_merged(),
585                            replay_merge.is_live_added()
586                        );
587                    }
588                },
589                100,
590            )? == 0
591            {
592                let err = archive.poll_for_error_response_as_string(4096)?;
593                if !err.is_empty() {
594                    panic!("{}", err);
595                }
596                if aeron.errmsg().len() > 0 && "no error" != aeron.errmsg() {
597                    panic!("{}", aeron.errmsg());
598                }
599                archive.poll_for_recording_signals()?;
600                thread::sleep(Duration::from_millis(100));
601            }
602        }
603        assert!(!replay_merge.has_failed());
604        assert!(replay_merge.is_live_added());
605        assert!(reply_count > 10_000);
606        Ok(())
607    }
608
609    #[test]
610    fn version_check() {
611        let major = unsafe { crate::aeron_version_major() };
612        let minor = unsafe { crate::aeron_version_minor() };
613        let patch = unsafe { crate::aeron_version_patch() };
614
615        let aeron_version = format!("{}.{}.{}", major, minor, patch);
616
617        let cargo_version = "1.47.4";
618        assert_eq!(aeron_version, cargo_version);
619    }
620
621    use std::thread;
622
623    pub fn start_aeron_archive() -> Result<
624        (
625            Aeron,
626            AeronArchiveContext,
627            EmbeddedArchiveMediaDriverProcess,
628        ),
629        Box<dyn Error>,
630    > {
631        let id = Aeron::nano_clock();
632        let aeron_dir = format!("target/aeron/{}/shm", id);
633        let archive_dir = format!("target/aeron/{}/archive", id);
634
635        let request_port = find_unused_udp_port(8000).expect("Could not find port");
636        let response_port = find_unused_udp_port(request_port + 1).expect("Could not find port");
637        let recording_event_port =
638            find_unused_udp_port(response_port + 1).expect("Could not find port");
639        let request_control_channel = &format!("aeron:udp?endpoint=localhost:{}", request_port);
640        let response_control_channel = &format!("aeron:udp?endpoint=localhost:{}", response_port);
641        let recording_events_channel =
642            &format!("aeron:udp?endpoint=localhost:{}", recording_event_port);
643        assert_ne!(request_control_channel, response_control_channel);
644
645        let archive_media_driver = EmbeddedArchiveMediaDriverProcess::build_and_start(
646            &aeron_dir,
647            &archive_dir,
648            request_control_channel,
649            response_control_channel,
650            recording_events_channel,
651        )
652        .expect("Failed to start Java process");
653
654        let aeron_context = AeronContext::new()?;
655        aeron_context.set_dir(&aeron_dir)?;
656        aeron_context.set_client_name("test")?;
657        aeron_context.set_publication_error_frame_handler(Some(&Handler::leak(
658            AeronPublicationErrorFrameHandlerLogger,
659        )))?;
660        let error_handler = Handler::leak(ErrorCount::default());
661        aeron_context.set_error_handler(Some(&error_handler))?;
662        let aeron = Aeron::new(&aeron_context)?;
663        aeron.start()?;
664
665        let archive_context = AeronArchiveContext::new_with_no_credentials_supplier(
666            &aeron,
667            request_control_channel,
668            response_control_channel,
669            recording_events_channel,
670        )?;
671        archive_context.set_error_handler(Some(&error_handler))?;
672        Ok((aeron, archive_context, archive_media_driver))
673    }
674
675    #[test]
676    #[serial]
677    pub fn test_aeron_archive() -> Result<(), Box<dyn error::Error>> {
678        let _ = env_logger::Builder::new()
679            .is_test(true)
680            .filter_level(log::LevelFilter::Info)
681            .try_init();
682        EmbeddedArchiveMediaDriverProcess::kill_all_java_processes()
683            .expect("failed to kill all java processes");
684
685        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
686
687        assert!(!aeron.is_closed());
688
689        info!("connected to aeron");
690
691        let archive_connector =
692            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
693        let archive = archive_connector
694            .poll_blocking(Duration::from_secs(30))
695            .expect("failed to connect to aeron archive media driver");
696
697        assert!(archive.get_archive_id() > 0);
698
699        let channel = AERON_IPC_STREAM;
700        let stream_id = 10;
701
702        let subscription_id =
703            archive.start_recording(channel, stream_id, SOURCE_LOCATION_LOCAL, true)?;
704
705        assert!(subscription_id >= 0);
706        info!("subscription id {}", subscription_id);
707
708        let publication = aeron
709            .async_add_exclusive_publication(channel, stream_id)?
710            .poll_blocking(Duration::from_secs(5))?;
711
712        for i in 0..11 {
713            while publication.offer(
714                "123456".as_bytes(),
715                Handlers::no_reserved_value_supplier_handler(),
716            ) <= 0
717            {
718                sleep(Duration::from_millis(50));
719                archive.poll_for_recording_signals()?;
720                let err = archive.poll_for_error_response_as_string(4096)?;
721                if !err.is_empty() {
722                    panic!("{}", err);
723                }
724                archive.idle();
725            }
726            info!("sent message {i} [test_aeron_archive]");
727        }
728
729        archive.idle();
730        let session_id = publication.get_constants()?.session_id;
731        info!("publication session id {}", session_id);
732        // since this is single threaded need to make sure it did write to archiver, usually not required in multi-proccess app
733        let stop_position = publication.position();
734        info!(
735            "publication stop position {} [publication={:?}]",
736            stop_position,
737            publication.get_constants()
738        );
739        let counters_reader = aeron.counters_reader();
740        info!("counters reader ready {:?}", counters_reader);
741
742        let mut counter_id = -1;
743
744        let start = Instant::now();
745        while counter_id <= 0 && start.elapsed() < Duration::from_secs(5) {
746            counter_id = RecordingPos::find_counter_id_by_session(&counters_reader, session_id);
747            info!("counter id {}", counter_id);
748        }
749
750        assert!(counter_id >= 0);
751
752        info!("counter id {counter_id}, session id {session_id}");
753        while counters_reader.get_counter_value(counter_id) < stop_position {
754            info!(
755                "current archive publication stop position {}",
756                counters_reader.get_counter_value(counter_id)
757            );
758            sleep(Duration::from_millis(50));
759        }
760        info!(
761            "found archive publication stop position {}",
762            counters_reader.get_counter_value(counter_id)
763        );
764
765        archive.stop_recording_channel_and_stream(channel, stream_id)?;
766        drop(publication);
767
768        info!("list recordings");
769        let found_recording_id = Cell::new(-1);
770        let start_pos = Cell::new(-1);
771        let end_pos = Cell::new(-1);
772        let start = Instant::now();
773        while start.elapsed() < Duration::from_secs(5) && found_recording_id.get() == -1 {
774            let mut count = 0;
775            archive.list_recordings_for_uri_once(
776                &mut count,
777                0,
778                i32::MAX,
779                channel,
780                stream_id,
781                |d: AeronArchiveRecordingDescriptor| {
782                    assert_eq!(d.stream_id, stream_id);
783                    info!("found recording {:#?}", d);
784                    info!(
785                        "strippedChannel={}, originalChannel={}",
786                        d.stripped_channel(),
787                        d.original_channel()
788                    );
789                    if d.stop_position > d.start_position && d.stop_position > 0 {
790                        found_recording_id.set(d.recording_id);
791                        start_pos.set(d.start_position);
792                        end_pos.set(d.stop_position);
793                    }
794
795                    // verify clone_struct works
796                    let copy = d.clone_struct();
797                    assert_eq!(copy.deref(), d.deref());
798                    assert_eq!(copy.recording_id, d.recording_id);
799                    assert_eq!(copy.control_session_id, d.control_session_id);
800                    assert_eq!(copy.mtu_length, d.mtu_length);
801                    assert_eq!(copy.source_identity_length, d.source_identity_length);
802                },
803            )?;
804            archive.poll_for_recording_signals()?;
805            let err = archive.poll_for_error_response_as_string(4096)?;
806            if !err.is_empty() {
807                panic!("{}", err);
808            }
809        }
810        assert!(start.elapsed() < Duration::from_secs(5));
811        info!("start replay");
812        let params = AeronArchiveReplayParams::new(
813            0,
814            i32::MAX,
815            start_pos.get(),
816            end_pos.get() - start_pos.get(),
817            0,
818            0,
819        )?;
820        info!("replay params {:#?}", params);
821        let replay_stream_id = 45;
822        let replay_session_id =
823            archive.start_replay(found_recording_id.get(), channel, replay_stream_id, &params)?;
824        let session_id = replay_session_id as i32;
825
826        info!("replay session id {}", replay_session_id);
827        info!("session id {}", session_id);
828        let channel_replay = format!("{}?session-id={}", channel, session_id);
829        info!("archive id: {}", archive.get_archive_id());
830
831        info!("add subscription {}", channel_replay);
832        let subscription = aeron
833            .async_add_subscription(
834                &channel_replay,
835                replay_stream_id,
836                Some(&Handler::leak(AeronAvailableImageLogger)),
837                Some(&Handler::leak(AeronUnavailableImageLogger)),
838            )?
839            .poll_blocking(Duration::from_secs(10))?;
840
841        #[derive(Default)]
842        struct FragmentHandler {
843            count: Cell<usize>,
844        }
845
846        impl AeronFragmentHandlerCallback for FragmentHandler {
847            fn handle_aeron_fragment_handler(&mut self, buffer: &[u8], _header: AeronHeader) {
848                assert_eq!(buffer, "123456".as_bytes());
849
850                // Update count (using Cell for interior mutability)
851                self.count.set(self.count.get() + 1);
852            }
853        }
854
855        let poll = Handler::leak(FragmentHandler::default());
856
857        let start = Instant::now();
858        while start.elapsed() < Duration::from_secs(10) && subscription.poll(Some(&poll), 100)? <= 0
859        {
860            let err = archive.poll_for_error_response_as_string(4096)?;
861            if !err.is_empty() {
862                panic!("{}", err);
863            }
864        }
865        assert!(
866            start.elapsed() < Duration::from_secs(10),
867            "messages not received {:?}",
868            poll.count
869        );
870        info!("aeron {:?}", aeron);
871        info!("ctx {:?}", archive_context);
872        assert_eq!(11, poll.count.get());
873        Ok(())
874    }
875
876    #[test]
877    #[serial]
878    fn test_invalid_recording_channel() -> Result<(), Box<dyn Error>> {
879        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
880        let archive_connector =
881            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
882        let archive = archive_connector
883            .poll_blocking(Duration::from_secs(30))
884            .expect("failed to connect to archive");
885
886        let invalid_channel = "invalid:channel";
887        let result =
888            archive.start_recording(invalid_channel, STREAM_ID, SOURCE_LOCATION_LOCAL, true);
889        assert!(
890            result.is_err(),
891            "Expected error when starting recording with an invalid channel"
892        );
893        Ok(())
894    }
895
896    #[test]
897    #[serial]
898    fn test_stop_recording_on_nonexistent_channel() -> Result<(), Box<dyn Error>> {
899        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
900        let archive_connector =
901            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
902        let archive = archive_connector
903            .poll_blocking(Duration::from_secs(30))
904            .expect("failed to connect to archive");
905
906        let nonexistent_channel = "aeron:udp?endpoint=localhost:9999";
907        let result = archive.stop_recording_channel_and_stream(nonexistent_channel, STREAM_ID);
908        assert!(
909            result.is_err(),
910            "Expected error when stopping recording on a non-existent channel"
911        );
912        Ok(())
913    }
914
915    #[test]
916    #[serial]
917    fn test_replay_with_invalid_recording_id() -> Result<(), Box<dyn Error>> {
918        let (aeron, archive_context, _media_driver) = start_aeron_archive()?;
919        let archive_connector =
920            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
921        let archive = archive_connector
922            .poll_blocking(Duration::from_secs(30))
923            .expect("failed to connect to archive");
924
925        let invalid_recording_id = -999;
926        let params = AeronArchiveReplayParams::new(0, i32::MAX, 0, 100, 0, 0)?;
927        let result = archive.start_replay(
928            invalid_recording_id,
929            "aeron:udp?endpoint=localhost:8888",
930            STREAM_ID,
931            &params,
932        );
933        assert!(
934            result.is_err(),
935            "Expected error when starting replay with an invalid recording id"
936        );
937        Ok(())
938    }
939
940    #[test]
941    #[serial]
942    fn test_archive_reconnect_after_close() -> Result<(), Box<dyn std::error::Error>> {
943        let (aeron, archive_context, media_driver) = start_aeron_archive()?;
944        let archive_connector =
945            AeronArchiveAsyncConnect::new_with_aeron(&archive_context.clone(), &aeron)?;
946        let archive = archive_connector
947            .poll_blocking(Duration::from_secs(30))
948            .expect("failed to connect to archive");
949
950        drop(archive);
951
952        let archive_connector = AeronArchiveAsyncConnect::new_with_aeron(&archive_context, &aeron)?;
953        let new_archive = archive_connector
954            .poll_blocking(Duration::from_secs(30))
955            .expect("failed to reconnect to archive");
956        assert!(
957            new_archive.get_archive_id() > 0,
958            "Reconnected archive should have a valid archive id"
959        );
960
961        drop(media_driver);
962        Ok(())
963    }
964}