monarch.actor#
The monarch.actor module provides the actor-based programming model for distributed computation. See Getting Started for an overview.
Creating Actors#
Actors are created on multidmensional meshes of processes that are launch across hosts. HostMesh represents a mesh of hosts. ProcMesh is a mesh of processes.
- class monarch.actor.HostMesh(hy_host_mesh, region, stream_logs, is_fake_in_process, _initialized_hy_host_mesh, _code_sync_proc_mesh)[source]#
Bases:
MeshTraitHostMesh represents a collection of compute hosts that can be used to spawn processes and actors.
- classmethod allocate_nonblocking(name, extent, allocator, alloc_constraints=None, bootstrap_cmd=None)[source]#
- property region: Region#
- shutdown()[source]#
Shutdown the host mesh and all of its processes. It will throw an exception if this host mesh is a reference rather than owned, which can happen if this HostMesh object was received from a remote actor or if it was produced by slicing.
- Returns:
A future that completes when the host mesh has been shut down.
- Return type:
Future[None]
- async sync_workspace(workspace, conda=False, auto_reload=False)[source]#
Sync local code changes to the remote hosts.
- property initialized: Future[Literal[True]]#
Future completes with ‘True’ when the HostMesh has initialized. Because HostMesh are remote objects, there is no guarentee that the HostMesh is still usable after this completes, only that at some point in the past it was usable.
- flatten(name)#
Returns a new device mesh with all dimensions flattened into a single dimension with the given name.
Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.
- rename(**kwargs)#
Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:
new_mesh = mesh.rename(host=’dp’, gpu=’tp’)
- size(dim=None)#
Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.
- slice(**kwargs)#
Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.
Examples: mesh.slice(batch=3, gpu=slice(2, 6))
- split(**kwargs)#
Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:
new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)
Dimensions not specified will remain unchanged.
- class monarch.actor.ProcMesh(hy_proc_mesh, host_mesh, region, root_region, _initialized_hy_proc_mesh, _device_mesh=None)[source]#
Bases:
MeshTraitA distributed mesh of processes for actor computation.
ProcMesh represents a collection of processes that can spawn and manage actors. It provides the foundation for distributed actor systems by managing process allocation, lifecycle, and communication across multiple hosts and devices.
The ProcMesh supports spawning actors, monitoring process health, logging configuration, and code synchronization across distributed processes.
- property initialized: Future[Literal[True]]#
Future completes with ‘True’ when the ProcMesh has initialized. Because ProcMesh are remote objects, there is no guarentee that the ProcMesh is still usable after this completes, only that at some point in the past it was usable.
- spawn(name, Class, *args, **kwargs)[source]#
Spawn a T-typed actor mesh on the process mesh.
Args: - name: The name of the actor. - Class: The class of the actor to spawn. - args: Positional arguments to pass to the actor’s constructor. - kwargs: Keyword arguments to pass to the actor’s constructor.
Returns: - The actor instance.
- classmethod from_host_mesh(host_mesh, hy_proc_mesh, region, setup=None, _attach_controller_controller=True)[source]#
- activate()[source]#
Activate the device mesh. Operations done from insided this context manager will be distributed tensor operations. Each operation will be excuted on each device in the mesh.
See https://meta-pytorch.org/monarch/generated/examples/distributed_tensors.html for more information
- with mesh.activate():
t = torch.rand(3, 4, device=”cuda”)
- stop()[source]#
This will stop all processes (and actors) in the mesh and release any resources associated with the mesh.
- flatten(name)#
Returns a new device mesh with all dimensions flattened into a single dimension with the given name.
Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.
- rename(**kwargs)#
Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:
new_mesh = mesh.rename(host=’dp’, gpu=’tp’)
- size(dim=None)#
Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.
- slice(**kwargs)#
Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.
Examples: mesh.slice(batch=3, gpu=slice(2, 6))
- split(**kwargs)#
Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:
new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)
Dimensions not specified will remain unchanged.
- monarch.actor.get_or_spawn_controller(name, Class, *args, **kwargs)[source]#
Creates a singleton actor (controller) indexed by name, or if it already exists, returns the existing actor.
- Parameters:
name (str) – The unique name of the actor, used as a key for retrieval.
Class (Type) – The class of the actor to spawn. Must be a subclass of Actor.
*args (Any) – Positional arguments to pass to the actor constructor.
**kwargs (Any) – Keyword arguments to pass to the actor constructor.
- Returns:
A Future that resolves to a reference to the actor.
- Return type:
Future[TActor]
Defining Actors#
All actor classes subclass the Actor base object, which provides them mesh slicing API. Each publicly exposed function of the actor is annotated with @endpoint:
- class monarch.actor.Actor[source]#
Bases:
MeshTrait- flatten(name)#
Returns a new device mesh with all dimensions flattened into a single dimension with the given name.
Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.
- rename(**kwargs)#
Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:
new_mesh = mesh.rename(host=’dp’, gpu=’tp’)
- size(dim=None)#
Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.
- slice(**kwargs)#
Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.
Examples: mesh.slice(batch=3, gpu=slice(2, 6))
- split(**kwargs)#
Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:
new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)
Dimensions not specified will remain unchanged.
- monarch.actor.endpoint(method: Callable[Concatenate[Any, P], Awaitable[R]], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False, instrument: bool = False) EndpointProperty[P, R][source]#
- monarch.actor.endpoint(method: Callable[Concatenate[Any, P], R], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False, instrument: bool = False) EndpointProperty[P, R]
- monarch.actor.endpoint(*, propagate: Propagator = None, explicit_response_port: Literal[False] = False, instrument: bool = False) EndpointIfy
- monarch.actor.endpoint(method: Callable[Concatenate[Any, 'Port[R]', P], Awaitable[None]], *, propagate: Propagator = None, explicit_response_port: Literal[True], instrument: bool = False) EndpointProperty[P, R]
- monarch.actor.endpoint(method: Callable[Concatenate[Any, 'Port[R]', P], None], *, propagate: Propagator = None, explicit_response_port: Literal[True], instrument: bool = False) EndpointProperty[P, R]
- monarch.actor.endpoint(*, propagate: Propagator = None, explicit_response_port: Literal[True], instrument: bool = False) PortedEndpointIfy
Messaging Actor#
Messaging is done through the “adverbs” defined for each endpoint
- class monarch.actor.Endpoint(propagator)[source]#
-
- stream(*args, **kwargs)[source]#
Broadcasts to all actors and yields their responses as a stream / generator.
This enables processing results from multiple actors incrementally as they become available. Returns an async generator of response values.
- class monarch.actor.Future(*, coro)[source]#
Bases:
Generic[R]The Future class wraps a PythonTask, which is a handle to a asyncio coroutine running on the Tokio event loop. These coroutines do not use asyncio or asyncio.Future; instead, they are executed directly on the Tokio runtime. The Future class provides both synchronous (.get()) and asynchronous APIs (await) for interacting with these tasks.
- Parameters:
coro (Coroutine[Any, Any, R] | PythonTask[R]) – The coroutine or PythonTask representing the asynchronous computation.
- class monarch.actor.ValueMesh(shape, values)[source]#
Bases:
MeshTrait,Generic[R]A mesh that holds the result of an endpoint invocation.
- values()[source]#
Generator that iterates over just the values in the mesh.
- Returns:
Values at all coordinates.
- Return type:
Iterable[R]
- flatten(name)#
Returns a new device mesh with all dimensions flattened into a single dimension with the given name.
Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.
- rename(**kwargs)#
Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:
new_mesh = mesh.rename(host=’dp’, gpu=’tp’)
- size(dim=None)#
Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.
- slice(**kwargs)#
Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.
Examples: mesh.slice(batch=3, gpu=slice(2, 6))
- split(**kwargs)#
Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:
new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)
Dimensions not specified will remain unchanged.
- class monarch.actor.ActorError(exception, message='A remote actor call has failed.')[source]#
Bases:
ExceptionDeterministic problem with the user’s code. For example, an OOM resulting in trying to allocate too much GPU memory, or violating some invariant enforced by the various APIs.
- add_note(object, /)#
Exception.add_note(note) – add a note to the exception
- args#
- with_traceback(object, /)#
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- class monarch.actor.Accumulator(endpoint, identity, combine)[source]#
Bases:
Generic[P,R,A]Accumulate the result of a broadcast invocation of an endpoint across a sliced mesh.
- Usage:
>>> counter = Accumulator(Actor.increment, 0, lambda x, y: x + y)
- monarch.actor.send(endpoint, args, kwargs, port=None, selection='all')[source]#
Fire-and-forget broadcast invocation of the endpoint across a given selection of the mesh.
This sends the message to all actors but does not wait for any result. Use the port provided to send the response back to the caller.
- class monarch.actor.Channel[source]#
Bases:
Generic[R]An advanced low level API for a communication channel used for message passing between actors.
Provides static methods to create communication channels with port pairs for sending and receiving messages of type R.
- class monarch.actor.Port(port_ref, instance, rank)[source]#
Bases:
Generic[R]Handle used to send reliable in-order messages through a channel to a PortReceiver.
- class monarch.actor.PortReceiver(mailbox, receiver, monitor=None)[source]#
Bases:
Generic[R]Receiver for messages sent through a communication channel.
Handles receiving R-typed objects sent from a corresponding Port. Asynchronously message reception with optional supervision monitoring for error handling.
- monarch.actor.as_endpoint(not_an_endpoint: Callable[P, R], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False) Endpoint[P, R][source]#
- monarch.actor.as_endpoint(not_an_endpoint: Callable[Concatenate['PortProtocol[R]', P], None], *, propagate: Propagator = None, explicit_response_port: Literal[True]) Endpoint[P, R]
Context API#
Use these functions to look up what actor is running the currently executing code.
- class monarch.actor.Point(rank, extent)[source]#
Bases:
Point,Mapping- extent#
- get(k[, d]) D[k] if k in D, else d. d defaults to None.#
- items() a set-like object providing a view on D's items#
- keys() a set-like object providing a view on D's keys#
- rank#
- size(label)#
- values() an object providing a view on D's values#