Shortcuts

torchx.specs

Tip

API reference for TorchX’s core data types. For a conceptual overview see Basic Concepts. For copy-pasteable recipes see Quick Reference.

Core TorchX types for defining distributed applications.

The main types are AppDef, Role, and Resource. Components are functions that return an AppDef which can then be launched via a Scheduler.

>>> import torchx.specs as specs
>>> app = specs.AppDef(
...     name="echo",
...     roles=[specs.Role(name="worker", image="/tmp", entrypoint="/bin/echo", args=["hello"])],
... )
>>> app.name
'echo'

AppDef

class torchx.specs.AppDef(name: str, roles: list[torchx.specs.api.Role] = <factory>, metadata: dict[str, str] = <factory>)[source]

A distributed application composed of one or more Role s.

>>> from torchx.specs import AppDef, Role
>>> app = AppDef(
...     name="my_train",
...     roles=[Role(name="trainer", image="my_image:latest")],
... )
Parameters:

metadata – scheduler-specific metadata (treatment varies by scheduler)

Role

class torchx.specs.Role(name: str, image: str, min_replicas: int | None = None, entrypoint: str = '<MISSING>', args: list[str] = <factory>, env: dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: dict[str, int] = <factory>, metadata: dict[str, typing.Any] = <factory>, mounts: list[torchx.specs.api.BindMount | torchx.specs.api.VolumeMount | torchx.specs.api.DeviceMount] = <factory>, workspace: torchx.specs.api.Workspace | None = None, overrides: dict[str, typing.Any] = <factory>)[source]

A set of nodes that perform a specific duty within an AppDef.

  • DDP app — single role (trainer)

  • Parameter-server app — multiple roles (trainer, ps)

>>> from torchx.specs import Role, Resource
>>> trainer = Role(
...     name="trainer",
...     image="pytorch/torch:latest",
...     entrypoint="train.py",
...     args=["--lr", "0.01"],
...     num_replicas=4,
...     resource=Resource(cpu=4, gpu=1, memMB=8192),
... )
Parameters:
  • name – name of the role

  • image – software bundle installed on the container (docker image, fbpkg, tar-ball, etc.)

  • entrypoint – command to invoke inside the container

  • args – arguments to the entrypoint

  • env – environment variable mappings

  • num_replicas – number of container replicas

  • min_replicas – minimum replicas for elastic scaling. If unset or unsupported by the scheduler, the job runs at num_replicas.

  • max_retries – max number of retries before giving up

  • retry_policy – retry behavior upon failures

  • resource – resource requirements per replica

  • port_map – named port mappings (e.g. {"tensorboard": 8081})

  • metadata – scheduler-specific data. Keys should follow $scheduler.$key.

  • mounts – bind, volume, or device mounts

  • workspace – local project directories to mirror on the remote job. The workspace argument on Runner APIs overrides this on roles[0].

pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo[source]

Hook for role-specific scheduler request modifications.

Called per-role during Scheduler.submit_dryrun, in the order they appear in AppDef.roles.

class torchx.specs.RetryPolicy(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Defines the retry policy for the Roles in the AppDef. The policy defines the behavior when the role replica encounters a failure:

  1. unsuccessful (non zero) exit code

  2. hardware/host crashes

  3. preemption

  4. eviction

Note

Not all retry policies are supported by all schedulers. However all schedulers must support RetryPolicy.APPLICATION. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).

  1. REPLICA: Replaces the replica instance. Surviving replicas are untouched.

    Use with dist.ddp component to have torchelastic coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.

  2. APPLICATION: Restarts the entire application.

  3. ROLE: Restarts the role when any error occurs in that role. This does not

    restart the whole job.

Resource

class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: dict[str, typing.Any] = <factory>, devices: dict[str, int] = <factory>, tags: dict[str, object] = <factory>)[source]

Represents resource requirements for a Role.

Important

Prefer resource() with named resources (t-shirt sizes) over specifying raw values directly.

>>> from torchx.specs import Resource
>>> Resource(cpu=4, gpu=1, memMB=8192)
Resource(cpu=4, gpu=1, memMB=8192, capabilities={}, devices={}, tags={})
Parameters:
  • cpu – number of logical cpu cores

  • gpu – number of gpus

  • memMB – MB of ram

  • capabilities – additional hardware specs (interpreted by scheduler)

  • devices – named devices with their quantities (e.g. {"vpc.amazonaws.com/efa": 1})

  • tags – metadata tags (not interpreted by schedulers)

static copy(original: Resource, **capabilities: Any) Resource[source]

Copies a resource, merging in the given capabilities.

get_resource_name() str | None[source]

Return the registered named-resource name, or None.

Set automatically by register.named_resource() on every resource it creates.

is_fractional() bool[source]

Return True if this resource is a fractional slice of a base resource.

Set automatically by register.named_resource() when a fractionals argument is provided.

torchx.specs.resource(cpu: int | None = None, gpu: int | None = None, memMB: int | None = None, h: str | None = None) Resource[source]

Creates a Resource from raw specs or a named resource.

When h is set, it takes precedence (raw specs are ignored). See Registering Named Resources for custom named resources.

>>> from torchx.specs import resource
>>> resource(cpu=4, gpu=1, memMB=8192)
Resource(cpu=4, gpu=1, memMB=8192, capabilities={}, devices={}, tags={})

Workspace

class torchx.specs.Workspace(projects: dict[str, str])[source]

Maps local project directories to remote workspace locations. At submit-time, files are copied/synced so that the remote job mirrors local code changes.

>>> from torchx.specs import Workspace
>>> # copies ~/github/torch/** into $REMOTE_ROOT/torch/**
>>> ws = Workspace(projects={"~/github/torch": "torch"})
>>> # copies ~/github/torch/** into $REMOTE_ROOT/** (no sub-dir)
>>> ws = Workspace(projects={"~/github/torch": ""})

The exact $REMOTE_ROOT is implementation-dependent. See WorkspaceMixin and scheduler docs.

Parameters:

projects{local_path: remote_subdir} mapping.

is_unmapped_single_project() bool[source]

True if this is a single-project workspace with no target sub-directory.

merge_into(outdir: str | pathlib.Path) None[source]

Copies each project into {outdir}/{target}.

Macros

class torchx.specs.macros[source]

Template variables substituted at runtime in Role.args, Role.env, and Role.metadata.

Warning

Macros in other Role fields are NOT substituted.

Available macros:

  1. img_root — root directory of the pulled image

  2. app_id — application id as assigned by the scheduler

  3. replica_id — per-role replica index (0, 1, ...). When a replica is replaced after failure, the replacement retains the same replica_id.

>>> from torchx.specs import AppDef, Role, macros
>>> trainer = Role(
...     name="trainer",
...     image="my_image:latest",
...     entrypoint="train.py",
...     args=["--app_id", macros.app_id],
...     env={"IMG_ROOT": macros.img_root},
... )
>>> app = AppDef("train_app", roles=[trainer])
class Values(img_root: str, app_id: str, replica_id: str, rank0_env: str, base_img_root: str = 'DEPRECATED')[source]
apply(role: Role) Role[source]

Returns a deep copy of role with macros substituted.

substitute(arg: str) str[source]

Substitutes macro placeholders in arg.

to_dict() dict[str, Any][source]

Returns the macro values as a plain dict.

Note

In addition to the three macros listed in the class docstring, two more attributes exist:

  • macros.rank0_env – expands to the name of the environment variable that provides the rank-0 (master) host address. Resolve it via shell expansion ($${rank0_env}) or in application code. Not available on all schedulers.

  • macros.base_img_rootdeprecated. Do not use in new code.

Run Configs

torchx.specs.CfgVal = str | int | float | bool | list[str] | dict[str, str] | None

Represent a PEP 604 union type

E.g. for int | str

Type alias for run config values: str | int | float | bool | list[str] | dict[str, str] | None. Used in cfg dicts passed to run() and scheduler methods.

class torchx.specs.runopts[source]

Schema for scheduler run configuration.

Holds accepted config keys, defaults, and help strings. Constructed by Scheduler.run_opts() and validated at submit time.

>>> from torchx.specs import runopts
>>> opts = runopts()
>>> opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True)
>>> opts.add("priority", type_=float, default=0.5, help="job priority")
>>> opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")

Note

For new schedulers, prefer StructuredOpts which auto-generates runopts from typed dataclass fields.

add(cfg_key: str, type_: Type[str | int | float | bool | list[str] | dict[str, str] | None], help: str, default: str | int | float | bool | list[str] | dict[str, str] | None = None, required: bool = False) None[source]

Registers a config option. Required options must not have a default.

cfg_from_json_repr(json_repr: str) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]

Converts the given dict to a valid cfg for this runopts object.

cfg_from_str(cfg_str: str) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]

Parses scheduler cfg from a string literal and returns a cfg map where the cfg values have been cast into the appropriate types as specified by this runopts object. Unknown keys are ignored and not returned in the resulting map.

Note

Unlike the method resolve, this method does NOT resolve default options or check that the required options are actually present in the given cfg_str. This method is intended to be called before calling resolve() when the input is a string encoded run cfg. That is to fully resolve the cfg, call opt.resolve(opt.cfg_from_str(cfg_literal)).

If the cfg_str is an empty string, then an empty cfg is returned. Otherwise, at least one kv-pair delimited by "=" (equal) is expected.

Either "," (comma) or ";" (semi-colon) can be used to delimit multiple kv-pairs.

CfgVal allows List of primitives, which can be passed as either "," or ";" (semi-colon) delimited. Since the same delimiters are used to delimit between cfg kv pairs, this method interprets the last (trailing) "," or ";" as the delimiter between kv pairs. See example below.

Examples:

opts = runopts()
opts.add("FOO", type_=List[str], default=["a"], help="an optional list option")
opts.add("BAR", type_=str, required=True, help="a required str option")

# required and default options not checked
# method returns strictly parsed cfg from the cfg literal string
opts.cfg_from_str("") == {}

# however, unknown options are ignored
# since the value type is unknown hence cannot cast to the correct type
opts.cfg_from_str("UNKNOWN=VALUE") == {}

opts.cfg_from_str("FOO=v1") == {"FOO": "v1"}

opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]}
opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]}

opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
get(name: str) torchx.specs.api.runopt | None[source]

Returns the registered option, or None.

Accepts camelCase names (e.g. "clusterName" resolves "cluster_name").

static is_type(obj: str | int | float | bool | list[str] | dict[str, str] | None, tp: Type[str | int | float | bool | list[str] | dict[str, str] | None]) bool[source]

Like isinstance() but supports generic types (e.g. list[str]).

resolve(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]

Validates cfg against registered options, filling defaults.

Raises InvalidRunConfigException for missing required options or type mismatches. Accepts camelCase keys.

class torchx.specs.runopt(default: str | int | float | bool | list[str] | dict[str, str] | None, opt_type: Type[str | int | float | bool | list[str] | dict[str, str] | None], is_required: bool, help: str)[source]

Metadata for a single scheduler run option.

cast_to_type(value: str) str | int | float | bool | list[str] | dict[str, str] | None[source]

Casts the given value (in its string representation) to the type of this run option. Below are the cast rules for each option type and value literal:

1. opt_type=str, value=”foo” -> “foo” 1. opt_type=bool, value=”True”/”False” -> True/False 1. opt_type=int, value=”1” -> 1 1. opt_type=float, value=”1.1” -> 1.1 1. opt_type=list[str]/List[str], value=”a,b,c” or value=”a;b;c” -> [“a”, “b”, “c”] 1. opt_type=dict[str,str]/Dict[str,str], value=”a:1,b:2” or value=”a:1;b:2” -> {“a”: “1”, “b”: “2”}

NOTE: dict parsing uses “:” as the kv separator (rather than the standard “=”) because “=” is used at the top-level cfg to parse runopts (notice the plural) from the CLI. Originally torchx only supported primitives and list[str] as CfgVal but dict[str,str] was added in https://github.com/meta-pytorch/torchx/pull/855

Structured Opts

class torchx.schedulers.api.StructuredOpts[source]

Base class for typed scheduler configuration options.

Provides a type-safe way to define scheduler run options as dataclass fields instead of manually building runopts. Subclasses should be @dataclass decorated with fields representing config options.

Features:
  • Auto-generates runopts from dataclass fields via as_runopts()

  • Parses raw config dicts into typed instances via from_cfg()

  • Supports snake_case field names with camelCase aliases

  • Extracts help text from field docstrings

  • Supports nested StructuredOpts fields, flattened with dot-prefixed keys (e.g., k8s.context)

Example

>>> from dataclasses import dataclass
>>> from torchx.schedulers.api import StructuredOpts
>>>
>>> @dataclass
... class MyOpts(StructuredOpts):
...     cluster_name: str
...     '''Name of the cluster to submit to.'''
...
...     num_retries: int = 3
...     '''Number of retry attempts.'''
...
>>> # Use in scheduler:
>>> # def _run_opts(self) -> runopts:
>>> #     return MyOpts.as_runopts()
>>> #
>>> # def _submit_dryrun(self, app, cfg):
>>> #     opts = MyOpts.from_cfg(cfg)
>>> #     # opts.cluster_name, opts.num_retries are typed
classmethod as_runopts() runopts[source]

Build runopts from dataclass fields.

Nested StructuredOpts fields are flattened with dot-prefixed keys (e.g., field k8s: K8sOpts with sub-field context becomes k8s.context).

classmethod from_cfg(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]) Self[source]

Create an instance from a raw config dict.

Fields are snake_case but also accept camelCase aliases (e.g., hpc_identity can be set via hpcIdentity). Nested StructuredOpts fields are reconstructed from dot-prefixed keys (e.g., k8s.context).

get(k[, d]) D[k] if k in D, else d.  d defaults to None.[source]

Run Status

class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: str | None = None, roles: list[torchx.specs.api.RoleStatus] = <factory>)[source]

Runtime status of an AppDef.

roles contains replica statuses for the most recent retry only.

format(filter_roles: list[str] | None = None) str[source]

Human-readable status string.

raise_for_status() None[source]

Raises AppStatusError if state is not SUCCEEDED.

class torchx.specs.AppState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

State of the application. An application starts from an initial UNSUBMITTED state and moves through SUBMITTED, PENDING, RUNNING states finally reaching a terminal state: SUCCEEDED,``FAILED``, CANCELLED.

If the scheduler supports preemption, the app moves from a RUNNING state to PENDING upon preemption.

If the user stops the application, then the application state moves to STOPPED, then to CANCELLED when the job is actually cancelled by the scheduler.

  1. UNSUBMITTED - app has not been submitted to the scheduler yet

  2. SUBMITTED - app has been successfully submitted to the scheduler

  3. PENDING - app has been submitted to the scheduler pending allocation

  4. RUNNING - app is running

  5. SUCCEEDED - app has successfully completed

  6. FAILED - app has unsuccessfully completed

  7. CANCELLED - app was cancelled before completing

  8. UNKNOWN - app state is unknown

torchx.specs.ReplicaState

alias of AppState

class torchx.specs.AppDryRunInfo(request: T, fmt: Callable[[T], str])[source]

Returned by Scheduler.submit_dryrun.

Wraps the scheduler request that would have been submitted. print(info) yields a human-readable representation.

App Handle

torchx.specs.AppHandle = <class 'str'>

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

torchx.specs.parse_app_handle(app_handle: str) ParsedAppHandle[source]

Parses {scheduler}://{session_name}/{app_id} into its components.

>>> from torchx.specs import parse_app_handle
>>> parse_app_handle("k8s://default/foo_bar")
ParsedAppHandle(scheduler_backend='k8s', session_name='default', app_id='foo_bar')
>>> parse_app_handle("k8s:///foo_bar")
ParsedAppHandle(scheduler_backend='k8s', session_name='', app_id='foo_bar')
class torchx.specs.ParsedAppHandle(scheduler_backend: str, session_name: str, app_id: str)[source]

Parsed components of an AppHandle.

app_id: str

Alias for field number 2

scheduler_backend: str

Alias for field number 0

session_name: str

Alias for field number 1

Mounts

torchx.specs.parse_mounts(opts: list[str]) list[torchx.specs.api.BindMount | torchx.specs.api.VolumeMount | torchx.specs.api.DeviceMount][source]

parse_mounts parses a list of options into typed mounts following a similar format to Dockers bind mount.

Multiple mounts can be specified in the same list. type must be specified first in each.

Ex:

type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]

Supported types:

BindMount: type=bind,src=<host path>,dst=<container path>[,readonly] VolumeMount: type=volume,src=<name/id>,dst=<container path>[,readonly] DeviceMount: type=device,src=/dev/<dev>[,dst=<container path>][,perm=rwm]

class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]

Bind-mounts a host path into the worker container.

class torchx.specs.VolumeMount(src: str, dst_path: str, read_only: bool = False)[source]

Mounts a persistent volume into the worker container.

class torchx.specs.DeviceMount(src_path: str, dst_path: str, permissions: str = 'rwm')[source]

Mounts a host device into the container.

Overlays

Overlays patch the scheduler’s submit-job request with fields not representable in AppDef or Role.

Use set_overlay() / get_overlay() to store and retrieve overlays, and apply_overlay() to apply them.

For Users

Use set_overlay() to attach scheduler-specific fields to a Role or AppDef:

>>> from torchx.specs import Role
>>> from torchx.specs.overlays import set_overlay, get_overlay

>>> # Kubernetes: add a node selector to a role
>>> role = Role(name="trainer", image="my-image", entrypoint="train.py")
>>> set_overlay(role, "kubernetes", "V1Pod", {
...     "spec": {"nodeSelector": {"accelerator": "a100"}},
... })

>>> # Multiple set_overlay calls merge (dicts upsert, lists append)
>>> set_overlay(role, "kubernetes", "V1Pod", {
...     "spec": {"tolerations": [{"key": "gpu", "operator": "Exists"}]},
... })
>>> get_overlay(role, "kubernetes", "V1Pod")
{'spec': {'nodeSelector': {'accelerator': 'a100'}, 'tolerations': [{'key': 'gpu', 'operator': 'Exists'}]}}

Operators

By default, set_overlay() merges dicts and appends lists. Use PUT(), JOIN(), and DEL() as dict keys to override per-field behavior:

>>> from torchx.specs import Role
>>> from torchx.specs.overlays import set_overlay, PUT, JOIN, DEL

>>> role = Role(name="trainer", image="my-image", entrypoint="train.py")

>>> # PUT: replace a list instead of appending
>>> set_overlay(role, "kubernetes", "V1Pod", {
...     "spec": {PUT("containers"): [{"name": "only"}]},
... })

>>> # JOIN: strategic merge list items by key field
>>> set_overlay(role, "kubernetes", "V1Pod", {
...     "spec": {JOIN("initContainers", on="name"): [
...         {"name": "setup", "image": "init:v2"},
...     ]},
... })

>>> # DEL: remove a field (server uses its default)
>>> set_overlay(role, "kubernetes", "V1Pod", {DEL("hostNetwork"): None})

Operators are stored in metadata and resolved automatically when the scheduler calls apply_overlay() at submit time — users don’t need to call apply_overlay() directly.

Note

None vs DEL vs missing key

These three states produce different results:

  • Key missing from overlay: field is untouched in the base

  • "field": None: field is explicitly set to None/null. In thrift/protobuf, this means “field present but null” — different from key is missing (never set).

  • DEL("field"): None: field is removed from the base dict. In thrift/protobuf, this means “field not sent in request”.

For Scheduler Implementors

To add overlay support to a scheduler, use get_overlay() to retrieve stored overlays, validate_overlay() to guard against user error, and apply_overlay() to apply the overlay onto the scheduler’s base request dict.

from torchx.specs.overlays import apply_overlay, get_overlay, validate_overlay

class MyScheduler(Scheduler):
    def _submit_dryrun(self, app, cfg):
        # Build base request from Role attributes
        base_request = build_request_from_role(app.roles[0])

        # Retrieve and validate the overlay
        overlay = get_overlay(app.roles[0], "my_scheduler", "JobSpec")
        validate_overlay(
            overlay,
            blocklist=["command", "env"],  # fields set via Role attrs
            overlay_name="JobSpec",
        )

        # Apply overlay onto the base request
        apply_overlay(base_request, overlay)

        # base_request now has user's overlay fields merged in
        return base_request

apply_overlay() handles operator keys (PUT(), JOIN(), DEL()) automatically — the scheduler doesn’t need to know about them.

torchx.specs.overlays.DEL(key: str) str[source]

Remove a key from the base dict.

Use as a dict key in overlays. The value is ignored (convention: None).

This is different from setting a field to NoneDEL removes the key entirely (thrift/protobuf: field not sent, server uses default), while "field": None sets it to null (thrift/protobuf: field present but null).

>>> from torchx.specs.overlays import apply_overlay, DEL
>>> base = {"keep": 1, "remove_me": "old"}
>>> apply_overlay(base, {DEL("remove_me"): None})
>>> base
{'keep': 1}
torchx.specs.overlays.JOIN(key: str, *, on: str) str[source]

Strategic merge list items by key field.

Matched items (same value for on field) have their fields merged. Unmatched items are appended. Use as a dict key in overlays.

>>> from torchx.specs.overlays import apply_overlay, JOIN
>>> base = {"containers": [{"name": "main", "image": "v1", "cpu": "1"}]}
>>> apply_overlay(base, {JOIN("containers", on="name"): [
...     {"name": "main", "memory": "1Gi"},
...     {"name": "sidecar", "image": "proxy"},
... ]})
>>> base
{'containers': [{'name': 'main', 'image': 'v1', 'cpu': '1', 'memory': '1Gi'}, {'name': 'sidecar', 'image': 'proxy'}]}
Raises:

TypeError – At apply time, if the base list contains non-dict items.

torchx.specs.overlays.PUT(key: str) str[source]

Replace a value entirely instead of merging/appending.

Use as a dict key in overlays. For lists this replaces instead of appending; for dicts this replaces instead of recursive merge.

>>> from torchx.specs.overlays import apply_overlay, PUT
>>> base = {"containers": [{"name": "old1"}, {"name": "old2"}]}
>>> apply_overlay(base, {PUT("containers"): [{"name": "only"}]})
>>> base
{'containers': [{'name': 'only'}]}
torchx.specs.overlays.apply_overlay(base: dict[str, Any], overlay: dict[str, Any], *, _resolve: bool = True) None[source]

Merge overlay into base in-place.

Default rules:

  1. dict → recursive merge (upsert keys)

  2. list → append overlay items

  3. primitive → overwrite value

Operators (use as dict keys in overlays):

  1. PUT() → replace value entirely (lists, dicts, or primitives)

  2. JOIN() → strategic merge list items by key field

  3. DEL() → remove key from base

During accumulation (multiple set_overlay() calls), operators for the same field replace earlier operations — last call wins.

>>> from torchx.specs.overlays import apply_overlay, PUT, JOIN, DEL

>>> # Dicts merge recursively, lists append, primitives overwrite
>>> base = {"spec": {"cpu": "500m"}, "tags": ["prod"], "replicas": 1}
>>> apply_overlay(base, {"spec": {"memory": "1Gi"}, "tags": ["gpu"], "replicas": 3})
>>> base
{'spec': {'cpu': '500m', 'memory': '1Gi'}, 'tags': ['prod', 'gpu'], 'replicas': 3}

>>> # PUT replaces a list instead of appending
>>> base = {"containers": [{"name": "old1"}, {"name": "old2"}]}
>>> apply_overlay(base, {PUT("containers"): [{"name": "only"}]})
>>> base
{'containers': [{'name': 'only'}]}

>>> # JOIN: match containers by name, merge their fields
>>> base = {"containers": [{"name": "main", "image": "v1"}]}
>>> apply_overlay(base, {JOIN("containers", on="name"): [
...     {"name": "main", "memory": "1Gi"},
... ]})
>>> base
{'containers': [{'name': 'main', 'image': 'v1', 'memory': '1Gi'}]}

>>> # DEL: remove a key from the base
>>> base = {"keep": 1, "remove": "old"}
>>> apply_overlay(base, {DEL("remove"): None})
>>> base
{'keep': 1}
Parameters:

_resolveInternal only — do not pass. When True (default), operator keys resolve to their operations on plain field names. When False, operator keys are stored as-is (used by set_overlay() for accumulation).

torchx.specs.overlays.get_overlay(target: AppDef | Role, namespace: str, kind: str) _Overlay[source]

Retrieve overlay from target.metadata[namespace][kind].

Returns {} if not found. If metadata[namespace] is a string, it is loaded as a file URI via fsspec (JSON or YAML).

For backwards compatibility, if kind is not a key in metadata[namespace], the entire namespace dict is returned as a flat overlay (with a deprecation warning).

torchx.specs.overlays.set_overlay(target: AppDef | Role, namespace: str, kind: str, overlay: _Overlay) None[source]

Store an overlay in target.metadata[namespace][kind].

Multiple calls for the same (namespace, kind) accumulate via apply_overlay() (dicts merge, lists append). Use PUT(), JOIN(), and DEL() operators in the overlay dict to control per-field behavior.

Parameters:
  • namespace – Scheduler namespace (e.g., "kubernetes", "mast").

  • kind – Scheduler struct type (e.g., "V1Pod", "HpcJobDefinition").

torchx.specs.overlays.validate_overlay(overlay: dict[str, Any], *, blocklist: list[str] | None = None, forbidden_keys: set[str] | None = None, overlay_name: str = 'overlay', suggestion: str = '') None[source]

Validate that overlay doesn’t contain disallowed keys.

Used by scheduler authors to guard against user error. Operator-prefixed keys (e.g., PUT("env")) are resolved to their logical field name before checking against the blocklist.

Parameters:
  • blocklist – Keys that should be set via Role/AppDef attributes.

  • forbidden_keys – Keys that belong to a different overlay type.

  • overlay_name – Overlay type name for error messages.

  • suggestion – Hint appended when forbidden_keys are found.

Raises:

ValueError – If validation fails.

>>> from torchx.specs.overlays import validate_overlay

>>> # "env" is blocklisted — should be set via Role.env
>>> try:
...     validate_overlay(
...         {"env": {"FOO": "bar"}, "nodeSelector": {"gpu": "true"}},
...         blocklist=["env", "command"],
...         overlay_name="PodSpec",
...     )
... except ValueError as e:
...     "env" in str(e)
True

Named Resources

Use resource() with the h parameter to look up a named resource:

from torchx.specs import resource
resource(h="gpu.small")   # generic t-shirt size
resource(h="aws_p3.2xlarge")  # AWS instance type

See Registering Named Resources for defining custom named resources.

Generic Named Resources

Defines generic named resources that are not specific to any cloud provider’s instance types. These generic named resources are meant to be used as default values for components and examples and are NOT meant to be used long term as the specific capabilities (e.g. number of cpu, gpu, memMB) are subject to change.

Note

The named resources in this file DO NOT map device capabilities such as special network interfaces (e.g. EFA devices on AWS).

Warning

Do not use for launching applications that require specific capabilities (e.g. needs exactly 4 x A100 GPUs with 40GB of memory connected with NVLink).

Different cloud provides offer different types of instance types hence practically speaking one should register their own named resources that accurately capture the instances they have at their disposal rather than using these defaults long term.

Note

The cpu/gpu/memory ratios in these default resources are based on current HW trends and do not map exactly to a particular instance type!

Warning

The specific capabilities of these default resources are subject to change at any time based on current hardware spec trends. Therefore, the user should NEVER assume that the specific number of cpu, gpu, and memMB will always remain the same. For instance, never assume that gpu.small will always have 8 cpus.

These are cloud-agnostic, t-shirt-sized defaults. The exact cpu/gpu/memory values may change between releases – define your own named resources for production workloads.

Name

CPU

GPU

Memory

gpu.small

8

1

32 GiB

gpu.medium

16

2

64 GiB

gpu.large

32

4

128 GiB

gpu.xlarge

64

8

256 GiB

cpu.nano

1

0

512 MiB

cpu.micro

1

0

1 GiB

cpu.small

1

0

2 GiB

cpu.medium

2

0

4 GiB

cpu.large

2

0

8 GiB

cpu.xlarge

8

0

32 GiB

AWS Named Resources

torchx.specs.named_resources_aws contains resource definitions that represent corresponding AWS instance types taken from https://aws.amazon.com/ec2/instance-types/. The resources are exposed via entrypoints after installing torchx lib. The mapping is stored in the setup.py file.

The named resources currently do not specify AWS instance type capabilities but merely represent the equvalent resource in mem, cpu and gpu numbers.

Note

These resource definitions may change in future. It is expected for each user to manage their own resources. See Registering Named Resources to set up named resources.

Usage:

from torchx.specs import named_resources
print(named_resources["aws_t3.medium"])
print(named_resources["aws_m5.2xlarge"])
print(named_resources["aws_p3.2xlarge"])
print(named_resources["aws_p3.8xlarge"])

Component Linter

torchx.specs.file_linter.validate(path: str, component_function: str, validators: list[torchx.specs.file_linter.ComponentFunctionValidator] | None = None) list[torchx.specs.file_linter.LinterMessage][source]

Validates the function to make sure it complies the component standard.

validate finds the component_function and vaidates it for according to the following rules:

  1. The function must have google-styple docs

  2. All function parameters must be annotated

  3. The function must return torchx.specs.api.AppDef

Parameters:
  • path – Path to python source file.

  • component_function – Name of the function to be validated.

Returns:

List of validation errors

Return type:

List[LinterMessage]

torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) tuple[str, dict[str, str]][source]

Parses the function and arguments description from the provided function. Docstring should be in google-style format

If function has no docstring, the function description will be the name of the function, TIP on how to improve the help message and arguments descriptions will be names of the arguments.

The arguments that are not present in the docstring will contain default/required information

Parameters:

fn – Function with or without docstring

Returns:

function description, arguments description where key is the name of the argument and value

if the description

class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]

See also

Quick Reference

Single-page reference with imports, types, and copy-pasteable recipes.

torchx.runner

The Runner API that submits AppDefs as jobs.

Advanced Usage

Registering named resources, custom components, and other plugins.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources