Rate this Page

monarch.actor#

The monarch.actor module provides the actor-based programming model for distributed computation. See Getting Started for an overview.

Creating Actors#

Actors are created on multidmensional meshes of processes that are launch across hosts. HostMesh represents a mesh of hosts. ProcMesh is a mesh of processes.

class monarch.actor.HostMesh(shape, allocator, alloc_constraints=None)[source]#

Bases: MeshTrait

HostMesh represents a collection of compute hosts that can be used to spawn processes and actors. The class requires you to provide your AllocateMixin that interfaces with the underlying resource allocator of your choice.

spawn_procs(per_host=None, bootstrap=None)[source]#

Start new processes on this host mesh. By default this starts one proc on each host in the mesh. Additional procs can be started using per_host to specify the local shape, e.g.`

per_host = {‘gpus’: 8}

Will create a proc mesh with an additional ‘gpus’ dimension.

bootstrap is a function that will be run at startup on each proc and can be used to e.g. configure CUDA or NCCL. We guarantee that CUDA has not been initialized before boostrap is called.

TODO: For now, a new allocator is created for every new ProcMesh.

property extent: Extent#
flatten(name)#

Returns a new device mesh with all dimensions flattened into a single dimension with the given name.

Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.

rename(**kwargs)#

Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:

new_mesh = mesh.rename(host=’dp’, gpu=’tp’)

size(dim=None)#

Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.

property sizes: dict[str, int]#
slice(**kwargs)#

Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.

Examples: mesh.slice(batch=3, gpu=slice(2, 6))

split(**kwargs)#

Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:

new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)

Dimensions not specified will remain unchanged.

class monarch.actor.ProcMesh(hy_proc_mesh, shape, _device_mesh=None)[source]#

Bases: MeshTrait

A distributed mesh of processes for actor computation.

ProcMesh represents a collection of processes that can spawn and manage actors. It provides the foundation for distributed actor systems by managing process allocation, lifecycle, and communication across multiple hosts and devices.

The ProcMesh supports spawning actors, monitoring process health, logging configuration, and code synchronization across distributed processes.

property initialized: Future[Literal[True]]#

Future completes with ‘True’ when the ProcMesh has initialized. Because ProcMesh are remote objects, there is no guarentee that the ProcMesh is still usable after this completes, only that at some point in the past it was usable.

property host_mesh: HostMesh#
spawn(name, Class, *args, **kwargs)[source]#

Spawn a T-typed actor mesh on the process mesh.

Args: - name: The name of the actor. - Class: The class of the actor to spawn. - args: Positional arguments to pass to the actor’s constructor. - kwargs: Keyword arguments to pass to the actor’s constructor.

Returns: - The actor instance.

Usage:
>>> procs: ProcMesh = host_mesh.spawn_procs(per_host={"gpus": 8})
>>> counters: Counter = procs.spawn("counters", Counter, 0)
to_table()[source]#
activate()[source]#
rank_tensor(dim)[source]#
rank_tensors()[source]#
stop()[source]#

This will stop all processes (and actors) in the mesh and release any resources associated with the mesh.

property extent: Extent#
flatten(name)#

Returns a new device mesh with all dimensions flattened into a single dimension with the given name.

Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.

rename(**kwargs)#

Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:

new_mesh = mesh.rename(host=’dp’, gpu=’tp’)

size(dim=None)#

Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.

property sizes: dict[str, int]#
slice(**kwargs)#

Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.

Examples: mesh.slice(batch=3, gpu=slice(2, 6))

split(**kwargs)#

Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:

new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)

Dimensions not specified will remain unchanged.

monarch.actor.get_or_spawn_controller(name, Class, *args, **kwargs)[source]#

Creates a singleton actor (controller) indexed by name, or if it already exists, returns the existing actor.

Parameters:
  • name (str) – The unique name of the actor, used as a key for retrieval.

  • Class (Type) – The class of the actor to spawn. Must be a subclass of Actor.

  • *args (Any) – Positional arguments to pass to the actor constructor.

  • **kwargs (Any) – Keyword arguments to pass to the actor constructor.

Returns:

A Future that resolves to a reference to the actor.

Return type:

Future[_ActorType]

Defining Actors#

All actor classes subclass the Actor base object, which provides them mesh slicing API. Each publicly exposed function of the actor is annotated with @endpoint:

class monarch.actor.Actor[source]#

Bases: MeshTrait

property logger: Logger[source]#
property initialized#
property extent: Extent#
flatten(name)#

Returns a new device mesh with all dimensions flattened into a single dimension with the given name.

Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.

rename(**kwargs)#

Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:

new_mesh = mesh.rename(host=’dp’, gpu=’tp’)

size(dim=None)#

Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.

property sizes: dict[str, int]#
slice(**kwargs)#

Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.

Examples: mesh.slice(batch=3, gpu=slice(2, 6))

split(**kwargs)#

Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:

new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)

Dimensions not specified will remain unchanged.

monarch.actor.endpoint(method: Callable[Concatenate[Any, P], Awaitable[R]], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False) EndpointProperty[P, R][source]#
monarch.actor.endpoint(method: Callable[Concatenate[Any, P], R], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False) EndpointProperty[P, R]
monarch.actor.endpoint(*, propagate: Propagator = None, explicit_response_port: Literal[False] = False) EndpointIfy
monarch.actor.endpoint(method: Callable[Concatenate[Any, 'Port[R]', P], Awaitable[None]], *, propagate: Propagator = None, explicit_response_port: Literal[True]) EndpointProperty[P, R]
monarch.actor.endpoint(method: Callable[Concatenate[Any, 'Port[R]', P], None], *, propagate: Propagator = None, explicit_response_port: Literal[True]) EndpointProperty[P, R]
monarch.actor.endpoint(*, propagate: Propagator = None, explicit_response_port: Literal[True]) PortedEndpointIfy

Messaging Actor#

Messaging is done through the “adverbs” defined for each endpoint

class monarch.actor.Endpoint(propagator)[source]#

Bases: ABC, Generic[P, R]

call_one(*args, **kwargs)[source]#
call(*args, **kwargs)[source]#
stream(*args, **kwargs)[source]#

Broadcasts to all actors and yields their responses as a stream / generator.

This enables processing results from multiple actors incrementally as they become available. Returns an async generator of response values.

broadcast(*args, **kwargs)[source]#

Fire-and-forget broadcast to all actors without waiting for actors to acknowledge receipt.

In other words, the return of this method does not guarrantee the delivery of the message.

rref(*args, **kwargs)[source]#
class monarch.actor.Future(*, coro)[source]#

Bases: Generic[R]

__init__(*, coro)[source]#
get(timeout=None)[source]#
result(timeout=None)[source]#
exception(timeout=None)[source]#
class monarch.actor.ValueMesh(shape, values)[source]#

Bases: MeshTrait, Generic[R]

A mesh that holds the result of an endpoint invocation.

item(**kwargs)[source]#

Get the value at the given coordinates.

Parameters:

kwargs – Coordinates to get the value at.

Returns:

Value at the given coordinate.

Raises:

KeyError – If invalid coordinates are provided.

Return type:

R

items()[source]#

Generator that returns values for the provided coordinates.

Returns:

Values at all coordinates.

Return type:

Iterable[Tuple[Point, R]]

property extent: Extent#
flatten(name)#

Returns a new device mesh with all dimensions flattened into a single dimension with the given name.

Currently this supports only dense meshes: that is, all ranks must be contiguous in the mesh.

rename(**kwargs)#

Returns a new device mesh with some of dimensions renamed. Dimensions not mentioned are retained:

new_mesh = mesh.rename(host=’dp’, gpu=’tp’)

size(dim=None)#

Returns the number of elements (total) of the subset of mesh asked for. If dims is None, returns the total number of devices in the mesh.

property sizes: dict[str, int]#
slice(**kwargs)#

Select along named dimensions. Integer values remove dimensions, slice objects keep dimensions but restrict them.

Examples: mesh.slice(batch=3, gpu=slice(2, 6))

split(**kwargs)#

Returns a new device mesh with some dimensions of this mesh split. For instance, this call splits the host dimension into dp and pp dimensions, The size of ‘pp’ is specified and the dimension size is derived from it:

new_mesh = mesh.split(host=(‘dp’, ‘pp’), gpu=(‘tp’,’cp’), pp=16, cp=2)

Dimensions not specified will remain unchanged.

class monarch.actor.ActorError(exception, message='A remote actor call has failed.')[source]#

Bases: Exception

Deterministic problem with the user’s code. For example, an OOM resulting in trying to allocate too much GPU memory, or violating some invariant enforced by the various APIs.

__init__(exception, message='A remote actor call has failed.')[source]#
add_note(object, /)#

Exception.add_note(note) – add a note to the exception

args#
with_traceback(object, /)#

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class monarch.actor.Accumulator(endpoint, identity, combine)[source]#

Bases: Generic[P, R, A]

Accumulate the result of a broadcast invocation of an endpoint across a sliced mesh.

Usage:
>>> counter = Accumulator(Actor.increment, 0, lambda x, y: x + y)
__init__(endpoint, identity, combine)[source]#
Parameters:
  • endpoint (Endpoint[~P, R]) – Endpoint to accumulate the result of.

  • identity (A) – Initial value of the accumulated value before the first combine invocation.

  • combine (Callable[[A, R], A]) – Lambda invoked for combining the result of the endpoint with the accumulated value.

accumulate(*args, **kwargs)[source]#

Accumulate the result of the endpoint invocation.

Parameters:
  • args (~P) – Arguments to pass to the endpoint.

  • kwargs (~P) – Keyword arguments to pass to the endpoint.

Returns:

Future that resolves to the accumulated value.

Return type:

Future[A]

monarch.actor.send(endpoint, args, kwargs, port=None, selection='all')[source]#

Fire-and-forget broadcast invocation of the endpoint across a given selection of the mesh.

This sends the message to all actors but does not wait for any result. Use the port provided to send the response back to the caller.

Parameters:
  • endpoint (Endpoint[~P, R]) – Endpoint to invoke.

  • args (Tuple[Any, ...]) – Arguments to pass to the endpoint.

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to the endpoint.

  • port (Port | None) – Handle to send the response to.

  • selection (Literal['all', 'choose']) – Selection query representing a subset of the mesh.

class monarch.actor.Channel[source]#

Bases: Generic[R]

An advanced low level API for a communication channel used for message passing between actors.

Provides static methods to create communication channels with port pairs for sending and receiving messages of type R.

static open(once=False)[source]#
static open_ranked(once=False)[source]#
class monarch.actor.Port(port_ref, instance, rank)[source]#

Bases: Generic[R]

Handle used to send reliable in-order messages through a channel to a PortReceiver.

send(obj)[source]#

Fire-and-forget send R-typed objects in order through a channel to its corresponding PortReceiver.

Parameters:

obj (R) – R-typed object to send.

exception(obj)[source]#
class monarch.actor.PortReceiver(mailbox, receiver, monitor=None)[source]#

Bases: Generic[R]

Receiver for messages sent through a communication channel.

Handles receiving R-typed objects sent from a corresponding Port. Asynchronously message reception with optional supervision monitoring for error handling.

recv()[source]#
ranked()[source]#
monarch.actor.as_endpoint(not_an_endpoint: Callable[P, R], *, propagate: Propagator = None, explicit_response_port: Literal[False] = False) Endpoint[P, R][source]#
monarch.actor.as_endpoint(not_an_endpoint: Callable[Concatenate['PortProtocol[R]', P], None], *, propagate: Propagator = None, explicit_response_port: Literal[True]) Endpoint[P, R]

Context API#

Use these functions to look up what actor is running the currently executing code.

monarch.actor.current_actor_name()[source]#
monarch.actor.current_rank()[source]#
monarch.actor.current_size()[source]#
monarch.actor.context()[source]#
class monarch.actor.Point(rank, extent)[source]#

Bases: Point, Mapping

extent#
get(k[, d]) D[k] if k in D, else d.  d defaults to None.#
items() a set-like object providing a view on D's items#
keys() a set-like object providing a view on D's keys#
rank#
size(label)#
values() an object providing a view on D's values#
class monarch.actor.Extent(labels, sizes)#

Bases: object

keys()#
nelements#
region#