Rate this Page

monarch.job#

The monarch.job module provides a declarative interface for managing distributed job resources. Jobs abstract away the details of different schedulers (SLURM, local execution, etc.) and provide a unified way to allocate hosts and create HostMesh objects.

Job Model#

A job object comprises a declarative specification and optionally the job’s state. The apply() operation applies the job’s specification to the scheduler, creating or updating the job as required. Once applied, you can query the job’s state() to get the allocated HostMesh objects.

Example:

from monarch.job import SlurmJob

# Create a job specification
job = SlurmJob(
    meshes={"trainers": 4, "dataloaders": 2},
    partition="gpu",
    time_limit="01:00:00",
)

# Get the state (applies the job if needed)
state = job.state()

# Access host meshes by name
trainer_hosts = state.trainers
dataloader_hosts = state.dataloaders

Job State#

class monarch.job.JobState(hosts)[source]#

Bases: object

Container for the current state of a job.

Provides access to the HostMesh objects for each mesh requested in the job specification. Each mesh is accessible as an attribute.

Example:

state = job.state()
state.trainers    # HostMesh for the "trainers" mesh
state.dataloaders # HostMesh for the "dataloaders" mesh
__init__(hosts)[source]#

Job Base Class#

All job implementations inherit from JobTrait, which defines the core interface for job lifecycle management.

class monarch.job.JobTrait[source]#

Bases: ABC

A job object represents a specification and set of machines that can be used to create monarch HostMeshes and run actors on them.

A job object comprises a declarative specification for the job and optionally the job’s state. The apply() operation applies the job’s specification to the scheduler, creating or updating the job as required. If the job exists and there are no changes in its specification, apply() is a no-op. Once applied, we can query the job’s state. The state of the job contains the set of hosts currently allocated, arranged into the requested host meshes. Conceptually, the state can be retrieved directly from the scheduler, but we may also cache snapshots of the state locally.

The state is the interface to the job consumed by Monarch: Monarch bootstraps host meshes from the state alone, and is not concerned with any other aspect of the job.

Conceptually, dynamic jobs (e.g., to enable consistently fast restarts, elasticity, etc.) can simply poll the state for changes. In practice, notification mechanisms would be developed so that polling isn’t required. The model allows for late resolution of some parts of the job’s specification. For example, a job that does not specify a name may instead resolve the name on the first apply(). In this way, jobs can also be “templates”. But the model also supports having the job refer to a specific instance by including the resolved job name in the specification itself.

Note

Subclasses must NOT set _status directly. The state() method manages status transitions and pickle caching. If a subclass pre-emptively sets _status = "running", the state() method will skip the cache dump, breaking job persistence. Instead, let apply() set the status after _create() returns.

apply(client_script=None)[source]#

Request the job as specified is brought into existence or modified to the current specification/ The worker machines launched in the job should call run_worker_forever to join the job.

Calling apply when the job as specified has already been applied is a no-op.

If client_script is not None, then creating the job arranges for the job to run train.py as the client.

Implementation note: To batch launch the job, we will first write .monarch/job_state.pkl with a Job that instructs the client to connect to the job that it is running in. Then we will schedule the job including that .monarch/job_state.pkl. When the client calls .state(), it will find the .monarch/job_state.pkl and connect to it.

property active: bool#
state(cached_path='.monarch/job_state.pkl')[source]#
Get the current state of this job, containing the host mesh objects of its requires that were requested

host_meshes = self.state() # properties of state hold the requested host meshes:

host_meshes.trainers host_meshes.dataloaders This is a dictionary so that meshes can hold different machine types.

cached_path: if cached_path is not None and the job has yet to be applied, we will first check cached_path for an existing created job state. If it exists and saved_job.can_run(self), we will connect to the cached job. Otherwise, we will apply this job and connect to it, saving the job in cached_path if it is not None.

Raises: JobExpiredException - when the job has finished and this connection cannot be made.

dump(filename)[source]#

Save job to a file. Helper to make it more apparent

Jobs are serializable across processes.

dumps()[source]#
kill()[source]#
abstract can_run(spec)[source]#

Is this job capable of running the job spec? This is used to check if a cached job can be used to run spec instead of creating a new reserveration.

It is also used by the batch run infrastructure to indicate that the batch job can certainly run itself.

Job Implementations#

LocalJob#

class monarch.job.LocalJob(hosts=('hosts',))[source]#

Bases: JobTrait

Job that runs on the local host.

This job calls this_host() for each host mesh requested. It serves as a stand-in in configuration so a job can be switched between remote and local execution by changing the job configuration.

can_run(spec)[source]#

Local jobs are the same regardless of what was saved, so just use the spec, which has the correct ‘hosts’ sequence.

property process#

SlurmJob#

class monarch.job.SlurmJob(meshes, python_exe='python', slurm_args=(), monarch_port=22222, job_name='monarch_job', ntasks_per_node=1, time_limit=None, partition=None, log_dir=None, exclusive=True, gpus_per_node=None, cpus_per_task=None, mem=None, job_start_timeout=None)[source]#

Bases: JobTrait

A job scheduler that uses SLURM command line tools to schedule jobs.

This implementation: 1. Uses sbatch to submit SLURM jobs that start monarch workers 2. Queries job status with squeue to get allocated hostnames 3. Uses the hostnames to connect to the started workers

add_mesh(name, num_nodes)[source]#
can_run(spec)[source]#

Check if this job can run the given spec.

share_node(tasks_per_node, gpus_per_task, partition)[source]#

Share a node with other jobs.

KubernetesJob#

class monarch.job.kubernetes.KubernetesJob(namespace, timeout=None)[source]#

Bases: JobTrait

Job implementation for Kubernetes that discovers and connects to pods.

Supports two modes:

Pre-provisioned – connect to pre-provisioned pods discovered via label selectors. Compatible with the MonarchMesh operator, third-party schedulers, or manually created pods. Used when image_spec or pod_spec is not specified in add_mesh.

Provisioning – create MonarchMesh CRDs via the K8s API so the pre-installed operator provisions StatefulSets and Services automatically. Pass image_spec or pod_spec (a V1PodSpec) to add_mesh to enable provisioning for that mesh. If the MonarchMesh CRD already exists, it is patched instead of created.

add_mesh(name, num_replicas, label_selector=None, pod_rank_label='apps.kubernetes.io/pod-index', image_spec=None, port=26600, pod_spec=None, labels=None)[source]#

Add a mesh specification.

In attach-only mode (default), meshes are discovered by label selector. In provisioning mode (image_spec or pod_spec supplied), a MonarchMesh CRD is created so the operator can provision the pods.

Parameters:
  • name (str) – Name of the mesh. Must follow RFC 1123 DNS label standard and Monarch hostname restriction: * At most 63 characters * only lowercase alphanumeric characters * must start with an alphabetic character, * and end with an alphanumeric character.

  • num_replicas (int) – Number of pod replicas (expects all ranks 0 to num_replicas-1)

  • label_selector (str | None) – Custom label selector for pod discovery. Cannot be set when provisioning.

  • pod_rank_label (str) – Label key containing the pod rank. Cannot be customized when provisioning.

  • image_spec (ImageSpec | None) – ImageSpec with container image and optional resources for simple provisioning. Mutually exclusive with pod_spec.

  • port (int) – Monarch worker port (default: 26600).

  • pod_spec (V1PodSpec | None) – V1PodSpec for advanced provisioning (e.g. custom volumes, sidecars). Mutually exclusive with image_spec.

  • labels (dict[str, str] | None) – Optional labels to apply to the MonarchMesh CRD metadata. Only used when provisioning (image_spec or pod_spec supplied).

Raises:

ValueError – On invalid name or conflicting parameters.

can_run(spec)[source]#

Check if this job can run the given spec.

Verifies that: 1. The spec is a KubernetesJob with matching configuration 2. The required pods are available and ready

Parameters:

spec (JobTrait) – JobTrait specification to check

Returns:

True if this job matches the spec and all required pods are available

Return type:

bool

Serialization#

Jobs can be serialized and deserialized for persistence and caching.

monarch.job.job_load(filename)[source]#

Load a job from a file.

Parameters:

filename (str) – Path to the pickled job file, typically from JobTrait.dump().

Returns:

The deserialized job object.

Return type:

JobTrait

monarch.job.job_loads(data)[source]#

Deserialize a job from bytes.

Parameters:

data (bytes) – Pickled job bytes, typically from JobTrait.dumps().

Returns:

The deserialized job object.

Return type:

JobTrait

SPMD Jobs#

The monarch.job.spmd submodule provides job primitives for launching torchrun-style SPMD training over Monarch. It parses torchrun arguments from an AppDef and executes the training script across the mesh.

monarch.job.spmd.serve(appdef, scheduler='mast_conda', scheduler_cfg=None)[source]#

Launch SPMD job using an AppDef or a single-node torchrun command.

This function launches monarch workers, then allows running SPMD training via run_spmd().

Assumptions:
  • When using an AppDef, the role’s entrypoint is a script (e.g., “workspace/entrypoint.sh”) that sets up the environment (activates conda, sets WORKSPACE_DIR, etc.) and runs its arguments.

  • The role’s args contains a torchrun command with the training script, e.g., [“torchrun”, “–nnodes=1”, “-m”, “train”, “–lr”, “0.001”].

  • The role’s workspace defines which files to upload to workers.

  • When using a command list, it should be a torchrun command, e.g., [“torchrun”, “–nproc-per-node=4”, “–standalone”, “train.py”].

Note

When passing a command list, only single-node torchrun is supported (--standalone or --nnodes=1). For multi-node training, use an AppDef with a scheduler that manages node allocation.

Parameters:
  • appdef (AppDef | List[str]) – Either a torchx AppDef instance, or a torchrun command as a list of strings (e.g., ["torchrun", "--nproc-per-node=4", "train.py"]). When a list is provided, the first element is the entrypoint and the rest are arguments.

  • scheduler (str) – Scheduler name (e.g., ‘mast_conda’, ‘local_cwd’)

  • scheduler_cfg (Dict[str, Any] | None) – Scheduler configuration dict

Returns:

SPMDJob instance

Raises:

ValueError – If command list specifies multi-node (–nnodes > 1).

Return type:

SPMDJob

Example

Using a torchrun command list (single-node only):

from monarch.job.spmd import serve

job = serve(
    ["torchrun", "--nproc-per-node=4", "--standalone", "train.py"],
    scheduler="local_cwd",
)
job.run_spmd()

Using an AppDef (supports multi-node):

from monarch.job.spmd import serve
from torchx import specs

app = specs.AppDef(
    name="my-training",
    roles=[
        specs.Role(
            name="trainer",
            image="my_workspace:latest",
            entrypoint="workspace/entrypoint.sh",
            args=["torchrun", "--nnodes=2", "--nproc-per-node=8",
                  "-m", "train"],
            num_replicas=2,
            resource=specs.resource(h="gtt_any"),
        ),
    ],
)
job = serve(
    app,
    scheduler="mast_conda",
    scheduler_cfg={
        "hpcClusterUuid": "MastGenAICluster",
        "hpcIdentity": "my_identity",
        "localityConstraints": ["region", "pci"],
    },
)
job.run_spmd()
class monarch.job.spmd.SPMDJob(handle, scheduler, workspace=None, original_roles=None)[source]#

Bases: JobTrait

SPMD (Single Program Multiple Data) job that uses torchx directly.

This job type wraps a torchx Runner and job handle, providing monarch job tracking.

can_run(spec)[source]#

Is this job capable of running the job spec? This is used to check if a cached job can be used to run spec instead of creating a new reserveration.

It is also used by the batch run infrastructure to indicate that the batch job can certainly run itself.

run_spmd()[source]#