1use std::io::ErrorKind;
10use std::net::SocketAddr;
11use std::path::Path;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15
16use anyhow::Context;
17use anyhow::Result;
18use anyhow::bail;
19use anyhow::ensure;
20use async_trait::async_trait;
21use futures::FutureExt;
22use futures::StreamExt;
23use futures::TryFutureExt;
24use futures::TryStreamExt;
25use futures::try_join;
26use hyperactor as reference;
27use hyperactor::Actor;
28use hyperactor::Bind;
29use hyperactor::Endpoint as _;
30use hyperactor::Handler;
31use hyperactor::Unbind;
32use hyperactor::context;
33use hyperactor_mesh::ActorMesh;
34use hyperactor_mesh::connect::Connect;
35use hyperactor_mesh::connect::accept;
36#[cfg(feature = "packaged_rsync")]
37use lazy_static::lazy_static;
38use nix::sys::signal;
39use nix::sys::signal::Signal;
40use nix::unistd::Pid;
41use serde::Deserialize;
42use serde::Serialize;
43use tempfile::TempDir;
44#[cfg(feature = "packaged_rsync")]
45use tempfile::TempPath;
46use tokio::fs;
47use tokio::net::TcpListener;
48use tokio::net::TcpStream;
49use tokio::process::Child;
50use tokio::process::Command;
51#[cfg(feature = "packaged_rsync")]
52use tokio::sync::OnceCell;
53use tracing::warn;
54use typeuri::Named;
55
56use crate::code_sync::WorkspaceLocation;
57
58#[cfg(feature = "packaged_rsync")]
59lazy_static! {
60 static ref RSYNC_BIN_PATH: OnceCell<TempPath> = OnceCell::new();
61}
62
63async fn get_rsync_bin_path() -> Result<&'static Path> {
64 #[cfg(feature = "packaged_rsync")]
65 {
66 use std::io::Write;
67 use std::os::unix::fs::PermissionsExt;
68 Ok(RSYNC_BIN_PATH
69 .get_or_try_init(|| async {
70 tokio::task::spawn_blocking(|| {
71 let mut tmp = tempfile::NamedTempFile::with_prefix("rsync.")?;
72 let rsync_bin = include_bytes!("rsync.bin");
73 tmp.write_all(rsync_bin)?;
74 let bin_path = tmp.into_temp_path();
75 std::fs::set_permissions(&bin_path, std::fs::Permissions::from_mode(0o755))?;
76 anyhow::Ok(bin_path)
77 })
78 .await?
79 })
80 .await?)
81 }
82 #[cfg(not(feature = "packaged_rsync"))]
83 {
84 Ok(Path::new("rsync"))
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct Change {
91 pub change_type: ChangeType,
93 pub path: PathBuf,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
98pub enum ChangeMessage {
99 Deleting,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub enum ChangeAction {
105 Received,
107 LocalChange,
109 NotTransferred,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114pub enum ChangeType {
115 Message(ChangeMessage),
116 Action(ChangeAction, FileType),
117}
118
119#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
121pub enum FileType {
122 File,
124 Directory,
126 Symlink,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
132pub struct RsyncResult {
133 pub changes: Vec<Change>,
135}
136wirevalue::register_type!(RsyncResult);
137
138impl RsyncResult {
139 pub fn empty() -> Self {
141 Self {
142 changes: Vec::new(),
143 }
144 }
145
146 fn parse_from_output(stdout: &str) -> Result<Self> {
149 let mut changes = Vec::new();
150
151 for line in stdout.lines() {
158 let line = line.trim();
159 let (raw_changes, path) = line.split_at(11);
160 let raw_changes = raw_changes.trim();
161 let path = &path[1..]; let mut iter = raw_changes.chars();
164 let change_type = match iter.next().context("missing change type")? {
165 '*' => ChangeType::Message(match iter.as_str() {
166 "deleting" => ChangeMessage::Deleting,
167 _ => bail!("unexpected change message: {}", raw_changes),
168 }),
169 c => {
170 let atype = match c {
171 '.' => ChangeAction::NotTransferred,
172 '>' => ChangeAction::Received,
173 'c' => ChangeAction::LocalChange,
174 _ => bail!("unexpected change type: {}", raw_changes),
175 };
176 let file_type = match iter.next().context("missing file type")? {
177 'f' => FileType::File,
178 'd' => FileType::Directory,
179 'L' => FileType::Symlink,
180 _ => bail!("unexpected file type: {}", raw_changes),
181 };
182 ChangeType::Action(atype, file_type)
183 }
184 };
185
186 changes.push(Change {
187 change_type,
188 path: PathBuf::from(path),
189 });
190 }
191
192 Ok(Self { changes })
193 }
194}
195
196pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<RsyncResult> {
197 fs::create_dir_all(workspace).await?;
200
201 let rsync_bin_path = get_rsync_bin_path().await?;
202 let output = Command::new(rsync_bin_path)
203 .arg("--archive")
204 .arg("--delete")
205 .arg("--itemize-changes")
207 .arg("--delete-after")
211 .arg("--delay-updates")
212 .arg("--exclude=.rsync-tmp.*")
213 .arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
214 .arg(format!("rsync://{}/workspace", addr))
215 .arg(format!("{}/", workspace.display()))
216 .stdout(Stdio::piped())
217 .stderr(Stdio::piped())
218 .output()
219 .await
220 .map_err(|e| {
221 if e.kind() == std::io::ErrorKind::NotFound {
222 anyhow::anyhow!(
223 "rsync binary: '{}' does not exist. Please ensure 'rsync' is installed and available in PATH.",
224 rsync_bin_path.display()
225 )
226 } else {
227 anyhow::anyhow!("failed to execute rsync: {}", e)
228 }
229 })?;
230
231 output
232 .status
233 .exit_ok()
234 .with_context(|| format!("rsync failed: {}", String::from_utf8_lossy(&output.stderr)))?;
235
236 RsyncResult::parse_from_output(&String::from_utf8(output.stdout)?)
237}
238
239#[derive(Debug)]
240pub struct RsyncDaemon {
241 child: Child,
242 state: TempDir,
243 addr: SocketAddr,
244}
245
246impl RsyncDaemon {
247 pub async fn spawn(listener: TcpListener, workspace: &Path) -> Result<Self> {
248 let state = TempDir::with_prefix("rsyncd.")?;
249
250 let content = format!(
254 r#"\
255[workspace]
256 path = {workspace}
257 use chroot = no
258 list = no
259 read only = true
260 write only = false
261 uid = {uid}
262 hosts allow = localhost ip6-localhost
263"#,
264 workspace = workspace.display(),
265 uid = nix::unistd::getuid().as_raw(),
266 );
267 let config = state.path().join("rsync.config");
268 fs::write(&config, content).await?;
269
270 let addr = listener.local_addr()?;
277 std::mem::drop(listener);
278
279 let mut child = Command::new(get_rsync_bin_path().await?)
281 .arg("--daemon")
282 .arg("--no-detach")
283 .arg(format!("--address={}", addr.ip()))
284 .arg(format!("--port={}", addr.port()))
285 .arg(format!("--config={}", config.display()))
286 .arg(format!("--log-file={}/log", state.path().display()))
287 .kill_on_drop(true)
288 .spawn()?;
289
290 tokio::select! {
294 res = child.wait() => bail!("unexpected early exit: {:?}", res),
295 res = async {
296 loop {
297 match TcpStream::connect(addr).await {
298 Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
299 tokio::time::sleep(Duration::from_millis(1)).await
300 }
301 Err(err) => return Err(err.into()),
302 Ok(_) => break,
303 }
304 }
305 anyhow::Ok(())
306 } => res?,
307 }
308
309 Ok(Self { child, state, addr })
310 }
311
312 pub fn addr(&self) -> &SocketAddr {
313 &self.addr
314 }
315
316 pub async fn shutdown(mut self) -> Result<String> {
317 let logs = fs::read_to_string(self.state.path().join("log")).await;
318 let id = self.child.id().context("missing pid")?;
319 let pid = Pid::from_raw(id as i32);
320 signal::kill(pid, Signal::SIGINT)?;
321 let status = self.child.wait().await?;
322 ensure!(status.code() == Some(20));
324 Ok(logs?)
325 }
326}
327
328#[derive(Debug, Clone, Named, Serialize, Deserialize, Bind, Unbind)]
329pub struct RsyncMessage {
330 pub connect: reference::PortRef<Connect>,
332 pub result: reference::PortRef<Result<RsyncResult, String>>,
334 pub workspace: WorkspaceLocation,
336}
337wirevalue::register_type!(RsyncMessage);
338
339#[derive(Debug, Default)]
340#[hyperactor::export(handlers = [RsyncMessage { cast = true }])]
341#[hyperactor::spawnable]
342pub struct RsyncActor {
343 }
345
346impl Actor for RsyncActor {}
347
348#[async_trait]
349impl Handler<RsyncMessage> for RsyncActor {
350 async fn handle(
351 &mut self,
352 cx: &hyperactor::Context<Self>,
353 RsyncMessage {
354 workspace,
355 connect,
356 result,
357 }: RsyncMessage,
358 ) -> Result<(), anyhow::Error> {
359 let res = async {
360 let workspace = workspace
361 .resolve()
362 .context("resolving workspace location")?;
363 let (connect_msg, completer) = Connect::allocate(cx.self_addr().clone(), cx);
364 connect.post(cx, connect_msg);
365
366 let ipv6_lo: SocketAddr = "[::1]:0".parse()?;
368 let ipv4_lo: SocketAddr = "127.0.0.1:0".parse()?;
369 let addrs: [SocketAddr; 2] = [ipv6_lo, ipv4_lo];
370
371 let (listener, mut stream) = try_join!(
372 TcpListener::bind(&addrs[..]).err_into(),
373 completer.complete(),
374 )?;
375 let addr = listener.local_addr()?;
376 let (rsync_result, _) = try_join!(do_rsync(&addr, &workspace), async move {
377 let (mut local, _) = listener.accept().await?;
378 tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
379 anyhow::Ok(())
380 },)?;
381 anyhow::Ok(rsync_result)
382 }
383 .await;
384 result.post(cx, res.map_err(|e| format!("{:#?}", e)));
385 Ok(())
386 }
387}
388
389pub async fn rsync_mesh<C: context::Actor + Copy + Unpin>(
390 cx: C,
391 actor_mesh: &ActorMesh<RsyncActor>,
392 local_workspace: PathBuf,
393 remote_workspace: WorkspaceLocation,
394) -> Result<Vec<RsyncResult>> {
395 use ndslice::View;
396
397 let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &local_workspace).await?;
399 let daemon_addr = daemon.addr();
400
401 let (rsync_conns_tx, rsync_conns_rx) = cx.mailbox().open_port::<Connect>();
402 let num_actors = actor_mesh.region().num_ranks();
403
404 let res = try_join!(
405 rsync_conns_rx
406 .take(num_actors)
407 .err_into::<anyhow::Error>()
408 .try_for_each_concurrent(None, |connect| async move {
409 let (mut local, mut stream) = try_join!(
410 TcpStream::connect(*daemon_addr).err_into(),
411 accept(cx, cx.instance().self_addr().clone(), connect),
412 )?;
413 tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
414 anyhow::Ok(())
415 })
416 .boxed(),
417 async move {
418 let (result_tx, result_rx) = cx.mailbox().open_port::<Result<RsyncResult, String>>();
419 actor_mesh.cast(
420 &cx,
421 RsyncMessage {
422 connect: rsync_conns_tx.bind(),
423 result: result_tx.bind(),
424 workspace: remote_workspace,
425 },
426 )?;
427 let res: Vec<RsyncResult> = result_rx
428 .take(num_actors)
429 .map(|res| res?.map_err(anyhow::Error::msg))
430 .try_collect()
431 .await?;
432 anyhow::Ok(res)
433 },
434 );
435
436 let logs = daemon.shutdown().await;
438
439 match res {
441 Ok(((), results)) => {
442 let _ = logs?;
443 Ok(results)
444 }
445 Err(err) => match logs {
446 Ok(logs) => Err(err).with_context(|| format!("rsync server logs: {}", logs)),
447 Err(shutdown_err) => {
448 warn!("failed to read logs from rsync daemon: {:?}", shutdown_err);
449 Err(err)
450 }
451 },
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use anyhow::Result;
458 use anyhow::anyhow;
459 use hyperactor_mesh::ActorMesh;
460 use hyperactor_mesh::context;
461 use hyperactor_mesh::test_utils;
462 use tempfile::TempDir;
463 use tokio::fs;
464 use tokio::net::TcpListener;
465
466 use super::*;
467
468 #[tokio::test]
469 #[cfg_attr(not(fbcode_build), ignore)]
471 async fn test_simple() -> Result<()> {
472 let input = TempDir::new()?;
473 fs::write(input.path().join("foo.txt"), "hello world").await?;
474
475 let output = TempDir::new()?;
476
477 let server = TcpListener::bind(("::", 0)).await?;
478 let daemon = RsyncDaemon::spawn(server, output.path()).await?;
479 do_rsync(daemon.addr(), input.path()).await?;
480 daemon.shutdown().await?;
481
482 assert!(!dir_diff::is_different(&input, &output).map_err(|e| anyhow!("{:?}", e))?);
483
484 Ok(())
485 }
486
487 #[tokio::test]
488 #[cfg_attr(not(fbcode_build), ignore)]
490 async fn test_rsync_actor_and_mesh() -> Result<()> {
491 let source_workspace = TempDir::new()?;
493 fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
494 fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
495 fs::create_dir(source_workspace.path().join("subdir")).await?;
496 fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;
497
498 let target_workspace = TempDir::new()?;
500 fs::create_dir(target_workspace.path().join("subdir5")).await?;
501 fs::write(target_workspace.path().join("foo.txt"), "something").await?;
502
503 let cx = context().await;
505 let instance = cx.actor_instance;
506 let mut host_mesh = test_utils::local_host_mesh(1).await;
507 let proc_mesh = host_mesh
508 .spawn(instance, "rsync_test", ndslice::Extent::unity(), None, None)
509 .await
510 .unwrap();
511 let actor_mesh: ActorMesh<RsyncActor> =
513 proc_mesh.spawn(instance, "rsync_test", &()).await?;
514
515 let results = rsync_mesh(
517 instance,
518 &actor_mesh,
519 source_workspace.path().to_path_buf(),
520 WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
521 )
522 .await?;
523
524 assert_eq!(results.len(), 1); let rsync_result = &results[0];
528
529 println!("Rsync result: {:#?}", rsync_result);
533
534 assert!(
536 !dir_diff::is_different(&source_workspace, &target_workspace)
537 .map_err(|e| anyhow!("{:?}", e))?
538 );
539
540 let _ = host_mesh.shutdown(instance).await;
541 Ok(())
542 }
543
544 #[tokio::test]
545 async fn test_rsync_result_parsing() -> Result<()> {
546 let stdout = r#">f+++++++++ test1.txt
548>f+++++++++ test2.txt
549cd+++++++++ subdir/
550>f+++++++++ subdir/test3.txt
551*deleting old_file.txt
552"#;
553
554 let result = RsyncResult::parse_from_output(stdout)?;
555
556 let expected_changes = vec![
558 Change {
559 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
560 path: PathBuf::from("test1.txt"),
561 },
562 Change {
563 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
564 path: PathBuf::from("test2.txt"),
565 },
566 Change {
567 change_type: ChangeType::Action(ChangeAction::LocalChange, FileType::Directory),
568 path: PathBuf::from("subdir/"),
569 },
570 Change {
571 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
572 path: PathBuf::from("subdir/test3.txt"),
573 },
574 Change {
575 change_type: ChangeType::Message(ChangeMessage::Deleting),
576 path: PathBuf::from("old_file.txt"),
577 },
578 ];
579
580 assert_eq!(result.changes.len(), expected_changes.len());
582
583 for (actual, expected) in result.changes.iter().zip(expected_changes.iter()) {
585 assert_eq!(
586 actual, expected,
587 "Change mismatch: actual={:?}, expected={:?}",
588 actual, expected
589 );
590 }
591
592 assert_eq!(result.changes, expected_changes);
594
595 Ok(())
596 }
597}