monarch.job#
The monarch.job module provides a declarative interface for managing
distributed job resources. Jobs abstract away the details of different
schedulers (SLURM, local execution, etc.) and provide a unified way to
allocate hosts and create HostMesh objects.
Job Model#
A job object comprises a declarative specification and optionally the job’s
state. The apply() operation applies the job’s specification to the
scheduler, creating or updating the job as required. Once applied, you can
query the job’s state() to get the allocated HostMesh objects.
Example:
from monarch.job import SlurmJob
# Create a job specification
job = SlurmJob(
meshes={"trainers": 4, "dataloaders": 2},
partition="gpu",
time_limit="01:00:00",
)
# Get the state (applies the job if needed)
state = job.state()
# Access host meshes by name
trainer_hosts = state.trainers
dataloader_hosts = state.dataloaders
Job State#
- class monarch.job.JobState(hosts)[source]#
Bases:
objectContainer for the current state of a job.
Provides access to the HostMesh objects for each mesh requested in the job specification. Each mesh is accessible as an attribute.
Example:
state = job.state() state.trainers # HostMesh for the "trainers" mesh state.dataloaders # HostMesh for the "dataloaders" mesh
Job Base Class#
All job implementations inherit from JobTrait, which defines the core
interface for job lifecycle management.
- class monarch.job.JobTrait[source]#
Bases:
ABCA job object represents a specification and set of machines that can be used to create monarch HostMeshes and run actors on them.
A job object comprises a declarative specification for the job and optionally the job’s state. The
apply()operation applies the job’s specification to the scheduler, creating or updating the job as required. If the job exists and there are no changes in its specification,apply()is a no-op. Once applied, we can query the job’s state. The state of the job contains the set of hosts currently allocated, arranged into the requested host meshes. Conceptually, the state can be retrieved directly from the scheduler, but we may also cache snapshots of the state locally.The state is the interface to the job consumed by Monarch: Monarch bootstraps host meshes from the state alone, and is not concerned with any other aspect of the job.
Conceptually, dynamic jobs (e.g., to enable consistently fast restarts, elasticity, etc.) can simply poll the state for changes. In practice, notification mechanisms would be developed so that polling isn’t required. The model allows for late resolution of some parts of the job’s specification. For example, a job that does not specify a name may instead resolve the name on the first
apply(). In this way, jobs can also be “templates”. But the model also supports having the job refer to a specific instance by including the resolved job name in the specification itself.Note
Subclasses must NOT set
_statusdirectly. Thestate()method manages status transitions and pickle caching. If a subclass pre-emptively sets_status = "running", thestate()method will skip the cache dump, breaking job persistence. Instead, letapply()set the status after_create()returns.- apply(client_script=None)[source]#
Request the job as specified is brought into existence or modified to the current specification/ The worker machines launched in the job should call run_worker_forever to join the job.
Calling apply when the job as specified has already been applied is a no-op.
If client_script is not None, then creating the job arranges for the job to run train.py as the client.
Implementation note: To batch launch the job, we will first write .monarch/job_state.pkl with a Job that instructs the client to connect to the job that it is running in. Then we will schedule the job including that .monarch/job_state.pkl. When the client calls .state(), it will find the .monarch/job_state.pkl and connect to it.
- state(cached_path='.monarch/job_state.pkl')[source]#
- Get the current state of this job, containing the host mesh objects of its requires that were requested
host_meshes = self.state() # properties of state hold the requested host meshes:
host_meshes.trainers host_meshes.dataloaders This is a dictionary so that meshes can hold different machine types.
cached_path: if cached_path is not None and the job has yet to be applied, we will first check cached_path for an existing created job state. If it exists and saved_job.can_run(self), we will connect to the cached job. Otherwise, we will apply this job and connect to it, saving the job in cached_path if it is not None.
Raises: JobExpiredException - when the job has finished and this connection cannot be made.
Job Implementations#
LocalJob#
- class monarch.job.LocalJob(hosts=('hosts',))[source]#
Bases:
JobTraitJob that runs on the local host.
This job calls
this_host()for each host mesh requested. It serves as a stand-in in configuration so a job can be switched between remote and local execution by changing the job configuration.- can_run(spec)[source]#
Local jobs are the same regardless of what was saved, so just use the spec, which has the correct ‘hosts’ sequence.
- property process#
SlurmJob#
- class monarch.job.SlurmJob(meshes, python_exe='python', slurm_args=(), monarch_port=22222, job_name='monarch_job', ntasks_per_node=1, time_limit=None, partition=None, log_dir=None, exclusive=True, gpus_per_node=None, cpus_per_task=None, mem=None, job_start_timeout=None)[source]#
Bases:
JobTraitA job scheduler that uses SLURM command line tools to schedule jobs.
This implementation: 1. Uses sbatch to submit SLURM jobs that start monarch workers 2. Queries job status with squeue to get allocated hostnames 3. Uses the hostnames to connect to the started workers
Share a node with other jobs.
KubernetesJob#
- class monarch.job.kubernetes.KubernetesJob(namespace, timeout=None)[source]#
Bases:
JobTraitJob implementation for Kubernetes that discovers and connects to pods.
Supports two modes:
Pre-provisioned – connect to pre-provisioned pods discovered via label selectors. Compatible with the MonarchMesh operator, third-party schedulers, or manually created pods. Used when
image_specorpod_specis not specified inadd_mesh.Provisioning – create MonarchMesh CRDs via the K8s API so the pre-installed operator provisions StatefulSets and Services automatically. Pass
image_specorpod_spec(aV1PodSpec) toadd_meshto enable provisioning for that mesh. If the MonarchMesh CRD already exists, it is patched instead of created.- add_mesh(name, num_replicas, label_selector=None, pod_rank_label='apps.kubernetes.io/pod-index', image_spec=None, port=26600, pod_spec=None, labels=None)[source]#
Add a mesh specification.
In attach-only mode (default), meshes are discovered by label selector. In provisioning mode (
image_specorpod_specsupplied), a MonarchMesh CRD is created so the operator can provision the pods.- Parameters:
name (str) – Name of the mesh. Must follow RFC 1123 DNS label standard and Monarch hostname restriction: * At most 63 characters * only lowercase alphanumeric characters * must start with an alphabetic character, * and end with an alphanumeric character.
num_replicas (int) – Number of pod replicas (expects all ranks 0 to num_replicas-1)
label_selector (str | None) – Custom label selector for pod discovery. Cannot be set when provisioning.
pod_rank_label (str) – Label key containing the pod rank. Cannot be customized when provisioning.
image_spec (ImageSpec | None) –
ImageSpecwith container image and optional resources for simple provisioning. Mutually exclusive withpod_spec.port (int) – Monarch worker port (default: 26600).
pod_spec (V1PodSpec | None) –
V1PodSpecfor advanced provisioning (e.g. custom volumes, sidecars). Mutually exclusive withimage_spec.labels (dict[str, str] | None) – Optional labels to apply to the MonarchMesh CRD metadata. Only used when provisioning (
image_specorpod_specsupplied).
- Raises:
ValueError – On invalid name or conflicting parameters.
Serialization#
Jobs can be serialized and deserialized for persistence and caching.
- monarch.job.job_load(filename)[source]#
Load a job from a file.
- Parameters:
filename (str) – Path to the pickled job file, typically from
JobTrait.dump().- Returns:
The deserialized job object.
- Return type:
- monarch.job.job_loads(data)[source]#
Deserialize a job from bytes.
- Parameters:
data (bytes) – Pickled job bytes, typically from
JobTrait.dumps().- Returns:
The deserialized job object.
- Return type:
SPMD Jobs#
The monarch.job.spmd submodule provides job primitives for launching
torchrun-style SPMD training over Monarch. It parses torchrun arguments from
an AppDef and executes the training script across the mesh.
- monarch.job.spmd.serve(appdef, scheduler='mast_conda', scheduler_cfg=None)[source]#
Launch SPMD job using an AppDef or a single-node torchrun command.
This function launches monarch workers, then allows running SPMD training via run_spmd().
- Assumptions:
When using an AppDef, the role’s entrypoint is a script (e.g., “workspace/entrypoint.sh”) that sets up the environment (activates conda, sets WORKSPACE_DIR, etc.) and runs its arguments.
The role’s args contains a torchrun command with the training script, e.g., [“torchrun”, “–nnodes=1”, “-m”, “train”, “–lr”, “0.001”].
The role’s workspace defines which files to upload to workers.
When using a command list, it should be a torchrun command, e.g., [“torchrun”, “–nproc-per-node=4”, “–standalone”, “train.py”].
Note
When passing a command list, only single-node torchrun is supported (
--standaloneor--nnodes=1). For multi-node training, use anAppDefwith a scheduler that manages node allocation.- Parameters:
appdef (AppDef | List[str]) – Either a torchx
AppDefinstance, or a torchrun command as a list of strings (e.g.,["torchrun", "--nproc-per-node=4", "train.py"]). When a list is provided, the first element is the entrypoint and the rest are arguments.scheduler (str) – Scheduler name (e.g., ‘mast_conda’, ‘local_cwd’)
scheduler_cfg (Dict[str, Any] | None) – Scheduler configuration dict
- Returns:
SPMDJob instance
- Raises:
ValueError – If command list specifies multi-node (–nnodes > 1).
- Return type:
Example
Using a torchrun command list (single-node only):
from monarch.job.spmd import serve job = serve( ["torchrun", "--nproc-per-node=4", "--standalone", "train.py"], scheduler="local_cwd", ) job.run_spmd()
Using an AppDef (supports multi-node):
from monarch.job.spmd import serve from torchx import specs app = specs.AppDef( name="my-training", roles=[ specs.Role( name="trainer", image="my_workspace:latest", entrypoint="workspace/entrypoint.sh", args=["torchrun", "--nnodes=2", "--nproc-per-node=8", "-m", "train"], num_replicas=2, resource=specs.resource(h="gtt_any"), ), ], ) job = serve( app, scheduler="mast_conda", scheduler_cfg={ "hpcClusterUuid": "MastGenAICluster", "hpcIdentity": "my_identity", "localityConstraints": ["region", "pci"], }, ) job.run_spmd()
- class monarch.job.spmd.SPMDJob(handle, scheduler, workspace=None, original_roles=None)[source]#
Bases:
JobTraitSPMD (Single Program Multiple Data) job that uses torchx directly.
This job type wraps a torchx Runner and job handle, providing monarch job tracking.