Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
FullSync¶
- class torchdata.datapipes.iter.FullSync(datapipe: IterDataPipe, timeout=1800)¶
- Synchronizes data across distributed processes to prevent hanging during training, which is caused by uneven sharded data (functional name: - fullsync). It stops when the shortest distributed shard is exhausted. It would be appended at the end of the graph of- DataPipeby- DistributedReadingServiceautomatically.- Parameters:
- datapipe – IterDataPipe that needs to be synchronized 
- timeout – Timeout for prefetching data in seconds. Default value equals to 30 minutes 
 
 - Example - >>> from torchdata.datapipes.iter import IterableWrapper >>> # Distributed training with world size 2 >>> world_size = 2 >>> dp = IterableWrapper(list(range(23))).sharding_filter() >>> torch.utils.data.graph_settings.apply_sharding(dp, world_size, rank) >>> # Rank 0 has 12 elements; Rank 1 has 11 elements >>> for d in dp: ... model(d) # Hanging at the end of epoch due to uneven sharding >>> dp = dp.fullsync() >>> # Both ranks have 11 elements >>> for d in dp: ... model(d) # Not hanging anymore