torchx.tracker¶
Tip
Trackers record artifacts, metadata, and lineage for training runs. Use
AppRun inside your job and register custom
backends via entry points.
Prerequisites: Basic Concepts (core concepts). For registering custom tracker backends, see Registering Custom Trackers in the Advanced Usage guide.
Overview & Usage¶
Note
PROTOTYPE, USE AT YOUR OWN RISK, APIs SUBJECT TO CHANGE
Practitioners running ML jobs often need to track information such as:
- Job inputs:
- configuration
model configuration
HPO parameters
- data
version
sources
- Job results:
metrics
model location
Conceptual job groupings
AppRun provides a uniform interface as an experiment and artifact tracking solution that
supports wrapping pluggable tracking implementations by providing TrackerBase adapter
implementation.
Example usage¶
Sample code using tracker API.
Tracker Setup¶
To enable tracking it requires:
Defining tracker backends (entrypoints/modules and configuration) on launcher side using .torchxconfig
Adding entrypoints within a user job using entry_points (specification)
1. Launcher side configuration¶
- User can define any number of tracker backends under torchx:tracker section in .torchxconfig, where:
- Key: is an arbitrary name for the tracker, where the name will be used to configure its properties
under [tracker:<TRACKER_NAME>]
- Value: is entrypoint or module factory method that must be available within user job. The value will be injected into a
user job and used to construct tracker implementation.
[torchx:tracker]
tracker_name=<entry_point_or_module_factory_method>
Each tracker can be additionally configured (currently limited to config parameter) under [tracker:<TRACKER NAME>] section:
[tracker:<TRACKER NAME>]
config=configvalue
For example, ~/.torchxconfig may be setup as:
[torchx:tracker]
tracker1=tracker1
tracker2=backend_2_entry_point
tracker3=torchx.tracker.mlflow:create_tracker
[tracker:tracker1]
config=s3://my_bucket/config.json
[tracker:tracker3]
config=my_config.json
2. User job configuration (Advanced)¶
Entrypoint value defined in the previous step must be discoverable under [torchx.tracker] group and callable within user job
(depending on packaging/distribution mechanism) to create an instance of the TrackerBase.
To accomplish that define entrypoint in the distribution in entry_points.txt as:
[torchx.tracker]
entry_point_name=my_module:create_tracker_fn
Acquiring AppRun instance¶
Use app_run_from_env():
>>> import os; os.environ["TORCHX_JOB_ID"] = "scheduler://session/job_id" # Simulate running job first
>>> from torchx.tracker import app_run_from_env
>>> app_run = app_run_from_env()
Reference TrackerBase implementation¶
FsspecTracker provides reference implementation of a tracker backend.
GitHub example directory provides example on how to
configure and use it in user application.
Querying data¶
CmdTrackerexposes operations available to users at the CLI level:torchx tracker list jobs [–parent-run-id RUN_ID]torchx tracker list metadata RUN_IDtorchx tracker list artifacts [–artifact ARTIFACT_NAME] RUN_ID
Alternatively, backend implementations may expose UI for user consumption.
Trackers operate at two levels:
Backend level (
TrackerBase) – the storage implementation. TorchX ships withFsspecTracker(filesystem-based) andMLflowTracker. You can implement your own backend.Job level (
AppRun) – the user-facing API that delegates to one or moreTrackerBasebackends.AppRunis constructed automatically from environment variables set by the TorchX runner (TORCHX_JOB_ID,TORCHX_TRACKERS).
Typical usage inside a training job:
from torchx.tracker.api import AppRun
# Singleton created from TORCHX_JOB_ID and TORCHX_TRACKERS env vars
run = AppRun.run_from_env()
# Store metadata (key-value pairs)
run.add_metadata(lr=0.001, epochs=10, model="resnet50")
# Store an artifact (named path + optional metadata)
run.add_artifact("checkpoint", "s3://bucket/checkpoints/epoch_10.pt")
# Link a parent run for lineage tracking
run.add_source("local_cwd://torchx/parent_job_123")
Implementing a Custom Tracker¶
Subclass TrackerBase and implement its eight
abstract methods. Then provide a factory function and register it as a
torchx.tracker entry point. See
Registering Custom Trackers for the full walkthrough.
Testing Your Tracker¶
Study torchx/tracker/test/api_test.py for a complete in-memory example. A
minimal test writes metadata and artifacts, then reads them back:
import unittest
class MyTrackerTest(unittest.TestCase):
def test_round_trip_metadata(self) -> None:
tracker = MyTracker(connection_str="test://localhost")
tracker.add_metadata("run-1", lr=0.01, epochs=10)
md = tracker.metadata("run-1")
self.assertEqual(md["lr"], 0.01)
self.assertEqual(md["epochs"], 10)
def test_round_trip_artifact(self) -> None:
tracker = MyTracker(connection_str="test://localhost")
tracker.add_artifact("run-1", "checkpoint", "/path/to/ckpt.pt")
arts = tracker.artifacts("run-1")
self.assertIn("checkpoint", arts)
self.assertEqual(arts["checkpoint"].path, "/path/to/ckpt.pt")
Test factory wiring with patch.dict:
import os
from unittest.mock import patch
@patch.dict(os.environ, {
"TORCHX_TRACKERS": "my_tracker",
"TORCHX_TRACKER_MY_TRACKER_CONFIG": "test://localhost",
"TORCHX_JOB_ID": "test-run-1",
})
def test_tracker_from_env(self) -> None:
from torchx.tracker.api import trackers_from_environ
trackers = list(trackers_from_environ())
self.assertEqual(len(trackers), 1)
Common Pitfalls¶
Entry point targets the class, not the factory: The entry point must reference a factory function
(config: str | None) -> TrackerBase, not the class itself.Factory signature mismatch: The factory receives
config: str | None, not keyword arguments. Parse connection strings or JSON inside the factory.Forgetting to handle ``None`` config: When no
TORCHX_TRACKER_<NAME>_CONFIGenv var is set,configisNone. Provide a sensible default or raise a clear error.
API Reference¶
- class torchx.tracker.AppRun(id: str, backends: Iterable[TrackerBase])[source]¶
Job-level tracker API that delegates to one or more
TrackerBasebackends.Warning
This API is experimental and may change significantly.
>>> from torchx.tracker.api import AppRun >>> run = AppRun(id="my_job_123", backends=[]) >>> run.add_metadata(lr=0.01, epochs=10) # no-op with empty backends >>> run.job_id() 'my_job_123'
- add_artifact(name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]¶
Store an artifact (name, path, optional metadata) for this run.
- add_source(source_id: str, artifact_name: str | None = None) None[source]¶
Link a source (TorchX run or external entity) to this run for lineage tracking.
- static run_from_env() AppRun[source]¶
Create a singleton
AppRunfrom environment variables.Reads
TORCHX_JOB_IDandTORCHX_TRACKERS(set by the torchx runner). Returns a cached singleton so all callers share the same tracker backends.Note
When not launched via torchx, returns an empty
AppRunwithjob_id="<UNDEFINED>"and no backends (write methods become no-ops).>>> from torchx.tracker.api import AppRun >>> apprun = AppRun.run_from_env() >>> apprun.add_metadata(md_1="foo", md_2="bar")
- class torchx.tracker.api.TrackerBase[source]¶
Abstract base for tracker backend implementations.
Warning
This API is experimental and may change significantly.
- abstract add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]¶
Add an artifact with the given name, path, and optional metadata.
- abstract add_metadata(run_id: str, **kwargs: object) None[source]¶
Store arbitrary key-value metadata for the given run.
- abstract add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]¶
Link a source run (lineage) to the given run.
- abstract artifacts(run_id: str) Mapping[str, TrackerArtifact][source]¶
Return all artifacts for the given run.
- abstract lineage(run_id: str) Lineage[source]¶
Return full lineage (parents and consumers) for the given run.
Data Types¶
Built-in Backends¶
- class torchx.tracker.backend.fsspec.FsspecTracker(fs: AbstractFileSystem, root_dir: str)[source]¶
Bases:
TrackerBaseImplements TrackerBase using Fsspec abstraction and has an advantage of using various storage options for persisting the data.
Important: torchx.tracker.api API is still experimental, hence there are no backwards compatibility gurantees with future releases yet.
Each run will have a directory with subdirs for metadata, artifact, source and descendants data.
- add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]¶
Add an artifact with the given name, path, and optional metadata.
- add_metadata(run_id: str, **metadata: object) None[source]¶
Store arbitrary key-value metadata for the given run.
- add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]¶
Link a source run (lineage) to the given run.
- artifacts(run_id: str) Mapping[str, TrackerArtifact][source]¶
Return all artifacts for the given run.
- lineage(run_id: str) Lineage[source]¶
Return full lineage (parents and consumers) for the given run.
- class torchx.tracker.mlflow.MLflowTracker(experiment_name: str | None = None, tracking_uri: str = 'file:///tmp/torchx/mlruns', artifact_location: str | None = None)[source]¶
Bases:
TrackerBaseAn implementation of a
Trackerthat uses mlflow as the backend. Don’t forget to call theclose()method for orderly shutdown. This ensures that the run state in mlflow is properly marked asFINISHED, otherwise the run will remain inUNFINISHEDstatus.Important
TorchX’s run_id is used as mlflow’s run_name! The run_id in TorchX is the job name. The job name in TorchX is made unique by adding a short random hash to the user-provided job name prefix. This is done because certain job schedulers supported by TorchX requires that the job name on the submitted job definition is globally unique (rather than the scheduler returning a unique job id as the return result of the job submission API).
Warning
APIs on this class may only be called with the same
run_name. Typically the user does not have to worry about manually setting the run_name as it is picked up by default from the environment variableTORCHX_JOB_ID.Warning
Lineage not supported. The following
TrackerBasemethods raiseNotImplementedError:add_source(),sources(), andlineage(). If you need lineage tracking, useFsspecTrackeror implement a custom backend.- add_artifact(run_id: str, name: str, path: str, metadata: Optional[Mapping[str, object]] = None) None[source]¶
Add an artifact with the given name, path, and optional metadata.
- add_metadata(run_id: str, **kwargs: object) None[source]¶
Store arbitrary key-value metadata for the given run.
- add_source(run_id: str, source_id: str, artifact_name: str | None = None) None[source]¶
Link a source run (lineage) to the given run.
- artifacts(run_id: str) Mapping[str, TrackerArtifact][source]¶
Return all artifacts for the given run.
- get_run(run_name: str) Run[source]¶
Gets mlflow’s
Runobject for the givenrun_namein the current experiment. If no such run exists, this method creates a new run under this experiment and starts the run so that subsequent calls to mlflow logs metadata, metrics, artifacts to the newly created run.Warning
This method should only be called with the same run_name! This is because of the way mlflow APIs work is by setting an “active run” for which subsequent mlflow logging APIs are made against the current active run in the stack. If you call
mlflow.start_run()directly or pass different run names, then you may be logging into two different mlflow runs from the same job!- Parameters:
run_name – equal to torchx’s run_id
Returns: mlflow’s
Runobject for therun_name
- get_run_id(run_name: str) str[source]¶
Gets the mlflow run’s run_id for the given
run_nameand additionally sets this run as the active run. Hence this method has a side-effect where all subsequent calls to mlflow log APIs are against the run for the givenrun_name.
- lineage(run_id: str) Lineage[source]¶
Return full lineage (parents and consumers) for the given run.
- log_params_flat(run_name: str, cfg: Any, key: str = '') None[source]¶
Designed to be primarily used with hydra-style config objects (e.g. dataclasses), logs the given
cfg, which is one of:@dataclass,Sequence(e.g.list,tuple,set), orMapping(e.g.dict), where the fields ofcfgare flattened recursively and logged as the the run’sParameterin mlflow.For example if
cfgis:@dataclass class Config2: foo: str = "bar" @dataclass class Config: i: int = 1 f: float = 2.1 s: str = "string" l: List[str] = field(default_factory=lambda :["a", "b", "c"]) cfg_list = List[Config2] = field(default_factory=lambda : [Config2(foo="hello"), Config2(foo="world")]) cfg2: Config2 = Config2()
Then this function logs the following parameters
i: "1" f: "2.1" s: "string" l: ["a", "b", "c"] cfg_list._0.foo = "hello" cfg_list._1.foo = "hello" cfg2.foo = "bar"
As shown above, primitive sequence containers are logged directly (e.g.
l: ["a", "b", "c"]) whereas nested sequence containers will be logged per element where the key is suffixed with_{INDEX}(e.g.cfg_list._0.foo = "hello").
CLI Command¶
- class torchx.cli.cmd_tracker.CmdTracker[source]¶
Prototype TorchX tracker subcommand that allows querying data by interacting with tracker implementation.
Important: commands and the arguments may be modified in the future.
- Supported commands:
tracker list jobs [–parent-run-id RUN_ID]
tracker list metadata RUN_ID
tracker list artifacts [–artifact ARTIFACT_NAME] RUN_ID
- add_arguments(subparser: ArgumentParser) None[source]¶
Adds the arguments to this sub command
Environment Variables¶
Set automatically by the runner when trackers are configured.
Variable |
Purpose |
|---|---|
|
The |
|
Comma-separated list of tracker entry-point keys to activate
(e.g. |
|
Per-tracker configuration string passed to the factory function.
|
|
Optional parent run ID for lineage tracking. Set by the runner;
read by |
See also
- Advanced Usage
Entry-point registration for custom trackers, schedulers, and components.
- Tracking
Runtime tracking utilities for use within applications.