monarch_rdma/
rdma_manager_actor.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9//! # RDMA Manager Actor
10//!
11//! Manages RDMA connections and operations using `hyperactor` for asynchronous messaging.
12//!
13//! ## Architecture
14//!
15//! `RdmaManagerActor` is a per-host entity that:
16//! - Manages connections to multiple remote RdmaManagerActors (i.e. across the hosts in a Monarch cluster)
17//! - Handles memory registration, connection setup, and data transfer
18//! - Manages all RdmaBuffers in its associated host
19//!
20//! ## Core Operations
21//!
22//! - Connection establishment with partner actors
23//! - RDMA operations (put/write, get/read)
24//! - Completion polling
25//! - Memory region management
26//!
27//! ## Usage
28//!
29//! See test examples: `test_rdma_write_loopback` and `test_rdma_read_loopback`.
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use std::time::Instant;
35
36use async_trait::async_trait;
37use futures::lock::Mutex;
38use hyperactor::Actor;
39use hyperactor::ActorId;
40use hyperactor::ActorRef;
41use hyperactor::Context;
42use hyperactor::HandleClient;
43use hyperactor::Handler;
44use hyperactor::Instance;
45use hyperactor::OncePortRef;
46use hyperactor::RefClient;
47use hyperactor::RemoteSpawn;
48use hyperactor::clock::Clock;
49use hyperactor::supervision::ActorSupervisionEvent;
50use serde::Deserialize;
51use serde::Serialize;
52use typeuri::Named;
53
54use crate::ibverbs_primitives::IbverbsConfig;
55use crate::ibverbs_primitives::RdmaMemoryRegionView;
56use crate::ibverbs_primitives::RdmaQpInfo;
57use crate::ibverbs_primitives::ibverbs_supported;
58use crate::ibverbs_primitives::mlx5dv_supported;
59use crate::ibverbs_primitives::resolve_qp_type;
60use crate::rdma_components::RdmaBuffer;
61use crate::rdma_components::RdmaDomain;
62use crate::rdma_components::RdmaQueuePair;
63use crate::rdma_components::get_registered_cuda_segments;
64use crate::validate_execution_context;
65
66/// Helper function to get detailed error messages from RDMAXCEL error codes
67pub fn get_rdmaxcel_error_message(error_code: i32) -> String {
68    unsafe {
69        let c_str = rdmaxcel_sys::rdmaxcel_error_string(error_code);
70        std::ffi::CStr::from_ptr(c_str)
71            .to_string_lossy()
72            .into_owned()
73    }
74}
75
76/// Represents a reference to a remote RDMA buffer that can be accessed via RDMA operations.
77/// This struct encapsulates all the information needed to identify and access a memory region
78/// on a remote host using RDMA.
79#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
80pub enum RdmaManagerMessage {
81    RequestBuffer {
82        addr: usize,
83        size: usize,
84        #[reply]
85        /// `reply` - Reply channel to return the RDMA buffer handle
86        reply: OncePortRef<RdmaBuffer>,
87    },
88    ReleaseBuffer {
89        buffer: RdmaBuffer,
90    },
91    RequestQueuePair {
92        other: ActorRef<RdmaManagerActor>,
93        self_device: String,
94        other_device: String,
95        #[reply]
96        /// `reply` - Reply channel to return the queue pair for communication
97        reply: OncePortRef<RdmaQueuePair>,
98    },
99    Connect {
100        /// `other` - The ActorId of the actor to connect to
101        other: ActorRef<RdmaManagerActor>,
102        self_device: String,
103        other_device: String,
104        /// `endpoint` - Connection information needed to establish the RDMA connection
105        endpoint: RdmaQpInfo,
106    },
107    InitializeQP {
108        other: ActorRef<RdmaManagerActor>,
109        self_device: String,
110        other_device: String,
111        #[reply]
112        /// `reply` - Reply channel to return the queue pair for communication
113        reply: OncePortRef<bool>,
114    },
115    ConnectionInfo {
116        /// `other` - The ActorId to get connection info for
117        other: ActorRef<RdmaManagerActor>,
118        self_device: String,
119        other_device: String,
120        #[reply]
121        /// `reply` - Reply channel to return the connection info
122        reply: OncePortRef<RdmaQpInfo>,
123    },
124    ReleaseQueuePair {
125        /// `other` - The ActorId to release queue pair for
126        other: ActorRef<RdmaManagerActor>,
127        self_device: String,
128        other_device: String,
129        /// `qp` - The queue pair to return (ownership transferred back)
130        qp: RdmaQueuePair,
131    },
132    GetQpState {
133        other: ActorRef<RdmaManagerActor>,
134        self_device: String,
135        other_device: String,
136        #[reply]
137        /// `reply` - Reply channel to return the QP state
138        reply: OncePortRef<u32>,
139    },
140}
141wirevalue::register_type!(RdmaManagerMessage);
142
143#[derive(Debug)]
144#[hyperactor::export(
145    spawn = true,
146    handlers = [
147        RdmaManagerMessage,
148    ],
149)]
150pub struct RdmaManagerActor {
151    // Nested map: local_device -> (ActorId, remote_device) -> RdmaQueuePair
152    device_qps: HashMap<String, HashMap<(ActorId, String), RdmaQueuePair>>,
153
154    // Track QPs currently being created to prevent duplicate creation
155    // Wrapped in Arc<Mutex> to allow safe concurrent access
156    pending_qp_creation: Arc<Mutex<HashSet<(String, ActorId, String)>>>,
157
158    // Map of RDMA device names to their domains and loopback QPs
159    // Created lazily when memory is registered for a specific device
160    device_domains: HashMap<String, (RdmaDomain, Option<RdmaQueuePair>)>,
161
162    config: IbverbsConfig,
163
164    mlx5dv_enabled: bool,
165
166    // Map of unique RdmaMemoryRegionView to ibv_mr*.  In case of cuda w/ pytorch its -1
167    // since its managed independently.  Only used for registration/deregistration purposes
168    mr_map: HashMap<usize, usize>,
169    // Id for next mrv created
170    mrv_id: usize,
171
172    // Map of PCI addresses to their optimal RDMA devices
173    // This is populated during actor initialization using the device selection algorithm
174    pci_to_device: HashMap<String, crate::ibverbs_primitives::RdmaDevice>,
175}
176
177impl Drop for RdmaManagerActor {
178    fn drop(&mut self) {
179        // Helper function to destroy QP resources
180        // We can't use Drop on RdmaQueuePair because it derives Clone
181        // Note: rdmaxcel_qp_destroy handles destroying both the QP and its CQs internally,
182        // so we must NOT call ibv_destroy_cq separately (would cause double-free/SIGSEGV)
183        fn destroy_queue_pair(qp: &RdmaQueuePair, _context: &str) {
184            unsafe {
185                if qp.qp != 0 {
186                    let rdmaxcel_qp = qp.qp as *mut rdmaxcel_sys::rdmaxcel_qp;
187                    rdmaxcel_sys::rdmaxcel_qp_destroy(rdmaxcel_qp);
188                }
189            }
190        }
191
192        // 1. Clean up all queue pairs (both regular and loopback)
193        for (_device_name, device_map) in self.device_qps.drain() {
194            for ((actor_id, _remote_device), qp) in device_map {
195                destroy_queue_pair(&qp, &format!("actor {:?}", actor_id));
196            }
197        }
198
199        // 2. Clean up device domains (which contain PDs and loopback QPs)
200        for (device_name, (domain, qp)) in self.device_domains.drain() {
201            if let Some(qp) = qp {
202                destroy_queue_pair(&qp, &format!("loopback QP on device {}", device_name));
203            }
204            drop(domain);
205        }
206
207        // 3. Clean up memory regions
208        let _mr_count = self.mr_map.len();
209        for (id, mr_ptr) in self.mr_map.drain() {
210            if mr_ptr != 0 {
211                unsafe {
212                    let result = rdmaxcel_sys::ibv_dereg_mr(mr_ptr as *mut rdmaxcel_sys::ibv_mr);
213                    if result != 0 {
214                        tracing::error!(
215                            "Failed to deregister MR with id {}: error code {}",
216                            id,
217                            result
218                        );
219                    }
220                }
221            }
222        }
223
224        // 4. Deregister all CUDA segments (if using mlx5dv)
225        // The segment scanner in Python handles compatibility checks
226        if self.mlx5dv_enabled {
227            unsafe {
228                let result = rdmaxcel_sys::deregister_segments();
229                if result != 0 {
230                    let error_msg = get_rdmaxcel_error_message(result);
231                    tracing::error!(
232                        "Failed to deregister CUDA segments: {} (error code: {})",
233                        error_msg,
234                        result
235                    );
236                }
237            }
238        }
239    }
240}
241
242impl RdmaManagerActor {
243    /// Get or create a domain and loopback QP for the specified RDMA device
244    fn get_or_create_device_domain(
245        &mut self,
246        device_name: &str,
247        rdma_device: &crate::ibverbs_primitives::RdmaDevice,
248    ) -> Result<(RdmaDomain, Option<RdmaQueuePair>), anyhow::Error> {
249        // Check if we already have a domain for this device
250        if let Some((domain, qp)) = self.device_domains.get(device_name) {
251            return Ok((domain.clone(), qp.clone()));
252        }
253
254        // Create new domain for this device
255        let domain = RdmaDomain::new(rdma_device.clone()).map_err(|e| {
256            anyhow::anyhow!("could not create domain for device {}: {}", device_name, e)
257        })?;
258
259        // Print device info if MONARCH_DEBUG_RDMA=1 is set (before initial QP creation)
260        crate::print_device_info_if_debug_enabled(domain.context);
261
262        // Create loopback QP for this domain if mlx5dv is supported
263        let qp = if mlx5dv_supported() {
264            let mut qp = RdmaQueuePair::new(domain.context, domain.pd, self.config.clone())
265                .map_err(|e| {
266                    anyhow::anyhow!(
267                        "could not create loopback QP for device {}: {}",
268                        device_name,
269                        e
270                    )
271                })?;
272
273            // Get connection info and connect to itself
274            let endpoint = qp.get_qp_info().map_err(|e| {
275                anyhow::anyhow!("could not get QP info for device {}: {}", device_name, e)
276            })?;
277
278            qp.connect(&endpoint).map_err(|e| {
279                anyhow::anyhow!(
280                    "could not connect loopback QP for device {}: {}",
281                    device_name,
282                    e
283                )
284            })?;
285
286            Some(qp)
287        } else {
288            None
289        };
290
291        self.device_domains
292            .insert(device_name.to_string(), (domain.clone(), qp.clone()));
293        Ok((domain, qp))
294    }
295
296    fn find_cuda_segment_for_address(
297        &mut self,
298        addr: usize,
299        size: usize,
300    ) -> Option<RdmaMemoryRegionView> {
301        let registered_segments = get_registered_cuda_segments();
302        for segment in registered_segments {
303            let start_addr = segment.phys_address;
304            let end_addr = start_addr + segment.phys_size;
305            if start_addr <= addr && addr + size <= end_addr {
306                let offset = addr - start_addr;
307                let rdma_addr = segment.mr_addr + offset;
308
309                let mrv = RdmaMemoryRegionView {
310                    id: self.mrv_id,
311                    virtual_addr: addr,
312                    rdma_addr,
313                    size,
314                    lkey: segment.lkey,
315                    rkey: segment.rkey,
316                };
317                self.mrv_id += 1;
318                return Some(mrv);
319            }
320        }
321        None
322    }
323
324    fn register_mr(
325        &mut self,
326        addr: usize,
327        size: usize,
328    ) -> Result<(RdmaMemoryRegionView, String), anyhow::Error> {
329        unsafe {
330            let mut mem_type: i32 = 0;
331            let ptr = addr as rdmaxcel_sys::CUdeviceptr;
332            let err = rdmaxcel_sys::rdmaxcel_cuPointerGetAttribute(
333                &mut mem_type as *mut _ as *mut std::ffi::c_void,
334                rdmaxcel_sys::CU_POINTER_ATTRIBUTE_MEMORY_TYPE,
335                ptr,
336            );
337            let is_cuda = err == rdmaxcel_sys::CUDA_SUCCESS;
338
339            let mut selected_rdma_device = None;
340
341            if is_cuda {
342                // Use rdmaxcel utility to get PCI address from CUDA pointer
343                let mut pci_addr_buf: [std::os::raw::c_char; 16] = [0; 16]; // Enough space for "ffff:ff:ff.0\0"
344                let err = rdmaxcel_sys::get_cuda_pci_address_from_ptr(
345                    addr as u64,
346                    pci_addr_buf.as_mut_ptr(),
347                    pci_addr_buf.len(),
348                );
349                if err != 0 {
350                    let error_msg = get_rdmaxcel_error_message(err);
351                    return Err(anyhow::anyhow!(
352                        "RdmaXcel get_cuda_pci_address_from_ptr failed (addr: 0x{:x}, size: {}): {}",
353                        addr,
354                        size,
355                        error_msg
356                    ));
357                }
358
359                // Convert C string to Rust string
360                let pci_addr = std::ffi::CStr::from_ptr(pci_addr_buf.as_ptr())
361                    .to_str()
362                    .unwrap();
363                selected_rdma_device = self.pci_to_device.get(pci_addr).cloned();
364            }
365
366            // Determine the RDMA device to use
367            let rdma_device = if let Some(device) = selected_rdma_device {
368                device
369            } else {
370                // Fallback to default device from config
371                self.config.device.clone()
372            };
373
374            let device_name = rdma_device.name().clone();
375            tracing::debug!(
376                "Using RDMA device: {} for memory at 0x{:x}",
377                device_name,
378                addr
379            );
380
381            // Get or create domain and loopback QP for this device
382            let (domain, qp) = self.get_or_create_device_domain(&device_name, &rdma_device)?;
383
384            let access = rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE
385                | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
386                | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_READ
387                | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_ATOMIC;
388
389            let mut mr: *mut rdmaxcel_sys::ibv_mr = std::ptr::null_mut();
390            let mrv;
391
392            if is_cuda {
393                // First, try to use segment scanning if mlx5dv is enabled
394                let mut segment_mrv = None;
395                if self.mlx5dv_enabled {
396                    // Try to find in already registered segments
397                    segment_mrv = self.find_cuda_segment_for_address(addr, size);
398
399                    // If not found, trigger a re-sync with the allocator and retry
400                    if segment_mrv.is_none() {
401                        let err = rdmaxcel_sys::register_segments(
402                            domain.pd,
403                            qp.unwrap().qp as *mut rdmaxcel_sys::rdmaxcel_qp_t,
404                        );
405                        // Only retry if register_segments succeeded
406                        // If it fails (e.g., scanner returns 0 segments), we'll fall back to dmabuf
407                        if err == 0 {
408                            segment_mrv = self.find_cuda_segment_for_address(addr, size);
409                        }
410                    }
411                }
412
413                // Use segment if found, otherwise fall back to direct dmabuf registration
414                if let Some(mrv_from_segment) = segment_mrv {
415                    mrv = mrv_from_segment;
416                } else {
417                    // Dmabuf path: used when mlx5dv is disabled OR scanner returns no segments
418                    let mut fd: i32 = -1;
419                    rdmaxcel_sys::rdmaxcel_cuMemGetHandleForAddressRange(
420                        &mut fd,
421                        addr as rdmaxcel_sys::CUdeviceptr,
422                        size,
423                        rdmaxcel_sys::CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD,
424                        0,
425                    );
426                    mr =
427                        rdmaxcel_sys::ibv_reg_dmabuf_mr(domain.pd, 0, size, 0, fd, access.0 as i32);
428                    if mr.is_null() {
429                        return Err(anyhow::anyhow!("Failed to register dmabuf MR"));
430                    }
431                    mrv = RdmaMemoryRegionView {
432                        id: self.mrv_id,
433                        virtual_addr: addr,
434                        rdma_addr: (*mr).addr as usize,
435                        size,
436                        lkey: (*mr).lkey,
437                        rkey: (*mr).rkey,
438                    };
439                    self.mrv_id += 1;
440                }
441            } else {
442                // CPU memory path
443                mr = rdmaxcel_sys::ibv_reg_mr(
444                    domain.pd,
445                    addr as *mut std::ffi::c_void,
446                    size,
447                    access.0 as i32,
448                );
449
450                if mr.is_null() {
451                    return Err(anyhow::anyhow!("failed to register standard MR"));
452                }
453
454                mrv = RdmaMemoryRegionView {
455                    id: self.mrv_id,
456                    virtual_addr: addr,
457                    rdma_addr: (*mr).addr as usize,
458                    size,
459                    lkey: (*mr).lkey,
460                    rkey: (*mr).rkey,
461                };
462                self.mrv_id += 1;
463            }
464            self.mr_map.insert(mrv.id, mr as usize);
465            Ok((mrv, device_name))
466        }
467    }
468
469    fn deregister_mr(&mut self, id: usize) -> Result<(), anyhow::Error> {
470        if let Some(mr_ptr) = self.mr_map.remove(&id) {
471            if mr_ptr != 0 {
472                unsafe {
473                    rdmaxcel_sys::ibv_dereg_mr(mr_ptr as *mut rdmaxcel_sys::ibv_mr);
474                }
475            }
476        }
477        Ok(())
478    }
479}
480
481#[async_trait]
482impl RemoteSpawn for RdmaManagerActor {
483    type Params = Option<IbverbsConfig>;
484
485    async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
486        if !ibverbs_supported() {
487            return Err(anyhow::anyhow!(
488                "Cannot create RdmaManagerActor because RDMA is not supported on this machine"
489            ));
490        }
491
492        // Use provided config or default if none provided
493        let mut config = params.unwrap_or_default();
494        tracing::debug!("rdma is enabled, config device hint: {}", config.device);
495
496        let mlx5dv_enabled = resolve_qp_type(config.qp_type) == rdmaxcel_sys::RDMA_QP_TYPE_MLX5DV;
497
498        // check config and hardware support align
499        if config.use_gpu_direct {
500            match validate_execution_context().await {
501                Ok(_) => {
502                    tracing::info!("GPU Direct RDMA execution context validated successfully");
503                }
504                Err(e) => {
505                    tracing::warn!(
506                        "GPU Direct RDMA execution context validation failed: {}. Downgrading to standard ibverbs mode.",
507                        e
508                    );
509                    config.use_gpu_direct = false;
510                }
511            }
512        }
513
514        // Build the CUDA to RDMA device mapping using device selection algorithm
515        let pci_to_device = crate::device_selection::create_cuda_to_rdma_mapping();
516        tracing::debug!(
517            "Built CUDA to RDMA device mapping with {} entries",
518            pci_to_device.len()
519        );
520
521        Ok(Self {
522            device_qps: HashMap::new(),
523            pending_qp_creation: Arc::new(Mutex::new(HashSet::new())),
524            device_domains: HashMap::new(),
525            config,
526            mlx5dv_enabled,
527            mr_map: HashMap::new(),
528            mrv_id: 0,
529            pci_to_device,
530        })
531    }
532}
533
534#[async_trait]
535impl Actor for RdmaManagerActor {
536    async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
537        tracing::debug!("RdmaManagerActor initialized with lazy domain/QP creation");
538        Ok(())
539    }
540
541    async fn handle_supervision_event(
542        &mut self,
543        _cx: &Instance<Self>,
544        _event: &ActorSupervisionEvent,
545    ) -> Result<bool, anyhow::Error> {
546        tracing::error!("rdmaManagerActor supervision event: {:?}", _event);
547        tracing::error!("rdmaManagerActor error occurred, stop the worker process, exit code: 1");
548        std::process::exit(1);
549    }
550}
551
552#[async_trait]
553#[hyperactor::forward(RdmaManagerMessage)]
554impl RdmaManagerMessageHandler for RdmaManagerActor {
555    /// Requests a buffer to be registered with the RDMA domain.
556    ///
557    /// This function registers a memory region with the RDMA domain and returns an `RdmaBuffer`
558    /// that encapsulates the necessary information for RDMA operations.
559    ///
560    /// # Arguments
561    ///
562    /// * `this` - The context of the actor requesting the buffer.
563    /// * `addr` - The starting address of the memory region to be registered.
564    /// * `size` - The size of the memory region to be registered.
565    ///
566    /// # Returns
567    ///
568    /// * `Result<RdmaBuffer, anyhow::Error>` - On success, returns an `RdmaBuffer` containing
569    ///   the registered memory region's details. On failure, returns an error.
570    async fn request_buffer(
571        &mut self,
572        cx: &Context<Self>,
573        addr: usize,
574        size: usize,
575    ) -> Result<RdmaBuffer, anyhow::Error> {
576        let (mrv, device_name) = self.register_mr(addr, size)?;
577
578        Ok(RdmaBuffer {
579            owner: cx.bind().clone(),
580            mr_id: mrv.id,
581            addr: mrv.rdma_addr,
582            size: mrv.size,
583            rkey: mrv.rkey,
584            lkey: mrv.lkey,
585            device_name,
586        })
587    }
588
589    /// Deregisters a buffer from the RDMA domain.
590    ///
591    /// This function removes the specified `RdmaBuffer` from the RDMA domain,
592    /// effectively releasing the resources associated with it.
593    ///
594    /// # Arguments
595    ///
596    /// * `_this` - The context of the actor releasing the buffer.
597    /// * `buffer` - The `RdmaBuffer` to be deregistered.
598    ///
599    /// # Returns
600    ///
601    /// * `Result<(), anyhow::Error>` - On success, returns `Ok(())`. On failure, returns an error.
602    async fn release_buffer(
603        &mut self,
604        _cx: &Context<Self>,
605        buffer: RdmaBuffer,
606    ) -> Result<(), anyhow::Error> {
607        self.deregister_mr(buffer.mr_id)
608            .map_err(|e| anyhow::anyhow!("could not deregister buffer: {}", e))?;
609        Ok(())
610    }
611
612    /// Requests a queue pair for communication with a remote RDMA manager actor.
613    ///
614    /// Basic logic: if queue pair exists in map, return it; if None, create connection first.
615    ///
616    /// # Arguments
617    ///
618    /// * `cx` - The context of the actor requesting the queue pair.
619    /// * `remote` - The ActorRef of the remote RDMA manager actor to communicate with.
620    ///
621    /// # Returns
622    ///
623    /// * `Result<RdmaQueuePair, anyhow::Error>` - On success, returns the queue pair for communication.
624    ///   On failure, returns an error.
625    async fn request_queue_pair(
626        &mut self,
627        cx: &Context<Self>,
628        other: ActorRef<RdmaManagerActor>,
629        self_device: String,
630        other_device: String,
631    ) -> Result<RdmaQueuePair, anyhow::Error> {
632        let other_id = other.actor_id().clone();
633
634        // Use the nested map structure: local_device -> (actor_id, remote_device) -> RdmaQueuePair
635        let inner_key = (other_id.clone(), other_device.clone());
636
637        // Check if queue pair exists in map
638        if let Some(device_map) = self.device_qps.get(&self_device) {
639            if let Some(qp) = device_map.get(&inner_key) {
640                return Ok(qp.clone());
641            }
642        }
643
644        // Try to acquire lock and mark as pending (hold lock only once!)
645        let pending_key = (self_device.clone(), other_id.clone(), other_device.clone());
646        let mut pending = self.pending_qp_creation.lock().await;
647
648        if pending.contains(&pending_key) {
649            // Another task is creating this QP, release lock and wait
650            drop(pending);
651
652            // Loop checking device_qps until QP is created (no more locks needed)
653            // Timeout after 1 second
654            let start = Instant::now();
655            let timeout = Duration::from_secs(1);
656
657            loop {
658                cx.clock().sleep(Duration::from_micros(200)).await;
659
660                // Check if QP was created while we waited
661                if let Some(device_map) = self.device_qps.get(&self_device) {
662                    if let Some(qp) = device_map.get(&inner_key) {
663                        return Ok(qp.clone());
664                    }
665                }
666
667                // Check for timeout
668                if start.elapsed() >= timeout {
669                    return Err(anyhow::anyhow!(
670                        "Timeout waiting for QP creation (device {} -> actor {} device {}). \
671                         Another task is creating it but hasn't completed in 1 second",
672                        self_device,
673                        other_id,
674                        other_device
675                    ));
676                }
677            }
678        } else {
679            // Not pending, add to set and proceed with creation
680            pending.insert(pending_key.clone());
681            drop(pending);
682            // Fall through to create QP
683        }
684
685        // Queue pair doesn't exist - need to create connection
686        let result = async {
687            let is_loopback = other_id == cx.bind::<RdmaManagerActor>().actor_id().clone()
688                && self_device == other_device;
689
690            if is_loopback {
691                // Loopback connection setup
692                self.initialize_qp(cx, other.clone(), self_device.clone(), other_device.clone())
693                    .await?;
694                let endpoint = self
695                    .connection_info(cx, other.clone(), other_device.clone(), self_device.clone())
696                    .await?;
697                self.connect(
698                    cx,
699                    other.clone(),
700                    self_device.clone(),
701                    other_device.clone(),
702                    endpoint,
703                )
704                .await?;
705            } else {
706                // Remote connection setup
707                self.initialize_qp(cx, other.clone(), self_device.clone(), other_device.clone())
708                    .await?;
709                other
710                    .initialize_qp(
711                        cx,
712                        cx.bind().clone(),
713                        other_device.clone(),
714                        self_device.clone(),
715                    )
716                    .await?;
717                let other_endpoint: RdmaQpInfo = other
718                    .connection_info(
719                        cx,
720                        cx.bind().clone(),
721                        other_device.clone(),
722                        self_device.clone(),
723                    )
724                    .await?;
725                self.connect(
726                    cx,
727                    other.clone(),
728                    self_device.clone(),
729                    other_device.clone(),
730                    other_endpoint,
731                )
732                .await?;
733                let local_endpoint = self
734                    .connection_info(cx, other.clone(), self_device.clone(), other_device.clone())
735                    .await?;
736                other
737                    .connect(
738                        cx,
739                        cx.bind().clone(),
740                        other_device.clone(),
741                        self_device.clone(),
742                        local_endpoint,
743                    )
744                    .await?;
745
746                // BARRIER: Ensure remote side has completed its connection and is ready
747                let remote_state = other
748                    .get_qp_state(
749                        cx,
750                        cx.bind().clone(),
751                        other_device.clone(),
752                        self_device.clone(),
753                    )
754                    .await?;
755
756                if remote_state != rdmaxcel_sys::ibv_qp_state::IBV_QPS_RTS {
757                    return Err(anyhow::anyhow!(
758                        "Remote QP not in RTS state after connection setup. \
759                         Local is ready but remote is in state {}. \
760                         This indicates a synchronization issue in connection setup.",
761                        remote_state
762                    ));
763                }
764            }
765
766            // Now that connection is established, get and clone the queue pair
767            if let Some(device_map) = self.device_qps.get(&self_device) {
768                if let Some(qp) = device_map.get(&inner_key) {
769                    Ok(qp.clone())
770                } else {
771                    Err(anyhow::anyhow!(
772                        "Failed to create connection for actor {} on device {}",
773                        other_id,
774                        other_device
775                    ))
776                }
777            } else {
778                Err(anyhow::anyhow!(
779                    "Failed to create connection for actor {} on device {} - no device map",
780                    other_id,
781                    other_device
782                ))
783            }
784        }
785        .await;
786
787        // Always remove from pending set when done (success or failure)
788        let mut pending = self.pending_qp_creation.lock().await;
789        pending.remove(&pending_key);
790        drop(pending);
791
792        result
793    }
794
795    async fn initialize_qp(
796        &mut self,
797        _cx: &Context<Self>,
798        other: ActorRef<RdmaManagerActor>,
799        self_device: String,
800        other_device: String,
801    ) -> Result<bool, anyhow::Error> {
802        let other_id = other.actor_id().clone();
803        let inner_key = (other_id.clone(), other_device.clone());
804
805        // Check if QP already exists in nested structure
806        if let Some(device_map) = self.device_qps.get(&self_device) {
807            if device_map.contains_key(&inner_key) {
808                return Ok(true);
809            }
810        }
811
812        // Resolve the RDMA device for the local device
813        let rdma_device = self
814            .pci_to_device
815            .iter()
816            .find(|(_, device)| device.name() == &self_device)
817            .map(|(_, device)| device.clone())
818            .unwrap_or_else(|| {
819                // Fallback to default device from config
820                crate::device_selection::resolve_rdma_device(&self.config.device)
821                    .unwrap_or_else(|| self.config.device.clone())
822            });
823
824        // Get or create domain and extract pointers to avoid borrowing issues
825        let (domain_context, domain_pd) = {
826            // Check if we already have a domain for the device
827            let (domain, _) = self.get_or_create_device_domain(&self_device, &rdma_device)?;
828            (domain.context, domain.pd)
829        };
830
831        let qp = RdmaQueuePair::new(domain_context, domain_pd, self.config.clone())
832            .map_err(|e| anyhow::anyhow!("could not create RdmaQueuePair: {}", e))?;
833
834        // Insert the QP into the nested map structure
835        self.device_qps
836            .entry(self_device.clone())
837            .or_insert_with(HashMap::new)
838            .insert(inner_key, qp);
839
840        tracing::debug!(
841            "successfully created a connection with {:?} for local device {} -> remote device {}",
842            other,
843            self_device,
844            other_device
845        );
846
847        Ok(true)
848    }
849
850    /// Establishes a connection with another actor
851    ///
852    /// # Arguments
853    /// * `other` - The ActorRef of the actor to connect to
854    /// * `endpoint` - Connection information needed to establish the RDMA connection
855    async fn connect(
856        &mut self,
857        _cx: &Context<Self>,
858        other: ActorRef<RdmaManagerActor>,
859        self_device: String,
860        other_device: String,
861        endpoint: RdmaQpInfo,
862    ) -> Result<(), anyhow::Error> {
863        tracing::debug!("connecting with {:?}", other);
864        let other_id = other.actor_id().clone();
865
866        let inner_key = (other_id.clone(), other_device.clone());
867
868        if let Some(device_map) = self.device_qps.get_mut(&self_device) {
869            match device_map.get_mut(&inner_key) {
870                Some(qp) => {
871                    qp.connect(&endpoint).map_err(|e| {
872                        anyhow::anyhow!("could not connect to RDMA endpoint: {}", e)
873                    })?;
874                    Ok(())
875                }
876                None => Err(anyhow::anyhow!(
877                    "No connection found for actor {}",
878                    other_id
879                )),
880            }
881        } else {
882            Err(anyhow::anyhow!(
883                "No device map found for device {}",
884                self_device
885            ))
886        }
887    }
888
889    /// Gets connection information for establishing an RDMA connection
890    ///
891    /// # Arguments
892    /// * `other` - The ActorRef to get connection info for
893    ///
894    /// # Returns
895    /// * `RdmaQpInfo` - Connection information needed for the RDMA connection
896    async fn connection_info(
897        &mut self,
898        _cx: &Context<Self>,
899        other: ActorRef<RdmaManagerActor>,
900        self_device: String,
901        other_device: String,
902    ) -> Result<RdmaQpInfo, anyhow::Error> {
903        tracing::debug!("getting connection info with {:?}", other);
904        let other_id = other.actor_id().clone();
905
906        let inner_key = (other_id.clone(), other_device.clone());
907
908        if let Some(device_map) = self.device_qps.get_mut(&self_device) {
909            match device_map.get_mut(&inner_key) {
910                Some(qp) => {
911                    let connection_info = qp.get_qp_info()?;
912                    Ok(connection_info)
913                }
914                None => Err(anyhow::anyhow!(
915                    "No connection found for actor {}",
916                    other_id
917                )),
918            }
919        } else {
920            Err(anyhow::anyhow!(
921                "No device map found for self device {}",
922                self_device
923            ))
924        }
925    }
926
927    /// Releases a queue pair back to the HashMap
928    ///
929    /// This method is now a no-op since RdmaQueuePair is Clone and can be safely shared.
930    /// The queue pair is not actually checked out, so there's nothing to release.
931    /// This method is kept for API compatibility.
932    ///
933    /// # Arguments
934    /// * `remote` - The ActorRef of the remote actor to return the queue pair for
935    /// * `qp` - The queue pair to release (ignored)
936    async fn release_queue_pair(
937        &mut self,
938        _cx: &Context<Self>,
939        _other: ActorRef<RdmaManagerActor>,
940        _self_device: String,
941        _other_device: String,
942        _qp: RdmaQueuePair,
943    ) -> Result<(), anyhow::Error> {
944        // No-op: Queue pairs are now cloned and shared via atomic counters
945        // Nothing needs to be released
946        Ok(())
947    }
948
949    /// Gets the state of a queue pair
950    ///
951    /// # Arguments
952    /// * `other` - The ActorRef to get the QP state for
953    /// * `self_device` - Local device name
954    /// * `other_device` - Remote device name
955    ///
956    /// # Returns
957    /// * `u32` - The QP state (e.g., IBV_QPS_RTS = Ready To Send)
958    async fn get_qp_state(
959        &mut self,
960        _cx: &Context<Self>,
961        other: ActorRef<RdmaManagerActor>,
962        self_device: String,
963        other_device: String,
964    ) -> Result<u32, anyhow::Error> {
965        let other_id = other.actor_id().clone();
966        let inner_key = (other_id.clone(), other_device.clone());
967
968        if let Some(device_map) = self.device_qps.get_mut(&self_device) {
969            match device_map.get_mut(&inner_key) {
970                Some(qp) => qp.state(),
971                None => Err(anyhow::anyhow!(
972                    "No connection found for actor {} on device {}",
973                    other_id,
974                    other_device
975                )),
976            }
977        } else {
978            Err(anyhow::anyhow!(
979                "No device map found for self device {}",
980                self_device
981            ))
982        }
983    }
984}