Local¶
This contains the TorchX local scheduler which can be used to run TorchX components locally via subprocesses.
- class torchx.schedulers.local_scheduler.LocalScheduler(session_name: str, image_provider_class: Callable[[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]], ImageProvider], cache_size: int = 100, extra_paths: list[str] | None = None)[source]¶
Bases:
Scheduler[Mapping[str,str|int|float|bool|list[str] |dict[str,str] |None]]Schedules on localhost. Containers are modeled as processes and certain properties of the container that are either not relevant or that cannot be enforced for localhost runs are ignored. Properties that are ignored:
Resource requirements
Resource limit enforcements
Retry policies
Retry counts (no retries supported)
Deployment preferences
Scheduler support orphan processes cleanup on receiving SIGTERM or SIGINT. The scheduler will terminate the spawned processes.
This is exposed via the scheduler local_cwd.
local_cwd runs the provided app relative to the current working directory and ignores the images field for faster iteration and testing purposes.
Note
The orphan cleanup only works if LocalScheduler is instantiated from the main thread.
Config Options
usage: [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES] optional arguments: log_dir=LOG_DIR (str, None) Directory to write stdout/stderr log files of replicas. prepend_cwd=PREPEND_CWD (bool, False) If set, prepends CWD to replica's PATH env var making binaries in CWD take precedence. auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False) Sets CUDA_VISIBLE_DEVICES for roles that request GPU resources.
Compatibility
Note
Due to scheduler differences jobs that run locally may not work when using a different scheduler due to network or software dependencies.
Feature
Scheduler Support
Fetch Logs
✔️
Distributed Jobs
LocalScheduler supports multiple replicas but all replicas will execute on the local host.
Cancel Job
✔️
Describe Job
✔️
Workspaces / Patching
Partial support. LocalScheduler runs the app from a local directory but does not support programmatic workspaces.
Mounts
❌
Elasticity
❌
- auto_set_CUDA_VISIBLE_DEVICES(role_params: dict[str, list[torchx.schedulers.local_scheduler.ReplicaParam]], app: AppDef, cfg: Opts) None[source]¶
If the run option
auto_set_cuda_visible_devices = True, then sets theCUDA_VISIBLE_DEVICESenv var to each replica’s (node) env var according to the number of gpus specified in each role’s resource specifications, overwriting any existingCUDA_VISIBLE_DEVICESin the role’senvfield. To manually setCUDA_VISIBLE_DEVICES, run withauto_set_cuda_visible_devices = Falsein the scheduler runcfg.Note
If the host’s device count is less than the total number of requested GPUs, then
CUDA_VISIBLE_DEVICESis NOT set (even ifauto_set_cuda_visible_devices=True).Note
This method either sets
CUDA_VISIBLE_DEVICESon all gpu roles or doesn’tExamples (all examples assume running on a host with 8 GPUs):
Role(num_replicas=2, resource=Resource(gpus=2))replica_0’s
CUDA_VISIBLE_DEVICES=0,1replica_1’s
CUDA_VISIBLE_DEVICES=2,3
Role(num_replicas=3, resource=Resource(gpus=4))Error - `` 3 * 4 = 12 >= 8``
[Role(num_replicas=1, resource=Resource(gpus=2)), Role(num_replicas=3, resource=Resource(gpus=1))]role_0, replica_0’s
CUDA_VISIBLE_DEVICES=0,1role_1, replica_0’s
CUDA_VISIBLE_DEVICES=2role_1, replica_1’s
CUDA_VISIBLE_DEVICES=3role_1, replica_2’s
CUDA_VISIBLE_DEVICES=4
- close() None[source]¶
Releases local resources. Safe to call multiple times.
Only override for schedulers with local state (e.g.
local_scheduler).
- describe(app_id: str) torchx.schedulers.api.DescribeAppResponse | None[source]¶
Returns app description, or
Noneif it no longer exists.
- list(cfg: Optional[Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]] = None) list[torchx.schedulers.api.ListAppResponse][source]¶
Lists jobs on this scheduler.
- log_iter(app_id: str, role_name: str, k: int = 0, regex: str | None = None, since: datetime.datetime | None = None, until: datetime.datetime | None = None, should_tail: bool = False, streams: torchx.schedulers.api.Stream | None = None) Iterable[str][source]¶
Returns an iterator over log lines for the
k-th replica ofrole_name.Important
Not all schedulers support log iteration, tailing, or time-based cursors. Check the specific scheduler docs.
Lines include trailing whitespace (
\n). Whenshould_tail=True, the iterator blocks until the app reaches a terminal state.- Parameters:
k – replica (node) index
regex – optional filter pattern
since – start cursor (scheduler-dependent)
until – end cursor (scheduler-dependent)
should_tail – if
True, follow output liketail -fstreams –
stdout,stderr, orcombined
- Raises:
NotImplementedError – if the scheduler does not support log iteration
- schedule(dryrun_info: AppDryRunInfo[PopenRequest]) str[source]¶
Submits a previously dry-run request. Returns the app_id.
Image Providers¶
- class torchx.schedulers.local_scheduler.ImageProvider[source]¶
Manages downloading and setting up an image on localhost. This is only needed for
LocalhostSchedulersince typically real schedulers will do this on-behalf of the user.- abstract fetch(image: str) str[source]¶
Pulls the given image and returns a path to the pulled image on the local host or empty string if no op
- fetch_role(role: Role) str[source]¶
Identical to
fetch(image)in that it fetches the role’s image and returns the path to the image root, except that it allows the role to be updated by this provider. Useful when additional environment variables need to be set on the role to comply with the image provider’s way of fetching and managing images on localhost. By default this method simply delegates tofetch(role.image). Override if necessary.
- get_cwd(image: str) str | None[source]¶
Returns the absolute path of the mounted img directory. Used as a working directory for starting child processes.
- get_replica_param(img_root: str, role: Role, stdout: str | None = None, stderr: str | None = None, combined: str | None = None) ReplicaParam[source]¶
Given the role replica’s specs returns
ReplicaParamholder which hold the arguments to eventually pass tosubprocess.Popento actually invoke and run each role’s replica. Theimg_rootis expected to be the return value ofself.fetch(role.image). Since the role’s image need only be fetched once (not for each replica) it is expected that the caller call thefetchmethod once per role and call this method for eachrole.num_replicas.
- class torchx.schedulers.local_scheduler.CWDImageProvider(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None])[source]¶
Similar to LocalDirectoryImageProvider however it ignores the image name and uses the current working directory as the image path.
Example:
fetch(Image(name="/tmp/foobar"))returns os.getcwd()fetch(Image(name="foobar:latest"))returns os.getcwd()
- fetch(image: str) str[source]¶
Pulls the given image and returns a path to the pulled image on the local host or empty string if no op
- class torchx.schedulers.local_scheduler.LocalDirectoryImageProvider(cfg: Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None])[source]¶
Interprets the image name as the path to a directory on local host. Does not “fetch” (e.g. download) anything. Used in conjunction with
LocalSchedulerto run local binaries.The image name must be an absolute path and must exist.
Example:
fetch(Image(name="/tmp/foobar"))returns/tmp/foobarfetch(Image(name="foobar"))raisesValueErrorfetch(Image(name="/tmp/dir/that/does/not_exist"))raisesValueError
- fetch(image: str) str[source]¶
- Raises:
ValueError – if the image name is not an absolute dir and if it does not exist or is not a directory
- get_cwd(image: str) str | None[source]¶
Returns the absolute working directory. Used as a working directory for the child process.
- get_entrypoint(img_root: str, role: Role) str[source]¶
Returns the role entrypoint. When local scheduler is executed with image_type=dir, the childprocess working directory will be set to the img_root. If role.entrypoint is relative path, it would be resolved as img_root/role.entrypoint, if role.entrypoint is absolute path, it will be executed as provided.
Reference¶
- torchx.schedulers.local_scheduler.create_scheduler(session_name: str, cache_size: int = 100, extra_paths: list[str] | None = None, image_provider_class: ~typing.Callable[[~typing.Mapping[str, str | int | float | bool | list[str] | dict[str, str] | None]], ~torchx.schedulers.local_scheduler.ImageProvider] = <class 'torchx.schedulers.local_scheduler.CWDImageProvider'>, **kwargs: ~typing.Any) LocalScheduler[source]¶
- class torchx.schedulers.local_scheduler.LogIterator(app_id: str, log_file: str, scheduler: Scheduler, should_tail: bool = True)[source]¶
- class torchx.schedulers.local_scheduler.PopenRequest(app_id: str, log_dir: str, role_params: dict[str, list[torchx.schedulers.local_scheduler.ReplicaParam]], role_log_dirs: dict[str, list[str]])[source]¶
Holds parameters to create a subprocess for each replica of each role of an application.