diff --git a/torchx/cli/cmd_delete.py b/torchx/cli/cmd_delete.py new file mode 100644 index 000000000..cefaa9cdc --- /dev/null +++ b/torchx/cli/cmd_delete.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import argparse +import logging + +from torchx.cli.cmd_base import SubCommand +from torchx.runner import get_runner + +logger: logging.Logger = logging.getLogger(__name__) + + +class CmdDelete(SubCommand): + def add_arguments(self, subparser: argparse.ArgumentParser) -> None: + subparser.add_argument( + "app_handle", + type=str, + help="torchx app handle (e.g. local://session-name/app-id)", + ) + + def run(self, args: argparse.Namespace) -> None: + app_handle = args.app_handle + runner = get_runner() + runner.delete(app_handle) diff --git a/torchx/cli/main.py b/torchx/cli/main.py index f8e07038e..0d9117d0c 100644 --- a/torchx/cli/main.py +++ b/torchx/cli/main.py @@ -16,6 +16,7 @@ from torchx.cli.cmd_base import SubCommand from torchx.cli.cmd_cancel import CmdCancel from torchx.cli.cmd_configure import CmdConfigure +from torchx.cli.cmd_delete import CmdDelete from torchx.cli.cmd_describe import CmdDescribe from torchx.cli.cmd_list import CmdList from torchx.cli.cmd_log import CmdLog @@ -37,6 +38,7 @@ def get_default_sub_cmds() -> Dict[str, SubCommand]: "builtins": CmdBuiltins(), "cancel": CmdCancel(), "configure": CmdConfigure(), + "delete": CmdDelete(), "describe": CmdDescribe(), "list": CmdList(), "log": CmdLog(), diff --git a/torchx/cli/test/cmd_delete_test.py b/torchx/cli/test/cmd_delete_test.py new file mode 100644 index 000000000..7335ff24c --- /dev/null +++ b/torchx/cli/test/cmd_delete_test.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import argparse +import unittest +from unittest.mock import MagicMock, patch + +from torchx.cli.cmd_delete import CmdDelete + + +class CmdDeleteTest(unittest.TestCase): + @patch("torchx.runner.api.Runner.delete") + def test_run(self, delete: MagicMock) -> None: + parser = argparse.ArgumentParser() + cmd_delete = CmdDelete() + cmd_delete.add_arguments(parser) + + args = parser.parse_args(["foo://session/id"]) + cmd_delete.run(args) + + self.assertEqual(delete.call_count, 1) + delete.assert_called_with("foo://session/id") diff --git a/torchx/runner/api.py b/torchx/runner/api.py index baaff0d67..34e375c1b 100644 --- a/torchx/runner/api.py +++ b/torchx/runner/api.py @@ -579,6 +579,16 @@ def cancel(self, app_handle: AppHandle) -> None: if status is not None and not status.is_terminal(): scheduler.cancel(app_id) + def delete(self, app_handle: AppHandle) -> None: + """ + Deletes the application from the scheduler. + """ + scheduler, scheduler_backend, app_id = self._scheduler_app_id(app_handle) + with log_event("delete", scheduler_backend, app_id): + status = self.status(app_handle) + if status is not None: + scheduler.delete(app_id) + def stop(self, app_handle: AppHandle) -> None: """ See method ``cancel``. diff --git a/torchx/runner/test/api_test.py b/torchx/runner/test/api_test.py index ec0db6daa..118057eb8 100644 --- a/torchx/runner/test/api_test.py +++ b/torchx/runner/test/api_test.py @@ -590,6 +590,10 @@ def test_cancel(self, _) -> None: with self.get_runner() as runner: self.assertIsNone(runner.cancel("local_dir://test_session/unknown_app_id")) + def test_delete(self, _) -> None: + with self.get_runner() as runner: + self.assertIsNone(runner.delete("local_dir://test_session/unknown_app_id")) + def test_stop(self, _) -> None: with self.get_runner() as runner: self.assertIsNone(runner.stop("local_dir://test_session/unknown_app_id")) diff --git a/torchx/schedulers/api.py b/torchx/schedulers/api.py index 34d661357..70a501ba4 100644 --- a/torchx/schedulers/api.py +++ b/torchx/schedulers/api.py @@ -264,6 +264,28 @@ def cancel(self, app_id: str) -> None: # do nothing if the app does not exist return + def delete(self, app_id: str) -> None: + """ + Deletes the job information for the specified ``app_id`` from the + scheduler's data-plane. Basically "deep-purging" the job from the + scheduler's data-plane. Calling this API on a "live" job (e.g in a + non-terminal status such as PENDING or RUNNING) cancels the job. + + Note that this API is only relevant for schedulers for which its + data-plane persistently stores the "JobDefinition" (which is often + versioned). AWS Batch and Kubernetes are examples of such schedulers. + On these schedulers, a finished job may fall out of the data-plane + (e.g. really old finished jobs get deleted) but the JobDefinition is + typically permanently stored. In this case, calling + :py:meth:`~cancel` would not delete the job definition. + + In schedulers with no such feature (e.g. SLURM) + :py:meth:`~delete` is the same as :py:meth:`~cancel`, which is the + default implementation. Hence implementors of such schedulers need not + override this method. + """ + self.cancel(app_id) + def log_iter( self, app_id: str, diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 252417dd7..a2e04bd8a 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -622,6 +622,16 @@ class KubernetesScheduler( $ torchx status kubernetes://torchx_user/1234 ... + **Cancellation** + + Canceling a job aborts it while preserving the job spec for inspection + and cloning via kubectl apply. Use the delete command to remove the job entirely: + + .. code-block:: bash + + $ torchx cancel kubernetes://namespace/jobname # abort, preserves spec + $ torchx delete kubernetes://namespace/jobname # delete completely + **Config Options** .. runopts:: @@ -818,6 +828,33 @@ def _validate(self, app: AppDef, scheduler: str, cfg: KubernetesOpts) -> None: pass def _cancel_existing(self, app_id: str) -> None: + """ + Abort a Volcano job while preserving the spec for inspection. + """ + namespace, name = app_id.split(":") + vcjob = self._custom_objects_api().get_namespaced_custom_object( + group="batch.volcano.sh", + version="v1alpha1", + namespace=namespace, + plural="jobs", + name=name, + ) + vcjob["status"]["state"]["phase"] = "Aborted" + self._custom_objects_api().replace_namespaced_custom_object_status( + group="batch.volcano.sh", + version="v1alpha1", + namespace=namespace, + plural="jobs", + name=name, + body=vcjob, + ) + + def delete(self, app_id: str) -> None: + """ + Delete a Volcano job completely from the cluster. + """ + if not self.exists(app_id): + return namespace, name = app_id.split(":") self._custom_objects_api().delete_namespaced_custom_object( group="batch.volcano.sh", diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index b34d25e04..edb643364 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -800,11 +800,26 @@ def test_runopts(self) -> None: }, ) - @patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object") - def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> None: + @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object") + @patch("kubernetes.client.CustomObjectsApi.replace_namespaced_custom_object_status") + def test_cancel_existing( + self, + replace_namespaced_custom_object_status: MagicMock, + get_namespaced_custom_object: MagicMock, + ) -> None: scheduler = create_scheduler("test") + get_namespaced_custom_object.return_value = { + "status": {"state": {"phase": "Running"}} + } scheduler._cancel_existing("testnamespace:testjob") - call = delete_namespaced_custom_object.call_args + get_namespaced_custom_object.assert_called_once_with( + group="batch.volcano.sh", + version="v1alpha1", + namespace="testnamespace", + plural="jobs", + name="testjob", + ) + call = replace_namespaced_custom_object_status.call_args args, kwargs = call self.assertEqual( kwargs, @@ -814,9 +829,26 @@ def test_cancel_existing(self, delete_namespaced_custom_object: MagicMock) -> No "namespace": "testnamespace", "plural": "jobs", "name": "testjob", + "body": {"status": {"state": {"phase": "Aborted"}}}, }, ) + @patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object") + @patch("torchx.schedulers.kubernetes_scheduler.KubernetesScheduler.exists") + def test_delete( + self, exists: MagicMock, delete_namespaced_custom_object: MagicMock + ) -> None: + scheduler = create_scheduler("test") + exists.return_value = True + scheduler.delete("testnamespace:testjob") + delete_namespaced_custom_object.assert_called_once_with( + group="batch.volcano.sh", + version="v1alpha1", + namespace="testnamespace", + plural="jobs", + name="testjob", + ) + @patch("kubernetes.client.CustomObjectsApi.list_namespaced_custom_object") def test_list(self, list_namespaced_custom_object: MagicMock) -> None: with patch(