1use std::cmp::Ordering;
10use std::collections::BTreeMap;
11use std::collections::BTreeSet;
12use std::collections::HashMap;
13use std::io::ErrorKind;
14use std::os::unix::fs::MetadataExt;
15use std::os::unix::fs::PermissionsExt;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::sync::mpsc::channel;
20use std::time::Duration;
21use std::time::SystemTime;
22use std::time::UNIX_EPOCH;
23
24use anyhow::Context;
25use anyhow::Result;
26use anyhow::bail;
27use anyhow::ensure;
28use async_tempfile::TempFile;
29use dashmap::DashMap;
30use dashmap::mapref::entry::Entry;
31use filetime::FileTime;
32use futures::SinkExt;
33use futures::StreamExt;
34use futures::try_join;
35use globset::Glob;
36use globset::GlobSet;
37use globset::GlobSetBuilder;
38use ignore::DirEntry;
39use ignore::WalkBuilder;
40use ignore::WalkState;
41use itertools::Itertools;
42use memmap2::MmapMut;
43use serde::Deserialize;
44use serde::Serialize;
45use tokio::fs;
46use tokio::io::AsyncRead;
47use tokio::io::AsyncReadExt;
48use tokio::io::AsyncWrite;
49use tokio::io::AsyncWriteExt;
50use tokio_util::codec::FramedRead;
51use tokio_util::codec::FramedWrite;
52use tokio_util::codec::LengthDelimitedCodec;
53
54use crate::diff::CondaFingerprint;
55use crate::replace::ReplacerBuilder;
56
57#[derive(Eq, PartialEq)]
58enum Origin {
59 Src,
60 Dst,
61}
62
63#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
64enum FileTypeInfo {
65 Directory,
66 File(bool),
67 Symlink,
68}
69
70impl FileTypeInfo {
71 fn same(&self, other: &FileTypeInfo) -> bool {
72 match (self, other) {
73 (FileTypeInfo::Directory, FileTypeInfo::Directory) => true,
74 (FileTypeInfo::File(_), FileTypeInfo::File(_)) => true,
75 (FileTypeInfo::Symlink, FileTypeInfo::Symlink) => true,
76 _ => false,
77 }
78 }
79}
80
81#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
82struct Metadata {
83 mtime: SystemTime,
84 ftype: FileTypeInfo,
85}
86
87#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
88pub enum Receive {
89 File { executable: bool },
90 Symlink,
91}
92
93#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
94pub enum Action {
95 Delete { directory: bool },
97 Directory,
99 Receive(SystemTime, Receive),
101}
102
103#[derive(Debug, Serialize, Deserialize)]
104struct FileSectionHeader {
105 num: usize,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
109struct FileHeader {
110 path: PathBuf,
111 symlink: bool,
112}
113
114#[derive(Debug, Serialize, Deserialize)]
115enum FileContents {
116 Symlink(PathBuf),
117 File(u64),
118}
119
120#[derive(Debug, Serialize, Deserialize)]
121struct FileContentsHeader {
122 path: PathBuf,
123 contents: FileContents,
124}
125
126#[derive(Debug, Serialize, Deserialize)]
127enum FileList {
128 Entry(PathBuf, Metadata),
129 Done,
130}
131
132struct ActionsBuilder {
133 ignores: Option<GlobSet>,
134 state: DashMap<PathBuf, (Origin, Metadata)>,
135 actions: DashMap<PathBuf, Action>,
136 mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
137}
138
139impl ActionsBuilder {
140 fn new_with(
141 ignores: Option<GlobSet>,
142 mtime_comparator: Box<dyn Fn(&SystemTime, &SystemTime) -> Ordering + Send + Sync + 'static>,
143 ) -> Self {
144 Self {
145 ignores,
146 state: DashMap::new(),
147 actions: DashMap::new(),
148 mtime_comparator,
149 }
150 }
151
152 fn process(&self, origin: Origin, path: PathBuf, metadata: Metadata) -> Result<()> {
153 match self.state.entry(path) {
154 Entry::Occupied(val) => {
155 let (path, (existing_origin, existing_metadata)) = val.remove_entry();
156 if let Some(ignores) = &self.ignores
157 && ignores.is_match(path.as_path())
158 {
159 return Ok(());
160 }
161 ensure!(existing_origin != origin);
162 let (src, dst) = match origin {
163 Origin::Dst => (existing_metadata, metadata),
164 Origin::Src => (metadata, existing_metadata),
165 };
166 if src.ftype == FileTypeInfo::Directory && dst.ftype == FileTypeInfo::Directory {
167 } else {
169 match (self.mtime_comparator)(&src.mtime, &dst.mtime) {
170 Ordering::Equal => {
171 ensure!(
172 src.ftype.same(&dst.ftype),
173 "{}: {:?} != {:?}",
174 path.display(),
175 dst,
176 src
177 );
178 }
179 Ordering::Greater | Ordering::Less => {
180 self.actions.insert(
181 path,
182 match src.ftype {
183 FileTypeInfo::File(executable) => {
184 Action::Receive(src.mtime, Receive::File { executable })
185 }
186 FileTypeInfo::Symlink => {
187 Action::Receive(src.mtime, Receive::Symlink)
188 }
189 FileTypeInfo::Directory => Action::Directory,
190 },
191 );
192 }
193 }
194 }
195 }
196 Entry::Vacant(entry) => {
197 entry.insert((origin, metadata));
198 }
199 }
200 Ok(())
201 }
202
203 fn process_src(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
204 self.process(Origin::Src, path, metadata)
205 }
206
207 fn process_dst(&self, path: PathBuf, metadata: Metadata) -> Result<()> {
208 self.process(Origin::Dst, path, metadata)
209 }
210
211 fn into_actions(self) -> HashMap<PathBuf, Action> {
212 let mut actions: HashMap<_, _> = self.actions.into_iter().collect();
213 for (path, (origin, metadata)) in self.state.into_iter() {
214 match origin {
215 Origin::Src => {
216 if let Some(ignores) = &self.ignores
217 && ignores.is_match(path.as_path())
218 {
219 continue;
220 }
221 actions.insert(
222 path,
223 match metadata.ftype {
224 FileTypeInfo::File(executable) => {
225 Action::Receive(metadata.mtime, Receive::File { executable })
226 }
227 FileTypeInfo::Directory => Action::Directory,
228 FileTypeInfo::Symlink => {
229 Action::Receive(metadata.mtime, Receive::Symlink)
230 }
231 },
232 );
233 }
234 Origin::Dst => {
235 actions.insert(
236 path,
237 Action::Delete {
238 directory: matches!(metadata.ftype, FileTypeInfo::Directory),
239 },
240 );
241 }
242 }
243 }
244 actions
245 }
246}
247
248fn walk_dir<
249 E: Into<anyhow::Error>,
250 F: Fn(PathBuf, Metadata) -> Result<(), E> + Sync + Send + 'static,
251>(
252 src: PathBuf,
253 callback: F,
254) -> Result<()> {
255 let (error_tx, error_rx) = channel();
256
257 let src_handle = src.clone();
258 let handle_ent = move |entry: DirEntry| -> Result<()> {
259 let metadata = entry.metadata()?;
260 callback(
261 entry
262 .path()
263 .strip_prefix(src_handle.clone())
264 .context("sub path")?
265 .to_path_buf(),
266 Metadata {
267 mtime: UNIX_EPOCH
268 + Duration::new(
269 metadata.mtime().try_into()?,
270 metadata.mtime_nsec().try_into()?,
271 ),
272 ftype: if metadata.file_type().is_file() {
273 let mode = metadata.permissions().mode();
274 FileTypeInfo::File(mode & 0o100 != 0)
275 } else if metadata.file_type().is_dir() {
276 FileTypeInfo::Directory
277 } else if metadata.file_type().is_symlink() {
278 FileTypeInfo::Symlink
279 } else {
280 bail!("unexpected file type")
281 },
282 },
283 )
284 .map_err(Into::into)?;
285 Ok(())
286 };
287
288 WalkBuilder::new(src)
289 .standard_filters(true)
290 .same_file_system(true)
291 .build_parallel()
292 .run(|| {
293 Box::new(|ent| match ent.map_err(Into::into).and_then(&handle_ent) {
294 Ok(()) => WalkState::Continue,
295 Err(err) => {
296 error_tx.clone().send(err).unwrap();
297 WalkState::Quit
298 }
299 })
300 });
301
302 match error_rx.try_recv() {
303 Ok(err) => Err(err),
304 _ => Ok(()),
305 }
306}
307
308pub async fn sender(
309 src: &Path,
310 from_receiver: impl AsyncRead + Unpin,
311 to_receiver: impl AsyncWrite + Unpin,
312) -> Result<()> {
313 let mut to_receiver = FramedWrite::new(to_receiver, LengthDelimitedCodec::new());
314 let mut from_receiver = FramedRead::new(from_receiver, LengthDelimitedCodec::new());
315
316 let (ent_tx, mut ent_rx) = tokio::sync::mpsc::unbounded_channel();
317 let src_clone = src.to_path_buf();
318 try_join!(
319 async {
320 tokio::task::spawn_blocking(move || {
321 walk_dir(src_clone.clone(), move |path, ent| ent_tx.send((path, ent)))
322 })
323 .await?
324 },
325 async {
326 let src_env = CondaFingerprint::from_env(src).await?;
328 to_receiver
329 .send(bincode::serde::encode_to_vec(&src_env, bincode::config::legacy())?.into())
330 .await
331 .context("sending src conda fingerprint")?;
332 to_receiver.flush().await?;
333
334 while let Some((path, metadata)) = ent_rx.recv().await {
336 to_receiver
337 .send(
338 bincode::serde::encode_to_vec(
339 FileList::Entry(path, metadata),
340 bincode::config::legacy(),
341 )?
342 .into(),
343 )
344 .await
345 .context("sending file ent")?;
346 }
347 to_receiver
348 .send(
349 bincode::serde::encode_to_vec(FileList::Done, bincode::config::legacy())?
350 .into(),
351 )
352 .await
353 .context("sending file list end")?;
354 to_receiver.flush().await?;
355
356 anyhow::Ok(())
357 },
358 )?;
359
360 to_receiver.flush().await?;
362 let mut to_receiver = to_receiver.into_inner();
363
364 let hdr: FileSectionHeader = bincode::serde::decode_from_slice(
365 &from_receiver.next().await.context("header")??,
366 bincode::config::legacy(),
367 )
368 .map(|(v, _)| v)?;
369 for _ in 0..hdr.num {
370 let FileHeader { path, symlink } = bincode::serde::decode_from_slice(
371 &from_receiver.next().await.context("signature")??,
372 bincode::config::legacy(),
373 )
374 .map(|(v, _)| v)?;
375 let fpath = src.join(&path);
376 if symlink {
377 let header = FileContentsHeader {
378 path,
379 contents: FileContents::Symlink(fs::read_link(&fpath).await?),
380 };
381 let header = bincode::serde::encode_to_vec(&header, bincode::config::legacy())?;
382 to_receiver.write_all(&header.len().to_le_bytes()).await?;
383 to_receiver
384 .write_all(&header)
385 .await
386 .context("sending sig header")?;
387 } else {
388 let mut base = fs::File::open(src.join(&path)).await?;
389 let header = FileContentsHeader {
390 path,
391 contents: FileContents::File(base.metadata().await?.len()),
392 };
393 let header = bincode::serde::encode_to_vec(&header, bincode::config::legacy())?;
394 to_receiver.write_all(&header.len().to_le_bytes()).await?;
395 to_receiver
396 .write_all(&header)
397 .await
398 .context("sending sig header")?;
399 tokio::io::copy(&mut base, &mut to_receiver).await?;
400 }
401 }
402 to_receiver.flush().await?;
403
404 Ok(())
405}
406
407async fn persist(tmp: TempFile, path: &Path) -> Result<(), std::io::Error> {
408 match fs::rename(tmp.file_path(), &path).await {
410 Err(err) if err.kind() == ErrorKind::IsADirectory => {
411 async {
412 fs::remove_dir(&path).await?;
413 fs::rename(tmp.file_path(), &path).await
414 }
415 .await
416 }
417 other => other,
418 }?;
419 tmp.drop_async().await;
420 Ok(())
421}
422
423async fn set_mtime(path: &Path, mtime: SystemTime) -> Result<(), std::io::Error> {
425 let mtime = FileTime::from_system_time(mtime);
426 filetime::set_symlink_file_times(path, mtime, mtime)?;
427 Ok(())
428}
429
430async fn make_executable(path: &Path) -> Result<(), std::io::Error> {
431 let metadata = fs::metadata(path).await?;
432 let mut permissions = metadata.permissions();
433 let mode = permissions.mode();
434 permissions.set_mode(mode | 0o111);
435 fs::set_permissions(path, permissions).await?;
436 Ok(())
437}
438
439fn is_binary(buf: &[u8]) -> bool {
440 if buf.iter().contains(&0) {
442 return true;
443 }
444
445 let non_print = buf
447 .iter()
448 .filter(|&&b| !(b == b'\n' || b == b'\r' || b == b'\t' || (0x20..=0x7E).contains(&b)))
449 .count();
450
451 non_print * 100 > buf.len() * 30
453}
454
455pub async fn receiver(
456 dst: &Path,
457 from_sender: impl AsyncRead + Unpin,
458 to_sender: impl AsyncWrite + Unpin,
459 replacement_paths: HashMap<PathBuf, PathBuf>,
460) -> Result<HashMap<PathBuf, Action>> {
461 let mut to_sender = FramedWrite::new(to_sender, LengthDelimitedCodec::new());
462 let mut from_sender = FramedRead::new(from_sender, LengthDelimitedCodec::new());
463
464 let dst_env = CondaFingerprint::from_env(dst).await?;
467 let src_env: CondaFingerprint = bincode::serde::decode_from_slice(
468 &from_sender.next().await.context("fingerprint")??,
469 bincode::config::legacy(),
470 )
471 .map(|(v, _)| v)?;
472 let comparator = CondaFingerprint::mtime_comparator(&src_env, &dst_env)?;
473 let ignores = GlobSetBuilder::new()
474 .add(Glob::new("**/*.pyc")?)
475 .add(Glob::new("**/__pycache__/")?)
476 .add(Glob::new("**/__pycache__/**/*")?)
477 .build()?;
478 let actions_builder = Arc::new(ActionsBuilder::new_with(Some(ignores), comparator));
479
480 try_join!(
482 async {
484 let dst = dst.to_path_buf();
485 let actions_builder = actions_builder.clone();
486 tokio::task::spawn_blocking(move || {
487 walk_dir(dst, move |path, ent| {
488 actions_builder
489 .process_dst(path.clone(), ent)
490 .with_context(|| format!("{}", path.display()))
491 })
492 })
493 .await??;
494 anyhow::Ok(())
495 },
496 async {
498 while let FileList::Entry(path, metadata) = bincode::serde::decode_from_slice(
499 &from_sender.next().await.context("file list")??,
500 bincode::config::legacy(),
501 )
502 .map(|(v, _): (FileList, _)| v)?
503 {
504 actions_builder
505 .process_src(path.clone(), metadata)
506 .with_context(|| format!("{}", path.display()))?;
507 }
508 anyhow::Ok(())
509 }
510 )?;
511 let actions = Arc::into_inner(actions_builder)
512 .expect("should be done")
513 .into_actions();
514
515 let mut dirs = BTreeSet::new();
517 let mut deletions = BTreeMap::new();
518 let mut files = HashMap::new();
519 for (path, action) in actions.iter() {
520 let path = path.clone();
521 match action {
522 Action::Directory => {
523 dirs.insert(path);
524 }
525 Action::Receive(mtime, recv) => {
526 files.insert(path, (*mtime, recv));
527 }
528 Action::Delete { directory } => {
529 deletions.insert(path, *directory);
530 }
531 }
532 }
533
534 try_join!(
535 async {
536 for (path, is_dir) in deletions.into_iter().rev() {
538 let fpath = dst.join(path);
539 if is_dir {
540 fs::remove_dir(&fpath).await
541 } else {
542 fs::remove_file(&fpath).await
543 }
544 .with_context(|| format!("deleting {}", fpath.display()))?;
545 }
546
547 for path in dirs.into_iter() {
549 let fpath = dst.join(path);
550 match fs::remove_file(&fpath).await {
551 Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
552 other => other,
553 }
554 .with_context(|| format!("clearing path {}", fpath.display()))?;
555 fs::create_dir(&fpath)
556 .await
557 .with_context(|| format!("creating dir {}", fpath.display()))?;
558 }
559
560 let replacer = {
562 let mut builder = ReplacerBuilder::new();
563
564 let src_prefix = src_env.pack_meta.history.last_prefix()?;
566 let dst_prefix = dst_env.pack_meta.history.last_prefix()?;
567 if src_prefix != dst_prefix {
568 builder.add(src_prefix, dst_prefix)?;
569 }
570
571 for (src, dst) in replacement_paths.iter() {
573 if src != dst {
574 builder.add(src, dst)?;
575 }
576 }
577
578 builder.build_if_non_empty()?
579 };
580
581 let mut from_sender = from_sender.into_inner();
583 for _ in 0..files.len() {
584 let len = from_sender.read_u64_le().await?;
586 let mut buf = vec![0u8; len as usize];
587 from_sender.read_exact(&mut buf).await?;
588 let FileContentsHeader { path, contents } =
589 bincode::serde::decode_from_slice(&buf, bincode::config::legacy())
590 .map(|(v, _)| v)
591 .context("delta header")?;
592 let fpath = dst.join(&path);
593 match (contents, files.get(&path).context("missing file")?) {
594 (FileContents::File(len), (mtime, Receive::File { executable })) => {
596 let mut dst_tmp =
597 TempFile::new_in(fpath.parent().context("parent")?).await?;
598 let mut reader = (&mut from_sender).take(len);
599
600 if let Some(ref replacer) = replacer {
602 let mut buf = vec![0; 4096];
604 let len = reader.read(&mut buf[..]).await?;
605 buf.truncate(len);
606 if is_binary(&buf) {
607 dst_tmp.write_all(&buf).await?;
608 tokio::io::copy(&mut reader, &mut dst_tmp).await?;
609
610 let mut mmap = unsafe { MmapMut::map_mut(&*dst_tmp)? };
613 replacer.replace_inplace_padded(&mut mmap)?;
614 } else {
615 reader.read_to_end(&mut buf).await?;
616 replacer.replace_inplace(&mut buf);
617 dst_tmp.write_all(&buf).await?;
618 }
619 } else {
620 tokio::io::copy(&mut reader, &mut dst_tmp).await?;
621 }
622
623 if *executable {
624 make_executable(dst_tmp.file_path()).await?;
625 }
626 persist(dst_tmp, &fpath).await?;
627 set_mtime(&fpath, *mtime).await?;
628 }
629 (FileContents::Symlink(mut target), (mtime, Receive::Symlink)) => {
630 if let Some(ref replacer) = replacer {
631 target = replacer.replace_path(target);
632 }
633 fs::symlink(target, &fpath).await?;
634 set_mtime(&fpath, *mtime).await?;
635 }
636 _ => bail!("unexpected file contents"),
637 }
638 }
639 anyhow::Ok(())
640 },
641 async {
642 to_sender
643 .send(
644 bincode::serde::encode_to_vec(
645 &FileSectionHeader { num: files.len() },
646 bincode::config::legacy(),
647 )?
648 .into(),
649 )
650 .await
651 .context("sending sig section header")?;
652 for (path, (_, recv)) in files.iter() {
653 to_sender
654 .send(
655 bincode::serde::encode_to_vec(
656 &FileHeader {
657 path: path.clone(),
658 symlink: matches!(recv, Receive::Symlink),
659 },
660 bincode::config::legacy(),
661 )?
662 .into(),
663 )
664 .await
665 .context("sending sig header")?;
666 }
667 to_sender.flush().await?;
668 anyhow::Ok(())
669 },
670 )?;
671
672 Ok(actions)
673}
674
675pub async fn sync(src: &Path, dst: &Path) -> Result<HashMap<PathBuf, Action>> {
676 let (recv, send) = tokio::io::duplex(5 * 1024 * 1024);
678 let (from_receiver, to_receiver) = tokio::io::split(recv);
679 let (from_sender, to_sender) = tokio::io::split(send);
680 let (actions, ()) = try_join!(
681 receiver(dst, from_sender, to_sender, HashMap::new()),
682 sender(src, from_receiver, to_receiver),
683 )?;
684 Ok(actions)
685}
686
687#[cfg(test)]
688mod tests {
689 use std::collections::HashMap;
690 use std::os::unix::fs::PermissionsExt;
691 use std::path::Path;
692 use std::path::PathBuf;
693 use std::time::Duration;
694 use std::time::SystemTime;
695
696 use anyhow::Result;
697 use rattler_conda_types::package::FileMode;
698 use tempfile::TempDir;
699 use tokio::fs;
700
701 use super::Action;
702 use super::make_executable;
703 use super::set_mtime;
704 use super::sync;
705 use crate::pack_meta_history::History;
706 use crate::pack_meta_history::HistoryRecord;
707 use crate::pack_meta_history::Offset;
708 use crate::pack_meta_history::OffsetRecord;
709 use crate::pack_meta_history::Offsets;
710 use crate::sync::Receive;
711
712 async fn setup_conda_env<P: AsRef<Path>>(
714 dirpath: P,
715 mtime: SystemTime,
716 prefix: Option<&str>,
717 ) -> Result<P> {
718 let env_path = dirpath.as_ref();
719
720 fs::create_dir_all(&env_path).await?;
722 fs::create_dir(&env_path.join("conda-meta")).await?;
723 fs::create_dir(&env_path.join("pack-meta")).await?;
724
725 add_file(
727 env_path,
728 "conda-meta/history",
729 "==> 2023-01-01 00:00:00 <==\npackage install actions\n",
730 mtime,
731 false,
732 )
733 .await?;
734
735 add_file(
737 env_path,
738 "conda-meta/package-1.0-0.json",
739 r#"{
740 "name": "package",
741 "version": "1.0",
742 "build": "0",
743 "build_number": 0,
744 "paths_data": {
745 "paths": [
746 {
747 "path": "bin/test-file",
748 "path_type": "hardlink",
749 "size_in_bytes": 10,
750 "mode": "text"
751 }
752 ]
753 },
754 "repodata_record": {
755 "package_record": {
756 "timestamp": 1672531200
757 }
758 }
759 }"#,
760 mtime,
761 false,
762 )
763 .await?;
764
765 let offsets = Offsets {
767 entries: vec![OffsetRecord {
768 path: PathBuf::from("bin/test-file"),
769 mode: FileMode::Text,
770 offsets: vec![Offset {
771 start: 0,
772 len: 10,
773 contents: None,
774 }],
775 }],
776 };
777 add_file(
778 env_path,
779 "pack-meta/offsets.jsonl",
780 &offsets.to_str()?,
781 mtime,
782 false,
783 )
784 .await?;
785
786 fs::create_dir(env_path.join("bin")).await?;
788 add_file(env_path, "bin/test-file", "test data\n", mtime, false).await?;
789
790 let window = (
792 mtime + Duration::from_secs(20),
793 mtime + Duration::from_secs(25),
794 );
795 fs::create_dir(env_path.join("lib")).await?;
796 add_file(
797 env_path,
798 "lib/libfoo.so",
799 "libfoo.so contents\n",
800 window.0 + Duration::from_secs(5),
801 false,
802 )
803 .await?;
804
805 let prefix_path = PathBuf::from(prefix.unwrap_or("base"));
807
808 let history = History {
810 entries: vec![
811 HistoryRecord {
812 timestamp: mtime.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
813 prefix: PathBuf::from("/conda/prefix"),
814 finished: true,
815 },
816 HistoryRecord {
817 timestamp: window.0.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
818 prefix: prefix_path.clone(),
819 finished: false,
820 },
821 HistoryRecord {
822 timestamp: window.1.duration_since(SystemTime::UNIX_EPOCH)?.as_secs(),
823 prefix: prefix_path,
824 finished: true,
825 },
826 ],
827 };
828 add_file(
829 env_path,
830 "pack-meta/history.jsonl",
831 &history.to_str()?,
832 mtime,
833 false,
834 )
835 .await?;
836
837 Ok(dirpath)
838 }
839
840 async fn modify_file(
842 env_path: &Path,
843 file_path: &str,
844 content: &str,
845 mtime: SystemTime,
846 ) -> Result<()> {
847 let full_path = env_path.join(file_path);
848 fs::write(&full_path, content).await?;
849
850 set_mtime(&full_path, mtime).await?;
852
853 Ok(())
854 }
855
856 async fn add_file(
858 env_path: &Path,
859 file_path: &str,
860 content: &str,
861 mtime: SystemTime,
862 executable: bool,
863 ) -> Result<()> {
864 let full_path = env_path.join(file_path);
865 fs::write(&full_path, content).await?;
866
867 if executable {
868 make_executable(&full_path).await?;
869 }
870
871 set_mtime(&full_path, mtime).await?;
873
874 Ok(())
875 }
876
877 async fn verify_file_content(path1: &Path, path2: &Path) -> Result<bool> {
879 let content1 = fs::read_to_string(path1).await?;
880 let content2 = fs::read_to_string(path2).await?;
881 Ok(content1 == content2)
882 }
883
884 async fn verify_file_permissions(path: &Path, expected_executable: bool) -> Result<bool> {
886 let metadata = fs::metadata(path).await?;
887 let mode = metadata.permissions().mode();
888 let is_executable = mode & 0o111 != 0;
889 Ok(is_executable == expected_executable)
890 }
891
892 async fn verify_symlink_target(path: &Path, expected_target: &Path) -> Result<bool> {
894 let target = fs::read_link(path).await?;
895 Ok(target == expected_target)
896 }
897
898 #[tokio::test]
899 async fn test_sync_modified_file() -> Result<()> {
900 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
905 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
906
907 let modified_content = "modified test data\n";
909 let newer_time = base_time + Duration::from_hours(1); modify_file(
911 src_env.path(),
912 "bin/test-file",
913 modified_content,
914 newer_time,
915 )
916 .await?;
917
918 let actions = sync(src_env.path(), dst_env.path()).await?;
920
921 let expected_actions = HashMap::from([(
923 PathBuf::from("bin/test-file"),
924 Action::Receive(newer_time, Receive::File { executable: false }),
925 )]);
926
927 assert_eq!(actions, expected_actions);
929
930 assert!(
932 verify_file_content(
933 &src_env.path().join("bin/test-file"),
934 &dst_env.path().join("bin/test-file")
935 )
936 .await?
937 );
938
939 Ok(())
940 }
941
942 #[tokio::test]
943 async fn test_sync_new_file() -> Result<()> {
944 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
949 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
950
951 let new_file_content = "new file content\n";
953 let newer_time = base_time + Duration::from_hours(1); add_file(
955 src_env.path(),
956 "lib/new-file.txt",
957 new_file_content,
958 newer_time,
959 false,
960 )
961 .await?;
962
963 let actions = sync(src_env.path(), dst_env.path()).await?;
965
966 let expected_actions = HashMap::from([
968 (
970 PathBuf::from("lib/new-file.txt"),
971 Action::Receive(newer_time, Receive::File { executable: false }),
972 ),
973 ]);
974
975 assert_eq!(actions, expected_actions);
977
978 assert!(
980 verify_file_content(
981 &src_env.path().join("lib/new-file.txt"),
982 &dst_env.path().join("lib/new-file.txt")
983 )
984 .await?
985 );
986
987 Ok(())
988 }
989
990 #[tokio::test]
991 async fn test_sync_directory_creation() -> Result<()> {
992 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
997 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
998
999 let newer_time = base_time + Duration::from_hours(1); fs::create_dir(src_env.path().join("new_dir")).await?;
1002 add_file(
1003 src_env.path(),
1004 "new_dir/test.txt",
1005 "test content",
1006 newer_time,
1007 false,
1008 )
1009 .await?;
1010 set_mtime(&src_env.path().join("new_dir"), newer_time).await?;
1011
1012 let actions = sync(src_env.path(), dst_env.path()).await?;
1014
1015 let expected_actions = HashMap::from([
1017 (PathBuf::from("new_dir"), Action::Directory),
1018 (
1019 PathBuf::from("new_dir/test.txt"),
1020 Action::Receive(newer_time, Receive::File { executable: false }),
1021 ),
1022 ]);
1023
1024 assert_eq!(actions, expected_actions);
1026
1027 assert!(dst_env.path().join("new_dir").exists());
1029
1030 assert!(
1032 verify_file_content(
1033 &src_env.path().join("new_dir/test.txt"),
1034 &dst_env.path().join("new_dir/test.txt")
1035 )
1036 .await?
1037 );
1038
1039 Ok(())
1040 }
1041
1042 #[tokio::test]
1043 async fn test_sync_symlink() -> Result<()> {
1044 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1049 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1050
1051 fs::symlink("bin/test-file", src_env.path().join("link-to-test")).await?;
1053
1054 let newer_time = base_time + Duration::from_hours(1); set_mtime(&src_env.path().join("link-to-test"), newer_time).await?;
1057
1058 let actions = sync(src_env.path(), dst_env.path()).await?;
1060
1061 let expected_actions = HashMap::from([(
1063 PathBuf::from("link-to-test"),
1064 Action::Receive(newer_time, Receive::Symlink),
1065 )]);
1066
1067 assert_eq!(actions, expected_actions);
1069
1070 assert!(dst_env.path().join("link-to-test").exists());
1072
1073 assert!(
1075 verify_symlink_target(
1076 &dst_env.path().join("link-to-test"),
1077 &PathBuf::from("bin/test-file")
1078 )
1079 .await?
1080 );
1081
1082 Ok(())
1083 }
1084
1085 #[tokio::test]
1086 async fn test_sync_file_deletion() -> Result<()> {
1087 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1092 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1093
1094 let newer_time = base_time + Duration::from_hours(1); add_file(
1097 dst_env.path(),
1098 "extra-file.txt",
1099 "should be deleted",
1100 newer_time,
1101 false,
1102 )
1103 .await?;
1104
1105 let actions = sync(src_env.path(), dst_env.path()).await?;
1107
1108 let expected_actions = HashMap::from([(
1110 PathBuf::from("extra-file.txt"),
1111 Action::Delete { directory: false },
1112 )]);
1113
1114 assert_eq!(actions, expected_actions);
1116
1117 assert!(!dst_env.path().join("extra-file.txt").exists());
1119
1120 Ok(())
1121 }
1122
1123 #[tokio::test]
1124 async fn test_sync_ignores_pyc_files() -> Result<()> {
1125 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1130 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1131
1132 let newer_time = base_time + Duration::from_hours(1); add_file(
1135 src_env.path(),
1136 "lib/test.pyc",
1137 "compiled python",
1138 newer_time,
1139 false,
1140 )
1141 .await?;
1142
1143 fs::create_dir(dst_env.path().join("lib/__pycache__")).await?;
1145 add_file(
1146 dst_env.path(),
1147 "lib/__pycache__/cached.pyc",
1148 "cached python",
1149 newer_time,
1150 false,
1151 )
1152 .await?;
1153
1154 let actions = sync(src_env.path(), dst_env.path()).await?;
1156
1157 let expected_actions = HashMap::from([
1159 (
1160 PathBuf::from("lib/__pycache__"),
1161 Action::Delete { directory: true },
1162 ),
1163 (
1164 PathBuf::from("lib/__pycache__/cached.pyc"),
1165 Action::Delete { directory: false },
1166 ),
1167 ]);
1168
1169 assert_eq!(actions, expected_actions);
1171
1172 assert!(!dst_env.path().join("lib/test.pyc").exists());
1174 assert!(!dst_env.path().join("lib/__pycache__").exists());
1175 assert!(!dst_env.path().join("lib/__pycache__/cached.pyc").exists());
1176
1177 Ok(())
1178 }
1179
1180 #[tokio::test]
1181 async fn test_sync_executable_permissions() -> Result<()> {
1182 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1187 let dst_env = setup_conda_env(TempDir::new()?, base_time, None).await?;
1188
1189 let newer_time = base_time + Duration::from_hours(1); add_file(
1192 src_env.path(),
1193 "bin/executable",
1194 "#!/bin/sh\necho hello",
1195 newer_time,
1196 true,
1197 )
1198 .await?;
1199
1200 let actions = sync(src_env.path(), dst_env.path()).await?;
1202
1203 let expected_actions = HashMap::from([(
1205 PathBuf::from("bin/executable"),
1206 Action::Receive(newer_time, Receive::File { executable: true }),
1207 )]);
1208
1209 assert_eq!(actions, expected_actions);
1211
1212 assert!(dst_env.path().join("bin/executable").exists());
1214
1215 assert!(
1217 verify_file_content(
1218 &src_env.path().join("bin/executable"),
1219 &dst_env.path().join("bin/executable")
1220 )
1221 .await?
1222 );
1223
1224 assert!(verify_file_permissions(&dst_env.path().join("bin/executable"), true).await?);
1226
1227 Ok(())
1228 }
1229
1230 #[tokio::test]
1231 async fn test_sync_text_file_prefix_replacement() -> Result<()> {
1232 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200); let src_prefix = "/opt/conda/src";
1237 let dst_prefix = "/opt/conda/dst";
1238 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1239 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1240
1241 let newer_time = base_time + Duration::from_hours(1);
1243 let text_content = format!(
1244 "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1245 src_prefix, src_prefix
1246 );
1247 add_file(
1248 src_env.path(),
1249 "bin/script.sh",
1250 &text_content,
1251 newer_time,
1252 true,
1253 )
1254 .await?;
1255
1256 add_file(src_env.path(), "bin/script2.sh", "", newer_time, true).await?;
1258
1259 let actions = sync(src_env.path(), dst_env.path()).await?;
1261
1262 let expected_actions = HashMap::from([
1264 (
1265 PathBuf::from("bin/script.sh"),
1266 Action::Receive(newer_time, Receive::File { executable: true }),
1267 ),
1268 (
1269 PathBuf::from("bin/script2.sh"),
1270 Action::Receive(newer_time, Receive::File { executable: true }),
1271 ),
1272 ]);
1273 assert_eq!(actions, expected_actions);
1274
1275 let dst_content = fs::read_to_string(dst_env.path().join("bin/script.sh")).await?;
1277 let expected_content = format!(
1278 "#!/bin/bash\nexport PATH={}/bin:$PATH\necho 'Using prefix: {}'\n",
1279 dst_prefix, dst_prefix
1280 );
1281 assert_eq!(dst_content, expected_content);
1282
1283 Ok(())
1284 }
1285
1286 #[tokio::test]
1287 async fn test_sync_binary_file_prefix_replacement() -> Result<()> {
1288 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1290
1291 let src_prefix = "/opt/conda/src";
1293 let dst_prefix = "/opt/conda/dst";
1294 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1295 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1296
1297 let newer_time = base_time + Duration::from_hours(1);
1299 let mut binary_content = Vec::new();
1300 binary_content.extend_from_slice(b"\x7fELF"); binary_content.extend_from_slice(&[0u8; 10]); binary_content.extend_from_slice(src_prefix.as_bytes());
1303 binary_content.extend_from_slice(&[0u8; 20]); binary_content.extend_from_slice(b"end");
1305
1306 fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1307 set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1308
1309 let actions = sync(src_env.path(), dst_env.path()).await?;
1311
1312 let expected_actions = HashMap::from([(
1314 PathBuf::from("lib/binary"),
1315 Action::Receive(newer_time, Receive::File { executable: false }),
1316 )]);
1317 assert_eq!(actions, expected_actions);
1318
1319 let dst_content = fs::read(dst_env.path().join("lib/binary")).await?;
1321
1322 assert_eq!(dst_content.len(), binary_content.len());
1324
1325 let dst_content_str = String::from_utf8_lossy(&dst_content);
1327 assert!(dst_content_str.contains(dst_prefix));
1328 assert!(!dst_content_str.contains(src_prefix));
1329
1330 assert!(dst_content.starts_with(b"\x7fELF"));
1332 assert!(dst_content.ends_with(b"end"));
1333
1334 Ok(())
1335 }
1336
1337 #[tokio::test]
1338 async fn test_sync_symlink_prefix_replacement() -> Result<()> {
1339 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1341
1342 let src_prefix = "/opt/conda/src";
1344 let dst_prefix = "/opt/conda/dst";
1345 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1346 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1347
1348 let newer_time = base_time + Duration::from_hours(1);
1350 let symlink_target = format!("{}/lib/target-file", src_prefix);
1351 fs::symlink(&symlink_target, src_env.path().join("bin/link-to-target")).await?;
1352 set_mtime(&src_env.path().join("bin/link-to-target"), newer_time).await?;
1353
1354 let actions = sync(src_env.path(), dst_env.path()).await?;
1356
1357 let expected_actions = HashMap::from([(
1359 PathBuf::from("bin/link-to-target"),
1360 Action::Receive(newer_time, Receive::Symlink),
1361 )]);
1362 assert_eq!(actions, expected_actions);
1363
1364 let dst_target = fs::read_link(dst_env.path().join("bin/link-to-target")).await?;
1366 let expected_target = PathBuf::from(format!("{}/lib/target-file", dst_prefix));
1367 assert_eq!(dst_target, expected_target);
1368
1369 Ok(())
1370 }
1371
1372 #[tokio::test]
1373 async fn test_sync_symlink_no_prefix_replacement() -> Result<()> {
1374 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1376
1377 let src_prefix = "/opt/conda/src";
1379 let dst_prefix = "/opt/conda/dst";
1380 let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1381 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1382
1383 let newer_time = base_time + Duration::from_hours(1);
1385 let symlink_target = "relative/path/target";
1386 fs::symlink(&symlink_target, src_env.path().join("bin/relative-link")).await?;
1387 set_mtime(&src_env.path().join("bin/relative-link"), newer_time).await?;
1388
1389 let actions = sync(src_env.path(), dst_env.path()).await?;
1391
1392 let expected_actions = HashMap::from([(
1394 PathBuf::from("bin/relative-link"),
1395 Action::Receive(newer_time, Receive::Symlink),
1396 )]);
1397 assert_eq!(actions, expected_actions);
1398
1399 let dst_target = fs::read_link(dst_env.path().join("bin/relative-link")).await?;
1401 let expected_target = PathBuf::from(symlink_target);
1402 assert_eq!(dst_target, expected_target);
1403
1404 Ok(())
1405 }
1406
1407 #[tokio::test]
1408 async fn test_sync_binary_file_prefix_replacement_fails_when_dst_longer() -> Result<()> {
1409 let base_time = SystemTime::UNIX_EPOCH + Duration::from_secs(1672531200);
1411
1412 let src_prefix = "/opt/src"; let dst_prefix = "/opt/very/long/destination/prefix"; let src_env = setup_conda_env(TempDir::new()?, base_time, Some(src_prefix)).await?;
1416 let dst_env = setup_conda_env(TempDir::new()?, base_time, Some(dst_prefix)).await?;
1417
1418 let newer_time = base_time + Duration::from_hours(1);
1420 let mut binary_content = Vec::new();
1421 binary_content.extend_from_slice(b"\x7fELF"); binary_content.extend_from_slice(&[0u8; 10]); binary_content.extend_from_slice(src_prefix.as_bytes());
1424 binary_content.extend_from_slice(&[0u8; 20]); binary_content.extend_from_slice(b"end");
1426
1427 fs::write(src_env.path().join("lib/binary"), &binary_content).await?;
1428 set_mtime(&src_env.path().join("lib/binary"), newer_time).await?;
1429
1430 let result = sync(src_env.path(), dst_env.path()).await;
1432
1433 assert!(result.is_err());
1435 let error_msg = result.unwrap_err().to_string();
1436 assert!(error_msg.contains("Input is longer than target length"));
1437
1438 Ok(())
1439 }
1440}