Quick Reference¶
Tip
Imports, core types, Runner lifecycle, and copy-pasteable recipes on one page. For detailed API docs see torchx.specs and torchx.runner.
Imports¶
# Core types — job definitions
import torchx.specs as specs
from torchx.specs import AppDef, Role, Resource, AppState, macros
# Named resources — t-shirt-sized hardware presets
from torchx.specs import resource
# Runner — submit, monitor, and manage jobs
from torchx.runner import get_runner
# Mounts — bind, volume, and device mounts
from torchx.specs import BindMount, VolumeMount, DeviceMount
# Plugins — registration, discovery, and diagnostics
from torchx import plugins
from torchx.plugins import register, find, PluginType
Core Types¶
AppDef¶
A job definition containing one or more Roles.
app = AppDef(
name="my_job", # str — job name
roles=[...], # list[Role] — one or more roles
metadata={}, # dict[str, str] — scheduler-specific metadata
)
Role¶
A set of identical replicas within an AppDef.
role = Role(
name="trainer", # str — role name
image="my_image:latest", # str — Docker image, fbpkg, or path
entrypoint="python", # str — command to run
args=["-m", "my_app"], # list[str] — arguments to entrypoint
env={"KEY": "value"}, # dict[str, str] — environment variables
num_replicas=1, # int — number of container replicas
resource=Resource( # Resource — hardware requirements per replica
cpu=4, gpu=1, memMB=8192,
),
# Optional fields:
# min_replicas=1, # int | None — minimum for elastic scaling
# max_retries=3, # int — retries before giving up
# retry_policy=RetryPolicy.APPLICATION,
# port_map={"tb": 6006}, # dict[str, int] — named port mappings
# mounts=[...], # list[BindMount | VolumeMount | DeviceMount]
# workspace=Workspace(...),
)
Resource¶
Hardware requirements per replica. Prefer named resources over raw values.
# Option 1: named resource (preferred)
from torchx.specs import resource
res = resource(h="gpu.small") # 8 CPU, 1 GPU, 32 GiB
# Option 2: explicit values
res = Resource(cpu=4, gpu=1, memMB=8192)
# Option 3: AWS instance type
res = resource(h="aws_p3.2xlarge")
See Named Resources for the full list.
Runner Lifecycle¶
Create a Runner¶
from torchx.runner import get_runner
# As a context manager (recommended — auto-closes scheduler connections)
with get_runner() as runner:
...
# Or manually
runner = get_runner()
# ... use runner ...
runner.close()
Submit a Job¶
run_component resolves a component by name; run takes a direct AppDef.
with get_runner() as runner:
# Method 1: run_component — resolves a component by name
# Same resolution as `torchx run` CLI
app_handle = runner.run_component(
"dist.ddp", # component name
["--script", "train.py", "-j", "2x2"], # args (list[str])
scheduler="kubernetes", # scheduler backend
cfg={"namespace": "default"}, # scheduler config (optional)
)
# Method 2: run — submit an AppDef directly
app = AppDef(name="my_job", roles=[...])
app_handle = runner.run(
app,
scheduler="kubernetes",
cfg={"namespace": "default"},
)
Monitor and Wait¶
# Poll status
status = runner.status(app_handle)
print(status.state) # AppState enum (see table below)
print(status.msg) # human-readable message
print(status.ui_url) # scheduler UI link (if available)
# Block until terminal state
final_status = runner.wait(app_handle, wait_interval=10)
# Check if terminal
if final_status and final_status.is_terminal():
print("Done:", final_status.state)
# Raise an exception if the job did not succeed
final_status.raise_for_status() # raises AppStatusError on non-SUCCEEDED
AppState values:
State |
Terminal? |
Description |
|---|---|---|
|
No |
Not yet submitted. |
|
No |
Submitted to the scheduler. |
|
No |
Waiting for resource allocation. |
|
No |
Running. |
|
Yes |
Completed successfully. |
|
Yes |
Completed unsuccessfully. |
|
Yes |
Cancelled before completing. |
|
No |
State cannot be determined. |
The app_handle is a URI: {scheduler}://{session_name}/{app_id} (e.g.
kubernetes://torchx/my_job_123). Pass it to all Runner methods.
Cancel and Delete¶
runner.cancel(app_handle) # request cancellation (async)
runner.delete(app_handle) # remove from scheduler
Fetch Logs¶
# Get log lines for replica 0 of the "trainer" role
for line in runner.log_lines(app_handle, role_name="trainer", k=0):
print(line, end="") # lines include trailing \n
Common Recipes¶
Single-Node Training¶
import torchx.specs as specs
from torchx.runner import get_runner
from torchx.specs import resource
app = specs.AppDef(
name="train",
roles=[
specs.Role(
name="trainer",
image="my_image:latest",
entrypoint="python",
args=["-m", "my_train", "--epochs", "10"],
resource=resource(h="gpu.small"),
env={"CUDA_VISIBLE_DEVICES": "0"},
)
],
)
with get_runner() as runner:
app_handle = runner.run(app, scheduler="local_cwd")
status = runner.wait(app_handle, wait_interval=1)
print(status)
Distributed Training (DDP)¶
Use the built-in dist.ddp component:
from torchx.runner import get_runner
with get_runner() as runner:
app_handle = runner.run_component(
"dist.ddp",
[
"--script", "train.py",
"-j", "2x2", # 2 nodes x 2 workers per node
"--gpu", "2",
"--memMB", "8192",
],
scheduler="kubernetes",
)
status = runner.wait(app_handle)
Or build the AppDef directly for full control:
import torchx.specs as specs
from torchx.runner import get_runner
app = specs.AppDef(
name="ddp_train",
roles=[
specs.Role(
name="trainer",
image="my_image:latest",
entrypoint="python",
args=[
"-m", "torch.distributed.run",
"--nnodes", "2",
"--nproc_per_node", "2",
"train.py",
],
num_replicas=2,
resource=specs.Resource(cpu=8, gpu=2, memMB=16384),
)
],
)
with get_runner() as runner:
app_handle = runner.run(app, scheduler="kubernetes")
Custom Component¶
# my_component.py
import torchx.specs as specs
from torchx.specs import resource
def trainer(
script: str,
image: str = "my_image:latest",
resource_name: str = "gpu.small",
) -> specs.AppDef:
"""Launch single-node training."""
return specs.AppDef(
name="trainer",
roles=[
specs.Role(
name="trainer",
image=image,
entrypoint="python",
args=["-m", script],
resource=resource(h=resource_name),
)
],
)
# Launch it:
from torchx.runner import get_runner
with get_runner() as runner:
# By name (same as CLI: torchx run my_component.py:trainer ...)
app_handle = runner.run_component(
"my_component.py:trainer",
["--script", "my_train", "--resource_name", "gpu.medium"],
scheduler="local_cwd",
)
# Or call the function directly
app = trainer(script="my_train", resource_name="gpu.medium")
app_handle = runner.run(app, scheduler="local_cwd")
Environment Variables¶
role = specs.Role(
name="trainer",
image="my_image:latest",
entrypoint="python",
args=["-m", "my_train"],
env={
"NCCL_DEBUG": "INFO",
"CUDA_VISIBLE_DEVICES": "0,1",
"MY_CONFIG": "/data/config.yaml",
},
)
Mounts¶
from torchx.specs import BindMount, VolumeMount
role = specs.Role(
name="trainer",
image="my_image:latest",
entrypoint="python",
args=["-m", "my_train"],
mounts=[
# Bind-mount a host directory
BindMount(src_path="/data/datasets", dst_path="/mnt/data", read_only=True),
# Mount a persistent volume (Kubernetes PVC = Persistent Volume Claim, etc.)
VolumeMount(src="my-pvc", dst_path="/mnt/checkpoints"),
],
)
Runtime Macros¶
Substitute scheduler-assigned values into args, env, and metadata
at runtime:
Macro |
Value |
|---|---|
|
Scheduler-assigned job ID |
|
Per-role replica index ( |
|
Root directory of the pulled image |
|
Name of the env var holding the rank-0 host address (resolve via shell expansion or application code; not available on all schedulers) |
from torchx.specs import macros
role = specs.Role(
name="trainer",
image="my_image:latest",
entrypoint="python",
args=[
"-m", "my_train",
"--job_id", macros.app_id, # scheduler-assigned job ID
"--replica", macros.replica_id, # 0, 1, 2, ...
],
env={
"IMG_ROOT": macros.img_root, # root dir of pulled image
},
)
Scheduler Config¶
with get_runner() as runner:
app_handle = runner.run(
app,
scheduler="kubernetes",
cfg={
"namespace": "my-namespace",
"image_repo": "my-registry.example.com/images",
"queue": "default",
},
)
# Or via .torchxconfig file:
# [kubernetes]
# namespace=my-namespace
# image_repo=my-registry.example.com/images
Scheduler Reference¶
Name |
Backend |
Notes |
|---|---|---|
|
Current working directory |
No container; runs as subprocesses. Good for development. |
|
Local Docker daemon |
Builds patched image from local workspace. |
|
Kubernetes |
Creates |
|
Slurm HPC |
Generates |
|
AWS Batch |
Creates job definitions and submits jobs. |
|
AWS SageMaker |
Creates training jobs. |
Named Resources Reference¶
Common cloud-agnostic sizes. For the complete list see Named Resources.
Name |
CPU |
GPU |
Memory |
|---|---|---|---|
|
8 |
1 |
32 GiB |
|
16 |
2 |
64 GiB |
|
32 |
4 |
128 GiB |
|
64 |
8 |
256 GiB |
|
1 |
0 |
2 GiB |
|
2 |
0 |
4 GiB |
|
2 |
0 |
8 GiB |
|
8 |
0 |
32 GiB |
See torchx.specs.named_resources_aws for AWS instance-type
resources (e.g. aws_p3.2xlarge, aws_m5.2xlarge).
Anti-Patterns¶
Avoid |
Prefer |
|---|---|
Hard-coding |
|
|
|
|
Separate |
Constructing |
Accept |
Using |
|
Parsing |
Use scheduler-native log APIs — log completeness is not guaranteed |
Plugin Implementor Quick Reference¶
Note
The rest of this page is for platform engineers extending TorchX. Job authors can stop here. For full extension guides see Advanced Usage and torchx.schedulers.
Condensed skeletons linking to full guides.
Scheduler Plugin¶
from dataclasses import dataclass
from typing import Any, Mapping
from torchx.plugins import register
from torchx.schedulers.api import (
DescribeAppResponse,
ListAppResponse,
Scheduler,
StructuredOpts,
)
from torchx.specs import AppDef, AppDryRunInfo, CfgVal, runopts
@dataclass
class MyRequest:
"""Native request type — passed from _submit_dryrun to schedule."""
job_name: str
cmd: list[str]
@dataclass
class MyOpts(StructuredOpts):
cluster: str = "default"
"""Cluster to submit to."""
class MyScheduler(Scheduler[Mapping[str, CfgVal]]):
def __init__(self, session_name: str, **kwargs: object) -> None:
super().__init__("my_backend", session_name)
def _run_opts(self) -> runopts:
return MyOpts.as_runopts()
def _submit_dryrun(self, app: AppDef, cfg: Mapping[str, CfgVal]) -> AppDryRunInfo[MyRequest]:
opts = MyOpts.from_cfg(cfg)
role = app.roles[0]
return AppDryRunInfo(MyRequest(app.name, [role.entrypoint, *role.args]), repr)
def schedule(self, dryrun_info: AppDryRunInfo[MyRequest]) -> str:
request: MyRequest = dryrun_info.request
return "job-id-123" # submit to backend, return ID
def describe(self, app_id: str) -> DescribeAppResponse | None:
return DescribeAppResponse(app_id=app_id)
def list(self, cfg: Mapping[str, CfgVal] | None = None) -> list[ListAppResponse]:
return []
def _cancel_existing(self, app_id: str) -> None:
pass # cancel the job
# 4. Register the scheduler
@register.scheduler()
def create_scheduler(session_name: str, **kwargs: Any) -> MyScheduler:
return MyScheduler(session_name)
Abstract methods: _submit_dryrun, schedule, describe, list,
_cancel_existing. Optional: _run_opts, log_iter, close,
_validate, _pre_build_validate, _delete_existing.
See Implementing a Custom Scheduler for the full guide.
Workspace Plugin¶
from typing import Mapping
from torchx.specs import CfgVal, Role, runopts
from torchx.workspace import WorkspaceMixin
class MyWorkspaceMixin(WorkspaceMixin[None]):
def workspace_opts(self) -> runopts:
opts = runopts()
opts.add("artifact_store", type_=str, required=True, help="Remote artifact store URL")
return opts
def caching_build_workspace_and_update_role(
self,
role: Role,
cfg: Mapping[str, CfgVal],
build_cache: dict[object, object],
) -> None:
if not role.workspace:
return
# ... build and upload workspace, then update role ...
role.env["WORKSPACE_URL"] = f"{cfg.get('artifact_store')}/{role.name}"
Mix into a scheduler: class MyScheduler(MyWorkspaceMixin, Scheduler[...]): ...
See torchx.workspace for the full API.
Tracker Plugin¶
from torchx.tracker.api import TrackerBase, TrackerArtifact, TrackerSource, Lineage
from typing import Iterable, Mapping
class MyTracker(TrackerBase):
def __init__(self, connection_str: str) -> None:
self._conn = connection_str
# --- write methods ---
def add_artifact(
self, run_id: str, name: str, path: str,
metadata: Mapping[str, object] | None = None,
) -> None: ...
def add_metadata(self, run_id: str, **kwargs: object) -> None: ...
def add_source(
self, run_id: str, source_id: str,
artifact_name: str | None = None,
) -> None: ...
# --- read methods ---
def artifacts(self, run_id: str) -> Mapping[str, TrackerArtifact]: ...
def metadata(self, run_id: str) -> Mapping[str, object]: ...
def sources(
self, run_id: str, artifact_name: str | None = None,
) -> Iterable[TrackerSource]: ...
def lineage(self, run_id: str) -> Lineage: ...
def run_ids(self, **kwargs: str) -> Iterable[str]: ...
def create(config: str | None) -> TrackerBase:
return MyTracker(connection_str=config or "default://localhost")
Abstract methods: add_artifact, artifacts, add_metadata,
metadata, add_source, sources, lineage, run_ids.
See Registering Custom Trackers for the full guide.
CLI Command Plugin¶
import argparse
from torchx.cli.cmd_base import SubCommand
class CmdMyTool(SubCommand):
def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
subparser.add_argument("--config", type=str, help="Path to config file")
subparser.add_argument("app_id", type=str, help="Application handle")
def run(self, args: argparse.Namespace) -> None:
print(f"Running my_tool on {args.app_id} with config={args.config}")
Abstract methods: add_arguments, run.
See Registering Custom CLI Commands for the full guide.
Plugin Registration¶
Recommended: ``@register`` decorator — place your plugin in a
torchx_plugins/<group>/ namespace package and decorate it:
# torchx_plugins/schedulers/my_scheduler.py
from torchx.plugins import register
@register.scheduler()
def my_sched(session_name: str, **kwargs) -> Scheduler:
return MyScheduler(session_name, **kwargs)
# torchx_plugins/named_resources/my_resources.py
@register.named_resource()
def gpu_x4() -> Resource:
return Resource(cpu=64, gpu=4, memMB=244_000)
# torchx_plugins/tracker/my_tracker.py
@register.tracker()
def my_tracker(config: str | None) -> TrackerBase:
return MyTracker(config)
See torchx.plugins for the full API reference and Advanced Usage for packaging and diagnostics.
Legacy: entry points (deprecated)
[project.entry-points."torchx.schedulers"]
my_sched = "my_pkg:create_scheduler"
[project.entry-points."torchx.named_resources"]
gpu_x4 = "my_pkg.resources:gpu_x4"
[project.entry-points."torchx.components"]
myco = "my_pkg.components"
[project.entry-points."torchx.tracker"]
my_tracker = "my_pkg.tracking:create"
[project.entry-points."torchx.cli.cmds"]
my_tool = "my_pkg.cli:CmdMyTool"
See Advanced Usage for the full registration guide.
See also
- torchx.specs
Full API documentation for AppDef, Role, Resource, and related types.
- torchx.runner
Full API documentation for Runner and get_runner.
- torchx.plugins
Plugin API reference (
@register,find(),PluginRegistry).- torchx.schedulers
Scheduler API reference and implementation guide.
- torchx.workspace
Workspace API reference and custom workspace mixin guide.
- torchx.tracker
Tracker API reference and backend implementations.
- CLI
CLI module API reference and custom command guide.
- Custom Components
Step-by-step guide for writing and launching custom components.
- Component Best Practices
Best practices for authoring reusable components.