Monarch Actors: Complete Guide#
Table of Contents#
Introduction#
Actors are the fundamental building blocks of Monarch applications. They are isolated, concurrent state machines that communicate through asynchronous message passing. This document provides a comprehensive guide to understanding and using actors in Monarch.
For deeper technical details on the Hyperactor actor system implementation, see the Hyperactor Book which provides comprehensive information on the underlying actor runtime and system architecture.
What is an Actor?#
Definition#
An Actor in Monarch is:
An isolated unit of computation with private state
A message handler that processes messages sequentially
A participant in the supervision tree
A typed entity with endpoints for remote method invocation
Core Characteristics#
graph LR
A[Actor] --> B[Private State]
A --> C[Message Mailbox]
A --> D[Endpoints]
A --> E[Lifecycle Hooks]
style A fill:#855b9d,stroke:#333,stroke-width:2px
style B fill:#007c88
style C fill:#007c88
style D fill:#007c88
style E fill:#007c88
1. Isolation
Each actor has its own private state
State is never directly accessed by other actors
All interaction happens through messages
2. Sequential Processing
Messages are processed one at a time
Next message waits until current message completes
Guarantees consistent state within actor
3. Asynchronous Communication
Messages are sent asynchronously
Sender doesn’t block waiting for response
Results returned as Futures
4. Location Transparency
Actors can be local or remote
Same API regardless of location
Framework handles serialization and routing
Actor Lifecycle#
Lifecycle Stages#
stateDiagram-v2
[*] --> Creating: spawn()
Creating --> Constructing: allocate resources
Constructing --> Initializing: new()
Initializing --> Running: init()
Running --> Running: handle messages
Running --> Terminating: stop/error
Terminating --> [*]: cleanup
Initializing --> Failed: init error
Running --> Failed: unhandled error
Failed --> [*]: propagate to parent
1. Creation Phase#
Spawning an Actor:
from monarch.actor import Actor, endpoint, this_proc
class MyActor(Actor):
def __init__(self, param1: int, param2: str):
self.param1 = param1
self.param2 = param2
self.state = {}
# Spawn on current process
actor = this_proc().spawn("my_actor", MyActor, param1=42, param2="hello")
What Happens:
Runtime allocates resources for the actor
Actor is registered in the runtime
Mailbox is created for message delivery
Actor ID is assigned
2. Construction Phase#
The __init__ Method:
class DataProcessor(Actor):
def __init__(self, buffer_size: int):
# Initialize state
self.buffer_size = buffer_size
self.buffer = []
self.processed_count = 0
# NO MESSAGING YET - actor not fully registered
Important Notes:
__init__is called during actor constructionActor cannot send/receive messages yet
No access to runtime services
Should only initialize state
3. Initialization Phase#
The init Hook (Optional):
In Rust-based actors or custom Python actors, you can override the init method:
class WorkerActor(Actor):
def __init__(self, config):
self.config = config
self.resources = None
async def init(self, this):
# Now we have access to runtime
# Can spawn child actors, send messages, etc.
self.resources = await self.acquire_resources()
self.worker_id = this.actor_id
4. Running Phase#
Once initialized, the actor enters its main lifecycle where it processes messages.
sequenceDiagram
participant Mailbox
participant ActorInstance
participant Handler
loop Message Processing
Mailbox->>ActorInstance: Deliver next message
ActorInstance->>Handler: Invoke endpoint
Handler->>Handler: Execute logic
Handler-->>ActorInstance: Return result
ActorInstance->>ActorInstance: Process result
end
Message Processing:
Actor waits for messages in mailbox
Processes one message at a time
Invokes corresponding endpoint handler
Returns result or sends to port
5. Termination Phase#
Normal Termination:
All child actors terminated
Mailbox drained
Resources cleaned up
Parent notified
Error Termination:
Unhandled exception in handler
Propagated to supervisor
Supervision tree handles recovery
Endpoints and Messaging#
What are Endpoints?#
Endpoints are methods decorated with @endpoint that define an actor’s public API.
from monarch.actor import Actor, endpoint
class Calculator(Actor):
def __init__(self):
self.history = []
@endpoint
def add(self, a: int, b: int) -> int:
result = a + b
self.history.append(("add", a, b, result))
return result
@endpoint
def multiply(self, a: int, b: int) -> int:
result = a * b
self.history.append(("multiply", a, b, result))
return result
@endpoint
def get_history(self) -> list:
return self.history
Messaging Adverbs#
Endpoints support different invocation patterns called adverbs:
1. call_one() - Single Actor Call#
Call a single actor and get response.
calc = this_proc().spawn("calc", Calculator)
# Synchronous wait for result
result = calc.add.call_one(5, 3).get()
print(result) # 8
# Async await
result = await calc.add.call_one(5, 3)
Use When:
Calling exactly one actor
Need the result
Point-to-point communication
Flow Diagram:
sequenceDiagram
participant Client
participant ActorInstance
Client->>ActorInstance: call_one(args)
Note over ActorInstance: Process message
ActorInstance-->>Client: Future[Result]
Client->>Client: .get() waits
Note over Client: Result available
2. call() - Broadcast and Collect#
Broadcast to all actors in mesh and collect responses.
calcs = procs.spawn("calcs", Calculator) # Multiple actors
# Call all and collect results
results = calcs.add.call(10, 5).get()
print(results) # [15, 15, 15, 15, 15, 15, 15, 15]
Use When:
Broadcasting to multiple actors
Need all responses
Parallel computation
Flow Diagram:
sequenceDiagram
participant Client
participant Mesh
participant A1 as Actor 1
participant A2 as Actor 2
participant An as Actor N
Client->>Mesh: call(args)
par Broadcast
Mesh->>A1: message
Mesh->>A2: message
Mesh->>An: message
end
par Responses
A1-->>Mesh: result
A2-->>Mesh: result
An-->>Mesh: result
end
Mesh-->>Client: Future[List[Result]]
3. broadcast() - Fire and Forget#
Send to all actors without waiting for responses.
trainers = procs.spawn("trainers", Trainer)
# Fire and forget
trainers.update_learning_rate.broadcast(0.001)
# Continues immediately, no waiting
next_operation()
Use When:
Don’t need responses
Side effects only
Maximum throughput needed
Flow Diagram:
sequenceDiagram
participant Client
participant Mesh
participant Actors
Client->>Mesh: broadcast(args)
Note over Client: Returns immediately
Mesh->>Actors: messages
Note over Actors: Process async
4. rref() - Distributed Tensor Reference#
Return distributed tensor from actor endpoint.
class LinearActor(Actor):
def __init__(self):
self.weights = torch.rand(100, 100)
@endpoint(propagate=lambda x: x)
def forward(self, input):
return input @ self.weights
linear = procs.spawn("linear", LinearActor)
# Returns distributed tensor reference
with procs.activate():
x = torch.rand(10, 100)
output = linear.forward.rref(x) # Distributed tensor
result = torch.relu(output)
Use When:
Working with distributed tensors
Need tensor operations across actors
Building neural network layers
5. stream() - Streaming Responses#
Stream responses as they arrive.
# Process responses as they come
async for result in workers.compute.stream(data):
print(f"Got result: {result}")
process_result(result)
Use When:
Want to process results as they arrive
Don’t want to wait for all responses
Pipeline processing
Message Ordering#
Monarch guarantees FIFO (First-In-First-Out) message ordering:
actor.method1.call_one(arg1) # Message M1
actor.method2.call_one(arg2) # Message M2
actor.method3.call_one(arg3) # Message M3
# Actor processes: M1, then M2, then M3 (in order)
Ordering Diagram:
sequenceDiagram
participant Sender
participant ActorMailbox
participant ActorInstance
Sender->>ActorMailbox: M1
Sender->>ActorMailbox: M2
Sender->>ActorMailbox: M3
Note over ActorMailbox: FIFO Queue: [M1, M2, M3]
ActorMailbox->>ActorInstance: Deliver M1
Note over ActorInstance: Process M1
ActorMailbox->>ActorInstance: Deliver M2
Note over ActorInstance: Process M2
ActorMailbox->>ActorInstance: Deliver M3
Note over ActorInstance: Process M3
Actor Context#
What is Context?#
The Context provides runtime information about the current execution environment.
from monarch.actor import context, Actor, endpoint
class ContextAwareActor(Actor):
@endpoint
def get_info(self):
ctx = context()
# Get current rank
rank = ctx.message_rank
# Get actor instance
actor_inst = ctx.actor_instance
# Get process reference
proc = ctx.proc
return {
"rank": rank,
"actor_id": actor_inst.actor_id,
"proc_id": actor_inst.proc_id
}
Context Information#
1. Message Rank#
The position in the mesh for the current message.
@endpoint
def process(self):
rank = context().message_rank
# rank is a dict: {"hosts": 0, "gpus": 3}
if rank["gpus"] == 0:
print("I'm the first GPU!")
Important: Message rank may differ from actor rank when slicing:
# Actor mesh: {"hosts": 4, "gpus": 8}
actors = procs.spawn("actors", MyActor)
# Slice to first 2 GPUs
subset = actors.slice(gpus=slice(0, 2))
# Inside actor:
# - actor_instance.rank = {"hosts": X, "gpus": Y} (original position)
# - message_rank = {"hosts": X, "gpus": 0 or 1} (position in slice)
2. Actor Instance#
Information about the actor itself.
@endpoint
def get_details(self):
inst = context().actor_instance
return {
"actor_id": inst.actor_id, # Unique actor ID
"rank": inst.rank, # Position in mesh
"proc": inst.proc, # Process reference
}
3. Process Reference#
Reference to the process hosting this actor.
@endpoint
def spawn_sibling(self):
# Get our process
proc = context().proc
# Spawn sibling actor on same process
sibling = proc.spawn("sibling", SiblingActor)
return sibling
Context Usage Diagram#
graph TD
A[context] --> B[message_rank]
A --> C[actor_instance]
A --> D[proc]
C --> C1[actor_id]
C --> C2[rank]
C --> C3[proc_id]
D --> D1[spawn]
D --> D2[host_mesh]
style A fill:#007c88,stroke:#333,stroke-width:2px
ActorMesh#
What is ActorMesh?#
An ActorMesh is a collection of actor instances organized in a multidimensional structure. It provides:
Structured organization of actors
Slicing and indexing operations
Broadcast messaging
Point-to-point references
Creating ActorMesh#
# Create process mesh
procs = this_host().spawn_procs(per_host={"gpus": 8})
# Spawn actors (creates ActorMesh)
actors = procs.spawn("my_actors", MyActor, init_param=value)
print(type(actors)) # ActorMesh
print(actors.extent) # {"gpus": 8}
Mesh Structure#
graph TD
subgraph ActorMesh
subgraph Row1[GPU 0]
A0[Actor Instance]
end
subgraph Row2[GPU 1]
A1[Actor Instance]
end
subgraph Row3[GPU 2]
A2[Actor Instance]
end
subgraph Row4[GPU 3]
A3[Actor Instance]
end
subgraph Row5[GPU 4]
A4[Actor Instance]
end
subgraph Row6[GPU 5]
A5[Actor Instance]
end
subgraph Row7[GPU 6]
A6[Actor Instance]
end
subgraph Row8[GPU 7]
A7[Actor Instance]
end
end
style A0 fill:#007c88
style A1 fill:#007c88
style A2 fill:#007c88
style A3 fill:#007c88
style A4 fill:#007c88
style A5 fill:#007c88
style A6 fill:#007c88
style A7 fill:#007c88
Slicing Operations#
Basic Slicing#
# Get single actor by index
actor_0 = actors.slice(gpus=0)
# Get range of actors
first_four = actors.slice(gpus=slice(0, 4))
# Multidimensional slicing
subset = actors.slice(hosts=slice(0, 2), gpus=slice(0, 4))
Slicing Patterns#
# All actors in mesh
actors.method.call()
# First GPU on each host
actors.slice(gpus=0).method.call()
# GPUs 0-3 on all hosts
actors.slice(gpus=slice(0, 4)).method.call()
# Specific point
actors.slice(hosts=1, gpus=3).method.call_one()
Slicing Visualization:
graph TB
subgraph Full Mesh [8x4]
direction LR
subgraph R0[Host 0]
H0G0[GPU 0]
H0G1[GPU 1]
H0G2[GPU 2]
H0G3[GPU 3]
end
subgraph R1[Host 1]
H1G0[GPU 0]
H1G1[GPU 1]
H1G2[GPU 2]
H1G3[GPU 3]
end
end
subgraph Slice Example [hosts=0, gpus=0:2]
S0[Host 0, GPU 0]
S1[Host 0, GPU 1]
end
H0G0 -.->|selected| S0
H0G1 -.->|selected| S1
style H0G0 fill:#007c88
style H0G1 fill:#007c88
style S0 fill:#855b9d
style S1 fill:#855b9d
Passing Actor References#
Actors can be passed as arguments to other actors:
class Server(Actor):
@endpoint
def serve(self):
return "data"
class Client(Actor):
def __init__(self, server: Server):
# Select corresponding server
rank = context().actor_instance.rank
self.server = server.slice(**rank)
@endpoint
def fetch(self):
return self.server.serve.call_one().get()
# Create both meshes
servers = server_procs.spawn("servers", Server)
clients = client_procs.spawn("clients", Client, servers)
# Clients can now communicate with servers
data = clients.fetch.call().get()
Advanced Patterns#
1. Explicit Response Ports#
For out-of-order responses or background processing:
from monarch.actor import Port, endpoint
import threading
from queue import Queue
class AsyncProcessor(Actor):
def __init__(self):
self.queue = Queue()
self.worker = threading.Thread(target=self._process_loop)
self.worker.start()
@endpoint(explicit_response_port=True)
def process(self, port: Port[str], data: str):
# Add to queue with response port
self.queue.put((port, data))
def _process_loop(self):
while True:
port, data = self.queue.get()
# Process data
result = self._heavy_computation(data)
# Send response when ready
port.send(result)
2. Actor Supervision#
Custom supervision for fine-grained error handling:
class SupervisorActor(Actor):
def __init__(self):
self.children = []
def __supervise__(self, event):
print(f"Supervision event: {event}")
if event.is_recoverable():
# Restart failed actor
self.restart_child(event.actor_id)
return True # Handled
else:
# Propagate to parent
return False
@endpoint
def spawn_worker(self):
# Spawn supervised child
worker = context().proc.spawn("worker", WorkerActor)
self.children.append(worker)
return worker
3. Channels for Direct Communication#
Low-level messaging with channels:
from monarch.actor import Channel, Port, PortReceiver
class Producer(Actor):
def __init__(self):
self.consumers = []
@endpoint
def register_consumer(self, port: Port):
self.consumers.append(port)
@endpoint
def produce(self, data):
# Send to all registered consumers
for port in self.consumers:
port.send(data)
class Consumer(Actor):
def __init__(self, producer: Producer):
# Create channel
self.port, self.receiver = Channel.open()
# Register with producer
producer = producer.slice(gpus=0)
producer.register_consumer.call_one(self.port)
@endpoint
async def consume(self):
# Receive from channel
data = await self.receiver.recv()
return data
4. Value Mesh for State Sharing#
Share readonly state across actor mesh:
from monarch.actor import ValueMesh
class ConfigActor(Actor):
def __init__(self, config_mesh: ValueMesh[dict]):
# Each actor gets corresponding config
rank = context().actor_instance.rank
self.config = config_mesh[rank]
@endpoint
def get_config(self):
return self.config
# Create value mesh
configs = [{"id": i, "param": i * 10} for i in range(8)]
config_mesh = ValueMesh.from_list(configs, extent={"gpus": 8})
# Spawn actors with value mesh
actors = procs.spawn("actors", ConfigActor, config_mesh)
Best Practices#
1. Actor Design#
✅ DO:
Keep actors focused on single responsibility
Use immutable messages when possible
Handle errors gracefully
Document endpoint contracts
❌ DON’T:
Share mutable state between actors
Block in endpoints (use async)
Ignore supervision events
Create circular dependencies
2. State Management#
class GoodActor(Actor):
def __init__(self):
# All state in __init__
self.counter = 0
self.data = []
@endpoint
def update(self, value):
# Modify state atomically
self.counter += 1
self.data.append(value)
return self.counter
# Bad: External state
global_state = {} # ❌ Don't do this
class BadActor(Actor):
@endpoint
def update(self, key, value):
global_state[key] = value # ❌ Race conditions!
3. Endpoint Design#
class WellDesignedActor(Actor):
# ✅ Clear, typed endpoints
@endpoint
def process_batch(self, batch_id: int, data: list[float]) -> dict:
"""
Process a batch of data.
Args:
batch_id: Unique batch identifier
data: List of data points to process
Returns:
Dictionary with processing results
"""
results = self._process(data)
return {
"batch_id": batch_id,
"results": results,
"processed_at": time.time()
}
4. Error Handling#
class ResilientActor(Actor):
@endpoint
def risky_operation(self, data):
try:
result = self._process(data)
return {"success": True, "result": result}
except ValueError as e:
# Log and return error
logger.error(f"Invalid data: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
# Unexpected error - let supervision handle it
logger.error(f"Unexpected error: {e}")
raise
5. Performance Tips#
class OptimizedActor(Actor):
def __init__(self):
# ✅ Pre-allocate resources
self.buffer = torch.zeros(1000, 1000)
self.cache = {}
@endpoint
def compute(self, input_data):
# ✅ Reuse buffers
self.buffer.copy_(input_data)
result = self._compute(self.buffer)
# ✅ Cache expensive computations
if input_data.id in self.cache:
return self.cache[input_data.id]
self.cache[input_data.id] = result
return result
6. Testing Actors#
import pytest
from monarch.actor import this_proc
@pytest.mark.asyncio
async def test_calculator():
# Spawn actor
calc = this_proc().spawn("test_calc", Calculator)
# Test endpoint
result = await calc.add.call_one(5, 3)
assert result == 8
# Test history
history = await calc.get_history.call_one()
assert len(history) == 1
assert history[0] == ("add", 5, 3, 8)
Summary#
Key Takeaways#
Actors are Isolated: Private state, message-based communication
Sequential Processing: One message at a time, FIFO ordering
Typed Endpoints: Clear API with type hints
Location Transparent: Same code for local/remote actors
Supervision Tree: Automatic error propagation and handling
ActorMesh: Multidimensional organization with slicing
Context API: Runtime information and services
Actor Lifecycle Recap#
graph LR
A[Spawn] --> B[__init__]
B --> C[init hook]
C --> D[Running]
D --> E[Handle Messages]
E --> D
D --> F[Terminate]
style A fill:#855b9d
style B fill:#007c88
style C fill:#007c88
style D fill:#007c88
style E fill:#13a3a4
style F fill:#0072c7
Quick Reference Card#
Basic Actor#
from monarch.actor import Actor, endpoint, this_proc
class MyActor(Actor):
def __init__(self, param):
self.param = param
@endpoint
def method(self, arg):
return self.param + arg
actor = this_proc().spawn("name", MyActor, param=10)
result = actor.method.call_one(5).get() # 15
Actor Mesh#
# Spawn mesh
actors = procs.spawn("actors", MyActor, param=10)
# Broadcast and collect
results = actors.method.call(5).get()
# Fire and forget
actors.method.broadcast(5)
# Slice
subset = actors.slice(gpus=slice(0, 4))
Context#
from monarch.actor import context
@endpoint
def my_endpoint(self):
ctx = context()
rank = ctx.message_rank
proc = ctx.proc
Supervision#
class MySupervisor(Actor):
def __supervise__(self, event):
# Handle or propagate
return True # Handled