1use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33use std::time::Duration;
34use std::time::Instant;
35
36use async_trait::async_trait;
37use futures::lock::Mutex;
38use hyperactor::Actor;
39use hyperactor::ActorId;
40use hyperactor::ActorRef;
41use hyperactor::Context;
42use hyperactor::HandleClient;
43use hyperactor::Handler;
44use hyperactor::Instance;
45use hyperactor::OncePortRef;
46use hyperactor::RefClient;
47use hyperactor::RemoteSpawn;
48use hyperactor::clock::Clock;
49use hyperactor::supervision::ActorSupervisionEvent;
50use serde::Deserialize;
51use serde::Serialize;
52use typeuri::Named;
53
54use crate::ibverbs_primitives::IbverbsConfig;
55use crate::ibverbs_primitives::RdmaMemoryRegionView;
56use crate::ibverbs_primitives::RdmaQpInfo;
57use crate::ibverbs_primitives::ibverbs_supported;
58use crate::ibverbs_primitives::mlx5dv_supported;
59use crate::ibverbs_primitives::resolve_qp_type;
60use crate::rdma_components::RdmaBuffer;
61use crate::rdma_components::RdmaDomain;
62use crate::rdma_components::RdmaQueuePair;
63use crate::rdma_components::get_registered_cuda_segments;
64use crate::validate_execution_context;
65
66pub fn get_rdmaxcel_error_message(error_code: i32) -> String {
68 unsafe {
69 let c_str = rdmaxcel_sys::rdmaxcel_error_string(error_code);
70 std::ffi::CStr::from_ptr(c_str)
71 .to_string_lossy()
72 .into_owned()
73 }
74}
75
76#[derive(Handler, HandleClient, RefClient, Debug, Serialize, Deserialize, Named)]
80pub enum RdmaManagerMessage {
81 RequestBuffer {
82 addr: usize,
83 size: usize,
84 #[reply]
85 reply: OncePortRef<RdmaBuffer>,
87 },
88 ReleaseBuffer {
89 buffer: RdmaBuffer,
90 },
91 RequestQueuePair {
92 other: ActorRef<RdmaManagerActor>,
93 self_device: String,
94 other_device: String,
95 #[reply]
96 reply: OncePortRef<RdmaQueuePair>,
98 },
99 Connect {
100 other: ActorRef<RdmaManagerActor>,
102 self_device: String,
103 other_device: String,
104 endpoint: RdmaQpInfo,
106 },
107 InitializeQP {
108 other: ActorRef<RdmaManagerActor>,
109 self_device: String,
110 other_device: String,
111 #[reply]
112 reply: OncePortRef<bool>,
114 },
115 ConnectionInfo {
116 other: ActorRef<RdmaManagerActor>,
118 self_device: String,
119 other_device: String,
120 #[reply]
121 reply: OncePortRef<RdmaQpInfo>,
123 },
124 ReleaseQueuePair {
125 other: ActorRef<RdmaManagerActor>,
127 self_device: String,
128 other_device: String,
129 qp: RdmaQueuePair,
131 },
132 GetQpState {
133 other: ActorRef<RdmaManagerActor>,
134 self_device: String,
135 other_device: String,
136 #[reply]
137 reply: OncePortRef<u32>,
139 },
140}
141wirevalue::register_type!(RdmaManagerMessage);
142
143#[derive(Debug)]
144#[hyperactor::export(
145 spawn = true,
146 handlers = [
147 RdmaManagerMessage,
148 ],
149)]
150pub struct RdmaManagerActor {
151 device_qps: HashMap<String, HashMap<(ActorId, String), RdmaQueuePair>>,
153
154 pending_qp_creation: Arc<Mutex<HashSet<(String, ActorId, String)>>>,
157
158 device_domains: HashMap<String, (RdmaDomain, Option<RdmaQueuePair>)>,
161
162 config: IbverbsConfig,
163
164 mlx5dv_enabled: bool,
165
166 mr_map: HashMap<usize, usize>,
169 mrv_id: usize,
171
172 pci_to_device: HashMap<String, crate::ibverbs_primitives::RdmaDevice>,
175}
176
177impl Drop for RdmaManagerActor {
178 fn drop(&mut self) {
179 fn destroy_queue_pair(qp: &RdmaQueuePair, _context: &str) {
184 unsafe {
185 if qp.qp != 0 {
186 let rdmaxcel_qp = qp.qp as *mut rdmaxcel_sys::rdmaxcel_qp;
187 rdmaxcel_sys::rdmaxcel_qp_destroy(rdmaxcel_qp);
188 }
189 }
190 }
191
192 for (_device_name, device_map) in self.device_qps.drain() {
194 for ((actor_id, _remote_device), qp) in device_map {
195 destroy_queue_pair(&qp, &format!("actor {:?}", actor_id));
196 }
197 }
198
199 for (device_name, (domain, qp)) in self.device_domains.drain() {
201 if let Some(qp) = qp {
202 destroy_queue_pair(&qp, &format!("loopback QP on device {}", device_name));
203 }
204 drop(domain);
205 }
206
207 let _mr_count = self.mr_map.len();
209 for (id, mr_ptr) in self.mr_map.drain() {
210 if mr_ptr != 0 {
211 unsafe {
212 let result = rdmaxcel_sys::ibv_dereg_mr(mr_ptr as *mut rdmaxcel_sys::ibv_mr);
213 if result != 0 {
214 tracing::error!(
215 "Failed to deregister MR with id {}: error code {}",
216 id,
217 result
218 );
219 }
220 }
221 }
222 }
223
224 if self.mlx5dv_enabled {
227 unsafe {
228 let result = rdmaxcel_sys::deregister_segments();
229 if result != 0 {
230 let error_msg = get_rdmaxcel_error_message(result);
231 tracing::error!(
232 "Failed to deregister CUDA segments: {} (error code: {})",
233 error_msg,
234 result
235 );
236 }
237 }
238 }
239 }
240}
241
242impl RdmaManagerActor {
243 fn get_or_create_device_domain(
245 &mut self,
246 device_name: &str,
247 rdma_device: &crate::ibverbs_primitives::RdmaDevice,
248 ) -> Result<(RdmaDomain, Option<RdmaQueuePair>), anyhow::Error> {
249 if let Some((domain, qp)) = self.device_domains.get(device_name) {
251 return Ok((domain.clone(), qp.clone()));
252 }
253
254 let domain = RdmaDomain::new(rdma_device.clone()).map_err(|e| {
256 anyhow::anyhow!("could not create domain for device {}: {}", device_name, e)
257 })?;
258
259 crate::print_device_info_if_debug_enabled(domain.context);
261
262 let qp = if mlx5dv_supported() {
264 let mut qp = RdmaQueuePair::new(domain.context, domain.pd, self.config.clone())
265 .map_err(|e| {
266 anyhow::anyhow!(
267 "could not create loopback QP for device {}: {}",
268 device_name,
269 e
270 )
271 })?;
272
273 let endpoint = qp.get_qp_info().map_err(|e| {
275 anyhow::anyhow!("could not get QP info for device {}: {}", device_name, e)
276 })?;
277
278 qp.connect(&endpoint).map_err(|e| {
279 anyhow::anyhow!(
280 "could not connect loopback QP for device {}: {}",
281 device_name,
282 e
283 )
284 })?;
285
286 Some(qp)
287 } else {
288 None
289 };
290
291 self.device_domains
292 .insert(device_name.to_string(), (domain.clone(), qp.clone()));
293 Ok((domain, qp))
294 }
295
296 fn find_cuda_segment_for_address(
297 &mut self,
298 addr: usize,
299 size: usize,
300 ) -> Option<RdmaMemoryRegionView> {
301 let registered_segments = get_registered_cuda_segments();
302 for segment in registered_segments {
303 let start_addr = segment.phys_address;
304 let end_addr = start_addr + segment.phys_size;
305 if start_addr <= addr && addr + size <= end_addr {
306 let offset = addr - start_addr;
307 let rdma_addr = segment.mr_addr + offset;
308
309 let mrv = RdmaMemoryRegionView {
310 id: self.mrv_id,
311 virtual_addr: addr,
312 rdma_addr,
313 size,
314 lkey: segment.lkey,
315 rkey: segment.rkey,
316 };
317 self.mrv_id += 1;
318 return Some(mrv);
319 }
320 }
321 None
322 }
323
324 fn register_mr(
325 &mut self,
326 addr: usize,
327 size: usize,
328 ) -> Result<(RdmaMemoryRegionView, String), anyhow::Error> {
329 unsafe {
330 let mut mem_type: i32 = 0;
331 let ptr = addr as rdmaxcel_sys::CUdeviceptr;
332 let err = rdmaxcel_sys::rdmaxcel_cuPointerGetAttribute(
333 &mut mem_type as *mut _ as *mut std::ffi::c_void,
334 rdmaxcel_sys::CU_POINTER_ATTRIBUTE_MEMORY_TYPE,
335 ptr,
336 );
337 let is_cuda = err == rdmaxcel_sys::CUDA_SUCCESS;
338
339 let mut selected_rdma_device = None;
340
341 if is_cuda {
342 let mut pci_addr_buf: [std::os::raw::c_char; 16] = [0; 16]; let err = rdmaxcel_sys::get_cuda_pci_address_from_ptr(
345 addr as u64,
346 pci_addr_buf.as_mut_ptr(),
347 pci_addr_buf.len(),
348 );
349 if err != 0 {
350 let error_msg = get_rdmaxcel_error_message(err);
351 return Err(anyhow::anyhow!(
352 "RdmaXcel get_cuda_pci_address_from_ptr failed (addr: 0x{:x}, size: {}): {}",
353 addr,
354 size,
355 error_msg
356 ));
357 }
358
359 let pci_addr = std::ffi::CStr::from_ptr(pci_addr_buf.as_ptr())
361 .to_str()
362 .unwrap();
363 selected_rdma_device = self.pci_to_device.get(pci_addr).cloned();
364 }
365
366 let rdma_device = if let Some(device) = selected_rdma_device {
368 device
369 } else {
370 self.config.device.clone()
372 };
373
374 let device_name = rdma_device.name().clone();
375 tracing::debug!(
376 "Using RDMA device: {} for memory at 0x{:x}",
377 device_name,
378 addr
379 );
380
381 let (domain, qp) = self.get_or_create_device_domain(&device_name, &rdma_device)?;
383
384 let access = rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_LOCAL_WRITE
385 | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
386 | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_READ
387 | rdmaxcel_sys::ibv_access_flags::IBV_ACCESS_REMOTE_ATOMIC;
388
389 let mut mr: *mut rdmaxcel_sys::ibv_mr = std::ptr::null_mut();
390 let mrv;
391
392 if is_cuda {
393 let mut segment_mrv = None;
395 if self.mlx5dv_enabled {
396 segment_mrv = self.find_cuda_segment_for_address(addr, size);
398
399 if segment_mrv.is_none() {
401 let err = rdmaxcel_sys::register_segments(
402 domain.pd,
403 qp.unwrap().qp as *mut rdmaxcel_sys::rdmaxcel_qp_t,
404 );
405 if err == 0 {
408 segment_mrv = self.find_cuda_segment_for_address(addr, size);
409 }
410 }
411 }
412
413 if let Some(mrv_from_segment) = segment_mrv {
415 mrv = mrv_from_segment;
416 } else {
417 let mut fd: i32 = -1;
419 rdmaxcel_sys::rdmaxcel_cuMemGetHandleForAddressRange(
420 &mut fd,
421 addr as rdmaxcel_sys::CUdeviceptr,
422 size,
423 rdmaxcel_sys::CU_MEM_RANGE_HANDLE_TYPE_DMA_BUF_FD,
424 0,
425 );
426 mr =
427 rdmaxcel_sys::ibv_reg_dmabuf_mr(domain.pd, 0, size, 0, fd, access.0 as i32);
428 if mr.is_null() {
429 return Err(anyhow::anyhow!("Failed to register dmabuf MR"));
430 }
431 mrv = RdmaMemoryRegionView {
432 id: self.mrv_id,
433 virtual_addr: addr,
434 rdma_addr: (*mr).addr as usize,
435 size,
436 lkey: (*mr).lkey,
437 rkey: (*mr).rkey,
438 };
439 self.mrv_id += 1;
440 }
441 } else {
442 mr = rdmaxcel_sys::ibv_reg_mr(
444 domain.pd,
445 addr as *mut std::ffi::c_void,
446 size,
447 access.0 as i32,
448 );
449
450 if mr.is_null() {
451 return Err(anyhow::anyhow!("failed to register standard MR"));
452 }
453
454 mrv = RdmaMemoryRegionView {
455 id: self.mrv_id,
456 virtual_addr: addr,
457 rdma_addr: (*mr).addr as usize,
458 size,
459 lkey: (*mr).lkey,
460 rkey: (*mr).rkey,
461 };
462 self.mrv_id += 1;
463 }
464 self.mr_map.insert(mrv.id, mr as usize);
465 Ok((mrv, device_name))
466 }
467 }
468
469 fn deregister_mr(&mut self, id: usize) -> Result<(), anyhow::Error> {
470 if let Some(mr_ptr) = self.mr_map.remove(&id) {
471 if mr_ptr != 0 {
472 unsafe {
473 rdmaxcel_sys::ibv_dereg_mr(mr_ptr as *mut rdmaxcel_sys::ibv_mr);
474 }
475 }
476 }
477 Ok(())
478 }
479}
480
481#[async_trait]
482impl RemoteSpawn for RdmaManagerActor {
483 type Params = Option<IbverbsConfig>;
484
485 async fn new(params: Self::Params) -> Result<Self, anyhow::Error> {
486 if !ibverbs_supported() {
487 return Err(anyhow::anyhow!(
488 "Cannot create RdmaManagerActor because RDMA is not supported on this machine"
489 ));
490 }
491
492 let mut config = params.unwrap_or_default();
494 tracing::debug!("rdma is enabled, config device hint: {}", config.device);
495
496 let mlx5dv_enabled = resolve_qp_type(config.qp_type) == rdmaxcel_sys::RDMA_QP_TYPE_MLX5DV;
497
498 if config.use_gpu_direct {
500 match validate_execution_context().await {
501 Ok(_) => {
502 tracing::info!("GPU Direct RDMA execution context validated successfully");
503 }
504 Err(e) => {
505 tracing::warn!(
506 "GPU Direct RDMA execution context validation failed: {}. Downgrading to standard ibverbs mode.",
507 e
508 );
509 config.use_gpu_direct = false;
510 }
511 }
512 }
513
514 let pci_to_device = crate::device_selection::create_cuda_to_rdma_mapping();
516 tracing::debug!(
517 "Built CUDA to RDMA device mapping with {} entries",
518 pci_to_device.len()
519 );
520
521 Ok(Self {
522 device_qps: HashMap::new(),
523 pending_qp_creation: Arc::new(Mutex::new(HashSet::new())),
524 device_domains: HashMap::new(),
525 config,
526 mlx5dv_enabled,
527 mr_map: HashMap::new(),
528 mrv_id: 0,
529 pci_to_device,
530 })
531 }
532}
533
534#[async_trait]
535impl Actor for RdmaManagerActor {
536 async fn init(&mut self, _this: &Instance<Self>) -> Result<(), anyhow::Error> {
537 tracing::debug!("RdmaManagerActor initialized with lazy domain/QP creation");
538 Ok(())
539 }
540
541 async fn handle_supervision_event(
542 &mut self,
543 _cx: &Instance<Self>,
544 _event: &ActorSupervisionEvent,
545 ) -> Result<bool, anyhow::Error> {
546 tracing::error!("rdmaManagerActor supervision event: {:?}", _event);
547 tracing::error!("rdmaManagerActor error occurred, stop the worker process, exit code: 1");
548 std::process::exit(1);
549 }
550}
551
552#[async_trait]
553#[hyperactor::forward(RdmaManagerMessage)]
554impl RdmaManagerMessageHandler for RdmaManagerActor {
555 async fn request_buffer(
571 &mut self,
572 cx: &Context<Self>,
573 addr: usize,
574 size: usize,
575 ) -> Result<RdmaBuffer, anyhow::Error> {
576 let (mrv, device_name) = self.register_mr(addr, size)?;
577
578 Ok(RdmaBuffer {
579 owner: cx.bind().clone(),
580 mr_id: mrv.id,
581 addr: mrv.rdma_addr,
582 size: mrv.size,
583 rkey: mrv.rkey,
584 lkey: mrv.lkey,
585 device_name,
586 })
587 }
588
589 async fn release_buffer(
603 &mut self,
604 _cx: &Context<Self>,
605 buffer: RdmaBuffer,
606 ) -> Result<(), anyhow::Error> {
607 self.deregister_mr(buffer.mr_id)
608 .map_err(|e| anyhow::anyhow!("could not deregister buffer: {}", e))?;
609 Ok(())
610 }
611
612 async fn request_queue_pair(
626 &mut self,
627 cx: &Context<Self>,
628 other: ActorRef<RdmaManagerActor>,
629 self_device: String,
630 other_device: String,
631 ) -> Result<RdmaQueuePair, anyhow::Error> {
632 let other_id = other.actor_id().clone();
633
634 let inner_key = (other_id.clone(), other_device.clone());
636
637 if let Some(device_map) = self.device_qps.get(&self_device) {
639 if let Some(qp) = device_map.get(&inner_key) {
640 return Ok(qp.clone());
641 }
642 }
643
644 let pending_key = (self_device.clone(), other_id.clone(), other_device.clone());
646 let mut pending = self.pending_qp_creation.lock().await;
647
648 if pending.contains(&pending_key) {
649 drop(pending);
651
652 let start = Instant::now();
655 let timeout = Duration::from_secs(1);
656
657 loop {
658 cx.clock().sleep(Duration::from_micros(200)).await;
659
660 if let Some(device_map) = self.device_qps.get(&self_device) {
662 if let Some(qp) = device_map.get(&inner_key) {
663 return Ok(qp.clone());
664 }
665 }
666
667 if start.elapsed() >= timeout {
669 return Err(anyhow::anyhow!(
670 "Timeout waiting for QP creation (device {} -> actor {} device {}). \
671 Another task is creating it but hasn't completed in 1 second",
672 self_device,
673 other_id,
674 other_device
675 ));
676 }
677 }
678 } else {
679 pending.insert(pending_key.clone());
681 drop(pending);
682 }
684
685 let result = async {
687 let is_loopback = other_id == cx.bind::<RdmaManagerActor>().actor_id().clone()
688 && self_device == other_device;
689
690 if is_loopback {
691 self.initialize_qp(cx, other.clone(), self_device.clone(), other_device.clone())
693 .await?;
694 let endpoint = self
695 .connection_info(cx, other.clone(), other_device.clone(), self_device.clone())
696 .await?;
697 self.connect(
698 cx,
699 other.clone(),
700 self_device.clone(),
701 other_device.clone(),
702 endpoint,
703 )
704 .await?;
705 } else {
706 self.initialize_qp(cx, other.clone(), self_device.clone(), other_device.clone())
708 .await?;
709 other
710 .initialize_qp(
711 cx,
712 cx.bind().clone(),
713 other_device.clone(),
714 self_device.clone(),
715 )
716 .await?;
717 let other_endpoint: RdmaQpInfo = other
718 .connection_info(
719 cx,
720 cx.bind().clone(),
721 other_device.clone(),
722 self_device.clone(),
723 )
724 .await?;
725 self.connect(
726 cx,
727 other.clone(),
728 self_device.clone(),
729 other_device.clone(),
730 other_endpoint,
731 )
732 .await?;
733 let local_endpoint = self
734 .connection_info(cx, other.clone(), self_device.clone(), other_device.clone())
735 .await?;
736 other
737 .connect(
738 cx,
739 cx.bind().clone(),
740 other_device.clone(),
741 self_device.clone(),
742 local_endpoint,
743 )
744 .await?;
745
746 let remote_state = other
748 .get_qp_state(
749 cx,
750 cx.bind().clone(),
751 other_device.clone(),
752 self_device.clone(),
753 )
754 .await?;
755
756 if remote_state != rdmaxcel_sys::ibv_qp_state::IBV_QPS_RTS {
757 return Err(anyhow::anyhow!(
758 "Remote QP not in RTS state after connection setup. \
759 Local is ready but remote is in state {}. \
760 This indicates a synchronization issue in connection setup.",
761 remote_state
762 ));
763 }
764 }
765
766 if let Some(device_map) = self.device_qps.get(&self_device) {
768 if let Some(qp) = device_map.get(&inner_key) {
769 Ok(qp.clone())
770 } else {
771 Err(anyhow::anyhow!(
772 "Failed to create connection for actor {} on device {}",
773 other_id,
774 other_device
775 ))
776 }
777 } else {
778 Err(anyhow::anyhow!(
779 "Failed to create connection for actor {} on device {} - no device map",
780 other_id,
781 other_device
782 ))
783 }
784 }
785 .await;
786
787 let mut pending = self.pending_qp_creation.lock().await;
789 pending.remove(&pending_key);
790 drop(pending);
791
792 result
793 }
794
795 async fn initialize_qp(
796 &mut self,
797 _cx: &Context<Self>,
798 other: ActorRef<RdmaManagerActor>,
799 self_device: String,
800 other_device: String,
801 ) -> Result<bool, anyhow::Error> {
802 let other_id = other.actor_id().clone();
803 let inner_key = (other_id.clone(), other_device.clone());
804
805 if let Some(device_map) = self.device_qps.get(&self_device) {
807 if device_map.contains_key(&inner_key) {
808 return Ok(true);
809 }
810 }
811
812 let rdma_device = self
814 .pci_to_device
815 .iter()
816 .find(|(_, device)| device.name() == &self_device)
817 .map(|(_, device)| device.clone())
818 .unwrap_or_else(|| {
819 crate::device_selection::resolve_rdma_device(&self.config.device)
821 .unwrap_or_else(|| self.config.device.clone())
822 });
823
824 let (domain_context, domain_pd) = {
826 let (domain, _) = self.get_or_create_device_domain(&self_device, &rdma_device)?;
828 (domain.context, domain.pd)
829 };
830
831 let qp = RdmaQueuePair::new(domain_context, domain_pd, self.config.clone())
832 .map_err(|e| anyhow::anyhow!("could not create RdmaQueuePair: {}", e))?;
833
834 self.device_qps
836 .entry(self_device.clone())
837 .or_insert_with(HashMap::new)
838 .insert(inner_key, qp);
839
840 tracing::debug!(
841 "successfully created a connection with {:?} for local device {} -> remote device {}",
842 other,
843 self_device,
844 other_device
845 );
846
847 Ok(true)
848 }
849
850 async fn connect(
856 &mut self,
857 _cx: &Context<Self>,
858 other: ActorRef<RdmaManagerActor>,
859 self_device: String,
860 other_device: String,
861 endpoint: RdmaQpInfo,
862 ) -> Result<(), anyhow::Error> {
863 tracing::debug!("connecting with {:?}", other);
864 let other_id = other.actor_id().clone();
865
866 let inner_key = (other_id.clone(), other_device.clone());
867
868 if let Some(device_map) = self.device_qps.get_mut(&self_device) {
869 match device_map.get_mut(&inner_key) {
870 Some(qp) => {
871 qp.connect(&endpoint).map_err(|e| {
872 anyhow::anyhow!("could not connect to RDMA endpoint: {}", e)
873 })?;
874 Ok(())
875 }
876 None => Err(anyhow::anyhow!(
877 "No connection found for actor {}",
878 other_id
879 )),
880 }
881 } else {
882 Err(anyhow::anyhow!(
883 "No device map found for device {}",
884 self_device
885 ))
886 }
887 }
888
889 async fn connection_info(
897 &mut self,
898 _cx: &Context<Self>,
899 other: ActorRef<RdmaManagerActor>,
900 self_device: String,
901 other_device: String,
902 ) -> Result<RdmaQpInfo, anyhow::Error> {
903 tracing::debug!("getting connection info with {:?}", other);
904 let other_id = other.actor_id().clone();
905
906 let inner_key = (other_id.clone(), other_device.clone());
907
908 if let Some(device_map) = self.device_qps.get_mut(&self_device) {
909 match device_map.get_mut(&inner_key) {
910 Some(qp) => {
911 let connection_info = qp.get_qp_info()?;
912 Ok(connection_info)
913 }
914 None => Err(anyhow::anyhow!(
915 "No connection found for actor {}",
916 other_id
917 )),
918 }
919 } else {
920 Err(anyhow::anyhow!(
921 "No device map found for self device {}",
922 self_device
923 ))
924 }
925 }
926
927 async fn release_queue_pair(
937 &mut self,
938 _cx: &Context<Self>,
939 _other: ActorRef<RdmaManagerActor>,
940 _self_device: String,
941 _other_device: String,
942 _qp: RdmaQueuePair,
943 ) -> Result<(), anyhow::Error> {
944 Ok(())
947 }
948
949 async fn get_qp_state(
959 &mut self,
960 _cx: &Context<Self>,
961 other: ActorRef<RdmaManagerActor>,
962 self_device: String,
963 other_device: String,
964 ) -> Result<u32, anyhow::Error> {
965 let other_id = other.actor_id().clone();
966 let inner_key = (other_id.clone(), other_device.clone());
967
968 if let Some(device_map) = self.device_qps.get_mut(&self_device) {
969 match device_map.get_mut(&inner_key) {
970 Some(qp) => qp.state(),
971 None => Err(anyhow::anyhow!(
972 "No connection found for actor {} on device {}",
973 other_id,
974 other_device
975 )),
976 }
977 } else {
978 Err(anyhow::anyhow!(
979 "No device map found for self device {}",
980 self_device
981 ))
982 }
983 }
984}