1use std::io::ErrorKind;
10use std::net::SocketAddr;
11use std::path::Path;
12use std::path::PathBuf;
13use std::process::Stdio;
14use std::time::Duration;
15
16use anyhow::Context;
17use anyhow::Result;
18use anyhow::bail;
19use anyhow::ensure;
20use async_trait::async_trait;
21use futures::FutureExt;
22use futures::StreamExt;
23use futures::TryFutureExt;
24use futures::TryStreamExt;
25use futures::try_join;
26use hyperactor::Actor;
27use hyperactor::Bind;
28use hyperactor::Handler;
29use hyperactor::Unbind;
30use hyperactor::context;
31use hyperactor::reference;
32use hyperactor_mesh::ActorMesh;
33use hyperactor_mesh::connect::Connect;
34use hyperactor_mesh::connect::accept;
35#[cfg(feature = "packaged_rsync")]
36use lazy_static::lazy_static;
37use nix::sys::signal;
38use nix::sys::signal::Signal;
39use nix::unistd::Pid;
40use serde::Deserialize;
41use serde::Serialize;
42use tempfile::TempDir;
43#[cfg(feature = "packaged_rsync")]
44use tempfile::TempPath;
45use tokio::fs;
46use tokio::net::TcpListener;
47use tokio::net::TcpStream;
48use tokio::process::Child;
49use tokio::process::Command;
50#[cfg(feature = "packaged_rsync")]
51use tokio::sync::OnceCell;
52use tracing::warn;
53use typeuri::Named;
54
55use crate::code_sync::WorkspaceLocation;
56
57#[cfg(feature = "packaged_rsync")]
58lazy_static! {
59 static ref RSYNC_BIN_PATH: OnceCell<TempPath> = OnceCell::new();
60}
61
62async fn get_rsync_bin_path() -> Result<&'static Path> {
63 #[cfg(feature = "packaged_rsync")]
64 {
65 use std::io::Write;
66 use std::os::unix::fs::PermissionsExt;
67 Ok(RSYNC_BIN_PATH
68 .get_or_try_init(|| async {
69 tokio::task::spawn_blocking(|| {
70 let mut tmp = tempfile::NamedTempFile::with_prefix("rsync.")?;
71 let rsync_bin = include_bytes!("rsync.bin");
72 tmp.write_all(rsync_bin)?;
73 let bin_path = tmp.into_temp_path();
74 std::fs::set_permissions(&bin_path, std::fs::Permissions::from_mode(0o755))?;
75 anyhow::Ok(bin_path)
76 })
77 .await?
78 })
79 .await?)
80 }
81 #[cfg(not(feature = "packaged_rsync"))]
82 {
83 Ok(Path::new("rsync"))
84 }
85}
86
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89pub struct Change {
90 pub change_type: ChangeType,
92 pub path: PathBuf,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub enum ChangeMessage {
98 Deleting,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
103pub enum ChangeAction {
104 Received,
106 LocalChange,
108 NotTransferred,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub enum ChangeType {
114 Message(ChangeMessage),
115 Action(ChangeAction, FileType),
116}
117
118#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
120pub enum FileType {
121 File,
123 Directory,
125 Symlink,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named)]
131pub struct RsyncResult {
132 pub changes: Vec<Change>,
134}
135wirevalue::register_type!(RsyncResult);
136
137impl RsyncResult {
138 pub fn empty() -> Self {
140 Self {
141 changes: Vec::new(),
142 }
143 }
144
145 fn parse_from_output(stdout: &str) -> Result<Self> {
148 let mut changes = Vec::new();
149
150 for line in stdout.lines() {
157 let line = line.trim();
158 let (raw_changes, path) = line.split_at(11);
159 let raw_changes = raw_changes.trim();
160 let path = &path[1..]; let mut iter = raw_changes.chars();
163 let change_type = match iter.next().context("missing change type")? {
164 '*' => ChangeType::Message(match iter.as_str() {
165 "deleting" => ChangeMessage::Deleting,
166 _ => bail!("unexpected change message: {}", raw_changes),
167 }),
168 c => {
169 let atype = match c {
170 '.' => ChangeAction::NotTransferred,
171 '>' => ChangeAction::Received,
172 'c' => ChangeAction::LocalChange,
173 _ => bail!("unexpected change type: {}", raw_changes),
174 };
175 let file_type = match iter.next().context("missing file type")? {
176 'f' => FileType::File,
177 'd' => FileType::Directory,
178 'L' => FileType::Symlink,
179 _ => bail!("unexpected file type: {}", raw_changes),
180 };
181 ChangeType::Action(atype, file_type)
182 }
183 };
184
185 changes.push(Change {
186 change_type,
187 path: PathBuf::from(path),
188 });
189 }
190
191 Ok(Self { changes })
192 }
193}
194
195pub async fn do_rsync(addr: &SocketAddr, workspace: &Path) -> Result<RsyncResult> {
196 fs::create_dir_all(workspace).await?;
199
200 let rsync_bin_path = get_rsync_bin_path().await?;
201 let output = Command::new(rsync_bin_path)
202 .arg("--archive")
203 .arg("--delete")
204 .arg("--itemize-changes")
206 .arg("--delete-after")
210 .arg("--delay-updates")
211 .arg("--exclude=.rsync-tmp.*")
212 .arg(format!("--partial-dir=.rsync-tmp.{}", addr.port()))
213 .arg(format!("rsync://{}/workspace", addr))
214 .arg(format!("{}/", workspace.display()))
215 .stdout(Stdio::piped())
216 .stderr(Stdio::piped())
217 .output()
218 .await
219 .map_err(|e| {
220 if e.kind() == std::io::ErrorKind::NotFound {
221 anyhow::anyhow!(
222 "rsync binary: '{}' does not exist. Please ensure 'rsync' is installed and available in PATH.",
223 rsync_bin_path.display()
224 )
225 } else {
226 anyhow::anyhow!("failed to execute rsync: {}", e)
227 }
228 })?;
229
230 output
231 .status
232 .exit_ok()
233 .with_context(|| format!("rsync failed: {}", String::from_utf8_lossy(&output.stderr)))?;
234
235 RsyncResult::parse_from_output(&String::from_utf8(output.stdout)?)
236}
237
238#[derive(Debug)]
239pub struct RsyncDaemon {
240 child: Child,
241 state: TempDir,
242 addr: SocketAddr,
243}
244
245impl RsyncDaemon {
246 pub async fn spawn(listener: TcpListener, workspace: &Path) -> Result<Self> {
247 let state = TempDir::with_prefix("rsyncd.")?;
248
249 let content = format!(
253 r#"\
254[workspace]
255 path = {workspace}
256 use chroot = no
257 list = no
258 read only = true
259 write only = false
260 uid = {uid}
261 hosts allow = localhost
262"#,
263 workspace = workspace.display(),
264 uid = nix::unistd::getuid().as_raw(),
265 );
266 let config = state.path().join("rsync.config");
267 fs::write(&config, content).await?;
268
269 let addr = listener.local_addr()?;
276 std::mem::drop(listener);
277
278 let mut child = Command::new(get_rsync_bin_path().await?)
280 .arg("--daemon")
281 .arg("--no-detach")
282 .arg(format!("--address={}", addr.ip()))
283 .arg(format!("--port={}", addr.port()))
284 .arg(format!("--config={}", config.display()))
285 .arg(format!("--log-file={}/log", state.path().display()))
286 .kill_on_drop(true)
287 .spawn()?;
288
289 tokio::select! {
293 res = child.wait() => bail!("unexpected early exit: {:?}", res),
294 res = async {
295 loop {
296 match TcpStream::connect(addr).await {
297 Err(err) if err.kind() == ErrorKind::ConnectionRefused => {
298 tokio::time::sleep(Duration::from_millis(1)).await
299 }
300 Err(err) => return Err(err.into()),
301 Ok(_) => break,
302 }
303 }
304 anyhow::Ok(())
305 } => res?,
306 }
307
308 Ok(Self { child, state, addr })
309 }
310
311 pub fn addr(&self) -> &SocketAddr {
312 &self.addr
313 }
314
315 pub async fn shutdown(mut self) -> Result<String> {
316 let logs = fs::read_to_string(self.state.path().join("log")).await;
317 let id = self.child.id().context("missing pid")?;
318 let pid = Pid::from_raw(id as i32);
319 signal::kill(pid, Signal::SIGINT)?;
320 let status = self.child.wait().await?;
321 ensure!(status.code() == Some(20));
323 Ok(logs?)
324 }
325}
326
327#[derive(Debug, Clone, Named, Serialize, Deserialize, Bind, Unbind)]
328pub struct RsyncMessage {
329 pub connect: reference::PortRef<Connect>,
331 pub result: reference::PortRef<Result<RsyncResult, String>>,
333 pub workspace: WorkspaceLocation,
335}
336wirevalue::register_type!(RsyncMessage);
337
338#[derive(Debug, Default)]
339#[hyperactor::export(spawn = true, handlers = [RsyncMessage { cast = true }])]
340pub struct RsyncActor {
341 }
343
344impl Actor for RsyncActor {}
345
346#[async_trait]
347impl Handler<RsyncMessage> for RsyncActor {
348 async fn handle(
349 &mut self,
350 cx: &hyperactor::Context<Self>,
351 RsyncMessage {
352 workspace,
353 connect,
354 result,
355 }: RsyncMessage,
356 ) -> Result<(), anyhow::Error> {
357 let res = async {
358 let workspace = workspace
359 .resolve()
360 .context("resolving workspace location")?;
361 let (connect_msg, completer) = Connect::allocate(cx.self_id().clone(), cx);
362 connect.send(cx, connect_msg)?;
363
364 let ipv6_lo: SocketAddr = "[::1]:0".parse()?;
366 let ipv4_lo: SocketAddr = "127.0.0.1:0".parse()?;
367 let addrs: [SocketAddr; 2] = [ipv6_lo, ipv4_lo];
368
369 let (listener, mut stream) = try_join!(
370 TcpListener::bind(&addrs[..]).err_into(),
371 completer.complete(),
372 )?;
373 let addr = listener.local_addr()?;
374 let (rsync_result, _) = try_join!(do_rsync(&addr, &workspace), async move {
375 let (mut local, _) = listener.accept().await?;
376 tokio::io::copy_bidirectional(&mut stream, &mut local).await?;
377 anyhow::Ok(())
378 },)?;
379 anyhow::Ok(rsync_result)
380 }
381 .await;
382 result.send(cx, res.map_err(|e| format!("{:#?}", e)))?;
383 Ok(())
384 }
385}
386
387pub async fn rsync_mesh<C: context::Actor + Copy + Unpin>(
388 cx: C,
389 actor_mesh: &ActorMesh<RsyncActor>,
390 local_workspace: PathBuf,
391 remote_workspace: WorkspaceLocation,
392) -> Result<Vec<RsyncResult>> {
393 use ndslice::View;
394
395 let daemon = RsyncDaemon::spawn(TcpListener::bind(("::1", 0)).await?, &local_workspace).await?;
397 let daemon_addr = daemon.addr();
398
399 let (rsync_conns_tx, rsync_conns_rx) = cx.mailbox().open_port::<Connect>();
400 let num_actors = actor_mesh.region().num_ranks();
401
402 let res = try_join!(
403 rsync_conns_rx
404 .take(num_actors)
405 .err_into::<anyhow::Error>()
406 .try_for_each_concurrent(None, |connect| async move {
407 let (mut local, mut stream) = try_join!(
408 TcpStream::connect(daemon_addr.clone()).err_into(),
409 accept(cx, cx.instance().self_id().clone(), connect),
410 )?;
411 tokio::io::copy_bidirectional(&mut local, &mut stream).await?;
412 anyhow::Ok(())
413 })
414 .boxed(),
415 async move {
416 let (result_tx, result_rx) = cx.mailbox().open_port::<Result<RsyncResult, String>>();
417 actor_mesh.cast(
418 &cx,
419 RsyncMessage {
420 connect: rsync_conns_tx.bind(),
421 result: result_tx.bind(),
422 workspace: remote_workspace,
423 },
424 )?;
425 let res: Vec<RsyncResult> = result_rx
426 .take(num_actors)
427 .map(|res| res?.map_err(anyhow::Error::msg))
428 .try_collect()
429 .await?;
430 anyhow::Ok(res)
431 },
432 );
433
434 let logs = daemon.shutdown().await;
436
437 match res {
439 Ok(((), results)) => {
440 let _ = logs?;
441 Ok(results)
442 }
443 Err(err) => match logs {
444 Ok(logs) => Err(err).with_context(|| format!("rsync server logs: {}", logs)),
445 Err(shutdown_err) => {
446 warn!("failed to read logs from rsync daemon: {:?}", shutdown_err);
447 Err(err)
448 }
449 },
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use anyhow::Result;
456 use anyhow::anyhow;
457 use hyperactor_mesh::ActorMesh;
458 use hyperactor_mesh::context;
459 use hyperactor_mesh::test_utils;
460 use tempfile::TempDir;
461 use tokio::fs;
462 use tokio::net::TcpListener;
463
464 use super::*;
465
466 #[tokio::test]
467 #[cfg_attr(not(fbcode_build), ignore)]
469 async fn test_simple() -> Result<()> {
470 let input = TempDir::new()?;
471 fs::write(input.path().join("foo.txt"), "hello world").await?;
472
473 let output = TempDir::new()?;
474
475 let server = TcpListener::bind(("::", 0)).await?;
476 let daemon = RsyncDaemon::spawn(server, output.path()).await?;
477 do_rsync(daemon.addr(), input.path()).await?;
478 daemon.shutdown().await?;
479
480 assert!(!dir_diff::is_different(&input, &output).map_err(|e| anyhow!("{:?}", e))?);
481
482 Ok(())
483 }
484
485 #[tokio::test]
486 #[cfg_attr(not(fbcode_build), ignore)]
488 async fn test_rsync_actor_and_mesh() -> Result<()> {
489 let source_workspace = TempDir::new()?;
491 fs::write(source_workspace.path().join("test1.txt"), "content1").await?;
492 fs::write(source_workspace.path().join("test2.txt"), "content2").await?;
493 fs::create_dir(source_workspace.path().join("subdir")).await?;
494 fs::write(source_workspace.path().join("subdir/test3.txt"), "content3").await?;
495
496 let target_workspace = TempDir::new()?;
498 fs::create_dir(target_workspace.path().join("subdir5")).await?;
499 fs::write(target_workspace.path().join("foo.txt"), "something").await?;
500
501 let cx = context().await;
503 let instance = cx.actor_instance;
504 let mut host_mesh = test_utils::local_host_mesh(1).await;
505 let proc_mesh = host_mesh
506 .spawn(instance, "rsync_test", ndslice::Extent::unity())
507 .await
508 .unwrap();
509 let actor_mesh: ActorMesh<RsyncActor> =
511 proc_mesh.spawn(instance, "rsync_test", &()).await?;
512
513 let results = rsync_mesh(
515 instance,
516 &actor_mesh,
517 source_workspace.path().to_path_buf(),
518 WorkspaceLocation::Constant(target_workspace.path().to_path_buf()),
519 )
520 .await?;
521
522 assert_eq!(results.len(), 1); let rsync_result = &results[0];
526
527 println!("Rsync result: {:#?}", rsync_result);
531
532 assert!(
534 !dir_diff::is_different(&source_workspace, &target_workspace)
535 .map_err(|e| anyhow!("{:?}", e))?
536 );
537
538 let _ = host_mesh.shutdown(instance).await;
539 Ok(())
540 }
541
542 #[tokio::test]
543 async fn test_rsync_result_parsing() -> Result<()> {
544 let stdout = r#">f+++++++++ test1.txt
546>f+++++++++ test2.txt
547cd+++++++++ subdir/
548>f+++++++++ subdir/test3.txt
549*deleting old_file.txt
550"#;
551
552 let result = RsyncResult::parse_from_output(stdout)?;
553
554 let expected_changes = vec![
556 Change {
557 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
558 path: PathBuf::from("test1.txt"),
559 },
560 Change {
561 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
562 path: PathBuf::from("test2.txt"),
563 },
564 Change {
565 change_type: ChangeType::Action(ChangeAction::LocalChange, FileType::Directory),
566 path: PathBuf::from("subdir/"),
567 },
568 Change {
569 change_type: ChangeType::Action(ChangeAction::Received, FileType::File),
570 path: PathBuf::from("subdir/test3.txt"),
571 },
572 Change {
573 change_type: ChangeType::Message(ChangeMessage::Deleting),
574 path: PathBuf::from("old_file.txt"),
575 },
576 ];
577
578 assert_eq!(result.changes.len(), expected_changes.len());
580
581 for (actual, expected) in result.changes.iter().zip(expected_changes.iter()) {
583 assert_eq!(
584 actual, expected,
585 "Change mismatch: actual={:?}, expected={:?}",
586 actual, expected
587 );
588 }
589
590 assert_eq!(result.changes, expected_changes);
592
593 Ok(())
594 }
595}