1use std::cmp::Ordering;
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::collections::HashMap;
13use std::io::ErrorKind;
14use std::os::unix::fs::MetadataExt;
15use std::os::unix::fs::PermissionsExt;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::mpsc::channel;
20use std::time::Duration;
21use std::time::SystemTime;
22use std::time::UNIX_EPOCH;
23
24use anyhow::Context;
25use anyhow::Result;
26use anyhow::bail;
27use anyhow::ensure;
28use async_tempfile::TempFile;
29use dashmap::DashMap;
30use dashmap::mapref::entry::Entry;
31use filetime::FileTime;
32use futures::SinkExt;
33use futures::StreamExt;
34use futures::try_join;
35use globset::Glob;
36use globset::GlobSet;
37use globset::GlobSetBuilder;
38use ignore::DirEntry;
39use ignore::WalkBuilder;
40use ignore::WalkState;
41use itertools::Itertools;
42use memmap2::MmapMut;
43use serde::Deserialize;
44use serde::Serialize;
45use tokio::fs;
46use tokio::io::AsyncRead;
47use tokio::io::AsyncReadExt;
48use tokio::io::AsyncWrite;
49use tokio::io::AsyncWriteExt;
50use tokio_util::codec::FramedRead;
51use tokio_util::codec::FramedWrite;
52use tokio_util::codec::LengthDelimitedCodec;
53
54use crate::diff::CondaFingerprint;
55use crate::replace::ReplacerBuilder;
56
57#[derive(Eq, PartialEq)]
58enum Origin {
59 Src,
60 Dst,
61}
62
63#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
64enum FileTypeInfo {
65 Directory,
66 File(bool),
67 Symlink,
68}
69
70impl FileTypeInfo {
71 fn same(&self, other: &FileTypeInfo) -> bool {
72 match (self, other) {
73 (FileTypeInfo::Directory, FileTypeInfo::Directory) => true,
74 (FileTypeInfo::File(_), FileTypeInfo::File(_)) => true,
75 (FileTypeInfo::Symlink, FileTypeInfo::Symlink) => true,
76 _ => false,
77 }
78 }
79}
80
81#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
82struct Metadata {
83 mtime: SystemTime,
84 ftype: FileTypeInfo,
85}
86
87#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
88pub enum Receive {
89 File { executable: bool },
90 Symlink,
91}
92
93#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
94pub enum Action {
95 Delete { directory: bool },
97 Directory,
99 Receive(SystemTime, Receive),
101}
102
103#[derive(Debug, Serialize, Deserialize)]
104struct FileSectionHeader {
105 num: usize,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
109struct FileHeader {
110 path: PathBuf,
111 symlink: bool,
112}
113
114#[derive(Debug, Serialize, Deserialize)]
115enum FileContents {
116 Symlink(PathBuf),
117 File(u64),
118}
119
120#[derive(Debug, Serialize, Deserialize)]
121struct FileContentsHeader {
122 path: PathBuf,
123 contents: FileContents,
124}
125
126#[derive(Debug, Serialize, Deserialize)]
127enum FileList {
128 Entry(PathBuf, Metadata),
129 Done,
130}
131
132struct ActionsBuilder {
133 ignores: Option<GlobSet>,
134 state: DashMap<PathBuf, (Origin, Metadata)>,
135 actions: DashMap<PathBuf, Action>,
136 mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
137}
138
139impl ActionsBuilder {
140 fn new_with(
141 ignores: Option<GlobSet>,
142 mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
143 ) -> Self {
144 Self {
145 ignores,
146 state: DashMap::new(),
147 actions: DashMap::new(),
148 mtime_comparator,
149 }
150 }
151
152 fn process(&self, origin: Origin, path: PathBuf, metadata: Metadata) -> Result<()> {
153 match self.state.entry(path) {
154 Entry::Occupied(val) => {
155 let (path, (existing_origin, existing_metadata)) = val.remove_entry();
156 if let Some(ignores) = &self.ignores {
157 if ignores.is_match(path.as_path()) {
158 return Ok(());
159 }
160 }
161 ensure!(existing_origin != origin);
162 let (src, dst) = match origin {
163 Origin::Dst => (existing_metadata, metadata),
164 Origin::Src => (metadata, existing_metadata),
165 };
166 if src.ftype == FileTypeInfo::Directory && dst.ftype == FileTypeInfo::Directory {
167 } else {
169 match (self.mtime_comparator)(&src.mtime, &dst.mtime) {
170 Ordering::Equal => {
171 ensure!(
172 src.ftype.same(&dst.ftype),
173 "{}: {:?} != {:?}",
174 path.display(),
175 dst,
176 src
177 );
178 }
179 Ordering::Greater | Ordering::Less => {
180 self.actions.insert(
181 path,
182 match src.ftype {
183 FileTypeInfo::File(executable) => {
184 Action::Receive(src.mtime, Receive::File { executable })
185 }
186 FileTypeInfo::Symlink => {
187 Action::Receive(src.mtime, Receive::Symlink)
188 }
189 FileTypeInfo::Directory => Action::Directory,
190 },
191 );
192 }
193 }
194 }
195 }
196 Entry::Vacant(entry) => {
197 entry.insert((origin, metadata));
198 }
199 }
200 Ok(())
201 }
202
203 fn process_src(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
204 self.process(Origin::Src, path, metadata)
205 }
206
207 fn process_dst(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
208 self.process(Origin::Dst, path, metadata)
209 }
210
211 fn into_actions(self) -> HashMap<PathBuf, Action> {
212 let mut actions: HashMap<_, _> = self.actions.into_iter().collect();
213 for (path, (origin, metadata)) in self.state.into_iter() {
214 match origin {
215 Origin::Src => {
216 if let Some(ignores) = &self.ignores {
217 if ignores.is_match(path.as_path()) {
218 continue;
219 }
220 }
221 actions.insert(
222 path,
223 match metadata.ftype {
224 FileTypeInfo::File(executable) => {
225 Action::Receive(metadata.mtime, Receive::File { executable })
226 }
227 FileTypeInfo::Directory => Action::Directory,
228 FileTypeInfo::Symlink => {
229 Action::Receive(metadata.mtime, Receive::Symlink)
230 }
231 },
232 );
233 }
234 Origin::Dst => {
235 actions.insert(
236 path,
237 Action::Delete {
238 directory: matches!(metadata.ftype, FileTypeInfo::Directory),
239 },
240 );
241 }
242 }
243 }
244 actions
245 }
246}
247
248fn walk_dir<
249 E: Into<anyhow::Error>,
250 F: Fn(PathBuf, Metadata) -> Result<(), E> + Sync + Send + 'static,
251>(
252 src: PathBuf,
253 callback: F,
254) -> Result<()> {
255 let (error_tx, error_rx) = channel();
256
257 let src_handle = src.clone();
258 let handle_ent = move |entry: DirEntry| -> Result<()> {
259 let metadata = entry.metadata()?;
260 callback(
261 entry
262 .path()
263 .strip_prefix(src_handle.clone())
264 .context("sub path")?
265 .to_path_buf(),
266 Metadata {
267 mtime: UNIX_EPOCH
268 + Duration::new(
269 metadata.mtime().try_into()?,
270 metadata.mtime_nsec().try_into()?,
271 ),
272 ftype: if metadata.file_type().is_file() {
273 let mode = metadata.permissions().mode();
274 FileTypeInfo::File(mode & 0o100 != 0)
275 } else if metadata.file_type().is_dir() {
276 FileTypeInfo::Directory
277 } else if metadata.file_type().is_symlink() {
278 FileTypeInfo::Symlink
279 } else {
280 bail!("unexpected file type")
281 },
282 },
283 )
284 .map_err(Into::into)?;
285 Ok(())
286 };
287
288 WalkBuilder::new(src)
289 .standard_filters(true)
290 .same_file_system(true)
291 .build_parallel()
292 .run(|| {
293 Box::new(|ent| match ent.map_err(Into::into).and_then(&handle_ent) {
294 Ok(()) => WalkState::Continue,
295 Err(err) => {
296 error_tx.clone().send(err).unwrap();
297 WalkState::Quit
298 }
299 })
300 });
301
302 match error_rx.try_recv() {
303 Ok(err) => Err(err),
304 _ => Ok(()),
305 }
306}
307
308pub async fn sender(
309 src: &Path,
310 from_receiver: impl AsyncRead + Unpin,
311 to_receiver: impl AsyncWrite + Unpin,
312) -> Result<()> {
313 let mut to_receiver = FramedWrite::new(to_receiver, LengthDelimitedCodec::new());
314 let mut from_receiver = FramedRead::new(from_receiver, LengthDelimitedCodec::new());
315
316 let (ent_tx, mut ent_rx) = tokio::sync::mpsc::unbounded_channel();
317 let src_clone = src.to_path_buf();
318 try_join!(
319 async {
320 tokio::task::spawn_blocking(move || {
321 walk_dir(src_clone.clone(), move |path, ent| ent_tx.send((path, ent)))
322 })
323 .await?
324 },
325 async {
326 let src_env = CondaFingerprint::from_env(src).await?;
328 to_receiver
329 .send(bincode::serialize(&src_env)?.into())
330 .await
331 .context("sending src conda fingerprint")?;
332 to_receiver.flush().await?;
333
334 while let Some((path, metadata)) = ent_rx.recv().await {
336 to_receiver
337 .send(bincode::serialize(&FileList::Entry(path, metadata))?.into())
338 .await
339 .context("sending file ent")?;
340 }
341 to_receiver
342 .send(bincode::serialize(&FileList::Done)?.into())
343 .await
344 .context("sending file list end")?;
345 to_receiver.flush().await?;
346
347 anyhow::Ok(())
348 },
349 )?;
350
351 to_receiver.flush().await?;
353 let mut to_receiver = to_receiver.into_inner();
354
355 let hdr: FileSectionHeader =
356 bincode::deserialize(&from_receiver.next().await.context("header")??)?;
357 for _ in 0..hdr.num {
358 let FileHeader { path, symlink } =
359 bincode::deserialize(&from_receiver.next().await.context("signature")??)?;
360 let fpath = src.join(&path);
361 if symlink {
362 let header = FileContentsHeader {
363 path,
364 contents: FileContents::Symlink(fs::read_link(&fpath).await?),
365 };
366 let header = bincode::serialize(&header)?;
367 to_receiver.write_all(&header.len().to_le_bytes()).await?;
368 to_receiver
369 .write_all(&header)
370 .await
371 .context("sending sig header")?;
372 } else {
373 let mut base = fs::File::open(src.join(&path)).await?;
374 let header = FileContentsHeader {
375 path,
376 contents: FileContents::File(base.metadata().await?.len()),
377 };
378 let header = bincode::serialize(&header)?;
379 to_receiver.write_all(&header.len().to_le_bytes()).await?;
380 to_receiver
381 .write_all(&header)
382 .await
383 .context("sending sig header")?;
384 tokio::io::copy(&mut base, &mut to_receiver).await?;
385 }
386 }
387 to_receiver.flush().await?;
388
389 Ok(())
390}
391
392async fn persist(tmp: TempFile, path: &Path) -> Result<(), std::io::Error> {
393 match fs::rename(tmp.file_path(), &path).await {
395 Err(err) if err.kind() == ErrorKind::IsADirectory => {
396 async {
397 fs::remove_dir(&path).await?;
398 fs::rename(tmp.file_path(), &path).await
399 }
400 .await
401 }
402 other => other,
403 }?;
404 tmp.drop_async().await;
405 Ok(())
406}
407
408async fn set_mtime(path: &Path, mtime: SystemTime) -> Result<(), std::io::Error> {
410 let mtime = FileTime::from_system_time(mtime);
411 filetime::set_symlink_file_times(path, mtime.clone(), mtime)?;
412 Ok(())
413}
414
415async fn make_executable(path: &Path) -> Result<(), std::io::Error> {
416 let metadata = fs::metadata(path).await?;
417 let mut permissions = metadata.permissions();
418 let mode = permissions.mode();
419 permissions.set_mode(mode | 0o111);
420 fs::set_permissions(path, permissions).await?;
421 Ok(())
422}
423
424fn is_binary(buf: &[u8]) -> bool {
425 if buf.iter().contains(&0) {
427 return true;
428 }
429
430 let non_print = buf
432 .iter()
433 .filter(|&&b| !(b == b'\n' || b == b'\r' || b == b'\t' || (0x20..=0x7E).contains(&b)))
434 .count();
435
436 non_print * 100 > buf.len() * 30
438}
439
440pub async fn receiver(
441 dst: &Path,
442 from_sender: impl AsyncRead + Unpin,
443 to_sender: impl AsyncWrite + Unpin,
444 replacement_paths: HashMap<PathBuf, PathBuf>,
445) -> Result<HashMap<PathBuf, Action>> {
446 let mut to_sender = FramedWrite::new(to_sender, LengthDelimitedCodec::new());
447 let mut from_sender = FramedRead::new(from_sender, LengthDelimitedCodec::new());
448
449 let dst_env = CondaFingerprint::from_env(dst).await?;
452 let src_env: CondaFingerprint =
453 bincode::deserialize(&from_sender.next().await.context("fingerprint")??)?;
454 let comparator = CondaFingerprint::mtime_comparator(&src_env, &dst_env)?;
455 let ignores = GlobSetBuilder::new()
456 .add(Glob::new("**/*.pyc")?)
457 .add(Glob::new("**/__pycache__/")?)
458 .add(Glob::new("**/__pycache__/**/*")?)
459 .build()?;
460 let actions_builder = Arc::new(ActionsBuilder::new_with(Some(ignores), comparator));
461
462 try_join!(
464 async {
466 let dst = dst.to_path_buf();
467 let actions_builder = actions_builder.clone();
468 tokio::task::spawn_blocking(move || {
469 walk_dir(dst, move |path, ent| {
470 actions_builder
471 .process_dst(path.clone(), ent)
472 .with_context(|| format!("{}", path.display()))
473 })
474 })
475 .await??;
476 anyhow::Ok(())
477 },
478 async {
480 while let FileList::Entry(path, metadata) =
481 bincode::deserialize(&from_sender.next().await.context("file list")??)?
482 {
483 actions_builder
484 .process_src(path.clone(), metadata)
485 .with_context(|| format!("{}", path.display()))?;
486 }
487 anyhow::Ok(())
488 }
489 )?;
490 let actions = Arc::into_inner(actions_builder)
491 .expect("should be done")
492 .into_actions();
493
494 let mut dirs = BTreeSet::new();
496 let mut deletions = BTreeMap::new();
497 let mut files = HashMap::new();
498 for (path, action) in actions.iter() {
499 let path = path.clone();
500 match action {
501 Action::Directory => {
502 dirs.insert(path);
503 }
504 Action::Receive(mtime, recv) => {
505 files.insert(path, (*mtime, recv));
506 }
507 Action::Delete { directory } => {
508 deletions.insert(path, *directory);
509 }
510 }
511 }
512
513 try_join!(
514 async {
515 for (path, is_dir) in deletions.into_iter().rev() {
517 let fpath = dst.join(path);
518 if is_dir {
519 fs::remove_dir(&fpath).await
520 } else {
521 fs::remove_file(&fpath).await
522 }
523 .with_context(|| format!("deleting {}", fpath.display()))?;
524 }
525
526 for path in dirs.into_iter() {
528 let fpath = dst.join(path);
529 match fs::remove_file(&fpath).await {
530 Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
531 other => other,
532 }
533 .with_context(|| format!("clearing path {}", fpath.display()))?;
534 fs::create_dir(&fpath)
535 .await
536 .with_context(|| format!("creating dir {}", fpath.display()))?;
537 }
538
539 let replacer = {
541 let mut builder = ReplacerBuilder::new();
542
543 let src_prefix = src_env.pack_meta.history.last_prefix()?;
545 let dst_prefix = dst_env.pack_meta.history.last_prefix()?;
546 if src_prefix != dst_prefix {
547 builder.add(src_prefix, dst_prefix)?;
548 }
549
550 for (src, dst) in replacement_paths.iter() {
552 if src != dst {
553 builder.add(src, dst)?;
554 }
555 }
556
557 builder.build_if_non_empty()?
558 };
559
560 let mut from_sender = from_sender.into_inner();
562 for _ in 0..files.len() {
563 let len = from_sender.read_u64_le().await?;
565 let mut buf = vec![0u8; len as usize];
566 from_sender.read_exact(&mut buf).await?;
567 let FileContentsHeader { path, contents } =
568 bincode::deserialize(&buf).context("delta header")?;
569 let fpath = dst.join(&path);
570 match (contents, files.get(&path).context("missing file")?) {
571 (FileContents::File(len), (mtime, Receive::File { executable })) => {
573 let mut dst_tmp =
574 TempFile::new_in(fpath.parent().context("parent")?).await?;
575 let mut reader = (&mut from_sender).take(len);
576
577 if let Some(ref replacer) = replacer {
579 let mut buf = vec![0; 4096];
581 let len = reader.read(&mut buf[..]).await?;
582 buf.truncate(len);
583 if is_binary(&buf) {
584 dst_tmp.write_all(&buf).await?;
585 tokio::io::copy(&mut reader, &mut dst_tmp).await?;
586
587 let mut mmap = unsafe { MmapMut::map_mut(&*dst_tmp)? };
590 replacer.replace_inplace_padded(&mut mmap)?;
591 } else {
592 reader.read_to_end(&mut buf).await?;
593 replacer.replace_inplace(&mut buf);
594 dst_tmp.write_all(&buf).await?;
595 }
596 } else {
597 tokio::io::copy(&mut reader, &mut dst_tmp).await?;
598 }
599
600 if *executable {
601 make_executable(dst_tmp.file_path()).await?;
602 }
603 persist(dst_tmp, &fpath).await?;
604 set_mtime(&fpath, *mtime).await?;
605 }
606 (FileContents::Symlink(mut target), (mtime, Receive::Symlink)) => {
607 if let Some(ref replacer) = replacer {
608 target = replacer.replace_path(target);
609 }
610 fs::symlink(target, &fpath).await?;
611 set_mtime(&fpath, *mtime).await?;
612 }
613 _ => bail!("unexpected file contents"),
614 }
615 }
616 anyhow::Ok(())
617 },
618 async {
619 to_sender
620 .send(bincode::serialize(&FileSectionHeader { num: files.len() })?.into())
621 .await
622 .context("sending sig section header")?;
623 for (path, (_, recv)) in files.iter() {
624 to_sender
625 .send(
626 bincode::serialize(&FileHeader {
627 path: path.clone(),
628 symlink: matches!(recv, Receive::Symlink),
629 })?
630 .into(),
631 )
632 .await
633 .context("sending sig header")?;
634 }
635 to_sender.flush().await?;
636 anyhow::Ok(())
637 },
638 )?;
639
640 Ok(actions)
641}
642
643pub async fn sync(src: &Path, dst: &Path) -> Result<HashMap<PathBuf, Action>> {
644 let (recv, send) = tokio::io::duplex(5 * 1024 * 1024);
646 let (from_receiver, to_receiver) = tokio::io::split(recv);
647 let (from_sender, to_sender) = tokio::io::split(send);
648 let (actions, ()) = try_join!(
649 receiver(dst, from_sender, to_sender, HashMap::new()),
650 sender(src, from_receiver, to_receiver),
651 )?;
652 Ok(actions)
653}
654
655#[cfg(test)]
656mod tests {
657 use std::collections::HashMap;
658 use std::os::unix::fs::PermissionsExt;
659 use std::path::Path;
660 use std::path::PathBuf;
661 use std::time::Duration;
662 use std::time::SystemTime;
663
664 use anyhow::Result;
665 use rattler_conda_types::package::FileMode;
666 use tempfile::TempDir;
667 use tokio::fs;
668
669 use super::Action;
670 use super::make_executable;
671 use super::set_mtime;
672 use super::sync;
673 use crate::pack_meta_history::History;
674 use crate::pack_meta_history::HistoryRecord;
675 use crate::pack_meta_history::Offset;
676 use crate::pack_meta_history::OffsetRecord;
677 use crate::pack_meta_history::Offsets;
678 use crate::sync::Receive;
679
680 async fn setup_conda_env<P: AsRef<Path>>(
682 dirpath: P,
683 mtime: SystemTime,
684 prefix: Option<&str>,
685 ) -> Result<P> {
686 let env_path = dirpath.as_ref();
687
688 fs::create_dir_all(&env_path).await?;
690 fs::create_dir(&env_path.join("conda-meta")).await?;
691 fs::create_dir(&env_path.join("pack-meta")).await?;
692
693 add_file(
695 env_path,
696 "conda-meta/history",
697 "==> 2023-01-01 00:00:00 <==\npackage install actions\n",
698 mtime,
699 false,
700 )
701 .await?;
702
703 add_file(
705 env_path,
706 "conda-meta/package-1.0-0.json",
707 r#"{
708 "name": "package",
709 "version": "1.0",
710 "build": "0",
711 "build_number": 0,
712 "paths_data": {
713 "paths": [
714 {
715 "path": "bin/test-file",
716 "path_type": "hardlink",
717 "size_in_bytes": 10,
718 "mode": "text"
719 }
720 ]
721 },
722 "repodata_record": {
723 "package_record": {
724 "timestamp": 1672531200
725 }
726 }
727 }"#,
728 mtime,
729 false,
730 )
731 .await?;
732
733 let offsets = Offsets {
735 entries: vec![OffsetRecord {
736 path: PathBuf::from("bin/test-file"),
737 mode: FileMode::Text,
738 offsets: vec![Offset {
739 start: 0,
740 len: 10,
741 contents: None,
742 }],
743 }],
744 };
745 add_file(
746 env_path,
747 "pack-meta/offsets.jsonl",
748 &offsets.to_str()?,
749 mtime,
750 false,
751 )
752 .await?;
753
754 fs::create_dir(env_path.join("bin")).await?;
756 add_file(env_path, "bin/test-file", "test data\n", mtime, false).await?;
757
758 let window = (
760 mtime + Duration::from_secs(20),
761 mtime + Duration::from_secs(25),
762 );
763 fs::create_dir(env_path.join("lib")).await?;
764 add_file(
765 env_path,
766 "lib/libfoo.so",
767 "libfoo.so contents\n",
768 window.0 + Duration::from_secs(5),
769 false,
770 )
771 .await?;
772
773 let prefix_path = PathBuf::from(prefix.unwrap_or("base"));
775
776 let history = History {
778 entries: vec![
779 HistoryRecord {
780 timestamp: mtime.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
781 prefix: PathBuf::from("/conda/prefix"),
782 finished: true,
783 },
784 HistoryRecord {
785 timestamp: window.0.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
786 prefix: prefix_path.clone(),
787 finished: false,
788 },
789 HistoryRecord {
790 timestamp: window.1.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
791 prefix: prefix_path,
792 finished: true,
793 },
794 ],
795 };
796 add_file(
797 env_path,
798 "pack-meta/history.jsonl",
799 &history.to_str()?,
800 mtime,
801 false,
802 )
803 .await?;
804
805 Ok(dirpath)
806 }
807
808 async fn modify_file(
810 env_path: &Path,
811 file_path: &str,
812 content: &str,
813 mtime: SystemTime,
814 ) -> Result<()> {
815 let full_path = env_path.join(file_path);
816 fs::write(&full_path, content).await?;
817
818 set_mtime(&full_path, mtime).await?;
820
821 Ok(())
822 }
823
824 async fn add_file(
826 env_path: &Path,
827 file_path: &str,
828 content: &str,
829 mtime: SystemTime,
830 executable: bool,
831 ) -> Result<()> {
832 let full_path = env_path.join(file_path);
833 fs::write(&full_path, content).await?;
834
835 if executable {
836 make_executable(&full_path).await?;
837 }
838
839 set_mtime(&full_path, mtime).await?;
841
842 Ok(())
843 }
844
845 async fn verify_file_content(path1: &Path, path2: &Path) -> Result<bool> {
847 let content1 = fs::read_to_string(path1).await?;
848 let content2 = fs::read_to_string(path2).await?;
849 Ok(content1 == content2)
850 }
851
852 async fn verify_file_permissions(path: &Path, expected_executable: bool) -> Result<bool> {
854 let metadata = fs::metadata(path).await?;
855 let mode = metadata.permissions().mode();
856 let is_executable = mode & 0o111 != 0;
857 Ok(is_executable == expected_executable)
858 }
859
860 async fn verify_symlink_target(path: &Path, expected_target: &Path) -> Result<bool> {
862 let target = fs::read_link(path).await?;
863 Ok(target == expected_target)
864 }
865
866 #[tokio::test]
867 async fn test_sync_modified_file() -> Result<()> {
868 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
873 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
874
875 let modified_content = "modified test data\n";
877 let newer_time = base_time + Duration::from_hours(1); modify_file(
879 src_env.path(),
880 "bin/test-file",
881 modified_content,
882 newer_time,
883 )
884 .await?;
885
886 let actions = sync(src_env.path(), dst_env.path()).await?;
888
889 let expected_actions = HashMap::from([(
891 PathBuf::from("bin/test-file"),
892 Action::Receive(newer_time, Receive::File { executable: false }),
893 )]);
894
895 assert_eq!(actions, expected_actions);
897
898 assert!(
900 verify_file_content(
901 &src_env.path().join("bin/test-file"),
902 &dst_env.path().join("bin/test-file")
903 )
904 .await?
905 );
906
907 Ok(())
908 }
909
910 #[tokio::test]
911 async fn test_sync_new_file() -> Result<()> {
912 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
917 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
918
919 let new_file_content = "new file content\n";
921 let newer_time = base_time + Duration::from_hours(1); add_file(
923 src_env.path(),
924 "lib/new-file.txt",
925 new_file_content,
926 newer_time,
927 false,
928 )
929 .await?;
930
931 let actions = sync(src_env.path(), dst_env.path()).await?;
933
934 let expected_actions = HashMap::from([
936 (
938 PathBuf::from("lib/new-file.txt"),
939 Action::Receive(newer_time, Receive::File { executable: false }),
940 ),
941 ]);
942
943 assert_eq!(actions, expected_actions);
945
946 assert!(
948 verify_file_content(
949 &src_env.path().join("lib/new-file.txt"),
950 &dst_env.path().join("lib/new-file.txt")
951 )
952 .await?
953 );
954
955 Ok(())
956 }
957
958 #[tokio::test]
959 async fn test_sync_directory_creation() -> Result<()> {
960 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
965 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
966
967 let newer_time = base_time + Duration::from_hours(1); fs::create_dir(src_env.path().join("new_dir")).await?;
970 add_file(
971 src_env.path(),
972 "new_dir/test.txt",
973 "test content",
974 newer_time,
975 false,
976 )
977 .await?;
978 set_mtime(&src_env.path().join("new_dir"), newer_time).await?;
979
980 let actions = sync(src_env.path(), dst_env.path()).await?;
982
983 let expected_actions = HashMap::from([
985 (PathBuf::from("new_dir"), Action::Directory),
986 (
987 PathBuf::from("new_dir/test.txt"),
988 Action::Receive(newer_time, Receive::File { executable: false }),
989 ),
990 ]);
991
992 assert_eq!(actions, expected_actions);
994
995 assert!(dst_env.path().join("new_dir").exists());
997
998 assert!(
1000 verify_file_content(
1001 &src_env.path().join("new_dir/test.txt"),
1002 &dst_env.path().join("new_dir/test.txt")
1003 )
1004 .await?
1005 );
1006
1007 Ok(())
1008 }
1009
1010 #[tokio::test]
1011 async fn test_sync_symlink() -> Result<()> {
1012 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1017 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1018
1019 fs::symlink("bin/test-file", src_env.path().join("link-to-test")).await?;
1021
1022 let newer_time = base_time + Duration::from_hours(1); set_mtime(&src_env.path().join("link-to-test"), newer_time).await?;
1025
1026 let actions = sync(src_env.path(), dst_env.path()).await?;
1028
1029 let expected_actions = HashMap::from([(
1031 PathBuf::from("link-to-test"),
1032 Action::Receive(newer_time, Receive::Symlink),
1033 )]);
1034
1035 assert_eq!(actions, expected_actions);
1037
1038 assert!(dst_env.path().join("link-to-test").exists());
1040
1041 assert!(
1043 verify_symlink_target(
1044 &dst_env.path().join("link-to-test"),
1045 &PathBuf::from("bin/test-file")
1046 )
1047 .await?
1048 );
1049
1050 Ok(())
1051 }
1052
1053 #[tokio::test]
1054 async fn test_sync_file_deletion() -> Result<()> {
1055 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1060 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1061
1062 let newer_time = base_time + Duration::from_hours(1); add_file(
1065 dst_env.path(),
1066 "extra-file.txt",
1067 "should be deleted",
1068 newer_time,
1069 false,
1070 )
1071 .await?;
1072
1073 let actions = sync(src_env.path(), dst_env.path()).await?;
1075
1076 let expected_actions = HashMap::from([(
1078 PathBuf::from("extra-file.txt"),
1079 Action::Delete { directory: false },
1080 )]);
1081
1082 assert_eq!(actions, expected_actions);
1084
1085 assert!(!dst_env.path().join("extra-file.txt").exists());
1087
1088 Ok(())
1089 }
1090
1091 #[tokio::test]
1092 async fn test_sync_ignores_pyc_files() -> Result<()> {
1093 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1098 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1099
1100 let newer_time = base_time + Duration::from_hours(1); add_file(
1103 src_env.path(),
1104 "lib/test.pyc",
1105 "compiled python",
1106 newer_time,
1107 false,
1108 )
1109 .await?;
1110
1111 fs::create_dir(dst_env.path().join("lib/__pycache__")).await?;
1113 add_file(
1114 dst_env.path(),
1115 "lib/__pycache__/cached.pyc",
1116 "cached python",
1117 newer_time,
1118 false,
1119 )
1120 .await?;
1121
1122 let actions = sync(src_env.path(), dst_env.path()).await?;
1124
1125 let expected_actions = HashMap::from([
1127 (
1128 PathBuf::from("lib/__pycache__"),
1129 Action::Delete { directory: true },
1130 ),
1131 (
1132 PathBuf::from("lib/__pycache__/cached.pyc"),
1133 Action::Delete { directory: false },
1134 ),
1135 ]);
1136
1137 assert_eq!(actions, expected_actions);
1139
1140 assert!(!dst_env.path().join("lib/test.pyc").exists());
1142 assert!(!dst_env.path().join("lib/__pycache__").exists());
1143 assert!(!dst_env.path().join("lib/__pycache__/cached.pyc").exists());
1144
1145 Ok(())
1146 }
1147
1148 #[tokio::test]
1149 async fn test_sync_executable_permissions() -> Result<()> {
1150 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1155 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1156
1157 let newer_time = base_time + Duration::from_hours(1); add_file(
1160 src_env.path(),
1161 "bin/executable",
1162 "#!/bin/sh\necho hello",
1163 newer_time,
1164 true,
1165 )
1166 .await?;
1167
1168 let actions = sync(src_env.path(), dst_env.path()).await?;
1170
1171 let expected_actions = HashMap::from([(
1173 PathBuf::from("bin/executable"),
1174 Action::Receive(newer_time, Receive::File { executable: true }),
1175 )]);
1176
1177 assert_eq!(actions, expected_actions);
1179
1180 assert!(dst_env.path().join("bin/executable").exists());
1182
1183 assert!(
1185 verify_file_content(
1186 &src_env.path().join("bin/executable"),
1187 &dst_env.path().join("bin/executable")
1188 )
1189 .await?
1190 );
1191
1192 assert!(verify_file_permissions(&dst_env.path().join("bin/executable"), true).await?);
1194
1195 Ok(())
1196 }
1197
1198 #[tokio::test]
1199 async fn test_sync_text_file_prefix_replacement() -> Result<()> {
1200 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_prefix = "/opt/conda/src";
1205 let dst_prefix = "/opt/conda/dst";
1206 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1207 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1208
1209 let newer_time = base_time + Duration::from_hours(1);
1211 let text_content = format!(
1212 "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1213 src_prefix, src_prefix
1214 );
1215 add_file(
1216 src_env.path(),
1217 "bin/script.sh",
1218 &text_content,
1219 newer_time,
1220 true,
1221 )
1222 .await?;
1223
1224 add_file(src_env.path(), "bin/script2.sh", "", newer_time, true).await?;
1226
1227 let actions = sync(src_env.path(), dst_env.path()).await?;
1229
1230 let expected_actions = HashMap::from([
1232 (
1233 PathBuf::from("bin/script.sh"),
1234 Action::Receive(newer_time, Receive::File { executable: true }),
1235 ),
1236 (
1237 PathBuf::from("bin/script2.sh"),
1238 Action::Receive(newer_time, Receive::File { executable: true }),
1239 ),
1240 ]);
1241 assert_eq!(actions, expected_actions);
1242
1243 let dst_content = fs::read_to_string(dst_env.path().join("bin/script.sh")).await?;
1245 let expected_content = format!(
1246 "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1247 dst_prefix, dst_prefix
1248 );
1249 assert_eq!(dst_content, expected_content);
1250
1251 Ok(())
1252 }
1253
1254 #[tokio::test]
1255 async fn test_sync_binary_file_prefix_replacement() -> Result<()> {
1256 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1258
1259 let src_prefix = "/opt/conda/src";
1261 let dst_prefix = "/opt/conda/dst";
1262 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1263 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1264
1265 let newer_time = base_time + Duration::from_hours(1);
1267 let mut binary_content = Vec::new();
1268 binary_content.extend_from_slice(b"\x7fELF"); binary_content.extend_from_slice(&[0u8; 10]); binary_content.extend_from_slice(src_prefix.as_bytes());
1271 binary_content.extend_from_slice(&[0u8; 20]); binary_content.extend_from_slice(b"end");
1273
1274 fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1275 set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1276
1277 let actions = sync(src_env.path(), dst_env.path()).await?;
1279
1280 let expected_actions = HashMap::from([(
1282 PathBuf::from("lib/binary"),
1283 Action::Receive(newer_time, Receive::File { executable: false }),
1284 )]);
1285 assert_eq!(actions, expected_actions);
1286
1287 let dst_content = fs::read(dst_env.path().join("lib/binary")).await?;
1289
1290 assert_eq!(dst_content.len(), binary_content.len());
1292
1293 let dst_content_str = String::from_utf8_lossy(&dst_content);
1295 assert!(dst_content_str.contains(dst_prefix));
1296 assert!(!dst_content_str.contains(src_prefix));
1297
1298 assert!(dst_content.starts_with(b"\x7fELF"));
1300 assert!(dst_content.ends_with(b"end"));
1301
1302 Ok(())
1303 }
1304
1305 #[tokio::test]
1306 async fn test_sync_symlink_prefix_replacement() -> Result<()> {
1307 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1309
1310 let src_prefix = "/opt/conda/src";
1312 let dst_prefix = "/opt/conda/dst";
1313 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1314 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1315
1316 let newer_time = base_time + Duration::from_hours(1);
1318 let symlink_target = format!("{}/lib/target-file", src_prefix);
1319 fs::symlink(&symlink_target, src_env.path().join("bin/link-to-target")).await?;
1320 set_mtime(&src_env.path().join("bin/link-to-target"), newer_time).await?;
1321
1322 let actions = sync(src_env.path(), dst_env.path()).await?;
1324
1325 let expected_actions = HashMap::from([(
1327 PathBuf::from("bin/link-to-target"),
1328 Action::Receive(newer_time, Receive::Symlink),
1329 )]);
1330 assert_eq!(actions, expected_actions);
1331
1332 let dst_target = fs::read_link(dst_env.path().join("bin/link-to-target")).await?;
1334 let expected_target = PathBuf::from(format!("{}/lib/target-file", dst_prefix));
1335 assert_eq!(dst_target, expected_target);
1336
1337 Ok(())
1338 }
1339
1340 #[tokio::test]
1341 async fn test_sync_symlink_no_prefix_replacement() -> Result<()> {
1342 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1344
1345 let src_prefix = "/opt/conda/src";
1347 let dst_prefix = "/opt/conda/dst";
1348 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1349 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1350
1351 let newer_time = base_time + Duration::from_hours(1);
1353 let symlink_target = "relative/path/target";
1354 fs::symlink(&symlink_target, src_env.path().join("bin/relative-link")).await?;
1355 set_mtime(&src_env.path().join("bin/relative-link"), newer_time).await?;
1356
1357 let actions = sync(src_env.path(), dst_env.path()).await?;
1359
1360 let expected_actions = HashMap::from([(
1362 PathBuf::from("bin/relative-link"),
1363 Action::Receive(newer_time, Receive::Symlink),
1364 )]);
1365 assert_eq!(actions, expected_actions);
1366
1367 let dst_target = fs::read_link(dst_env.path().join("bin/relative-link")).await?;
1369 let expected_target = PathBuf::from(symlink_target);
1370 assert_eq!(dst_target, expected_target);
1371
1372 Ok(())
1373 }
1374
1375 #[tokio::test]
1376 async fn test_sync_binary_file_prefix_replacement_fails_when_dst_longer() -> Result<()> {
1377 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1379
1380 let src_prefix = "/opt/src"; let dst_prefix = "/opt/very/long/destination/prefix"; let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1384 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1385
1386 let newer_time = base_time + Duration::from_hours(1);
1388 let mut binary_content = Vec::new();
1389 binary_content.extend_from_slice(b"\x7fELF"); binary_content.extend_from_slice(&[0u8; 10]); binary_content.extend_from_slice(src_prefix.as_bytes());
1392 binary_content.extend_from_slice(&[0u8; 20]); binary_content.extend_from_slice(b"end");
1394
1395 fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1396 set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1397
1398 let result = sync(src_env.path(), dst_env.path()).await;
1400
1401 assert!(result.is_err());
1403 let error_msg = result.unwrap_err().to_string();
1404 assert!(error_msg.contains("Input is longer than target length"));
1405
1406 Ok(())
1407 }
1408}