Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
- Applies a function over each item from the source DataPipe concurrently using - ThreadPoolExecutor(functional name:- threadpool_map). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.- Parameters:
- source_datapipe – Source IterDataPipe 
- fn – Function being applied over each item 
- input_col – - Index or indices of data which - fnis applied, such as:- Noneas default to apply- fnto the data directly.
- Integer(s) is used for list/tuple. 
- Key(s) is used for dict. 
 
- output_col – - Index of data where result of - fnis placed.- output_colcan be specified only when- input_colis not- None- Noneas default to replace the index that- input_colspecified; For- input_colwith multiple indices, the left-most one is used, and other indices will be removed.
- Integer is used for list/tuple. - -1represents to append result at the end.
- Key is used for dict. New key is acceptable. 
 
- scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128) 
- max_workers – Maximum number of threads to execute function calls 
- **threadpool_kwargs – additional arguments to be given to the - ThreadPoolExecutor
 
 - Note - For more information about - max_workersand additional arguments for the- ThreadPoolExecutorplease refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor- Note - For optimal use of all threads, - scheduled_tasks>- max_workersis strongly recommended. The higher the variance of the time needed to finish execution of the given- fnis, the higher the value of- scheduled_tasksneeds to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).- However, too high value of - scheduled_tasksmight lead to long waiting period until the first element is yielded as- nextis called- scheduled_tasksmany times on- source_datapipebefore yielding.- We encourage you to try out different values of - max_workersand- scheduled_tasksin search for optimal values for your use-case.- Example: - # fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16) - def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp)) - [(0, 0), (1, 10), (2, 20), (3, 30), ...] - dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp)) - [(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]