Shortcuts

Local

This contains the TorchX local scheduler which can be used to run TorchX components locally via subprocesses.

class torchx.schedulers.local_scheduler.LocalScheduler(session_name: str, image_provider_class: Callable[[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]], ImageProvider], cache_size: int = 100, extra_paths: list[str] | None = None)[source]

Bases: Scheduler[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]]

Schedules on localhost. Containers are modeled as processes and certain properties of the container that are either not relevant or that cannot be enforced for localhost runs are ignored. Properties that are ignored:

  1. Resource requirements

  2. Resource limit enforcements

  3. Retry policies

  4. Retry counts (no retries supported)

  5. Deployment preferences

Scheduler support orphan processes cleanup on receiving SIGTERM or SIGINT. The scheduler will terminate the spawned processes.

This is exposed via the scheduler local_cwd.

  • local_cwd runs the provided app relative to the current working directory and ignores the images field for faster iteration and testing purposes.

Note

The orphan cleanup only works if LocalScheduler is instantiated from the main thread.

Config Options

    usage:
        [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES]

    optional arguments:
        log_dir=LOG_DIR (str, None)
            Directory to write stdout/stderr log files of replicas.
        prepend_cwd=PREPEND_CWD (bool, False)
            If set, prepends CWD to replica's PATH env var making binaries in CWD take precedence.
        auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False)
            Sets CUDA_VISIBLE_DEVICES for roles that request GPU resources.

Compatibility

Note

Due to scheduler differences jobs that run locally may not work when using a different scheduler due to network or software dependencies.

Feature

Scheduler Support

Fetch Logs

✔️

Distributed Jobs

LocalScheduler supports multiple replicas but all replicas will execute on the local host.

Cancel Job

✔️

Describe Job

✔️

Workspaces / Patching

Partial support. LocalScheduler runs the app from a local directory but does not support programmatic workspaces.

Mounts

Elasticity

auto_set_CUDA_VISIBLE_DEVICES(role_params: dict[str, list[torchx.schedulers.local_scheduler.ReplicaParam]], app: AppDef, cfg: Opts) None[source]

If the run option auto_set_cuda_visible_devices = True, then sets the CUDA_VISIBLE_DEVICES env var to each replica’s (node) env var according to the number of gpus specified in each role’s resource specifications, overwriting any existing CUDA_VISIBLE_DEVICES in the role’s env field. To manually set CUDA_VISIBLE_DEVICES, run with auto_set_cuda_visible_devices = False in the scheduler runcfg.

Note

If the host’s device count is less than the total number of requested GPUs, then CUDA_VISIBLE_DEVICES is NOT set (even if auto_set_cuda_visible_devices=True).

Note

This method either sets CUDA_VISIBLE_DEVICES on all gpu roles or doesn’t

Examples (all examples assume running on a host with 8 GPUs):

  1. Role(num_replicas=2, resource=Resource(gpus=2))
    1. replica_0’s CUDA_VISIBLE_DEVICES=0,1

    2. replica_1’s CUDA_VISIBLE_DEVICES=2,3

  2. Role(num_replicas=3, resource=Resource(gpus=4))
    1. Error - `` 3 * 4 = 12 >= 8``

  3. [Role(num_replicas=1, resource=Resource(gpus=2)), Role(num_replicas=3, resource=Resource(gpus=1))]
    1. role_0, replica_0’s CUDA_VISIBLE_DEVICES=0,1

    2. role_1, replica_0’s CUDA_VISIBLE_DEVICES=2

    3. role_1, replica_1’s CUDA_VISIBLE_DEVICES=3

    4. role_1, replica_2’s CUDA_VISIBLE_DEVICES=4

close() None[source]

Releases local resources. Safe to call multiple times.

Only override for schedulers with local state (e.g. local_scheduler).

describe(app_id: str) torchx.schedulers.api.DescribeAppResponse | None[source]

Returns app description, or None if it no longer exists.

list(cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None) list[torchx.schedulers.api.ListAppResponse][source]

Lists jobs on this scheduler.

log_iter(app_id: str, role_name: str, k: int = 0, regex: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, should_tail: bool = False, streams: torchx.schedulers.api.Stream | None = None) Iterable[str][source]

Returns an iterator over log lines for the k-th replica of role_name.

Important

Not all schedulers support log iteration, tailing, or time-based cursors. Check the specific scheduler docs.

Lines include trailing whitespace (\n). When should_tail=True, the iterator blocks until the app reaches a terminal state.

Parameters:
  • k – replica (node) index

  • regex – optional filter pattern

  • since – start cursor (scheduler-dependent)

  • until – end cursor (scheduler-dependent)

  • should_tail – if True, follow output like tail -f

  • streamsstdout, stderr, or combined

Raises:

NotImplementedError – if the scheduler does not support log iteration

schedule(dryrun_info: AppDryRunInfo[PopenRequest]) str[source]

Submits a previously dry-run request. Returns the app_id.

Image Providers

class torchx.schedulers.local_scheduler.ImageProvider[source]

Manages downloading and setting up an image on localhost. This is only needed for LocalhostScheduler since typically real schedulers will do this on-behalf of the user.

abstract fetch(image: str) str[source]

Pulls the given image and returns a path to the pulled image on the local host or empty string if no op

fetch_role(role: Role) str[source]

Identical to fetch(image) in that it fetches the role’s image and returns the path to the image root, except that it allows the role to be updated by this provider. Useful when additional environment variables need to be set on the role to comply with the image provider’s way of fetching and managing images on localhost. By default this method simply delegates to fetch(role.image). Override if necessary.

get_cwd(image: str) str | None[source]

Returns the absolute path of the mounted img directory. Used as a working directory for starting child processes.

get_entrypoint(img_root: str, role: Role) str[source]

Returns the location of the entrypoint.

get_replica_param(img_root: str, role: Role, stdout: str | None = None, stderr: str | None = None, combined: str | None = None) ReplicaParam[source]

Given the role replica’s specs returns ReplicaParam holder which hold the arguments to eventually pass to subprocess.Popen to actually invoke and run each role’s replica. The img_root is expected to be the return value of self.fetch(role.image). Since the role’s image need only be fetched once (not for each replica) it is expected that the caller call the fetch method once per role and call this method for each role.num_replicas.

class torchx.schedulers.local_scheduler.CWDImageProvider(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None])[source]

Similar to LocalDirectoryImageProvider however it ignores the image name and uses the current working directory as the image path.

Example:

  1. fetch(Image(name="/tmp/foobar")) returns os.getcwd()

  2. fetch(Image(name="foobar:latest")) returns os.getcwd()

fetch(image: str) str[source]

Pulls the given image and returns a path to the pulled image on the local host or empty string if no op

get_cwd(image: str) str | None[source]

Returns the absolute path of the mounted img directory. Used as a working directory for starting child processes.

get_entrypoint(img_root: str, role: Role) str[source]

Returns the location of the entrypoint.

class torchx.schedulers.local_scheduler.LocalDirectoryImageProvider(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None])[source]

Interprets the image name as the path to a directory on local host. Does not “fetch” (e.g. download) anything. Used in conjunction with LocalScheduler to run local binaries.

The image name must be an absolute path and must exist.

Example:

  1. fetch(Image(name="/tmp/foobar")) returns /tmp/foobar

  2. fetch(Image(name="foobar")) raises ValueError

  3. fetch(Image(name="/tmp/dir/that/does/not_exist")) raises ValueError

fetch(image: str) str[source]
Raises:

ValueError – if the image name is not an absolute dir and if it does not exist or is not a directory

get_cwd(image: str) str | None[source]

Returns the absolute working directory. Used as a working directory for the child process.

get_entrypoint(img_root: str, role: Role) str[source]

Returns the role entrypoint. When local scheduler is executed with image_type=dir, the childprocess working directory will be set to the img_root. If role.entrypoint is relative path, it would be resolved as img_root/role.entrypoint, if role.entrypoint is absolute path, it will be executed as provided.

Reference

torchx.schedulers.local_scheduler.create_scheduler(session_name: str, cache_size: int = 100, extra_paths: list[str] | None = None, image_provider_class: ~typing.Callable[[~typing.Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]], ~torchx.schedulers.local_scheduler.ImageProvider] = <class 'torchx.schedulers.local_scheduler.CWDImageProvider'>, **kwargs: ~typing.Any) LocalScheduler[source]
class torchx.schedulers.local_scheduler.LogIterator(app_id: str, log_file: str, scheduler: Scheduler, should_tail: bool = True)[source]
class torchx.schedulers.local_scheduler.PopenRequest(app_id: str, log_dir: str, role_params: dict[str, list[torchx.schedulers.local_scheduler.ReplicaParam]], role_log_dirs: dict[str, list[str]])[source]

Holds parameters to create a subprocess for each replica of each role of an application.

class torchx.schedulers.local_scheduler.ReplicaParam(args: list[str], env: dict[str, str], stdout: str | None = None, stderr: str | None = None, combined: str | None = None, cwd: str | None = None)[source]

Holds LocalScheduler._popen() parameters for each replica of the role.

class torchx.schedulers.local_scheduler.SignalException(msg: str, sigval: Signals)[source]

Exception is raised during the runtime when the torchx local scheduler process got termination signal.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources