Rate this Page

Exporting Metrics and Logs with OpenTelemetry#

This example shows how to export Monarch’s built-in metrics and logs to an OpenTelemetry Collector and visualize them with Grafana. We deploy the full observability stack on Kubernetes using the Monarch CRD and operator.

When OTEL_EXPORTER_OTLP_ENDPOINT is set, Monarch’s telemetry layer exports metrics and logs via OTLP/HTTP to the specified collector. Metrics are exported automatically. Log export uses the unified tracing layer, which is enabled by default and wires up the OTLP log sink. Built-in actor system metrics (mailbox posts, messages sent/received, queue sizes, etc.) and log events are exported with no code changes.

Architecture#

┌──────────────┐
│  Controller  │──OTLP/HTTP──┐
│  (main.py)   │             │
└──────────────┘             ▼
                        ┌────────────────┐     ┌────────────┐
                        │ OTEL Collector │────▶│  stdout    │
                        │  (port 4318)   │     │  (debug)   │
                        └────────────────┘     └────────────┘
┌──────────────┐             ▲           │     ┌────────────┐     ┌────────────┐
│  Worker pods │──OTLP/HTTP──┘           ├────▶│ Prometheus │────▶│ Prometheus │
│  (mesh)      │                         │     │  (scrape   │     │  (server   │
└──────────────┘                         │     │  port 8889)│     │  port 9090)│
                                         │     └────────────┘     └──────┬─────┘
                                         │     ┌────────────┐            │
                                         └────▶│   Loki     │            │
                                               │ (port 3100)│            │
                                               └──────┬─────┘            │
                                                      │                  │
                                         ┌────────────┴──────────────────┘
                                         │       Grafana                 │
                                         │     (port 3000)               │
                                         │  Prometheus + Loki            │
                                         └───────────────────────────────┘

Both the controller and worker pods set OTEL_EXPORTER_OTLP_ENDPOINT pointing at the collector service. The collector receives OTLP metrics and logs and fans them out to:

  • debug exporter – logs metrics and log records to stdout (verify with kubectl logs)

  • prometheus exporter – exposes a /metrics endpoint on port 8889

  • loki exporter – forwards log records to Grafana Loki for aggregation and querying

Grafana connects to both Prometheus and Loki as datasources, providing a unified UI for exploring metrics and logs.

Prerequisites#

Deploy the Observability Stack#

The Kubernetes manifests live alongside this script in the otel_collector/ directory. Deploy them in order:

# Create the namespace
kubectl create namespace monarch-tests

# Deploy Loki (log aggregation backend)
kubectl apply -f otel_collector/loki.yaml
kubectl rollout status deployment/loki -n monarch-tests

# Deploy the OTEL collector
kubectl apply -f otel_collector/otel-collector.yaml
kubectl rollout status deployment/otel-collector -n monarch-tests

# Deploy Prometheus (scrapes metrics from the collector)
kubectl apply -f otel_collector/prometheus.yaml
kubectl rollout status deployment/prometheus -n monarch-tests

# Deploy Grafana (visualization)
kubectl apply -f otel_collector/grafana.yaml
kubectl rollout status deployment/grafana -n monarch-tests

# Deploy the controller pod (includes RBAC)
kubectl apply -f otel_collector/controller.yaml
kubectl wait --for=condition=Ready pod/otel-controller -n monarch-tests --timeout=120s

Run the Example#

Copy this script to the controller pod and run it:

kubectl cp otel_collector.py monarch-tests/otel-controller:/tmp/main.py
kubectl exec -it otel-controller -n monarch-tests -- \
    python /tmp/main.py --num-replicas=2 --iterations=100

The script provisions a MonarchMesh with OTEL_EXPORTER_OTLP_ENDPOINT set on worker pods, spawns actors, runs several rounds of work, then cleans up.

Visualize in Grafana#

Port-forward and open the Grafana UI:

kubectl port-forward -n monarch-tests svc/grafana 3000:3000

Open http://localhost:3000 (no login required).

Explore Metrics

  1. Go to Explore (compass icon in the left sidebar).

  2. Select Prometheus datasource from the dropdown.

  3. Try these queries:

    • mailbox_posts_total – total mailbox posts across all actors

    • actor_messages_sent_total – messages sent by the actor system

    • actor_messages_received_total – messages received

    • rate(mailbox_posts_total[1m]) – mailbox post rate per second

Explore Logs

  1. Go to Explore (compass icon in the left sidebar).

  2. Select Loki datasource from the dropdown.

  3. Try these queries:

    • {service_name="monarch-worker"} – all worker logs

    • {service_name="monarch-controller"} – all controller logs

    • {service_name=~".+"} – all logs from any service

    • {service_name="monarch-worker"} |= "error" – filter for error messages

Verify Metrics and Logs#

Check the collector’s debug output to confirm data is being received:

kubectl logs -n monarch-tests deployment/otel-collector --tail=100

You should see metric data points logged with names like mailbox.posts, actor.messages_sent, actor.messages_received, etc., as well as log records from the actor system.

To view metrics via Prometheus:

# Port-forward the Prometheus endpoint
kubectl port-forward -n monarch-tests svc/otel-collector 8889:8889

# Scrape metrics
curl -s http://localhost:8889/metrics | head -50

Cleanup#

kubectl delete -f otel_collector/controller.yaml
kubectl delete -f otel_collector/grafana.yaml
kubectl delete -f otel_collector/prometheus.yaml
kubectl delete -f otel_collector/otel-collector.yaml
kubectl delete -f otel_collector/loki.yaml
kubectl delete namespace monarch-tests

