torchx.runner¶
Submits AppDef jobs to schedulers.
The runner takes an AppDef (the result of evaluating a component function)
along with a scheduler name and run config, and submits it as a job.
from torchx.runner import get_runner
with get_runner() as runner:
app_handle = runner.run(app, scheduler="kubernetes", cfg=cfg)
status = runner.status(app_handle)
print(status)
The Runner submits, monitors, and manages jobs. Use
get_runner() to create one with all registered schedulers.
Key methods:
run()/run_component()– submit a jobstatus()– poll current statewait()– block until terminal statecancel()– request cancellationdelete()– remove a job definition from the schedulerlog_lines()– stream log outputlist()– list jobs on a schedulerdryrun()– preview what would be submitted without submittingschedule()– submit a previously dry-run request (allows request mutation)
Scheduler instances are created lazily on first use. Use the Runner as a context manager for automatic cleanup.
See Quick Reference for copy-pasteable recipes.
- torchx.runner.get_runner(name: str | None = None, component_defaults: dict[str, dict[str, str]] | None = None, **scheduler_params: Any) Runner[source]¶
Creates a
Runnerwith all registered schedulers.with get_runner() as runner: app_handle = runner.run(app, scheduler="kubernetes", cfg=cfg) print(runner.status(app_handle))
- Parameters:
scheduler_params – extra kwargs passed to all scheduler constructors.
- class torchx.runner.Runner(name: str = '', scheduler_factories: dict[str, torchx.schedulers.SchedulerFactory] | None = None, component_defaults: dict[str, dict[str, str]] | None = None, scheduler_params: dict[str, object] | None = None)[source]¶
Submits, monitors, and manages
AppDefjobs.Use
get_runner()to create an instance with all registered schedulers.>>> from torchx.runner import get_runner >>> runner = get_runner() >>> runner.scheduler_backends() ['local_cwd', 'local_docker', 'slurm', 'kubernetes', ...]
- cancel(app_handle: str) None[source]¶
Requests cancellation. The app transitions to
CANCELLEDasynchronously.
- cfg_from_str(scheduler: str, *cfg_literal: str) Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None][source]¶
Convenience function around the scheduler’s
runopts.cfg_from_str()method.Usage:
from torchx.runner import get_runner runner = get_runner() cfg = runner.cfg_from_str("local_cwd", "log_dir=/tmp/foobar", "prepend_cwd=True") assert cfg == {"log_dir": "/tmp/foobar", "prepend_cwd": True, "auto_set_cuda_visible_devices": False}
- describe(app_handle: str) torchx.specs.api.AppDef | None[source]¶
Reconstructs the
AppDeffrom the scheduler.Completeness is scheduler-dependent. Returns
Noneif the app no longer exists.
- dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None, workspace: torchx.specs.api.Workspace | str | None = None, parent_run_id: str | None = None) AppDryRunInfo[source]¶
Returns what would be submitted without actually submitting.
The returned
AppDryRunInfocan beprint()-ed for inspection or passed toschedule().
- dryrun_component(component: str, component_args: list[str] | dict[str, Any], scheduler: str, cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None, workspace: torchx.specs.api.Workspace | str | None = None, parent_run_id: str | None = None) AppDryRunInfo[source]¶
Like
run_component()but returns the request without submitting.
- list(scheduler: str, cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None) list[torchx.schedulers.api.ListAppResponse][source]¶
Lists jobs on the scheduler.
- Parameters:
cfg – scheduler config, used by some schedulers for backend routing.
- log_lines(app_handle: str, role_name: str, k: int = 0, regex: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, should_tail: bool = False, streams: torchx.schedulers.api.Stream | None = None) Iterable[str][source]¶
Returns an iterator over log lines for the k-th replica of a role.
Important
kis the node (host) id, NOT the worker rank.Warning
Completeness is scheduler-dependent. Lines may be partial or missing if logs have been purged. Do not use this for programmatic output parsing.
Lines include trailing whitespace (
\n). Useprint(line, end="")to avoid double newlines.- Parameters:
k – replica (node) index
regex – optional filter pattern
since – start cursor (scheduler-dependent)
until – end cursor (scheduler-dependent)
- run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None, workspace: torchx.specs.api.Workspace | str | None = None, parent_run_id: str | None = None) str[source]¶
- run_component(component: str, component_args: list[str] | dict[str, Any], scheduler: str, cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None, workspace: torchx.specs.api.Workspace | str | None = None, parent_run_id: str | None = None) str[source]¶
Resolves and runs a named component.
componentresolution order (high → low):User-registered
torchx.componentsentry pointsBuiltins relative to
torchx.components(e.g."dist.ddp")File-based
path/to/file.py:function_name
- schedule(dryrun_info: AppDryRunInfo) str[source]¶
Submits a previously dry-run request, allowing request mutation.
dryrun_info = runner.dryrun(app, scheduler="kubernetes", cfg) dryrun_info.request.foo = "bar" # mutate the raw request app_handle = runner.schedule(dryrun_info)
Warning
Use sparingly. Overwriting many raw scheduler fields may cause your usage to diverge from TorchX’s supported API.
- status(app_handle: str) torchx.specs.api.AppStatus | None[source]¶
Returns app status, or
Noneif the app no longer exists.
See also
- Quick Reference
Single-page reference with imports, types, and copy-pasteable recipes.
- torchx.schedulers
Scheduler API reference and implementation guide.
- .torchxconfig
Configuring scheduler options via
.torchxconfig.