Rate this Page

Monarch Actors: Complete Guide#

Table of Contents#

  1. Introduction

  2. What is an Actor?

  3. Actor Lifecycle

  4. Endpoints and Messaging

  5. Actor Context

  6. ActorMesh

  7. Advanced Patterns

  8. Best Practices


Introduction#

Actors are the fundamental building blocks of Monarch applications. They are isolated, concurrent state machines that communicate through asynchronous message passing. This document provides a comprehensive guide to understanding and using actors in Monarch.

For deeper technical details on the Hyperactor actor system implementation, see the Hyperactor Book which provides comprehensive information on the underlying actor runtime and system architecture.


What is an Actor?#

Definition#

An Actor in Monarch is:

  • An isolated unit of computation with private state

  • A message handler that processes messages sequentially

  • A participant in the supervision tree

  • A typed entity with endpoints for remote method invocation

Core Characteristics#

graph LR
    A[Actor] --> B[Private State]
    A --> C[Message Mailbox]
    A --> D[Endpoints]
    A --> E[Lifecycle Hooks]

    style A fill:#855b9d,stroke:#333,stroke-width:2px
    style B fill:#007c88
    style C fill:#007c88
    style D fill:#007c88
    style E fill:#007c88

1. Isolation

  • Each actor has its own private state

  • State is never directly accessed by other actors

  • All interaction happens through messages

2. Sequential Processing

  • Messages are processed one at a time

  • Next message waits until current message completes

  • Guarantees consistent state within actor

3. Asynchronous Communication

  • Messages are sent asynchronously

  • Sender doesn’t block waiting for response

  • Results returned as Futures

4. Location Transparency

  • Actors can be local or remote

  • Same API regardless of location

  • Framework handles serialization and routing


Actor Lifecycle#

Lifecycle Stages#

stateDiagram-v2
    [*] --> Creating: spawn()
    Creating --> Constructing: allocate resources
    Constructing --> Initializing: new()
    Initializing --> Running: init()
    Running --> Running: handle messages
    Running --> Terminating: stop/error
    Terminating --> [*]: cleanup

    Initializing --> Failed: init error
    Running --> Failed: unhandled error
    Failed --> [*]: propagate to parent

1. Creation Phase#

Spawning an Actor:

from monarch.actor import Actor, endpoint, this_proc

class MyActor(Actor):
    def __init__(self, param1: int, param2: str):
        self.param1 = param1
        self.param2 = param2
        self.state = {}

# Spawn on current process
actor = this_proc().spawn("my_actor", MyActor, param1=42, param2="hello")

What Happens:

  1. Runtime allocates resources for the actor

  2. Actor is registered in the runtime

  3. Mailbox is created for message delivery

  4. Actor ID is assigned

2. Construction Phase#

The __init__ Method:

class DataProcessor(Actor):
    def __init__(self, buffer_size: int):
        # Initialize state
        self.buffer_size = buffer_size
        self.buffer = []
        self.processed_count = 0

        # NO MESSAGING YET - actor not fully registered

Important Notes:

  • __init__ is called during actor construction

  • Actor cannot send/receive messages yet

  • No access to runtime services

  • Should only initialize state

3. Initialization Phase#

The init Hook (Optional):

In Rust-based actors or custom Python actors, you can override the init method:

class WorkerActor(Actor):
    def __init__(self, config):
        self.config = config
        self.resources = None

    async def init(self, this):
        # Now we have access to runtime
        # Can spawn child actors, send messages, etc.
        self.resources = await self.acquire_resources()
        self.worker_id = this.actor_id

4. Running Phase#

Once initialized, the actor enters its main lifecycle where it processes messages.

sequenceDiagram
    participant Mailbox
    participant ActorInstance
    participant Handler

    loop Message Processing
        Mailbox->>ActorInstance: Deliver next message
        ActorInstance->>Handler: Invoke endpoint
        Handler->>Handler: Execute logic
        Handler-->>ActorInstance: Return result
        ActorInstance->>ActorInstance: Process result
    end

Message Processing:

  • Actor waits for messages in mailbox

  • Processes one message at a time

  • Invokes corresponding endpoint handler

  • Returns result or sends to port

5. Termination Phase#

Normal Termination:

  • All child actors terminated

  • Mailbox drained

  • Resources cleaned up

  • Parent notified

Error Termination:

  • Unhandled exception in handler

  • Propagated to supervisor

  • Supervision tree handles recovery


Endpoints and Messaging#

What are Endpoints?#

Endpoints are methods decorated with @endpoint that define an actor’s public API.

from monarch.actor import Actor, endpoint

class Calculator(Actor):
    def __init__(self):
        self.history = []

    @endpoint
    def add(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(("add", a, b, result))
        return result

    @endpoint
    def multiply(self, a: int, b: int) -> int:
        result = a * b
        self.history.append(("multiply", a, b, result))
        return result

    @endpoint
    def get_history(self) -> list:
        return self.history

Messaging Adverbs#

Endpoints support different invocation patterns called adverbs:

1. call_one() - Single Actor Call#

Call a single actor and get response.

calc = this_proc().spawn("calc", Calculator)

# Synchronous wait for result
result = calc.add.call_one(5, 3).get()
print(result)  # 8

# Async await
result = await calc.add.call_one(5, 3)

Use When:

  • Calling exactly one actor

  • Need the result

  • Point-to-point communication

Flow Diagram:

sequenceDiagram
    participant Client
    participant ActorInstance

    Client->>ActorInstance: call_one(args)
    Note over ActorInstance: Process message
    ActorInstance-->>Client: Future[Result]
    Client->>Client: .get() waits
    Note over Client: Result available

2. call() - Broadcast and Collect#

Broadcast to all actors in mesh and collect responses.

calcs = procs.spawn("calcs", Calculator)  # Multiple actors

# Call all and collect results
results = calcs.add.call(10, 5).get()
print(results)  # [15, 15, 15, 15, 15, 15, 15, 15]

Use When:

  • Broadcasting to multiple actors

  • Need all responses

  • Parallel computation

Flow Diagram:

sequenceDiagram
    participant Client
    participant Mesh
    participant A1 as Actor 1
    participant A2 as Actor 2
    participant An as Actor N

    Client->>Mesh: call(args)
    par Broadcast
        Mesh->>A1: message
        Mesh->>A2: message
        Mesh->>An: message
    end

    par Responses
        A1-->>Mesh: result
        A2-->>Mesh: result
        An-->>Mesh: result
    end

    Mesh-->>Client: Future[List[Result]]

3. broadcast() - Fire and Forget#

Send to all actors without waiting for responses.

trainers = procs.spawn("trainers", Trainer)

# Fire and forget
trainers.update_learning_rate.broadcast(0.001)

# Continues immediately, no waiting
next_operation()

Use When:

  • Don’t need responses

  • Side effects only

  • Maximum throughput needed

Flow Diagram:

sequenceDiagram
    participant Client
    participant Mesh
    participant Actors

    Client->>Mesh: broadcast(args)
    Note over Client: Returns immediately
    Mesh->>Actors: messages
    Note over Actors: Process async

4. rref() - Distributed Tensor Reference#

Return distributed tensor from actor endpoint.

class LinearActor(Actor):
    def __init__(self):
        self.weights = torch.rand(100, 100)

    @endpoint(propagate=lambda x: x)
    def forward(self, input):
        return input @ self.weights

linear = procs.spawn("linear", LinearActor)

# Returns distributed tensor reference
with procs.activate():
    x = torch.rand(10, 100)
    output = linear.forward.rref(x)  # Distributed tensor
    result = torch.relu(output)

Use When:

  • Working with distributed tensors

  • Need tensor operations across actors

  • Building neural network layers

5. stream() - Streaming Responses#

Stream responses as they arrive.

# Process responses as they come
async for result in workers.compute.stream(data):
    print(f"Got result: {result}")
    process_result(result)

Use When:

  • Want to process results as they arrive

  • Don’t want to wait for all responses

  • Pipeline processing

Message Ordering#

Monarch guarantees FIFO (First-In-First-Out) message ordering:

actor.method1.call_one(arg1)  # Message M1
actor.method2.call_one(arg2)  # Message M2
actor.method3.call_one(arg3)  # Message M3

# Actor processes: M1, then M2, then M3 (in order)

Ordering Diagram:

sequenceDiagram
    participant Sender
    participant ActorMailbox
    participant ActorInstance

    Sender->>ActorMailbox: M1
    Sender->>ActorMailbox: M2
    Sender->>ActorMailbox: M3

    Note over ActorMailbox: FIFO Queue: [M1, M2, M3]

    ActorMailbox->>ActorInstance: Deliver M1
    Note over ActorInstance: Process M1
    ActorMailbox->>ActorInstance: Deliver M2
    Note over ActorInstance: Process M2
    ActorMailbox->>ActorInstance: Deliver M3
    Note over ActorInstance: Process M3

Actor Context#

What is Context?#

The Context provides runtime information about the current execution environment.

from monarch.actor import context, Actor, endpoint

class ContextAwareActor(Actor):
    @endpoint
    def get_info(self):
        ctx = context()

        # Get current rank
        rank = ctx.message_rank

        # Get actor instance
        actor_inst = ctx.actor_instance

        # Get process reference
        proc = ctx.proc

        return {
            "rank": rank,
            "actor_id": actor_inst.actor_id,
            "proc_id": actor_inst.proc_id
        }

Context Information#

1. Message Rank#

The position in the mesh for the current message.

@endpoint
def process(self):
    rank = context().message_rank
    # rank is a dict: {"hosts": 0, "gpus": 3}

    if rank["gpus"] == 0:
        print("I'm the first GPU!")

Important: Message rank may differ from actor rank when slicing:

# Actor mesh: {"hosts": 4, "gpus": 8}
actors = procs.spawn("actors", MyActor)

# Slice to first 2 GPUs
subset = actors.slice(gpus=slice(0, 2))

# Inside actor:
# - actor_instance.rank = {"hosts": X, "gpus": Y}  (original position)
# - message_rank = {"hosts": X, "gpus": 0 or 1}     (position in slice)

2. Actor Instance#

Information about the actor itself.

@endpoint
def get_details(self):
    inst = context().actor_instance

    return {
        "actor_id": inst.actor_id,  # Unique actor ID
        "rank": inst.rank,          # Position in mesh
        "proc": inst.proc,          # Process reference
    }

3. Process Reference#

Reference to the process hosting this actor.

@endpoint
def spawn_sibling(self):
    # Get our process
    proc = context().proc

    # Spawn sibling actor on same process
    sibling = proc.spawn("sibling", SiblingActor)
    return sibling

Context Usage Diagram#

graph TD
    A[context] --> B[message_rank]
    A --> C[actor_instance]
    A --> D[proc]

    C --> C1[actor_id]
    C --> C2[rank]
    C --> C3[proc_id]

    D --> D1[spawn]
    D --> D2[host_mesh]

    style A fill:#007c88,stroke:#333,stroke-width:2px

ActorMesh#

What is ActorMesh?#

An ActorMesh is a collection of actor instances organized in a multidimensional structure. It provides:

  • Structured organization of actors

  • Slicing and indexing operations

  • Broadcast messaging

  • Point-to-point references

Creating ActorMesh#

# Create process mesh
procs = this_host().spawn_procs(per_host={"gpus": 8})

# Spawn actors (creates ActorMesh)
actors = procs.spawn("my_actors", MyActor, init_param=value)

print(type(actors))  # ActorMesh
print(actors.extent)  # {"gpus": 8}

Mesh Structure#

graph TD
    subgraph ActorMesh
        subgraph Row1[GPU 0]
            A0[Actor Instance]
        end
        subgraph Row2[GPU 1]
            A1[Actor Instance]
        end
        subgraph Row3[GPU 2]
            A2[Actor Instance]
        end
        subgraph Row4[GPU 3]
            A3[Actor Instance]
        end
        subgraph Row5[GPU 4]
            A4[Actor Instance]
        end
        subgraph Row6[GPU 5]
            A5[Actor Instance]
        end
        subgraph Row7[GPU 6]
            A6[Actor Instance]
        end
        subgraph Row8[GPU 7]
            A7[Actor Instance]
        end
    end

    style A0 fill:#007c88
    style A1 fill:#007c88
    style A2 fill:#007c88
    style A3 fill:#007c88
    style A4 fill:#007c88
    style A5 fill:#007c88
    style A6 fill:#007c88
    style A7 fill:#007c88

Slicing Operations#

Basic Slicing#

# Get single actor by index
actor_0 = actors.slice(gpus=0)

# Get range of actors
first_four = actors.slice(gpus=slice(0, 4))

# Multidimensional slicing
subset = actors.slice(hosts=slice(0, 2), gpus=slice(0, 4))

Slicing Patterns#

# All actors in mesh
actors.method.call()

# First GPU on each host
actors.slice(gpus=0).method.call()

# GPUs 0-3 on all hosts
actors.slice(gpus=slice(0, 4)).method.call()

# Specific point
actors.slice(hosts=1, gpus=3).method.call_one()

Slicing Visualization:

graph TB
    subgraph Full Mesh [8x4]
        direction LR
        subgraph R0[Host 0]
            H0G0[GPU 0]
            H0G1[GPU 1]
            H0G2[GPU 2]
            H0G3[GPU 3]
        end
        subgraph R1[Host 1]
            H1G0[GPU 0]
            H1G1[GPU 1]
            H1G2[GPU 2]
            H1G3[GPU 3]
        end
    end

    subgraph Slice Example [hosts=0, gpus=0:2]
        S0[Host 0, GPU 0]
        S1[Host 0, GPU 1]
    end

    H0G0 -.->|selected| S0
    H0G1 -.->|selected| S1

    style H0G0 fill:#007c88
    style H0G1 fill:#007c88
    style S0 fill:#855b9d
    style S1 fill:#855b9d

Passing Actor References#

Actors can be passed as arguments to other actors:

class Server(Actor):
    @endpoint
    def serve(self):
        return "data"

class Client(Actor):
    def __init__(self, server: Server):
        # Select corresponding server
        rank = context().actor_instance.rank
        self.server = server.slice(**rank)

    @endpoint
    def fetch(self):
        return self.server.serve.call_one().get()

# Create both meshes
servers = server_procs.spawn("servers", Server)
clients = client_procs.spawn("clients", Client, servers)

# Clients can now communicate with servers
data = clients.fetch.call().get()

Advanced Patterns#

1. Explicit Response Ports#

For out-of-order responses or background processing:

from monarch.actor import Port, endpoint
import threading
from queue import Queue

class AsyncProcessor(Actor):
    def __init__(self):
        self.queue = Queue()
        self.worker = threading.Thread(target=self._process_loop)
        self.worker.start()

    @endpoint(explicit_response_port=True)
    def process(self, port: Port[str], data: str):
        # Add to queue with response port
        self.queue.put((port, data))

    def _process_loop(self):
        while True:
            port, data = self.queue.get()
            # Process data
            result = self._heavy_computation(data)
            # Send response when ready
            port.send(result)

2. Actor Supervision#

Custom supervision for fine-grained error handling:

class SupervisorActor(Actor):
    def __init__(self):
        self.children = []

    def __supervise__(self, event):
        print(f"Supervision event: {event}")

        if event.is_recoverable():
            # Restart failed actor
            self.restart_child(event.actor_id)
            return True  # Handled
        else:
            # Propagate to parent
            return False

    @endpoint
    def spawn_worker(self):
        # Spawn supervised child
        worker = context().proc.spawn("worker", WorkerActor)
        self.children.append(worker)
        return worker

3. Channels for Direct Communication#

Low-level messaging with channels:

from monarch.actor import Channel, Port, PortReceiver

class Producer(Actor):
    def __init__(self):
        self.consumers = []

    @endpoint
    def register_consumer(self, port: Port):
        self.consumers.append(port)

    @endpoint
    def produce(self, data):
        # Send to all registered consumers
        for port in self.consumers:
            port.send(data)

class Consumer(Actor):
    def __init__(self, producer: Producer):
        # Create channel
        self.port, self.receiver = Channel.open()

        # Register with producer
        producer = producer.slice(gpus=0)
        producer.register_consumer.call_one(self.port)

    @endpoint
    async def consume(self):
        # Receive from channel
        data = await self.receiver.recv()
        return data

4. Value Mesh for State Sharing#

Share readonly state across actor mesh:

from monarch.actor import ValueMesh

class ConfigActor(Actor):
    def __init__(self, config_mesh: ValueMesh[dict]):
        # Each actor gets corresponding config
        rank = context().actor_instance.rank
        self.config = config_mesh[rank]

    @endpoint
    def get_config(self):
        return self.config

# Create value mesh
configs = [{"id": i, "param": i * 10} for i in range(8)]
config_mesh = ValueMesh.from_list(configs, extent={"gpus": 8})

# Spawn actors with value mesh
actors = procs.spawn("actors", ConfigActor, config_mesh)

Best Practices#

1. Actor Design#

DO:

  • Keep actors focused on single responsibility

  • Use immutable messages when possible

  • Handle errors gracefully

  • Document endpoint contracts

DON’T:

  • Share mutable state between actors

  • Block in endpoints (use async)

  • Ignore supervision events

  • Create circular dependencies

2. State Management#

class GoodActor(Actor):
    def __init__(self):
        # All state in __init__
        self.counter = 0
        self.data = []

    @endpoint
    def update(self, value):
        # Modify state atomically
        self.counter += 1
        self.data.append(value)
        return self.counter

# Bad: External state
global_state = {}  # ❌ Don't do this

class BadActor(Actor):
    @endpoint
    def update(self, key, value):
        global_state[key] = value  # ❌ Race conditions!

3. Endpoint Design#

class WellDesignedActor(Actor):
    # ✅ Clear, typed endpoints
    @endpoint
    def process_batch(self, batch_id: int, data: list[float]) -> dict:
        """
        Process a batch of data.

        Args:
            batch_id: Unique batch identifier
            data: List of data points to process

        Returns:
            Dictionary with processing results
        """
        results = self._process(data)
        return {
            "batch_id": batch_id,
            "results": results,
            "processed_at": time.time()
        }

4. Error Handling#

class ResilientActor(Actor):
    @endpoint
    def risky_operation(self, data):
        try:
            result = self._process(data)
            return {"success": True, "result": result}
        except ValueError as e:
            # Log and return error
            logger.error(f"Invalid data: {e}")
            return {"success": False, "error": str(e)}
        except Exception as e:
            # Unexpected error - let supervision handle it
            logger.error(f"Unexpected error: {e}")
            raise

5. Performance Tips#

class OptimizedActor(Actor):
    def __init__(self):
        # ✅ Pre-allocate resources
        self.buffer = torch.zeros(1000, 1000)
        self.cache = {}

    @endpoint
    def compute(self, input_data):
        # ✅ Reuse buffers
        self.buffer.copy_(input_data)
        result = self._compute(self.buffer)

        # ✅ Cache expensive computations
        if input_data.id in self.cache:
            return self.cache[input_data.id]

        self.cache[input_data.id] = result
        return result

6. Testing Actors#

import pytest
from monarch.actor import this_proc

@pytest.mark.asyncio
async def test_calculator():
    # Spawn actor
    calc = this_proc().spawn("test_calc", Calculator)

    # Test endpoint
    result = await calc.add.call_one(5, 3)
    assert result == 8

    # Test history
    history = await calc.get_history.call_one()
    assert len(history) == 1
    assert history[0] == ("add", 5, 3, 8)

Summary#

Key Takeaways#

  1. Actors are Isolated: Private state, message-based communication

  2. Sequential Processing: One message at a time, FIFO ordering

  3. Typed Endpoints: Clear API with type hints

  4. Location Transparent: Same code for local/remote actors

  5. Supervision Tree: Automatic error propagation and handling

  6. ActorMesh: Multidimensional organization with slicing

  7. Context API: Runtime information and services

Actor Lifecycle Recap#

graph LR
    A[Spawn] --> B[__init__]
    B --> C[init hook]
    C --> D[Running]
    D --> E[Handle Messages]
    E --> D
    D --> F[Terminate]

    style A fill:#855b9d
    style B fill:#007c88
    style C fill:#007c88
    style D fill:#007c88
    style E fill:#13a3a4
    style F fill:#0072c7

Quick Reference Card#

Basic Actor#

from monarch.actor import Actor, endpoint, this_proc

class MyActor(Actor):
    def __init__(self, param):
        self.param = param

    @endpoint
    def method(self, arg):
        return self.param + arg

actor = this_proc().spawn("name", MyActor, param=10)
result = actor.method.call_one(5).get()  # 15

Actor Mesh#

# Spawn mesh
actors = procs.spawn("actors", MyActor, param=10)

# Broadcast and collect
results = actors.method.call(5).get()

# Fire and forget
actors.method.broadcast(5)

# Slice
subset = actors.slice(gpus=slice(0, 4))

Context#

from monarch.actor import context

@endpoint
def my_endpoint(self):
    ctx = context()
    rank = ctx.message_rank
    proc = ctx.proc

Supervision#

class MySupervisor(Actor):
    def __supervise__(self, event):
        # Handle or propagate
        return True  # Handled