Attention
June 2024 Status Update: Removing DataPipes and DataLoader V2
We are re-focusing the torchdata repo to be an iterative enhancement of torch.utils.data.DataLoader. We do not plan on continuing development or maintaining the [DataPipes] and [DataLoaderV2] solutions, and they will be removed from the torchdata repo. We’ll also be revisiting the DataPipes references in pytorch/pytorch. In release torchdata==0.8.0 (July 2024) they will be marked as deprecated, and in 0.9.0 (Oct 2024) they will be deleted. Existing users are advised to pin to torchdata==0.8.0 or an older version until they are able to migrate away. Subsequent releases will not include DataPipes or DataLoaderV2. Please reach out if you suggestions or comments (please use this issue for feedback)
ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
Applies a function over each item from the source DataPipe concurrently using
ThreadPoolExecutor(functional name:threadpool_map). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.- Parameters:
source_datapipe – Source IterDataPipe
fn – Function being applied over each item
input_col –
Index or indices of data which
fnis applied, such as:Noneas default to applyfnto the data directly.Integer(s) is used for list/tuple.
Key(s) is used for dict.
output_col –
Index of data where result of
fnis placed.output_colcan be specified only wheninput_colis notNoneNoneas default to replace the index thatinput_colspecified; Forinput_colwith multiple indices, the left-most one is used, and other indices will be removed.Integer is used for list/tuple.
-1represents to append result at the end.Key is used for dict. New key is acceptable.
scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)
max_workers – Maximum number of threads to execute function calls
**threadpool_kwargs – additional arguments to be given to the
ThreadPoolExecutor
Note
For more information about
max_workersand additional arguments for theThreadPoolExecutorplease refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutorNote
For optimal use of all threads,
scheduled_tasks>max_workersis strongly recommended. The higher the variance of the time needed to finish execution of the givenfnis, the higher the value ofscheduled_tasksneeds to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).However, too high value of
scheduled_tasksmight lead to long waiting period until the first element is yielded asnextis calledscheduled_tasksmany times onsource_datapipebefore yielding.We encourage you to try out different values of
max_workersandscheduled_tasksin search for optimal values for your use-case.Example:
# fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]