MultiProcessingReadingService¶
- class torchdata.dataloader2.MultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: Optional[str] = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo], Union[IterDataPipe, MapDataPipe]]] = None, worker_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo, SeedGenerator], Union[IterDataPipe, MapDataPipe]]] = None)¶
Spawns multiple worker processes to load data from the
DataPipegraph. If any non-replicableDataPipe(sharding_round_robin_dispatch) is presented in the graph, a separate dispatching process will be created to load data from the lowest common ancestor of all non-replicableDataPipesand distributes data to each worker process in the round-robin manner Then, the subsequentDataPipegraph in each worker process will process the data from the dispatching process and eventually return the result to the main process.- Parameters:
num_workers (int, optional) – How many subprocesses to use for data loading.
0will be replaced byInProcessReadingServicein the future.multiprocessing_context (str, optional) – Multiprocessing starting method. If method is None then the default context is returned. Otherwise, method should be ‘fork’, ‘spawn’.
worker_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of each worker process.
main_prefetch_cnt – (int, 10 by default): Number of data will be prefetched at the end of the whole pipeline in the main process.
worker_init_fn – (Callable, optional): Function to be called when each worker process launches with
DataPipeandWorkerInfoas the expected arguments.worker_reset_fn – (Callable, optional): Function to be called at the beginning of each epoch in each worker process with
DataPipe,WorkerInfoandSeedGeneratoras the expected arguments.
- finalize() None¶
MultiProcessingReadingServiceinvalidate states & properly exits all subprocesses.
- initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
MultiProcessingReadingServicefinds information about sharding, separates graph by multiple pieces and reconnects it using queues. creates subprocesses.
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]¶
ReadingServicespins up service for an epoch. Called at the beginning of every time gettingDataLoader2iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.
iter_reset_fn – Optional reset function from the prior
ReadingServciewhenSequentialReadingServicechains multipleReadingServices
- Returns:
A new
iter_reset_fnto be used by subseqeuentReadingService
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.