diff --git a/pipeline.py b/pipeline.py index 00d3df24..6c656501 100644 --- a/pipeline.py +++ b/pipeline.py @@ -16,6 +16,9 @@ use_secret_as_volume, ) +from utils.consts import RHELAI_IMAGE + + TEACHER_CONFIG_MAP = "teacher-server" TEACHER_SECRET = "teacher-server" JUDGE_CONFIG_MAP = "judge-server" @@ -310,6 +313,7 @@ def pipeline( save_samples=train_save_samples, max_batch_len=train_max_batch_len, seed=train_seed, + image=RHELAI_IMAGE, ) training_phase_1.after(data_processing_task, model_to_pvc_task) training_phase_1.set_caching_options(False) @@ -330,6 +334,7 @@ def pipeline( save_samples=train_save_samples, max_batch_len=train_max_batch_len, seed=train_seed, + image=RHELAI_IMAGE, ) training_phase_2.set_caching_options(False) diff --git a/pipeline.yaml b/pipeline.yaml index ef5d586e..51c79594 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -351,6 +351,8 @@ components: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER + image: + parameterType: STRING input_pvc_name: parameterType: STRING learning_rate: @@ -401,6 +403,8 @@ components: defaultValue: 3840.0 isOptional: true parameterType: NUMBER_INTEGER + image: + parameterType: STRING input_pvc_name: parameterType: STRING learning_rate: @@ -731,32 +735,32 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ - \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ - \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ + \ str,\n output_pvc_name: str,\n name_suffix: str,\n image: str,\n\ + \ # path_to_model: str,\n phase_num: int,\n nproc_per_node: int\ + \ = 3,\n nnodes: int = 2,\n num_epochs: int = 2,\n effective_batch_size:\ + \ int = 3840,\n learning_rate: float = 1e-4,\n num_warmup_steps: int\ + \ = 800,\n save_samples: int = 0,\n max_batch_len: int = 20000,\n\ + \ seed: int = 42,\n):\n import inspect\n import os\n import\ + \ time\n\n import kubernetes\n import urllib3\n import yaml\n\n\ + \ def list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n models = os.listdir(model_dir)\n newest_idx = max(\n \ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(models)\n )[-1]\n newest_model\ + \ = models[newest_idx]\n return f\"{model_dir}/{newest_model}\"\n\ + \n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\n\ + \n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ + \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ + Unsupported value of {phase_num=}\")\n\n manifest = inspect.cleandoc(\n\ + \ f\"\"\"\n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n\ + \ metadata:\n name: {name}\n spec:\n nprocPerNode:\ + \ \\\"{nproc_per_node}\\\"\n pytorchReplicaSpecs:\n \ + \ Master:\n replicas: 1\n restartPolicy: OnFailure\n\ + \ template:\n metadata:\n annotations:\n\ + \ sidecar.istio.io/inject: 'false'\n spec:\n\ + \ containers:\n - args:\n \ + \ - |\n echo \"Running phase {phase_num}\"\ \n echo \"Using {path_to_model} model for training\"\ \n echo \"Using {path_to_data} data for training\"\ \n mkdir -p /output/phase_{phase_num}/model;\n\ @@ -935,32 +939,32 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ \ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\ - \ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\ - \ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\ - \ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\ - \ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \ - \ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\ - \ = 42,\n):\n import inspect\n import os\n import time\n\n import\ - \ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\ - \ model_dir = \"/output/phase_1/model/hf_format\"\n models\ - \ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\ - {model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\ - \ )[-1]\n newest_model = models[newest_idx]\n return\ - \ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\ - \n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ - \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ + \ str,\n output_pvc_name: str,\n name_suffix: str,\n image: str,\n\ + \ # path_to_model: str,\n phase_num: int,\n nproc_per_node: int\ + \ = 3,\n nnodes: int = 2,\n num_epochs: int = 2,\n effective_batch_size:\ + \ int = 3840,\n learning_rate: float = 1e-4,\n num_warmup_steps: int\ + \ = 800,\n save_samples: int = 0,\n max_batch_len: int = 20000,\n\ + \ seed: int = 42,\n):\n import inspect\n import os\n import\ + \ time\n\n import kubernetes\n import urllib3\n import yaml\n\n\ + \ def list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\ + \n models = os.listdir(model_dir)\n newest_idx = max(\n \ + \ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \ + \ for i, model in enumerate(models)\n )[-1]\n newest_model\ + \ = models[newest_idx]\n return f\"{model_dir}/{newest_model}\"\n\ + \n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\n\ + \n if phase_num == 1:\n path_to_model = \"/input_model\"\n \ + \ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\ \ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\ \ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\ - Unsupported value of {phase_num=}\")\n\n image = \"registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\ - \n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\ - \ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \ - \ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\ - \"\n pytorchReplicaSpecs:\n Master:\n replicas:\ - \ 1\n restartPolicy: OnFailure\n template:\n \ - \ metadata:\n annotations:\n \ - \ sidecar.istio.io/inject: 'false'\n spec:\n \ - \ containers:\n - args:\n \ - \ - |\n echo \"Running phase {phase_num}\"\ + Unsupported value of {phase_num=}\")\n\n manifest = inspect.cleandoc(\n\ + \ f\"\"\"\n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n\ + \ metadata:\n name: {name}\n spec:\n nprocPerNode:\ + \ \\\"{nproc_per_node}\\\"\n pytorchReplicaSpecs:\n \ + \ Master:\n replicas: 1\n restartPolicy: OnFailure\n\ + \ template:\n metadata:\n annotations:\n\ + \ sidecar.istio.io/inject: 'false'\n spec:\n\ + \ containers:\n - args:\n \ + \ - |\n echo \"Running phase {phase_num}\"\ \n echo \"Using {path_to_model} model for training\"\ \n echo \"Using {path_to_data} data for training\"\ \n mkdir -p /output/phase_{phase_num}/model;\n\ @@ -1868,6 +1872,9 @@ root: parameters: effective_batch_size: componentInputParameter: train_effective_batch_size_phase_1 + image: + runtimeValue: + constant: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1 input_pvc_name: taskOutputParameter: outputParameterKey: name @@ -1918,6 +1925,9 @@ root: parameters: effective_batch_size: componentInputParameter: train_effective_batch_size_phase_2 + image: + runtimeValue: + constant: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1 input_pvc_name: taskOutputParameter: outputParameterKey: name diff --git a/training/components.py b/training/components.py index 20bf353f..a88250c0 100644 --- a/training/components.py +++ b/training/components.py @@ -126,6 +126,7 @@ def pytorchjob_manifest_op( input_pvc_name: str, output_pvc_name: str, name_suffix: str, + image: str, # path_to_model: str, phase_num: int, nproc_per_node: int = 3, @@ -167,8 +168,6 @@ def list_phase1_final_model(): else: raise RuntimeError(f"Unsupported value of {phase_num=}") - image = "registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1" - manifest = inspect.cleandoc( f""" apiVersion: kubeflow.org/v1 diff --git a/training/faked/components.py b/training/faked/components.py index a5fcff3b..4dddb6d1 100644 --- a/training/faked/components.py +++ b/training/faked/components.py @@ -14,6 +14,7 @@ def pytorchjob_manifest_op( input_pvc_name: str, output_pvc_name: str, name_suffix: str, + image: str, ) -> NamedTuple("outputs", manifest=str, name=str): Outputs = NamedTuple("outputs", manifest=str, name=str) return Outputs("", "")