Note
Go to the end to download the full example code
Exporting Metrics and Logs with OpenTelemetry#
This example shows how to export Monarch’s built-in metrics and logs to an OpenTelemetry Collector and visualize them with Grafana. We deploy the full observability stack on Kubernetes using the Monarch CRD and operator.
When OTEL_EXPORTER_OTLP_ENDPOINT is set, Monarch’s telemetry layer exports
metrics and logs via OTLP/HTTP to the specified collector. Metrics are exported
automatically. Log export uses the unified tracing layer, which is enabled
by default and wires up the OTLP log sink.
Built-in actor system metrics (mailbox posts, messages sent/received, queue
sizes, etc.) and log events are exported with no code changes.
Architecture#
┌──────────────┐
│ Controller │──OTLP/HTTP──┐
│ (main.py) │ │
└──────────────┘ ▼
┌────────────────┐ ┌────────────┐
│ OTEL Collector │────▶│ stdout │
│ (port 4318) │ │ (debug) │
└────────────────┘ └────────────┘
┌──────────────┐ ▲ │ ┌────────────┐ ┌────────────┐
│ Worker pods │──OTLP/HTTP──┘ ├────▶│ Prometheus │────▶│ Prometheus │
│ (mesh) │ │ │ (scrape │ │ (server │
└──────────────┘ │ │ port 8889)│ │ port 9090)│
│ └────────────┘ └──────┬─────┘
│ ┌────────────┐ │
└────▶│ Loki │ │
│ (port 3100)│ │
└──────┬─────┘ │
│ │
┌────────────┴──────────────────┘
│ Grafana │
│ (port 3000) │
│ Prometheus + Loki │
└───────────────────────────────┘
Both the controller and worker pods set OTEL_EXPORTER_OTLP_ENDPOINT
pointing at the collector service. The collector receives OTLP metrics and
logs and fans them out to:
debug exporter – logs metrics and log records to stdout (verify with
kubectl logs)prometheus exporter – exposes a
/metricsendpoint on port 8889loki exporter – forwards log records to Grafana Loki for aggregation and querying
Grafana connects to both Prometheus and Loki as datasources, providing a unified UI for exploring metrics and logs.
Prerequisites#
A Kubernetes cluster with the Monarch CRD and operator installed
kubectlconfigured to access the cluster
Deploy the Observability Stack#
The Kubernetes manifests live alongside this script in the otel_collector/
directory. Deploy them in order:
# Create the namespace
kubectl create namespace monarch-tests
# Deploy Loki (log aggregation backend)
kubectl apply -f otel_collector/loki.yaml
kubectl rollout status deployment/loki -n monarch-tests
# Deploy the OTEL collector
kubectl apply -f otel_collector/otel-collector.yaml
kubectl rollout status deployment/otel-collector -n monarch-tests
# Deploy Prometheus (scrapes metrics from the collector)
kubectl apply -f otel_collector/prometheus.yaml
kubectl rollout status deployment/prometheus -n monarch-tests
# Deploy Grafana (visualization)
kubectl apply -f otel_collector/grafana.yaml
kubectl rollout status deployment/grafana -n monarch-tests
# Deploy the controller pod (includes RBAC)
kubectl apply -f otel_collector/controller.yaml
kubectl wait --for=condition=Ready pod/otel-controller -n monarch-tests --timeout=120s
Run the Example#
Copy this script to the controller pod and run it:
kubectl cp otel_collector.py monarch-tests/otel-controller:/tmp/main.py
kubectl exec -it otel-controller -n monarch-tests -- \
python /tmp/main.py --num-replicas=2 --iterations=100
The script provisions a MonarchMesh with OTEL_EXPORTER_OTLP_ENDPOINT
set on worker pods, spawns actors, runs several rounds of work, then cleans up.
Visualize in Grafana#
Port-forward and open the Grafana UI:
kubectl port-forward -n monarch-tests svc/grafana 3000:3000
Open http://localhost:3000 (no login required).
Explore Metrics
Go to Explore (compass icon in the left sidebar).
Select Prometheus datasource from the dropdown.
Try these queries:
mailbox_posts_total– total mailbox posts across all actorsactor_messages_sent_total– messages sent by the actor systemactor_messages_received_total– messages receivedrate(mailbox_posts_total[1m])– mailbox post rate per second
Explore Logs
Go to Explore (compass icon in the left sidebar).
Select Loki datasource from the dropdown.
Try these queries:
{service_name="monarch-worker"}– all worker logs{service_name="monarch-controller"}– all controller logs{service_name=~".+"}– all logs from any service{service_name="monarch-worker"} |= "error"– filter for error messages
Verify Metrics and Logs#
Check the collector’s debug output to confirm data is being received:
kubectl logs -n monarch-tests deployment/otel-collector --tail=100
You should see metric data points logged with names like mailbox.posts,
actor.messages_sent, actor.messages_received, etc., as well as log
records from the actor system.
To view metrics via Prometheus:
# Port-forward the Prometheus endpoint
kubectl port-forward -n monarch-tests svc/otel-collector 8889:8889
# Scrape metrics
curl -s http://localhost:8889/metrics | head -50
Cleanup#
kubectl delete -f otel_collector/controller.yaml
kubectl delete -f otel_collector/grafana.yaml
kubectl delete -f otel_collector/prometheus.yaml
kubectl delete -f otel_collector/otel-collector.yaml
kubectl delete -f otel_collector/loki.yaml
kubectl delete namespace monarch-tests
Configuration#
Environment Variable |
Default |
Description |
|---|---|---|
|
(unset) |
Collector endpoint. When set, enables OTLP metric and log export. |
|
|
Service name attached to all exported telemetry. |
|
|
How often the periodic metric reader pushes to the exporter. |
|
|
Enables the unified tracing layer and OTLP log sink. Enabled by default. |
|
|
Set to |
|
(unset) |
Additional headers for the OTLP exporter (e.g., auth tokens). |
|
(unset) |
Timeout for OTLP export requests. |
Controller Script#
The controller script below spawns actors on Kubernetes worker pods and
exercises them to generate metrics and log events. Both the controller
and worker pods have OTEL_EXPORTER_OTLP_ENDPOINT set, so all telemetry
is automatically forwarded to the collector.
import argparse
import logging
import os
import socket
import time
from kubernetes.client import V1Container, V1EnvVar, V1PodSpec
from monarch._src.job.kubernetes import _WORKER_BOOTSTRAP_SCRIPT
from monarch.actor import Actor, endpoint
from monarch.job.kubernetes import KubernetesJob
logger: logging.Logger = logging.getLogger(__name__)
_OTEL_ENDPOINT = "http://otel-collector.monarch-tests.svc.cluster.local:4318"
Define an actor that performs work to generate telemetry.
class WorkActor(Actor):
"""Actor that performs work to generate telemetry."""
@endpoint
def do_work(self, iterations: int) -> dict:
"""Run a loop to generate actor message metrics."""
logger.info("starting work with %d iterations", iterations)
total = 0
for i in range(iterations):
total += i * i
logger.info("completed work: result=%d", total)
return {
"hostname": socket.gethostname(),
"iterations": iterations,
"result": total,
}
@endpoint
def ping(self) -> str:
logger.info("received ping")
return f"pong from {socket.gethostname()}"
Build the worker pod spec with OTEL environment variables.
def build_worker_pod_spec(port: int) -> V1PodSpec:
"""Build a V1PodSpec with OTEL_EXPORTER_OTLP_ENDPOINT configured."""
return V1PodSpec(
containers=[
V1Container(
name="worker",
image="ghcr.io/meta-pytorch/monarch:latest",
image_pull_policy="Always",
command=["python", "-u", "-c", _WORKER_BOOTSTRAP_SCRIPT],
env=[
V1EnvVar(name="MONARCH_PORT", value=str(port)),
V1EnvVar(
name="OTEL_EXPORTER_OTLP_ENDPOINT",
value=_OTEL_ENDPOINT,
),
V1EnvVar(
name="OTEL_SERVICE_NAME",
value="monarch-worker",
),
V1EnvVar(
name="MONARCH_FILE_LOG",
value="trace",
),
],
)
]
)
The main function provisions workers, spawns actors, and runs several rounds of work to produce a stream of metrics and logs.
def main():
parser = argparse.ArgumentParser(
description="Monarch OTEL Collector Kubernetes example"
)
parser.add_argument(
"--num-replicas",
type=int,
default=2,
help="Number of worker replicas (default: 2)",
)
parser.add_argument(
"--iterations",
type=int,
default=5,
help="Number of work rounds to generate metrics (default: 5)",
)
args = parser.parse_args()
otel_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "")
print(f"Controller OTEL_EXPORTER_OTLP_ENDPOINT: {otel_endpoint}")
print("Connecting to Kubernetes...")
job = KubernetesJob(namespace="monarch-tests")
port = 26600
job.add_mesh(
"workers",
num_replicas=args.num_replicas,
pod_spec=build_worker_pod_spec(port),
port=port,
)
print(f"Waiting for {args.num_replicas} worker pod(s)...")
state = job.state()
host_mesh = state.workers
procs = host_mesh.spawn_procs()
print("Spawning actors...")
actors = procs.spawn("work_actor", WorkActor)
# Run several rounds of work to generate a stream of metrics.
for i in range(args.iterations):
results = actors.ping.call().get()
print(f" Round {i + 1}: {list(results)}")
work_results = actors.do_work.call(1000).get()
for _, result in work_results.items():
print(
f" {result['hostname']}: computed {result['iterations']} iterations"
)
# Wait for the periodic metric reader and log sink to flush.
print("Waiting for metrics and logs to flush to collector...")
time.sleep(10)
print()
print("Verify metrics and logs in the OTEL collector logs:")
print(" kubectl logs -n monarch-tests deployment/otel-collector --tail=100")
print()
print("Scrape Prometheus endpoint:")
print(" kubectl port-forward -n monarch-tests svc/otel-collector 8889:8889")
print(" curl http://localhost:8889/metrics")
procs.stop().get()
job.kill()
print("Done.")
if __name__ == "__main__":
main()
Total running time of the script: (0 minutes 0.000 seconds)