ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
Applies a function over each item from the source DataPipe concurrently using
ThreadPoolExecutor(functional name:threadpool_map). The function can be any regular Python function or partial object. Lambda function is not recommended as it is not supported by pickle.- Parameters:
source_datapipe – Source IterDataPipe
fn – Function being applied over each item
input_col –
Index or indices of data which
fnis applied, such as:Noneas default to applyfnto the data directly.Integer(s) is used for list/tuple.
Key(s) is used for dict.
output_col –
Index of data where result of
fnis placed.output_colcan be specified only wheninput_colis notNoneNoneas default to replace the index thatinput_colspecified; Forinput_colwith multiple indices, the left-most one is used, and other indices will be removed.Integer is used for list/tuple.
-1represents to append result at the end.Key is used for dict. New key is acceptable.
scheduled_tasks – How many tasks will be scheduled at any given time (Default value: 128)
max_workers – Maximum number of threads to execute function calls
**threadpool_kwargs – additional arguments to be given to the
ThreadPoolExecutor
Note
For more information about
max_workersand additional arguments for theThreadPoolExecutorplease refer to: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutorNote
For optimal use of all threads,
scheduled_tasks>max_workersis strongly recommended. The higher the variance of the time needed to finish execution of the givenfnis, the higher the value ofscheduled_tasksneeds to be to avoid threads sitting idle while waiting for the next result (as results are returned in correct order).However, too high value of
scheduled_tasksmight lead to long waiting period until the first element is yielded asnextis calledscheduled_tasksmany times onsource_datapipebefore yielding.We encourage you to try out different values of
max_workersandscheduled_tasksin search for optimal values for your use-case.Example:
# fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]