Distributed¶
For distributed training, TorchX relies on the scheduler’s gang scheduling
capabilities to schedule n copies of nodes. Once launched, the application
is expected to be written in a way that leverages this topology, for instance,
with PyTorch’s
DDP.
You can express a variety of node topologies with TorchX by specifying multiple
torchx.specs.Role in your component’s AppDef. Each role maps to
a homogeneous group of nodes that performs a “role” (function) in the overall
training. Scheduling-wise, TorchX launches each role as a sub-gang.
A DDP-style training job has a single role: trainers. Whereas a training job that uses parameter servers will have two roles: parameter server, trainer. You can specify different entrypoint (executable), num replicas, resource requirements, and more for each role.
DDP Builtin¶
DDP-style trainers are common and easy to templetize since they are homogeneous
single role AppDefs, so there is a builtin: dist.ddp. Assuming your DDP
training script is called main.py, launch it as:
# locally, 1 node x 4 workers
$ torchx run -s local_cwd dist.ddp -j 1x4 --script main.py
# locally, 2 node x 4 workers (8 total)
$ torchx run -s local_cwd dist.ddp -j 2x4 --script main.py
# remote (optionally pass --rdzv_port to use a different master port than the default 29500)
$ torchx run -s kubernetes -cfg queue=default dist.ddp \
-j 2x4 \
--script main.py
# remote -- elastic/autoscaling with 2 minimum and max 5 nodes with 8
# workers each
$ torchx run -s kubernetes dist.ddp -j 2:5x8 --script main.py
Note that the only difference compared to the local launch is the scheduler (-s).
The dist.ddp builtin uses torchelastic (more specifically torch.distributed.run)
under the hood. Read more about torchelastic here.
Components APIs¶
- torchx.components.dist.ddp(*script_args: str, script: str | None = None, m: str | None = None, image: str = 'ghcr.io/pytorch/torchx:0.8.0dev0', name: str = '/', h: str | None = None, cpu: int = 2, gpu: int = 0, memMB: int = 1024, j: str = '1x2', env: dict[str, str] | None = None, metadata: dict[str, str] | None = None, max_retries: int = 0, rdzv_port: int = 29500, rdzv_backend: str = 'c10d', rdzv_conf: str | None = None, mounts: list[str] | None = None, debug: bool = False, tee: int = 3) AppDef[source]¶
Distributed data parallel style application (one role, multi-replica). Uses torch.distributed.run to launch and coordinate PyTorch worker processes. Defaults to using
c10drendezvous backend on rendezvous_endpoint$rank_0_host:$rdzv_port. Note thatrdzv_portparameter is ignored when running on single node, and instead we use port 0 which instructs torchelastic to chose a free random port on the host.- Note: (cpu, gpu, memMB) parameters are mutually exclusive with
h(named resource) where htakes precedence if specified for setting resource requirements. See registering named resources.
- Parameters:
script_args – arguments to the main module
script – script or binary to run within the image
m – the python module path to run
image – image (e.g. docker)
name – job name override in the following format:
{experimentname}/{runname}or{experimentname}/or/{runname}or{runname}. Uses the script or module name if{runname}not specified.cpu – number of cpus per replica
gpu – number of gpus per replica
memMB – cpu memory in MB per replica
h – a registered named resource (if specified takes precedence over cpu, gpu, memMB)
j – [{min_nnodes}:]{nnodes}x{nproc_per_node}, for gpu hosts, nproc_per_node must not exceed num gpus
env – environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)
metadata – metadata to be passed to the scheduler (e.g. KEY1=v1,KEY2=v2,KEY3=v3)
max_retries – the number of scheduler retries allowed
rdzv_port – the port on rank0’s host to use for hosting the c10d store used for rendezvous. Only takes effect when running multi-node. When running single node, this parameter is ignored and a random free port is chosen.
rdzv_backend – the rendezvous backend to use. Only takes effect when running multi-node.
rdzv_conf – the additional rendezvous configuration to use (ex. join_timeout=600,close_timeout=600,timeout=600).
mounts – mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]). See scheduler documentation for more info.
debug – whether to run with preset debug flags enabled
tee – tees the specified std stream(s) to console + file. 0: none, 1: stdout, 2: stderr, 3: both
- Note: (cpu, gpu, memMB) parameters are mutually exclusive with
Note
SPMD stands for Single Program, Multiple Data – a paradigm where every worker runs the same code but on different data partitions.
- torchx.components.dist.spmd(*args: str, script: str | None = None, m: str | None = None, image: str = 'ghcr.io/pytorch/torchx:0.8.0dev0', name: str = '/', h: str = 'gpu.small', j: str = '1x1', env: dict[str, str] | None = None, metadata: dict[str, str] | None = None, max_retries: int = 0, mounts: list[str] | None = None, debug: bool = False) AppDef[source]¶
Usage (by script): torchx run spmd -j 2x8 -h aws_p4d.24xlarge –name my_experiment/trial_1 –script path/to/my/trainer.py -foo bar
Usage (by module): torchx run spmd -j 2x8 -h aws_p4d.24xlarge –name my_experiment/trial_1 -m path.to.my.trainer -foo bar
Usage (infer GPU count): torchx run spmd -j 2 -h p4d.24xlarge … (same as -j 2x8)
Creates a torchx.specs.AppDef (Job Definition) for a Single-Process-Multiple-Data (SPMD) style application. See: https://en.wikipedia.org/wiki/Single_program,_multiple_data.
SPMD launches n x m (set via the -j nxm option) copies of the same program, where n is the number of nodes (hosts) and m is the number of processes on each node.
If you have a distributed PyTorch script (DDP, FSDP, RPC) use this component to launch the distributed application. You can also use -j 1x1 to launch a single process application which would be equivalent to launching with regular python except that your application can safely call torch.distributed.init_process_group(backend).
- Note: For multi-node distributed runs, the hosts MUST have a network route to each other
AND port 29500 should be open on all hosts. Please check your security group settings.
- Parameters:
args – the arguments to the main module or script (e.g. my/trainer.py -foo bar) (for docker based runs) the script path must be relative to the WORKDIR of the image
script –
m – the main module name (e.g. my.module.trainer). When this option is used, the script_args are passed as the arguments to the main module). Invoking my module is useful when the relative/absolute path of the main script is unknown w.r.t the WORKDIR of the image. Use this option when it makes sense to invoke the main script via python -m <MAIN.MODULE>.
image – the base docker image of the workspace, if workspace is disabled, then the image of the job
name –
{experimentname}/{runname}or{experimentname}/or/{runname}or{runname}h – the type of host to run on (e.g. aws_p4d.24xlarge). Must be one of the registered named resources
j – {nnodes}x{nproc_per_node}. For GPU hosts omitting nproc_per_node will infer it from the GPU count on the host
env – environment variables to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)
metadata – metadata to be passed to the scheduler (e.g. KEY1=v1,KEY2=v2,KEY3=v3)
max_retries – the number of scheduler retries allowed
mounts – (for docker based runs only) mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]).
debug – whether to run with preset debug flags enabled