Shortcuts

Distributed

For distributed training, TorchX relies on the scheduler’s gang scheduling capabilities to schedule n copies of nodes. Once launched, the application is expected to be written in a way that leverages this topology, for instance, with PyTorch’s DDP. You can express a variety of node topologies with TorchX by specifying multiple torchx.specs.Role in your component’s AppDef. Each role maps to a homogeneous group of nodes that performs a “role” (function) in the overall training. Scheduling-wise, TorchX launches each role as a sub-gang.

A DDP-style training job has a single role: trainers. Whereas a training job that uses parameter servers will have two roles: parameter server, trainer. You can specify different entrypoint (executable), num replicas, resource requirements, and more for each role.

DDP Builtin

DDP-style trainers are common and easy to templetize since they are homogeneous single role AppDefs, so there is a builtin: dist.ddp. Assuming your DDP training script is called main.py, launch it as:

# locally, 1 node x 4 workers
$ torchx run -s local_cwd dist.ddp -j 1x4 --script main.py

# locally, 2 node x 4 workers (8 total)
$ torchx run -s local_cwd dist.ddp -j 2x4 --script main.py

# remote (optionally pass --rdzv_port to use a different master port than the default 29500)
$ torchx run -s kubernetes -cfg queue=default dist.ddp \
    -j 2x4 \
    --script main.py

# remote -- elastic/autoscaling with 2 minimum and max 5 nodes with 8
# workers each
$ torchx run -s kubernetes dist.ddp -j 2:5x8 --script main.py

Note that the only difference compared to the local launch is the scheduler (-s). The dist.ddp builtin uses torchelastic (more specifically torch.distributed.run) under the hood. Read more about torchelastic here.

Components APIs

torchx.components.dist.ddp(*script_args: str, script: str | None = None, m: str | None = None, image: str = 'ghcr.io/pytorch/torchx:0.8.0dev0', name: str = '/', h: str | None = None, cpu: int = 2, gpu: int = 0, memMB: int = 1024, j: str = '1x2', env: dict[str, str] | None = None, metadata: dict[str, str] | None = None, max_retries: int = 0, rdzv_port: int = 29500, rdzv_backend: str = 'c10d', rdzv_conf: str | None = None, mounts: list[str] | None = None, debug: bool = False, tee: int = 3) AppDef[source]

Distributed data parallel style application (one role, multi-replica). Uses torch.distributed.run to launch and coordinate PyTorch worker processes. Defaults to using c10d rendezvous backend on rendezvous_endpoint $rank_0_host:$rdzv_port. Note that rdzv_port parameter is ignored when running on single node, and instead we use port 0 which instructs torchelastic to chose a free random port on the host.

Note: (cpu, gpu, memMB) parameters are mutually exclusive with h (named resource) where

h takes precedence if specified for setting resource requirements. See registering named resources.

Parameters:
  • script_args – arguments to the main module

  • script – script or binary to run within the image

  • m – the python module path to run

  • image – image (e.g. docker)

  • name – job name override in the following format: {experimentname}/{runname} or {experimentname}/ or /{runname} or {runname}. Uses the script or module name if {runname} not specified.

  • cpu – number of cpus per replica

  • gpu – number of gpus per replica

  • memMB – cpu memory in MB per replica

  • h – a registered named resource (if specified takes precedence over cpu, gpu, memMB)

  • j – [{min_nnodes}:]{nnodes}x{nproc_per_node}, for gpu hosts, nproc_per_node must not exceed num gpus

  • env – environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)

  • metadata – metadata to be passed to the scheduler (e.g. KEY1=v1,KEY2=v2,KEY3=v3)

  • max_retries – the number of scheduler retries allowed

  • rdzv_port – the port on rank0’s host to use for hosting the c10d store used for rendezvous. Only takes effect when running multi-node. When running single node, this parameter is ignored and a random free port is chosen.

  • rdzv_backend – the rendezvous backend to use. Only takes effect when running multi-node.

  • rdzv_conf – the additional rendezvous configuration to use (ex. join_timeout=600,close_timeout=600,timeout=600).

  • mounts – mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]). See scheduler documentation for more info.

  • debug – whether to run with preset debug flags enabled

  • tee – tees the specified std stream(s) to console + file. 0: none, 1: stdout, 2: stderr, 3: both

Note

SPMD stands for Single Program, Multiple Data – a paradigm where every worker runs the same code but on different data partitions.

torchx.components.dist.spmd(*args: str, script: str | None = None, m: str | None = None, image: str = 'ghcr.io/pytorch/torchx:0.8.0dev0', name: str = '/', h: str = 'gpu.small', j: str = '1x1', env: dict[str, str] | None = None, metadata: dict[str, str] | None = None, max_retries: int = 0, mounts: list[str] | None = None, debug: bool = False) AppDef[source]

Usage (by script): torchx run spmd -j 2x8 -h aws_p4d.24xlarge –name my_experiment/trial_1 –script path/to/my/trainer.py -foo bar

Usage (by module): torchx run spmd -j 2x8 -h aws_p4d.24xlarge –name my_experiment/trial_1 -m path.to.my.trainer -foo bar

Usage (infer GPU count): torchx run spmd -j 2 -h p4d.24xlarge … (same as -j 2x8)

Creates a torchx.specs.AppDef (Job Definition) for a Single-Process-Multiple-Data (SPMD) style application. See: https://en.wikipedia.org/wiki/Single_program,_multiple_data.

SPMD launches n x m (set via the -j nxm option) copies of the same program, where n is the number of nodes (hosts) and m is the number of processes on each node.

If you have a distributed PyTorch script (DDP, FSDP, RPC) use this component to launch the distributed application. You can also use -j 1x1 to launch a single process application which would be equivalent to launching with regular python except that your application can safely call torch.distributed.init_process_group(backend).

Note: For multi-node distributed runs, the hosts MUST have a network route to each other

AND port 29500 should be open on all hosts. Please check your security group settings.

Parameters:
  • args – the arguments to the main module or script (e.g. my/trainer.py -foo bar) (for docker based runs) the script path must be relative to the WORKDIR of the image

  • script

  • m – the main module name (e.g. my.module.trainer). When this option is used, the script_args are passed as the arguments to the main module). Invoking my module is useful when the relative/absolute path of the main script is unknown w.r.t the WORKDIR of the image. Use this option when it makes sense to invoke the main script via python -m <MAIN.MODULE>.

  • image – the base docker image of the workspace, if workspace is disabled, then the image of the job

  • name{experimentname}/{runname} or {experimentname}/ or /{runname} or {runname}

  • h – the type of host to run on (e.g. aws_p4d.24xlarge). Must be one of the registered named resources

  • j – {nnodes}x{nproc_per_node}. For GPU hosts omitting nproc_per_node will infer it from the GPU count on the host

  • env – environment variables to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3)

  • metadata – metadata to be passed to the scheduler (e.g. KEY1=v1,KEY2=v2,KEY3=v3)

  • max_retries – the number of scheduler retries allowed

  • mounts – (for docker based runs only) mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]).

  • debug – whether to run with preset debug flags enabled

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources