.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "examples_pipelines/kfp/advanced_pipeline.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_examples_pipelines_kfp_advanced_pipeline.py: Advanced KubeFlow Pipelines Example =================================== This is an example pipeline using KubeFlow Pipelines built with only TorchX components. KFP adapters can be used transform the TorchX components directly into something that can be used within KFP. .. GENERATED FROM PYTHON SOURCE LINES 20-23 Input Arguments ############### Lets first define some arguments for the pipeline. .. GENERATED FROM PYTHON SOURCE LINES 23-28 .. code-block:: default import argparse parser = argparse.ArgumentParser(description="example kfp pipeline") .. GENERATED FROM PYTHON SOURCE LINES 29-34 TorchX components are built around images. Depending on what scheduler you're using this can vary but for KFP these images are specified as docker containers. We have one container for the example apps and one for the standard built in apps. If you modify the torchx example code you'll need to rebuild the container before launching it on KFP .. GENERATED FROM PYTHON SOURCE LINES 34-50 .. code-block:: default from torchx.version import TORCHX_IMAGE, EXAMPLES_IMAGE parser.add_argument( "--image", type=str, help="docker image to use for the examples apps", default=EXAMPLES_IMAGE, ) parser.add_argument( "--torchx_image", type=str, help="docker image to use for the builtin torchx apps", default=TORCHX_IMAGE, ) .. GENERATED FROM PYTHON SOURCE LINES 51-55 Most TorchX components use `fsspec `_ to abstract away dealing with remote filesystems. This allows the components to take paths like `s3://` to make it easy to use cloud storage providers. .. GENERATED FROM PYTHON SOURCE LINES 55-72 .. code-block:: default parser.add_argument( "--data_path", type=str, help="path to place the data", required=True, ) parser.add_argument("--load_path", type=str, help="checkpoint path to load from") parser.add_argument( "--output_path", type=str, help="path to place checkpoints and model outputs", required=True, ) parser.add_argument( "--log_path", type=str, help="path to place the tensorboard logs", default="/tmp" ) .. GENERATED FROM PYTHON SOURCE LINES 73-80 This example uses the torchserve for inference so we need to specify some options. This assumes you have a TorchServe instance running in the same Kubernetes cluster with with the service name `torchserve` in the default namespace. See https://github.com/pytorch/serve/blob/master/kubernetes/README.md for info on how to setup TorchServe. .. GENERATED FROM PYTHON SOURCE LINES 80-93 .. code-block:: default parser.add_argument( "--management_api", type=str, help="path to the torchserve management API", default="http://torchserve.default.svc.cluster.local:8081", ) parser.add_argument( "--model_name", type=str, help="the name of the inference model", default="tiny_image_net", ) .. GENERATED FROM PYTHON SOURCE LINES 94-96 Finally, set the output path for the exported KFP pipeline package. This can either be .yaml or .zip. .. GENERATED FROM PYTHON SOURCE LINES 96-103 .. code-block:: default parser.add_argument( "--package_path", type=str, help="path to place the compiled pipeline package", default="pipeline.yaml", ) .. GENERATED FROM PYTHON SOURCE LINES 104-105 notebook. .. GENERATED FROM PYTHON SOURCE LINES 105-120 .. code-block:: default import sys if "NOTEBOOK" in globals(): argv = [ "--data_path", "/tmp/data", "--output_path", "/tmp/output", ] else: argv = sys.argv[1:] args: argparse.Namespace = parser.parse_args(argv) .. GENERATED FROM PYTHON SOURCE LINES 121-130 Creating the Components ####################### The first component we're creating is a data preprocessor. TorchX separates the definitions (component) from the implementation (app) so in our pipeline we just need to define a simple component so TorchX knows how to execute the datapreproc app. datapreproc outputs the data to a specified fsspec path. These paths are all specified ahead of time so we have a fully static pipeline. .. GENERATED FROM PYTHON SOURCE LINES 130-144 .. code-block:: default from torchx import specs from torchx.components.base.binary_component import binary_component datapreproc_app: specs.AppDef = binary_component( name="examples-datapreproc", entrypoint="datapreproc/datapreproc.py", args=[ "--output_path", args.data_path, ], image=args.image, ) .. GENERATED FROM PYTHON SOURCE LINES 145-148 Now that we have the TorchX component we need to adapt it so it can run in KFP via our KFP adapter. component_from_app takes in a TorchX component and returns a KFP component. .. GENERATED FROM PYTHON SOURCE LINES 148-152 .. code-block:: default from torchx.pipelines.kfp.adapter import ContainerFactory, component_from_app datapreproc_comp: ContainerFactory = component_from_app(datapreproc_app) .. GENERATED FROM PYTHON SOURCE LINES 153-155 Next we'll create the trainer component that takes in the training data from the previous datapreproc component. .. GENERATED FROM PYTHON SOURCE LINES 155-174 .. code-block:: default trainer_app: specs.AppDef = binary_component( name="examples-lightning_classy_vision-trainer", entrypoint="lightning_classy_vision/train.py", args=[ "--output_path", args.output_path, "--load_path", args.load_path or "", "--log_path", args.log_path, "--data_path", args.data_path, "--epochs", "1", ], image=args.image, ) .. GENERATED FROM PYTHON SOURCE LINES 175-177 To have the tensorboard path show up in KFPs UI we need to some metadata so KFP knows where to consume the metrics from. .. GENERATED FROM PYTHON SOURCE LINES 177-191 .. code-block:: default import os.path from typing import Dict ui_metadata: Dict[str, object] = { "outputs": [ { "type": "tensorboard", "source": os.path.join(args.log_path, "lightning_logs"), } ] } trainer_comp: ContainerFactory = component_from_app(trainer_app, ui_metadata) .. GENERATED FROM PYTHON SOURCE LINES 192-195 For the inference, we're leveraging one of the builtin TorchX components. This component takes in a model and uploads it to the TorchServe management API endpoints. .. GENERATED FROM PYTHON SOURCE LINES 195-211 .. code-block:: default from torchx.components.serve import torchserve serve_app: specs.AppDef = torchserve( model_path=os.path.join(args.output_path, "model.mar"), management_api=args.management_api, image=args.torchx_image, params={ "model_name": args.model_name, # set this to allocate a worker # "initial_workers": 1, }, ) serve_comp: ContainerFactory = component_from_app(serve_app) .. GENERATED FROM PYTHON SOURCE LINES 212-215 For model interpretability we're leveraging a custom component stored in it's own component file. This component takes in the output from datapreproc and train components and produces images with integrated gradient results. .. GENERATED FROM PYTHON SOURCE LINES 215-230 .. code-block:: default # make sure examples is on the path if "__file__" in globals(): sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "..")) from examples.apps.lightning_classy_vision.component import interpret interpret_app: specs.AppDef = interpret( load_path=os.path.join(args.output_path, "last.ckpt"), data_path=args.data_path, output_path=args.output_path, image=args.image, ) interpret_comp: ContainerFactory = component_from_app(interpret_app) .. GENERATED FROM PYTHON SOURCE LINES 231-242 Pipeline Definition ################### The last step is to define the actual pipeline using the adapted KFP components and export the pipeline package that can be uploaded to a KFP cluster. The KFP adapter currently doesn't track the input and outputs so the containers need to have their dependencies specified via `.after()`. We call `.set_tty()` to make the logs from the components more responsive for example purposes. .. GENERATED FROM PYTHON SOURCE LINES 242-273 .. code-block:: default import kfp def pipeline() -> None: datapreproc = datapreproc_comp() datapreproc.container.set_tty() trainer = trainer_comp() trainer.container.set_tty() trainer.after(datapreproc) serve = serve_comp() serve.container.set_tty() serve.after(trainer) # Serve and interpret only require the trained model so we can run them # in parallel to each other. interpret = interpret_comp() interpret.container.set_tty() interpret.after(trainer) kfp.compiler.Compiler().compile( pipeline_func=pipeline, package_path=args.package_path, ) with open("pipeline.yaml", 'rt') as f: print(f.read()) .. GENERATED FROM PYTHON SOURCE LINES 274-277 Once this has all run you should have a pipeline file (typically pipeline.yaml) that you can upload to your KFP cluster via the UI or a kfp.Client. .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_examples_pipelines_kfp_advanced_pipeline.py: .. only :: html .. container:: sphx-glr-footer :class: sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: advanced_pipeline.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: advanced_pipeline.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_