torchx.specs¶
Tip
API reference for TorchX’s core data types. For a conceptual overview see Basic Concepts. For copy-pasteable recipes see Quick Reference.
Core TorchX types for defining distributed applications.
The main types are AppDef, Role, and Resource.
Components are functions that return an AppDef which can then be launched
via a Scheduler.
>>> import torchx.specs as specs
>>> app = specs.AppDef(
... name="echo",
... roles=[specs.Role(name="worker", image="/tmp", entrypoint="/bin/echo", args=["hello"])],
... )
>>> app.name
'echo'
AppDef¶
- class torchx.specs.AppDef(name: str, roles: list[torchx.specs.api.Role] = <factory>, metadata: dict[str, str] = <factory>)[source]¶
A distributed application composed of one or more
Roles.>>> from torchx.specs import AppDef, Role >>> app = AppDef( ... name="my_train", ... roles=[Role(name="trainer", image="my_image:latest")], ... )
- Parameters:
metadata – scheduler-specific metadata (treatment varies by scheduler)
Role¶
- class torchx.specs.Role(name: str, image: str, min_replicas: int | None = None, entrypoint: str = '<MISSING>', args: list[str] = <factory>, env: dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: dict[str, int] = <factory>, metadata: dict[str, typing.Any] = <factory>, mounts: list[torchx.specs.api.BindMount | torchx.specs.api.VolumeMount | torchx.specs.api.DeviceMount] = <factory>, workspace: torchx.specs.api.Workspace | None = None, overrides: dict[str, typing.Any] = <factory>)[source]¶
A set of nodes that perform a specific duty within an
AppDef.DDP app — single role (
trainer)Parameter-server app — multiple roles (
trainer,ps)
>>> from torchx.specs import Role, Resource >>> trainer = Role( ... name="trainer", ... image="pytorch/torch:latest", ... entrypoint="train.py", ... args=["--lr", "0.01"], ... num_replicas=4, ... resource=Resource(cpu=4, gpu=1, memMB=8192), ... )
- Parameters:
name – name of the role
image – software bundle installed on the container (docker image, fbpkg, tar-ball, etc.)
entrypoint – command to invoke inside the container
args – arguments to the entrypoint
env – environment variable mappings
num_replicas – number of container replicas
min_replicas – minimum replicas for elastic scaling. If unset or unsupported by the scheduler, the job runs at
num_replicas.max_retries – max number of retries before giving up
retry_policy – retry behavior upon failures
resource – resource requirements per replica
port_map – named port mappings (e.g.
{"tensorboard": 8081})metadata – scheduler-specific data. Keys should follow
$scheduler.$key.mounts – bind, volume, or device mounts
workspace – local project directories to mirror on the remote job. The
workspaceargument onRunnerAPIs overrides this onroles[0].
- pre_proc(scheduler: str, dryrun_info: AppDryRunInfo) AppDryRunInfo[source]¶
Hook for role-specific scheduler request modifications.
Called per-role during
Scheduler.submit_dryrun, in the order they appear inAppDef.roles.
- class torchx.specs.RetryPolicy(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Defines the retry policy for the
Rolesin theAppDef. The policy defines the behavior when the role replica encounters a failure:unsuccessful (non zero) exit code
hardware/host crashes
preemption
eviction
Note
Not all retry policies are supported by all schedulers. However all schedulers must support
RetryPolicy.APPLICATION. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
Use with
dist.ddpcomponent to have torchelastic coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.
APPLICATION: Restarts the entire application.
- ROLE: Restarts the role when any error occurs in that role. This does not
restart the whole job.
Resource¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: dict[str, typing.Any] = <factory>, devices: dict[str, int] = <factory>, tags: dict[str, object] = <factory>)[source]¶
Represents resource requirements for a
Role.Important
Prefer
resource()with named resources (t-shirt sizes) over specifying raw values directly.>>> from torchx.specs import Resource >>> Resource(cpu=4, gpu=1, memMB=8192) Resource(cpu=4, gpu=1, memMB=8192, capabilities={}, devices={}, tags={})
- Parameters:
cpu – number of logical cpu cores
gpu – number of gpus
memMB – MB of ram
capabilities – additional hardware specs (interpreted by scheduler)
devices – named devices with their quantities (e.g.
{"vpc.amazonaws.com/efa": 1})tags – metadata tags (not interpreted by schedulers)
- static copy(original: Resource, **capabilities: Any) Resource[source]¶
Copies a resource, merging in the given
capabilities.
- torchx.specs.resource(cpu: int | None = None, gpu: int | None = None, memMB: int | None = None, h: str | None = None) Resource[source]¶
Creates a
Resourcefrom raw specs or a named resource.When
his set, it takes precedence (raw specs are ignored). See Registering Named Resources for custom named resources.>>> from torchx.specs import resource >>> resource(cpu=4, gpu=1, memMB=8192) Resource(cpu=4, gpu=1, memMB=8192, capabilities={}, devices={}, tags={})
Workspace¶
- class torchx.specs.Workspace(projects: dict[str, str])[source]¶
Maps local project directories to remote workspace locations. At submit-time, files are copied/synced so that the remote job mirrors local code changes.
>>> from torchx.specs import Workspace >>> # copies ~/github/torch/** into $REMOTE_ROOT/torch/** >>> ws = Workspace(projects={"~/github/torch": "torch"}) >>> # copies ~/github/torch/** into $REMOTE_ROOT/** (no sub-dir) >>> ws = Workspace(projects={"~/github/torch": ""})
The exact
$REMOTE_ROOTis implementation-dependent. SeeWorkspaceMixinand scheduler docs.- Parameters:
projects –
{local_path: remote_subdir}mapping.
- is_unmapped_single_project() bool[source]¶
Trueif this is a single-project workspace with no target sub-directory.
- merge_into(outdir: str | pathlib.Path) None[source]¶
Copies each project into
{outdir}/{target}.
Macros¶
- class torchx.specs.macros[source]¶
Template variables substituted at runtime in
Role.args,Role.env, andRole.metadata.Warning
Macros in other
Rolefields are NOT substituted.Available macros:
img_root— root directory of the pulled imageapp_id— application id as assigned by the schedulerreplica_id— per-role replica index (0, 1, ...). When a replica is replaced after failure, the replacement retains the samereplica_id.
>>> from torchx.specs import AppDef, Role, macros >>> trainer = Role( ... name="trainer", ... image="my_image:latest", ... entrypoint="train.py", ... args=["--app_id", macros.app_id], ... env={"IMG_ROOT": macros.img_root}, ... ) >>> app = AppDef("train_app", roles=[trainer])
Note
In addition to the three macros listed in the class docstring, two more attributes exist:
macros.rank0_env– expands to the name of the environment variable that provides the rank-0 (master) host address. Resolve it via shell expansion ($${rank0_env}) or in application code. Not available on all schedulers.macros.base_img_root– deprecated. Do not use in new code.
Run Configs¶
- torchx.specs.CfgVal = str | int | float | bool | list[str] | dict[str, str] | None¶
Represent a PEP 604 union type
E.g. for int | str
Type alias for run config values:
str | int | float | bool | list[str] | dict[str, str] | None. Used incfgdicts passed torun()and scheduler methods.
- class torchx.specs.runopts[source]¶
Schema for scheduler run configuration.
Holds accepted config keys, defaults, and help strings. Constructed by
Scheduler.run_opts()and validated at submit time.>>> from torchx.specs import runopts >>> opts = runopts() >>> opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) >>> opts.add("priority", type_=float, default=0.5, help="job priority") >>> opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")
Note
For new schedulers, prefer
StructuredOptswhich auto-generatesrunoptsfrom typed dataclass fields.- add(cfg_key: str, type_: Type[str | int | float | bool | list[str] | dict[str, str] | None], help: str, default: str | int | float | bool | list[str] | dict[str, str] | None = None, required: bool = False) None[source]¶
Registers a config option. Required options must not have a default.
- cfg_from_json_repr(json_repr: str) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]¶
Converts the given dict to a valid cfg for this
runoptsobject.
- cfg_from_str(cfg_str: str) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]¶
Parses scheduler
cfgfrom a string literal and returns a cfg map where the cfg values have been cast into the appropriate types as specified by this runopts object. Unknown keys are ignored and not returned in the resulting map.Note
Unlike the method
resolve, this method does NOT resolve default options or check that the required options are actually present in the givencfg_str. This method is intended to be called before callingresolve()when the input is a string encoded run cfg. That is to fully resolve the cfg, callopt.resolve(opt.cfg_from_str(cfg_literal)).If the
cfg_stris an empty string, then an emptycfgis returned. Otherwise, at least one kv-pair delimited by"="(equal) is expected.Either
","(comma) or";"(semi-colon) can be used to delimit multiple kv-pairs.CfgValallowsListof primitives, which can be passed as either","or";"(semi-colon) delimited. Since the same delimiters are used to delimit between cfg kv pairs, this method interprets the last (trailing)","or";"as the delimiter between kv pairs. See example below.Examples:
opts = runopts() opts.add("FOO", type_=List[str], default=["a"], help="an optional list option") opts.add("BAR", type_=str, required=True, help="a required str option") # required and default options not checked # method returns strictly parsed cfg from the cfg literal string opts.cfg_from_str("") == {} # however, unknown options are ignored # since the value type is unknown hence cannot cast to the correct type opts.cfg_from_str("UNKNOWN=VALUE") == {} opts.cfg_from_str("FOO=v1") == {"FOO": "v1"} opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
- get(name: str) torchx.specs.api.runopt | None[source]¶
Returns the registered option, or
None.Accepts camelCase names (e.g.
"clusterName"resolves"cluster_name").
- static is_type(obj: str | int | float | bool | list[str] | dict[str, str] | None, tp: Type[str | int | float | bool | list[str] | dict[str, str] | None]) bool[source]¶
Like
isinstance()but supports generic types (e.g.list[str]).
- resolve(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]) dict[str, str | int | float | bool | list[str] | dict[str, str] | None][source]¶
Validates
cfgagainst registered options, filling defaults.Raises
InvalidRunConfigExceptionfor missing required options or type mismatches. Accepts camelCase keys.
- class torchx.specs.runopt(default: str | int | float | bool | list[str] | dict[str, str] | None, opt_type: Type[str | int | float | bool | list[str] | dict[str, str] | None], is_required: bool, help: str)[source]¶
Metadata for a single scheduler run option.
- cast_to_type(value: str) str | int | float | bool | list[str] | dict[str, str] | None[source]¶
Casts the given value (in its string representation) to the type of this run option. Below are the cast rules for each option type and value literal:
1. opt_type=str, value=”foo” -> “foo” 1. opt_type=bool, value=”True”/”False” -> True/False 1. opt_type=int, value=”1” -> 1 1. opt_type=float, value=”1.1” -> 1.1 1. opt_type=list[str]/List[str], value=”a,b,c” or value=”a;b;c” -> [“a”, “b”, “c”] 1. opt_type=dict[str,str]/Dict[str,str], value=”a:1,b:2” or value=”a:1;b:2” -> {“a”: “1”, “b”: “2”}
NOTE: dict parsing uses “:” as the kv separator (rather than the standard “=”) because “=” is used at the top-level cfg to parse runopts (notice the plural) from the CLI. Originally torchx only supported primitives and list[str] as CfgVal but dict[str,str] was added in https://github.com/meta-pytorch/torchx/pull/855
Structured Opts¶
- class torchx.schedulers.api.StructuredOpts[source]
Base class for typed scheduler configuration options.
Provides a type-safe way to define scheduler run options as dataclass fields instead of manually building
runopts. Subclasses should be@dataclassdecorated with fields representing config options.- Features:
Auto-generates
runoptsfrom dataclass fields viaas_runopts()Parses raw config dicts into typed instances via
from_cfg()Supports snake_case field names with camelCase aliases
Extracts help text from field docstrings
Supports nested
StructuredOptsfields, flattened with dot-prefixed keys (e.g.,k8s.context)
Example
>>> from dataclasses import dataclass >>> from torchx.schedulers.api import StructuredOpts >>> >>> @dataclass ... class MyOpts(StructuredOpts): ... cluster_name: str ... '''Name of the cluster to submit to.''' ... ... num_retries: int = 3 ... '''Number of retry attempts.''' ... >>> # Use in scheduler: >>> # def _run_opts(self) -> runopts: >>> # return MyOpts.as_runopts() >>> # >>> # def _submit_dryrun(self, app, cfg): >>> # opts = MyOpts.from_cfg(cfg) >>> # # opts.cluster_name, opts.num_retries are typed
- classmethod as_runopts() runopts[source]
Build
runoptsfrom dataclass fields.Nested
StructuredOptsfields are flattened with dot-prefixed keys (e.g., fieldk8s: K8sOptswith sub-fieldcontextbecomesk8s.context).
- classmethod from_cfg(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]) Self[source]
Create an instance from a raw config dict.
Fields are snake_case but also accept camelCase aliases (e.g.,
hpc_identitycan be set viahpcIdentity). NestedStructuredOptsfields are reconstructed from dot-prefixed keys (e.g.,k8s.context).
- get(k[, d]) D[k] if k in D, else d. d defaults to None.[source]
Run Status¶
- class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: str | None = None, roles: list[torchx.specs.api.RoleStatus] = <factory>)[source]¶
Runtime status of an
AppDef.rolescontains replica statuses for the most recent retry only.
- class torchx.specs.AppState(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
State of the application. An application starts from an initial
UNSUBMITTEDstate and moves throughSUBMITTED,PENDING,RUNNINGstates finally reaching a terminal state:SUCCEEDED,``FAILED``,CANCELLED.If the scheduler supports preemption, the app moves from a
RUNNINGstate toPENDINGupon preemption.If the user stops the application, then the application state moves to
STOPPED, then toCANCELLEDwhen the job is actually cancelled by the scheduler.UNSUBMITTED - app has not been submitted to the scheduler yet
SUBMITTED - app has been successfully submitted to the scheduler
PENDING - app has been submitted to the scheduler pending allocation
RUNNING - app is running
SUCCEEDED - app has successfully completed
FAILED - app has unsuccessfully completed
CANCELLED - app was cancelled before completing
UNKNOWN - app state is unknown
- class torchx.specs.AppDryRunInfo(request: T, fmt: Callable[[T], str])[source]¶
Returned by
Scheduler.submit_dryrun.Wraps the scheduler
requestthat would have been submitted.print(info)yields a human-readable representation.
App Handle¶
- torchx.specs.AppHandle = <class 'str'>¶
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
- torchx.specs.parse_app_handle(app_handle: str) ParsedAppHandle[source]¶
Parses
{scheduler}://{session_name}/{app_id}into its components.>>> from torchx.specs import parse_app_handle >>> parse_app_handle("k8s://default/foo_bar") ParsedAppHandle(scheduler_backend='k8s', session_name='default', app_id='foo_bar') >>> parse_app_handle("k8s:///foo_bar") ParsedAppHandle(scheduler_backend='k8s', session_name='', app_id='foo_bar')
Mounts¶
- torchx.specs.parse_mounts(opts: list[str]) list[torchx.specs.api.BindMount | torchx.specs.api.VolumeMount | torchx.specs.api.DeviceMount][source]¶
parse_mounts parses a list of options into typed mounts following a similar format to Dockers bind mount.
Multiple mounts can be specified in the same list.
typemust be specified first in each.- Ex:
type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]
- Supported types:
BindMount: type=bind,src=<host path>,dst=<container path>[,readonly] VolumeMount: type=volume,src=<name/id>,dst=<container path>[,readonly] DeviceMount: type=device,src=/dev/<dev>[,dst=<container path>][,perm=rwm]
- class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]¶
Bind-mounts a host path into the worker container.
Overlays¶
Overlays patch the scheduler’s submit-job request with fields not representable
in AppDef or Role.
Use set_overlay() / get_overlay() to store and retrieve overlays,
and apply_overlay() to apply them.
For Users¶
Use set_overlay() to attach scheduler-specific fields to a
Role or AppDef:
>>> from torchx.specs import Role
>>> from torchx.specs.overlays import set_overlay, get_overlay
>>> # Kubernetes: add a node selector to a role
>>> role = Role(name="trainer", image="my-image", entrypoint="train.py")
>>> set_overlay(role, "kubernetes", "V1Pod", {
... "spec": {"nodeSelector": {"accelerator": "a100"}},
... })
>>> # Multiple set_overlay calls merge (dicts upsert, lists append)
>>> set_overlay(role, "kubernetes", "V1Pod", {
... "spec": {"tolerations": [{"key": "gpu", "operator": "Exists"}]},
... })
>>> get_overlay(role, "kubernetes", "V1Pod")
{'spec': {'nodeSelector': {'accelerator': 'a100'}, 'tolerations': [{'key': 'gpu', 'operator': 'Exists'}]}}
Operators¶
By default, set_overlay() merges dicts and appends lists. Use
PUT(), JOIN(), and DEL() as dict keys to override
per-field behavior:
>>> from torchx.specs import Role
>>> from torchx.specs.overlays import set_overlay, PUT, JOIN, DEL
>>> role = Role(name="trainer", image="my-image", entrypoint="train.py")
>>> # PUT: replace a list instead of appending
>>> set_overlay(role, "kubernetes", "V1Pod", {
... "spec": {PUT("containers"): [{"name": "only"}]},
... })
>>> # JOIN: strategic merge list items by key field
>>> set_overlay(role, "kubernetes", "V1Pod", {
... "spec": {JOIN("initContainers", on="name"): [
... {"name": "setup", "image": "init:v2"},
... ]},
... })
>>> # DEL: remove a field (server uses its default)
>>> set_overlay(role, "kubernetes", "V1Pod", {DEL("hostNetwork"): None})
Operators are stored in metadata and resolved automatically when the scheduler
calls apply_overlay() at submit time — users don’t need to call
apply_overlay() directly.
Note
None vs DEL vs missing key
These three states produce different results:
Key missing from overlay: field is untouched in the base
"field": None: field is explicitly set toNone/null. In thrift/protobuf, this means “field present but null” — different from key is missing (never set).DEL("field"): None: field is removed from the base dict. In thrift/protobuf, this means “field not sent in request”.
For Scheduler Implementors¶
To add overlay support to a scheduler, use get_overlay() to retrieve
stored overlays, validate_overlay() to guard against user error, and
apply_overlay() to apply the overlay onto the scheduler’s base request dict.
from torchx.specs.overlays import apply_overlay, get_overlay, validate_overlay
class MyScheduler(Scheduler):
def _submit_dryrun(self, app, cfg):
# Build base request from Role attributes
base_request = build_request_from_role(app.roles[0])
# Retrieve and validate the overlay
overlay = get_overlay(app.roles[0], "my_scheduler", "JobSpec")
validate_overlay(
overlay,
blocklist=["command", "env"], # fields set via Role attrs
overlay_name="JobSpec",
)
# Apply overlay onto the base request
apply_overlay(base_request, overlay)
# base_request now has user's overlay fields merged in
return base_request
apply_overlay() handles operator keys (PUT(), JOIN(),
DEL()) automatically — the scheduler doesn’t need to know about them.
- torchx.specs.overlays.DEL(key: str) str[source]¶
Remove a key from the base dict.
Use as a dict key in overlays. The value is ignored (convention:
None).This is different from setting a field to
None—DELremoves the key entirely (thrift/protobuf: field not sent, server uses default), while"field": Nonesets it to null (thrift/protobuf: field present but null).>>> from torchx.specs.overlays import apply_overlay, DEL >>> base = {"keep": 1, "remove_me": "old"} >>> apply_overlay(base, {DEL("remove_me"): None}) >>> base {'keep': 1}
- torchx.specs.overlays.JOIN(key: str, *, on: str) str[source]¶
Strategic merge list items by key field.
Matched items (same value for
onfield) have their fields merged. Unmatched items are appended. Use as a dict key in overlays.>>> from torchx.specs.overlays import apply_overlay, JOIN >>> base = {"containers": [{"name": "main", "image": "v1", "cpu": "1"}]} >>> apply_overlay(base, {JOIN("containers", on="name"): [ ... {"name": "main", "memory": "1Gi"}, ... {"name": "sidecar", "image": "proxy"}, ... ]}) >>> base {'containers': [{'name': 'main', 'image': 'v1', 'cpu': '1', 'memory': '1Gi'}, {'name': 'sidecar', 'image': 'proxy'}]}
- Raises:
TypeError – At apply time, if the base list contains non-dict items.
- torchx.specs.overlays.PUT(key: str) str[source]¶
Replace a value entirely instead of merging/appending.
Use as a dict key in overlays. For lists this replaces instead of appending; for dicts this replaces instead of recursive merge.
>>> from torchx.specs.overlays import apply_overlay, PUT >>> base = {"containers": [{"name": "old1"}, {"name": "old2"}]} >>> apply_overlay(base, {PUT("containers"): [{"name": "only"}]}) >>> base {'containers': [{'name': 'only'}]}
- torchx.specs.overlays.apply_overlay(base: dict[str, Any], overlay: dict[str, Any], *, _resolve: bool = True) None[source]¶
Merge
overlayintobasein-place.Default rules:
dict → recursive merge (upsert keys)
list → append overlay items
primitive → overwrite value
Operators (use as dict keys in overlays):
PUT()→ replace value entirely (lists, dicts, or primitives)JOIN()→ strategic merge list items by key fieldDEL()→ remove key from base
During accumulation (multiple
set_overlay()calls), operators for the same field replace earlier operations — last call wins.>>> from torchx.specs.overlays import apply_overlay, PUT, JOIN, DEL >>> # Dicts merge recursively, lists append, primitives overwrite >>> base = {"spec": {"cpu": "500m"}, "tags": ["prod"], "replicas": 1} >>> apply_overlay(base, {"spec": {"memory": "1Gi"}, "tags": ["gpu"], "replicas": 3}) >>> base {'spec': {'cpu': '500m', 'memory': '1Gi'}, 'tags': ['prod', 'gpu'], 'replicas': 3} >>> # PUT replaces a list instead of appending >>> base = {"containers": [{"name": "old1"}, {"name": "old2"}]} >>> apply_overlay(base, {PUT("containers"): [{"name": "only"}]}) >>> base {'containers': [{'name': 'only'}]} >>> # JOIN: match containers by name, merge their fields >>> base = {"containers": [{"name": "main", "image": "v1"}]} >>> apply_overlay(base, {JOIN("containers", on="name"): [ ... {"name": "main", "memory": "1Gi"}, ... ]}) >>> base {'containers': [{'name': 'main', 'image': 'v1', 'memory': '1Gi'}]} >>> # DEL: remove a key from the base >>> base = {"keep": 1, "remove": "old"} >>> apply_overlay(base, {DEL("remove"): None}) >>> base {'keep': 1}
- Parameters:
_resolve – Internal only — do not pass. When
True(default), operator keys resolve to their operations on plain field names. WhenFalse, operator keys are stored as-is (used byset_overlay()for accumulation).
- torchx.specs.overlays.get_overlay(target: AppDef | Role, namespace: str, kind: str) _Overlay[source]¶
Retrieve overlay from
target.metadata[namespace][kind].Returns
{}if not found. Ifmetadata[namespace]is a string, it is loaded as a file URI viafsspec(JSON or YAML).For backwards compatibility, if
kindis not a key inmetadata[namespace], the entire namespace dict is returned as a flat overlay (with a deprecation warning).
- torchx.specs.overlays.set_overlay(target: AppDef | Role, namespace: str, kind: str, overlay: _Overlay) None[source]¶
Store an overlay in
target.metadata[namespace][kind].Multiple calls for the same
(namespace, kind)accumulate viaapply_overlay()(dicts merge, lists append). UsePUT(),JOIN(), andDEL()operators in the overlay dict to control per-field behavior.- Parameters:
namespace – Scheduler namespace (e.g.,
"kubernetes","mast").kind – Scheduler struct type (e.g.,
"V1Pod","HpcJobDefinition").
- torchx.specs.overlays.validate_overlay(overlay: dict[str, Any], *, blocklist: list[str] | None = None, forbidden_keys: set[str] | None = None, overlay_name: str = 'overlay', suggestion: str = '') None[source]¶
Validate that overlay doesn’t contain disallowed keys.
Used by scheduler authors to guard against user error. Operator-prefixed keys (e.g.,
PUT("env")) are resolved to their logical field name before checking against the blocklist.- Parameters:
blocklist – Keys that should be set via Role/AppDef attributes.
forbidden_keys – Keys that belong to a different overlay type.
overlay_name – Overlay type name for error messages.
suggestion – Hint appended when
forbidden_keysare found.
- Raises:
ValueError – If validation fails.
>>> from torchx.specs.overlays import validate_overlay >>> # "env" is blocklisted — should be set via Role.env >>> try: ... validate_overlay( ... {"env": {"FOO": "bar"}, "nodeSelector": {"gpu": "true"}}, ... blocklist=["env", "command"], ... overlay_name="PodSpec", ... ) ... except ValueError as e: ... "env" in str(e) True
Named Resources¶
Use resource() with the h parameter to look up a named resource:
from torchx.specs import resource
resource(h="gpu.small") # generic t-shirt size
resource(h="aws_p3.2xlarge") # AWS instance type
See Registering Named Resources for defining custom named resources.
Generic Named Resources¶
Defines generic named resources that are not specific to any cloud provider’s instance types. These generic named resources are meant to be used as default values for components and examples and are NOT meant to be used long term as the specific capabilities (e.g. number of cpu, gpu, memMB) are subject to change.
Note
The named resources in this file DO NOT map device capabilities such as special network interfaces (e.g. EFA devices on AWS).
Warning
Do not use for launching applications that require specific capabilities (e.g. needs exactly 4 x A100 GPUs with 40GB of memory connected with NVLink).
Different cloud provides offer different types of instance types hence practically speaking one should register their own named resources that accurately capture the instances they have at their disposal rather than using these defaults long term.
Note
The cpu/gpu/memory ratios in these default resources are based on current HW trends and do not map exactly to a particular instance type!
Warning
The specific capabilities of these default resources are subject to change
at any time based on current hardware spec trends.
Therefore, the user should NEVER assume that the specific number of cpu, gpu, and memMB
will always remain the same. For instance, never assume that gpu.small will always
have 8 cpus.
These are cloud-agnostic, t-shirt-sized defaults. The exact cpu/gpu/memory values may change between releases – define your own named resources for production workloads.
Name |
CPU |
GPU |
Memory |
|---|---|---|---|
|
8 |
1 |
32 GiB |
|
16 |
2 |
64 GiB |
|
32 |
4 |
128 GiB |
|
64 |
8 |
256 GiB |
|
1 |
0 |
512 MiB |
|
1 |
0 |
1 GiB |
|
1 |
0 |
2 GiB |
|
2 |
0 |
4 GiB |
|
2 |
0 |
8 GiB |
|
8 |
0 |
32 GiB |
AWS Named Resources¶
torchx.specs.named_resources_aws contains resource definitions that represent corresponding AWS instance types taken from https://aws.amazon.com/ec2/instance-types/. The resources are exposed via entrypoints after installing torchx lib. The mapping is stored in the setup.py file.
The named resources currently do not specify AWS instance type capabilities but merely represent the equvalent resource in mem, cpu and gpu numbers.
Note
These resource definitions may change in future. It is expected for each user to manage their own resources. See Registering Named Resources to set up named resources.
Usage:
from torchx.specs import named_resources print(named_resources["aws_t3.medium"]) print(named_resources["aws_m5.2xlarge"]) print(named_resources["aws_p3.2xlarge"]) print(named_resources["aws_p3.8xlarge"])
Component Linter¶
- torchx.specs.file_linter.validate(path: str, component_function: str, validators: list[torchx.specs.file_linter.ComponentFunctionValidator] | None = None) list[torchx.specs.file_linter.LinterMessage][source]¶
Validates the function to make sure it complies the component standard.
validatefinds thecomponent_functionand vaidates it for according to the following rules:The function must have google-styple docs
All function parameters must be annotated
The function must return
torchx.specs.api.AppDef
- Parameters:
path – Path to python source file.
component_function – Name of the function to be validated.
- Returns:
List of validation errors
- Return type:
List[LinterMessage]
- torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) tuple[str, dict[str, str]][source]¶
Parses the function and arguments description from the provided function. Docstring should be in google-style format
If function has no docstring, the function description will be the name of the function, TIP on how to improve the help message and arguments descriptions will be names of the arguments.
The arguments that are not present in the docstring will contain default/required information
- Parameters:
fn – Function with or without docstring
- Returns:
- function description, arguments description where key is the name of the argument and value
if the description
- class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]¶
See also
- Quick Reference
Single-page reference with imports, types, and copy-pasteable recipes.
- torchx.runner
The Runner API that submits AppDefs as jobs.
- Advanced Usage
Registering named resources, custom components, and other plugins.