Note
Go to the end to download the full example code
DDP on Kubernetes#
This tutorial extends the DDP using SPMDActor tutorial to run PyTorch’s Distributed
Data Parallel (DDP) on Kubernetes using Monarch’s SPMDActor and KubernetesJob.
This example shows:
How to provision GPU workers on Kubernetes using
KubernetesJobwith Python-native provisioningHow to connect to Kubernetes pods using
KubernetesJobHow to run multi-node DDP training using
SPMDActor
Prerequisites#
Before running this example, you need:
A Kubernetes cluster with GPU nodes (
nvidia.com/gpuresources)The MonarchMesh CRD and operator installed
NVIDIA device plugin deployed
kubectlconfigured to access the cluster
Python-Native Provisioning#
With --provision, the script creates MonarchMesh CRDs directly from Python.
No YAML manifests for worker pods are needed – only a controller pod with
RBAC permissions to create CRDs and watch pods:
# Deploy the controller with CRD permissions
kubectl apply -f manifests/ddp_provision.yaml
# Copy scripts and run
kubectl cp kubernetes_ddp.py monarch-tests/ddp-controller:/tmp/kubernetes_ddp.py
kubectl cp train.py monarch-tests/ddp-controller:/tmp/train.py
kubectl exec -it ddp-controller -n monarch-tests -- \
python /tmp/kubernetes_ddp.py --provision --num_hosts 2 --gpus_per_host 4
YAML Manifest Provisioning#
Alternatively, you can pre-provision workers with YAML manifests:
apiVersion: monarch.pytorch.org/v1alpha1
kind: MonarchMesh
metadata:
name: ddpmesh # Name of MonarchMesh
namespace: monarch-tests
spec:
replicas: 2 # Number of worker pods (hosts)
port: 26600
podTemplate:
containers:
- name: worker
image: ghcr.io/meta-pytorch/monarch:latest
resources:
limits:
nvidia.com/gpu: 4
requests:
nvidia.com/gpu: 4
command:
- python
- -u
- -c
- |
from monarch.actor import run_worker_loop_forever
import socket
address = f"tcp://{socket.getfqdn()}:26600"
run_worker_loop_forever(address=address, ca="trust_all_connections")
Deploy with:
kubectl apply -f manifests/ddp_mesh.yaml
See the complete manifest on GitHub including RBAC configuration and controller pod.
Training Script#
The training script (train.py) is a standard PyTorch DDP script:
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
def main():
dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
model = nn.Linear(10, 1).cuda()
ddp_model = DDP(model)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
for step in range(5):
inputs = torch.randn(4, 10).cuda()
outputs = ddp_model(inputs)
loss = outputs.sum()
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"[Rank {rank}] Step {step} loss={loss.item()}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
This script:
Initializes NCCL process group (environment variables set by
SPMDActor)Creates a simple linear model wrapped in DDP
Runs 5 training steps
Cleans up the process group
Imports#
We import Monarch’s Kubernetes job support and SPMDActor.
import argparse
import asyncio
import textwrap
from kubernetes.client import (
V1Container,
V1EmptyDirVolumeSource,
V1EnvVar,
V1PodSpec,
V1ResourceRequirements,
V1Volume,
V1VolumeMount,
)
from monarch._src.job.kubernetes import _WORKER_BOOTSTRAP_SCRIPT
from monarch.config import configure
from monarch.job.kubernetes import KubernetesJob
from monarch.spmd import SPMDActor
from monarch.tools.network import AddrType
configure(enable_log_forwarding=True)
# Path to train.py on worker pods
TRAIN_SCRIPT = "/tmp/train.py"
# Training script content — written to worker pods at startup when provisioning
_TRAIN_SCRIPT_CONTENT = textwrap.dedent("""\
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
def main():
dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
model = nn.Linear(10, 1).cuda()
ddp_model = DDP(model)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
for step in range(5):
inputs = torch.randn(4, 10).cuda()
outputs = ddp_model(inputs)
loss = outputs.sum()
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"[Rank {rank}] Step {step} loss={loss.item()}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
""")
def build_gpu_pod_spec(gpus_per_host: int) -> V1PodSpec:
"""Build a V1PodSpec with GPU resources and shared memory for NCCL.
The bootstrap command writes train.py to the worker filesystem
before starting the Monarch worker loop, so the SPMDActor can
find and execute it.
"""
# Write train.py then start the worker loop
bootstrap = (
"import pathlib\n"
f"pathlib.Path({TRAIN_SCRIPT!r}).write_text({_TRAIN_SCRIPT_CONTENT!r})\n"
+ _WORKER_BOOTSTRAP_SCRIPT
)
gpu_resources = {"nvidia.com/gpu": str(gpus_per_host)}
return V1PodSpec(
containers=[
V1Container(
name="worker",
image="ghcr.io/meta-pytorch/monarch:latest",
command=["python", "-u", "-c", bootstrap],
env=[V1EnvVar(name="MONARCH_PORT", value="26600")],
resources=V1ResourceRequirements(
limits=gpu_resources,
requests=gpu_resources,
),
volume_mounts=[
V1VolumeMount(name="dshm", mount_path="/dev/shm"),
],
)
],
volumes=[
V1Volume(
name="dshm",
empty_dir=V1EmptyDirVolumeSource(medium="Memory", size_limit="16Gi"),
)
],
)
Main Function#
The main function connects to Kubernetes pods and runs DDP training
using SPMDActor to execute the training script.
async def main(
num_hosts: int = 2,
gpus_per_host: int = 4,
mesh_name: str = "ddpmesh",
provision: bool = False,
) -> None:
"""Run DDP training on Kubernetes.
Args:
num_hosts: Number of worker pods (must match MonarchMesh replicas)
gpus_per_host: GPUs per pod (must match nvidia.com/gpu in MonarchMesh)
mesh_name: Name of the MonarchMesh resource
provision: If True, create MonarchMesh CRDs from Python
"""
print("=" * 60)
print("Kubernetes DDP Example")
print(f"Configuration: {num_hosts} hosts, {gpus_per_host} GPUs/host")
print("=" * 60)
# %%
# Connect to Kubernetes
# ~~~~~~~~~~~~~~~~~~~~~
# Create a ``KubernetesJob`` in the ``monarch-tests`` namespace.
# With ``--provision``, the job creates MonarchMesh CRDs via the K8s API
# using ``pod_spec`` for full control over the pod template (needed for
# the shared memory volume that NCCL requires). Without ``--provision``,
# it attaches to pre-provisioned pods.
k8s_job = KubernetesJob(namespace="monarch-tests")
if provision:
k8s_job.add_mesh(
mesh_name,
num_replicas=num_hosts,
pod_spec=build_gpu_pod_spec(gpus_per_host),
)
else:
k8s_job.add_mesh(mesh_name, num_replicas=num_hosts)
# %%
# Create Process Mesh
# ~~~~~~~~~~~~~~~~~~~
# Get the job state and spawn processes on the workers. Each host gets
# ``gpus_per_host`` processes, one per GPU.
job_state = k8s_job.state()
host_mesh = getattr(job_state, mesh_name)
proc_mesh = host_mesh.spawn_procs({"gpus": gpus_per_host})
# Stream logs from all processes to the client
await proc_mesh.logging_option(stream_to_client=True)
# %%
# Run DDP Training with SPMDActor
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Spawn ``SPMDActor`` on the process mesh. The actor configures torch elastic
# environment variables (RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT)
# and executes the training script.
spmd_actors = proc_mesh.spawn("_SPMDActor", SPMDActor)
# Get master address/port from first actor (all coordinates = 0)
# We use IPv4 addresses since short hostnames may not resolve across pods.
first_values = dict.fromkeys(proc_mesh._labels, 0)
master_addr, master_port = await spmd_actors.slice(
**first_values
).get_host_port.call_one(AddrType.IPv4)
# Execute training script across the mesh
await spmd_actors.main.call(master_addr, master_port, [TRAIN_SCRIPT])
print("=" * 60)
print("DDP example completed successfully!")
print("=" * 60)
# Clean up
proc_mesh.stop().get()
if provision:
k8s_job.kill()
Running the Example#
Option 1: Python-native provisioning (recommended)
With --provision, train.py is embedded in the worker pod spec and
written to the filesystem at startup. No manual file copying to workers
is needed.
Deploy the controller with CRD permissions:
kubectl apply -f manifests/ddp_provision.yaml
Run from the controller:
kubectl cp kubernetes_ddp.py monarch-tests/ddp-controller:/tmp/kubernetes_ddp.py kubectl exec -it ddp-controller -n monarch-tests -- \ python /tmp/kubernetes_ddp.py --provision --num_hosts 2 --gpus_per_host 4
Clean up:
kubectl delete -f manifests/ddp_provision.yaml
Option 2: YAML manifest provisioning
Deploy the MonarchMesh:
kubectl apply -f manifests/ddp_mesh.yaml
Wait for pods to be ready:
kubectl get pods -n monarch-tests -l app.kubernetes.io/name=monarch-worker kubectl get pods -n monarch-tests ddp-controller
Copy train.py to each worker pod:
for pod in $(kubectl get pods -n monarch-tests -l app.kubernetes.io/name=monarch-worker -o name); do kubectl cp train.py monarch-tests/${pod#pod/}:/tmp/train.py doneRun from the controller pod:
kubectl cp kubernetes_ddp.py monarch-tests/ddp-controller:/tmp/kubernetes_ddp.py kubectl exec -it ddp-controller -n monarch-tests -- \ python /tmp/kubernetes_ddp.py --num_hosts 2 --gpus_per_host 4
Clean up:
kubectl delete -f manifests/ddp_mesh.yaml
Command-line Arguments#
--provision: Create MonarchMesh CRDs from Python (no worker YAML needed)--num_hosts: Number of worker pods--gpus_per_host: GPUs per pod--mesh_name: Name of the MonarchMesh resource
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run DDP training on Kubernetes")
parser.add_argument(
"--num_hosts",
type=int,
default=2,
help="Number of worker pods",
)
parser.add_argument(
"--gpus_per_host",
type=int,
default=4,
help="GPUs per pod",
)
parser.add_argument(
"--mesh_name",
type=str,
default="ddpmesh",
help="Name of the MonarchMesh resource",
)
parser.add_argument(
"--provision",
action="store_true",
help="Provision MonarchMesh CRDs from Python (no YAML manifests needed)",
)
args = parser.parse_args()
asyncio.run(
main(args.num_hosts, args.gpus_per_host, args.mesh_name, args.provision)
)
Total running time of the script: (0 minutes 0.000 seconds)