Configuration#

Environment Variable

Default

Description

OTEL_EXPORTER_OTLP_ENDPOINT

(unset)

Collector endpoint. When set, enables OTLP metric and log export.

OTEL_SERVICE_NAME

unknown_service

Service name attached to all exported telemetry.

OTEL_METRIC_EXPORT_INTERVAL

1s

How often the periodic metric reader pushes to the exporter.

USE_UNIFIED_LAYER

true

Enables the unified tracing layer and OTLP log sink. Enabled by default.

ENABLE_OTEL_METRICS

true

Set to false to disable OTel metrics entirely.

OTEL_EXPORTER_OTLP_HEADERS

(unset)

Additional headers for the OTLP exporter (e.g., auth tokens).

OTEL_EXPORTER_OTLP_TIMEOUT

(unset)

Timeout for OTLP export requests.

Controller Script#

The controller script below spawns actors on Kubernetes worker pods and exercises them to generate metrics and log events. Both the controller and worker pods have OTEL_EXPORTER_OTLP_ENDPOINT set, so all telemetry is automatically forwarded to the collector.

import argparse
import logging
import os
import socket
import time

from kubernetes.client import V1Container, V1EnvVar, V1PodSpec
from monarch._src.job.kubernetes import _WORKER_BOOTSTRAP_SCRIPT
from monarch.actor import Actor, endpoint
from monarch.job.kubernetes import KubernetesJob

logger: logging.Logger = logging.getLogger(__name__)

_OTEL_ENDPOINT = "http://otel-collector.monarch-tests.svc.cluster.local:4318"

Define an actor that performs work to generate telemetry.

class WorkActor(Actor):
    """Actor that performs work to generate telemetry."""

    @endpoint
    def do_work(self, iterations: int) -> dict:
        """Run a loop to generate actor message metrics."""
        logger.info("starting work with %d iterations", iterations)
        total = 0
        for i in range(iterations):
            total += i * i
        logger.info("completed work: result=%d", total)
        return {
            "hostname": socket.gethostname(),
            "iterations": iterations,
            "result": total,
        }

    @endpoint
    def ping(self) -> str:
        logger.info("received ping")
        return f"pong from {socket.gethostname()}"

Build the worker pod spec with OTEL environment variables.

def build_worker_pod_spec(port: int) -> V1PodSpec:
    """Build a V1PodSpec with OTEL_EXPORTER_OTLP_ENDPOINT configured."""
    return V1PodSpec(
        containers=[
            V1Container(
                name="worker",
                image="ghcr.io/meta-pytorch/monarch:latest",
                image_pull_policy="Always",
                command=["python", "-u", "-c", _WORKER_BOOTSTRAP_SCRIPT],
                env=[
                    V1EnvVar(name="MONARCH_PORT", value=str(port)),
                    V1EnvVar(
                        name="OTEL_EXPORTER_OTLP_ENDPOINT",
                        value=_OTEL_ENDPOINT,
                    ),
                    V1EnvVar(
                        name="OTEL_SERVICE_NAME",
                        value="monarch-worker",
                    ),
                    V1EnvVar(
                        name="MONARCH_FILE_LOG",
                        value="trace",
                    ),
                ],
            )
        ]
    )

The main function provisions workers, spawns actors, and runs several rounds of work to produce a stream of metrics and logs.

def main():
    parser = argparse.ArgumentParser(
        description="Monarch OTEL Collector Kubernetes example"
    )
    parser.add_argument(
        "--num-replicas",
        type=int,
        default=2,
        help="Number of worker replicas (default: 2)",
    )
    parser.add_argument(
        "--iterations",
        type=int,
        default=5,
        help="Number of work rounds to generate metrics (default: 5)",
    )
    args = parser.parse_args()

    otel_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "")
    print(f"Controller OTEL_EXPORTER_OTLP_ENDPOINT: {otel_endpoint}")

    print("Connecting to Kubernetes...")
    job = KubernetesJob(namespace="monarch-tests")
    port = 26600
    job.add_mesh(
        "workers",
        num_replicas=args.num_replicas,
        pod_spec=build_worker_pod_spec(port),
        port=port,
    )

    print(f"Waiting for {args.num_replicas} worker pod(s)...")
    state = job.state()
    host_mesh = state.workers
    procs = host_mesh.spawn_procs()

    print("Spawning actors...")
    actors = procs.spawn("work_actor", WorkActor)

    # Run several rounds of work to generate a stream of metrics.
    for i in range(args.iterations):
        results = actors.ping.call().get()
        print(f"  Round {i + 1}: {list(results)}")
        work_results = actors.do_work.call(1000).get()
        for _, result in work_results.items():
            print(
                f"    {result['hostname']}: computed {result['iterations']} iterations"
            )

    # Wait for the periodic metric reader and log sink to flush.
    print("Waiting for metrics and logs to flush to collector...")
    time.sleep(10)

    print()
    print("Verify metrics and logs in the OTEL collector logs:")
    print("  kubectl logs -n monarch-tests deployment/otel-collector --tail=100")
    print()
    print("Scrape Prometheus endpoint:")
    print("  kubectl port-forward -n monarch-tests svc/otel-collector 8889:8889")
    print("  curl http://localhost:8889/metrics")

    procs.stop().get()
    job.kill()
    print("Done.")


if __name__ == "__main__":
    main()

Total running time of the script: (0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery