Shortcuts

torchx.tracker

Tip

Trackers record artifacts, metadata, and lineage for training runs. Use AppRun inside your job and register custom backends via entry points.

Prerequisites: Basic Concepts (core concepts). For registering custom tracker backends, see Registering Custom Trackers in the Advanced Usage guide.

Overview & Usage

Note

PROTOTYPE, USE AT YOUR OWN RISK, APIs SUBJECT TO CHANGE

Practitioners running ML jobs often need to track information such as:

  • Job inputs:
    • configuration
      • model configuration

      • HPO parameters

    • data
      • version

      • sources

  • Job results:
    • metrics

    • model location

  • Conceptual job groupings

AppRun provides a uniform interface as an experiment and artifact tracking solution that supports wrapping pluggable tracking implementations by providing TrackerBase adapter implementation.

Example usage

Sample code using tracker API.

Tracker Setup

To enable tracking it requires:

  1. Defining tracker backends (entrypoints/modules and configuration) on launcher side using .torchxconfig

  2. Adding entrypoints within a user job using entry_points (specification)

1. Launcher side configuration

User can define any number of tracker backends under torchx:tracker section in .torchxconfig, where:
  • Key: is an arbitrary name for the tracker, where the name will be used to configure its properties

    under [tracker:<TRACKER_NAME>]

  • Value: is entrypoint or module factory method that must be available within user job. The value will be injected into a

    user job and used to construct tracker implementation.

[torchx:tracker]
tracker_name=<entry_point_or_module_factory_method>

Each tracker can be additionally configured (currently limited to config parameter) under [tracker:<TRACKER NAME>] section:

[tracker:<TRACKER NAME>]
config=configvalue

For example, ~/.torchxconfig may be setup as:

[torchx:tracker]
tracker1=tracker1
tracker2=backend_2_entry_point
tracker3=torchx.tracker.mlflow:create_tracker

[tracker:tracker1]
config=s3://my_bucket/config.json

[tracker:tracker3]
config=my_config.json

2. User job configuration (Advanced)

Entrypoint value defined in the previous step must be discoverable under [torchx.tracker] group and callable within user job (depending on packaging/distribution mechanism) to create an instance of the TrackerBase.

To accomplish that define entrypoint in the distribution in entry_points.txt as:

[torchx.tracker]
entry_point_name=my_module:create_tracker_fn

Acquiring AppRun instance

Use app_run_from_env():

>>> import os; os.environ["TORCHX_JOB_ID"] = "scheduler://session/job_id" # Simulate running job first
>>> from torchx.tracker import app_run_from_env
>>> app_run = app_run_from_env()

Reference TrackerBase implementation

FsspecTracker provides reference implementation of a tracker backend. GitHub example directory provides example on how to configure and use it in user application.

Querying data

  • CmdTracker exposes operations available to users at the CLI level:
    • torchx tracker list jobs [–parent-run-id RUN_ID]

    • torchx tracker list metadata RUN_ID

    • torchx tracker list artifacts [–artifact ARTIFACT_NAME] RUN_ID

  • Alternatively, backend implementations may expose UI for user consumption.

Trackers operate at two levels:

  • Backend level (TrackerBase) – the storage implementation. TorchX ships with FsspecTracker (filesystem-based) and MLflowTracker. You can implement your own backend.

  • Job level (AppRun) – the user-facing API that delegates to one or more TrackerBase backends. AppRun is constructed automatically from environment variables set by the TorchX runner (TORCHX_JOB_ID, TORCHX_TRACKERS).

Typical usage inside a training job:

from torchx.tracker.api import AppRun

# Singleton created from TORCHX_JOB_ID and TORCHX_TRACKERS env vars
run = AppRun.run_from_env()

# Store metadata (key-value pairs)
run.add_metadata(lr=0.001, epochs=10, model="resnet50")

# Store an artifact (named path + optional metadata)
run.add_artifact("checkpoint", "s3://bucket/checkpoints/epoch_10.pt")

# Link a parent run for lineage tracking
run.add_source("local_cwd://torchx/parent_job_123")

Implementing a Custom Tracker

Subclass TrackerBase and implement its eight abstract methods. Then provide a factory function and register it as a torchx.tracker entry point. See Registering Custom Trackers for the full walkthrough.

Testing Your Tracker

Study torchx/tracker/test/api_test.py for a complete in-memory example. A minimal test writes metadata and artifacts, then reads them back:

import unittest

class MyTrackerTest(unittest.TestCase):
    def test_round_trip_metadata(self) -> None:
        tracker = MyTracker(connection_str="test://localhost")
        tracker.add_metadata("run-1", lr=0.01, epochs=10)
        md = tracker.metadata("run-1")
        self.assertEqual(md["lr"], 0.01)
        self.assertEqual(md["epochs"], 10)

    def test_round_trip_artifact(self) -> None:
        tracker = MyTracker(connection_str="test://localhost")
        tracker.add_artifact("run-1", "checkpoint", "/path/to/ckpt.pt")
        arts = tracker.artifacts("run-1")
        self.assertIn("checkpoint", arts)
        self.assertEqual(arts["checkpoint"].path, "/path/to/ckpt.pt")

Test factory wiring with patch.dict:

import os
from unittest.mock import patch

@patch.dict(os.environ, {
    "TORCHX_TRACKERS": "my_tracker",
    "TORCHX_TRACKER_MY_TRACKER_CONFIG": "test://localhost",
    "TORCHX_JOB_ID": "test-run-1",
})
def test_tracker_from_env(self) -> None:
    from torchx.tracker.api import trackers_from_environ
    trackers = list(trackers_from_environ())
    self.assertEqual(len(trackers), 1)

Common Pitfalls

  • Entry point targets the class, not the factory: The entry point must reference a factory function (config: str | None) -> TrackerBase, not the class itself.

  • Factory signature mismatch: The factory receives config: str | None, not keyword arguments. Parse connection strings or JSON inside the factory.

  • Forgetting to handle ``None`` config: When no TORCHX_TRACKER_<NAME>_CONFIG env var is set, config is None. Provide a sensible default or raise a clear error.

API Reference

class torchx.tracker.AppRun(id: str, backends: Iterable[TrackerBase])[source]

Job-level tracker API that delegates to one or more TrackerBase backends.

Warning

This API is experimental and may change significantly.

>>> from torchx.tracker.api import AppRun
>>> run = AppRun(id="my_job_123", backends=[])
>>> run.add_metadata(lr=0.01, epochs=10)  # no-op with empty backends
>>> run.job_id()
'my_job_123'
add_artifact(name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]

Store an artifact (name, path, optional metadata) for this run.

add_metadata(**kwargs: object) None[source]

Store key-value metadata for this run.

add_source(source_id: str, artifact_name: str | None = None) None[source]

Link a source (TorchX run or external entity) to this run for lineage tracking.

job_id() str[source]

Return the run ID.

static run_from_env() AppRun[source]

Create a singleton AppRun from environment variables.

Reads TORCHX_JOB_ID and TORCHX_TRACKERS (set by the torchx runner). Returns a cached singleton so all callers share the same tracker backends.

Note

When not launched via torchx, returns an empty AppRun with job_id="<UNDEFINED>" and no backends (write methods become no-ops).

>>> from torchx.tracker.api import AppRun
>>> apprun = AppRun.run_from_env()
>>> apprun.add_metadata(md_1="foo", md_2="bar")
sources() Iterable[AppRunTrackableSource][source]

Return source links for this run (queries the first backend).

class torchx.tracker.api.TrackerBase[source]

Abstract base for tracker backend implementations.

Warning

This API is experimental and may change significantly.

abstract add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]

Add an artifact with the given name, path, and optional metadata.

abstract add_metadata(run_id: str, **kwargs: object) None[source]

Store arbitrary key-value metadata for the given run.

abstract add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]

Link a source run (lineage) to the given run.

abstract artifacts(run_id: str) Mapping[str, TrackerArtifact][source]

Return all artifacts for the given run.

abstract lineage(run_id: str) Lineage[source]

Return full lineage (parents and consumers) for the given run.

abstract metadata(run_id: str) Mapping[str, object][source]

Return metadata for the given run.

abstract run_ids(**kwargs: str) Iterable[str][source]

Return run IDs, optionally filtered by keyword arguments.

abstract sources(run_id: str, artifact_name: str | None = None) Iterable[TrackerSource][source]

Return sources for the given run, optionally filtered by artifact_name.

Data Types

class torchx.tracker.api.TrackerArtifact(name: str, path: str, metadata: Optional[Mapping[str, object]])[source]

An artifact stored by a backend tracker (name, path, and optional metadata).

class torchx.tracker.api.TrackerSource(source_run_id: str, artifact_name: str | None)[source]

A source link at the backend tracker level.

source_run_id is a TorchX handle or external entity ID. artifact_name classifies the relationship (used for filtering).

Built-in Backends

class torchx.tracker.backend.fsspec.FsspecTracker(fs: AbstractFileSystem, root_dir: str)[source]

Bases: TrackerBase

Implements TrackerBase using Fsspec abstraction and has an advantage of using various storage options for persisting the data.

Important: torchx.tracker.api API is still experimental, hence there are no backwards compatibility gurantees with future releases yet.

Each run will have a directory with subdirs for metadata, artifact, source and descendants data.

add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]

Add an artifact with the given name, path, and optional metadata.

add_metadata(run_id: str, **metadata: object) None[source]

Store arbitrary key-value metadata for the given run.

add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]

Link a source run (lineage) to the given run.

artifacts(run_id: str) Mapping[str, TrackerArtifact][source]

Return all artifacts for the given run.

lineage(run_id: str) Lineage[source]

Return full lineage (parents and consumers) for the given run.

metadata(run_id: str) Mapping[str, object][source]

Return metadata for the given run.

run_ids(parent_run_id: str | None = None) Iterable[str][source]

Return run IDs, optionally filtered by keyword arguments.

sources(run_id: str, artifact_name: str | None = None) Iterable[TrackerSource][source]

Return sources for the given run, optionally filtered by artifact_name.

class torchx.tracker.mlflow.MLflowTracker(experiment_name: str | None = None, tracking_uri: str = 'file:///tmp/torchx/mlruns', artifact_location: str | None = None)[source]

Bases: TrackerBase

An implementation of a Tracker that uses mlflow as the backend. Don’t forget to call the close() method for orderly shutdown. This ensures that the run state in mlflow is properly marked as FINISHED, otherwise the run will remain in UNFINISHED status.

Important

TorchX’s run_id is used as mlflow’s run_name! The run_id in TorchX is the job name. The job name in TorchX is made unique by adding a short random hash to the user-provided job name prefix. This is done because certain job schedulers supported by TorchX requires that the job name on the submitted job definition is globally unique (rather than the scheduler returning a unique job id as the return result of the job submission API).

Warning

APIs on this class may only be called with the same run_name. Typically the user does not have to worry about manually setting the run_name as it is picked up by default from the environment variable TORCHX_JOB_ID.

Warning

Lineage not supported. The following TrackerBase methods raise NotImplementedError: add_source(), sources(), and lineage(). If you need lineage tracking, use FsspecTracker or implement a custom backend.

add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]

Add an artifact with the given name, path, and optional metadata.

add_metadata(run_id: str, **kwargs: object) None[source]

Store arbitrary key-value metadata for the given run.

add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]

Link a source run (lineage) to the given run.

artifacts(run_id: str) Mapping[str, TrackerArtifact][source]

Return all artifacts for the given run.

get_run(run_name: str) Run[source]

Gets mlflow’s Run object for the given run_name in the current experiment. If no such run exists, this method creates a new run under this experiment and starts the run so that subsequent calls to mlflow logs metadata, metrics, artifacts to the newly created run.

Warning

This method should only be called with the same run_name! This is because of the way mlflow APIs work is by setting an “active run” for which subsequent mlflow logging APIs are made against the current active run in the stack. If you call mlflow.start_run() directly or pass different run names, then you may be logging into two different mlflow runs from the same job!

Parameters:

run_name – equal to torchx’s run_id

Returns: mlflow’s Run object for the run_name

get_run_id(run_name: str) str[source]

Gets the mlflow run’s run_id for the given run_name and additionally sets this run as the active run. Hence this method has a side-effect where all subsequent calls to mlflow log APIs are against the run for the given run_name.

lineage(run_id: str) Lineage[source]

Return full lineage (parents and consumers) for the given run.

log_params_flat(run_name: str, cfg: Any, key: str = '') None[source]

Designed to be primarily used with hydra-style config objects (e.g. dataclasses), logs the given cfg, which is one of: @dataclass, Sequence (e.g. list, tuple, set), or Mapping (e.g. dict), where the fields of cfg are flattened recursively and logged as the the run’s Parameter in mlflow.

For example if cfg is:

@dataclass
class Config2:
    foo: str = "bar"

@dataclass
class Config:
    i: int = 1
    f: float = 2.1
    s: str = "string"
    l: List[str] = field(default_factory=lambda :["a", "b", "c"])
    cfg_list = List[Config2] = field(default_factory=lambda : [Config2(foo="hello"), Config2(foo="world")])
    cfg2: Config2 = Config2()

Then this function logs the following parameters

i: "1"
f: "2.1"
s: "string"
l: ["a", "b", "c"]
cfg_list._0.foo = "hello"
cfg_list._1.foo = "hello"
cfg2.foo = "bar"

As shown above, primitive sequence containers are logged directly (e.g. l: ["a", "b", "c"]) whereas nested sequence containers will be logged per element where the key is suffixed with _{INDEX} (e.g. cfg_list._0.foo = "hello").

metadata(run_id: str) Mapping[str, object][source]

Return metadata for the given run.

run_ids(**kwargs: str) Iterable[str][source]

Return run IDs, optionally filtered by keyword arguments.

sources(run_id: str, artifact_name: str | None = None) Iterable[TrackerSource][source]

Return sources for the given run, optionally filtered by artifact_name.

CLI Command

class torchx.cli.cmd_tracker.CmdTracker[source]

Prototype TorchX tracker subcommand that allows querying data by interacting with tracker implementation.

Important: commands and the arguments may be modified in the future.

Supported commands:
  • tracker list jobs [–parent-run-id RUN_ID]

  • tracker list metadata RUN_ID

  • tracker list artifacts [–artifact ARTIFACT_NAME] RUN_ID

add_arguments(subparser: ArgumentParser) None[source]

Adds the arguments to this sub command

run(args: Namespace) None[source]

Runs the sub command. Parsed arguments are available as args.

Environment Variables

Set automatically by the runner when trackers are configured.

Variable

Purpose

TORCHX_JOB_ID

The AppHandle for the current job. Set by the runner and used by AppRun.run_from_env() to identify the run.

TORCHX_TRACKERS

Comma-separated list of tracker entry-point keys to activate (e.g. fsspec,my_tracker).

TORCHX_TRACKER_<NAME>_CONFIG

Per-tracker configuration string passed to the factory function. <NAME> is the upper-cased entry-point key.

TORCHX_PARENT_RUN_ID

Optional parent run ID for lineage tracking. Set by the runner; read by AppRun.run_from_env() which automatically calls tracker.add_source() on each backend to record the lineage link.

See also

Advanced Usage

Entry-point registration for custom trackers, schedulers, and components.

Tracking

Runtime tracking utilities for use within applications.

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources