Skip to content
This repository was archived by the owner on Sep 24, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
use_secret_as_volume,
)

from utils.consts import RHELAI_IMAGE


TEACHER_CONFIG_MAP = "teacher-server"
TEACHER_SECRET = "teacher-server"
JUDGE_CONFIG_MAP = "judge-server"
Expand Down Expand Up @@ -310,6 +313,7 @@ def pipeline(
save_samples=train_save_samples,
max_batch_len=train_max_batch_len,
seed=train_seed,
image=RHELAI_IMAGE,
)
training_phase_1.after(data_processing_task, model_to_pvc_task)
training_phase_1.set_caching_options(False)
Expand All @@ -330,6 +334,7 @@ def pipeline(
save_samples=train_save_samples,
max_batch_len=train_max_batch_len,
seed=train_seed,
image=RHELAI_IMAGE,
)

training_phase_2.set_caching_options(False)
Expand Down
106 changes: 58 additions & 48 deletions pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ components:
defaultValue: 3840.0
isOptional: true
parameterType: NUMBER_INTEGER
image:
parameterType: STRING
input_pvc_name:
parameterType: STRING
learning_rate:
Expand Down Expand Up @@ -401,6 +403,8 @@ components:
defaultValue: 3840.0
isOptional: true
parameterType: NUMBER_INTEGER
image:
parameterType: STRING
input_pvc_name:
parameterType: STRING
learning_rate:
Expand Down Expand Up @@ -731,32 +735,32 @@ deploymentSpec:
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\
\ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\
\ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\
\ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\
\ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \
\ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\
\ = 42,\n):\n import inspect\n import os\n import time\n\n import\
\ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\
\ model_dir = \"/output/phase_1/model/hf_format\"\n models\
\ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\
{model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\
\ )[-1]\n newest_model = models[newest_idx]\n return\
\ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\
\n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \
\ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\
\ str,\n output_pvc_name: str,\n name_suffix: str,\n image: str,\n\
\ # path_to_model: str,\n phase_num: int,\n nproc_per_node: int\
\ = 3,\n nnodes: int = 2,\n num_epochs: int = 2,\n effective_batch_size:\
\ int = 3840,\n learning_rate: float = 1e-4,\n num_warmup_steps: int\
\ = 800,\n save_samples: int = 0,\n max_batch_len: int = 20000,\n\
\ seed: int = 42,\n):\n import inspect\n import os\n import\
\ time\n\n import kubernetes\n import urllib3\n import yaml\n\n\
\ def list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\
\n models = os.listdir(model_dir)\n newest_idx = max(\n \
\ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \
\ for i, model in enumerate(models)\n )[-1]\n newest_model\
\ = models[newest_idx]\n return f\"{model_dir}/{newest_model}\"\n\
\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\n\
\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \
\ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\
\ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\
\ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\
Unsupported value of {phase_num=}\")\n\n image = \"registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\
\n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\
\ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \
\ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\
\"\n pytorchReplicaSpecs:\n Master:\n replicas:\
\ 1\n restartPolicy: OnFailure\n template:\n \
\ metadata:\n annotations:\n \
\ sidecar.istio.io/inject: 'false'\n spec:\n \
\ containers:\n - args:\n \
\ - |\n echo \"Running phase {phase_num}\"\
Unsupported value of {phase_num=}\")\n\n manifest = inspect.cleandoc(\n\
\ f\"\"\"\n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n\
\ metadata:\n name: {name}\n spec:\n nprocPerNode:\
\ \\\"{nproc_per_node}\\\"\n pytorchReplicaSpecs:\n \
\ Master:\n replicas: 1\n restartPolicy: OnFailure\n\
\ template:\n metadata:\n annotations:\n\
\ sidecar.istio.io/inject: 'false'\n spec:\n\
\ containers:\n - args:\n \
\ - |\n echo \"Running phase {phase_num}\"\
\n echo \"Using {path_to_model} model for training\"\
\n echo \"Using {path_to_data} data for training\"\
\n mkdir -p /output/phase_{phase_num}/model;\n\
Expand Down Expand Up @@ -935,32 +939,32 @@ deploymentSpec:
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef pytorchjob_manifest_op(\n model_pvc_name: str,\n input_pvc_name:\
\ str,\n output_pvc_name: str,\n name_suffix: str,\n # path_to_model:\
\ str,\n phase_num: int,\n nproc_per_node: int = 3,\n nnodes: int\
\ = 2,\n num_epochs: int = 2,\n effective_batch_size: int = 3840,\n\
\ learning_rate: float = 1e-4,\n num_warmup_steps: int = 800,\n \
\ save_samples: int = 0,\n max_batch_len: int = 20000,\n seed: int\
\ = 42,\n):\n import inspect\n import os\n import time\n\n import\
\ kubernetes\n import urllib3\n import yaml\n\n def list_phase1_final_model():\n\
\ model_dir = \"/output/phase_1/model/hf_format\"\n models\
\ = os.listdir(model_dir)\n newest_idx = max(\n (os.path.getmtime(f\"\
{model_dir}/{model}\"), i)\n for i, model in enumerate(models)\n\
\ )[-1]\n newest_model = models[newest_idx]\n return\
\ f\"{model_dir}/{newest_model}\"\n\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\
\n\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \
\ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\
\ str,\n output_pvc_name: str,\n name_suffix: str,\n image: str,\n\
\ # path_to_model: str,\n phase_num: int,\n nproc_per_node: int\
\ = 3,\n nnodes: int = 2,\n num_epochs: int = 2,\n effective_batch_size:\
\ int = 3840,\n learning_rate: float = 1e-4,\n num_warmup_steps: int\
\ = 800,\n save_samples: int = 0,\n max_batch_len: int = 20000,\n\
\ seed: int = 42,\n):\n import inspect\n import os\n import\
\ time\n\n import kubernetes\n import urllib3\n import yaml\n\n\
\ def list_phase1_final_model():\n model_dir = \"/output/phase_1/model/hf_format\"\
\n models = os.listdir(model_dir)\n newest_idx = max(\n \
\ (os.path.getmtime(f\"{model_dir}/{model}\"), i)\n \
\ for i, model in enumerate(models)\n )[-1]\n newest_model\
\ = models[newest_idx]\n return f\"{model_dir}/{newest_model}\"\n\
\n name = f\"train-phase-{phase_num}-{name_suffix.rstrip('-sdg')}\"\n\
\n if phase_num == 1:\n path_to_model = \"/input_model\"\n \
\ path_to_data = \"/input_data/knowledge/data.jsonl\"\n elif phase_num\
\ == 2:\n path_to_model = list_phase1_final_model()\n path_to_data\
\ = \"/input_data/skills/data.jsonl\"\n else:\n raise RuntimeError(f\"\
Unsupported value of {phase_num=}\")\n\n image = \"registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1\"\
\n\n manifest = inspect.cleandoc(\n f\"\"\"\n apiVersion:\
\ kubeflow.org/v1\n kind: PyTorchJob\n metadata:\n \
\ name: {name}\n spec:\n nprocPerNode: \\\"{nproc_per_node}\\\
\"\n pytorchReplicaSpecs:\n Master:\n replicas:\
\ 1\n restartPolicy: OnFailure\n template:\n \
\ metadata:\n annotations:\n \
\ sidecar.istio.io/inject: 'false'\n spec:\n \
\ containers:\n - args:\n \
\ - |\n echo \"Running phase {phase_num}\"\
Unsupported value of {phase_num=}\")\n\n manifest = inspect.cleandoc(\n\
\ f\"\"\"\n apiVersion: kubeflow.org/v1\n kind: PyTorchJob\n\
\ metadata:\n name: {name}\n spec:\n nprocPerNode:\
\ \\\"{nproc_per_node}\\\"\n pytorchReplicaSpecs:\n \
\ Master:\n replicas: 1\n restartPolicy: OnFailure\n\
\ template:\n metadata:\n annotations:\n\
\ sidecar.istio.io/inject: 'false'\n spec:\n\
\ containers:\n - args:\n \
\ - |\n echo \"Running phase {phase_num}\"\
\n echo \"Using {path_to_model} model for training\"\
\n echo \"Using {path_to_data} data for training\"\
\n mkdir -p /output/phase_{phase_num}/model;\n\
Expand Down Expand Up @@ -1868,6 +1872,9 @@ root:
parameters:
effective_batch_size:
componentInputParameter: train_effective_batch_size_phase_1
image:
runtimeValue:
constant: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1
input_pvc_name:
taskOutputParameter:
outputParameterKey: name
Expand Down Expand Up @@ -1918,6 +1925,9 @@ root:
parameters:
effective_batch_size:
componentInputParameter: train_effective_batch_size_phase_2
image:
runtimeValue:
constant: registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1
input_pvc_name:
taskOutputParameter:
outputParameterKey: name
Expand Down
3 changes: 1 addition & 2 deletions training/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def pytorchjob_manifest_op(
input_pvc_name: str,
output_pvc_name: str,
name_suffix: str,
image: str,
# path_to_model: str,
phase_num: int,
nproc_per_node: int = 3,
Expand Down Expand Up @@ -167,8 +168,6 @@ def list_phase1_final_model():
else:
raise RuntimeError(f"Unsupported value of {phase_num=}")

image = "registry.stage.redhat.io/rhelai1/instructlab-nvidia-rhel9:1.3.1"

manifest = inspect.cleandoc(
f"""
apiVersion: kubeflow.org/v1
Expand Down
1 change: 1 addition & 0 deletions training/faked/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def pytorchjob_manifest_op(
input_pvc_name: str,
output_pvc_name: str,
name_suffix: str,
image: str,
) -> NamedTuple("outputs", manifest=str, name=str):
Outputs = NamedTuple("outputs", manifest=str, name=str)
return Outputs("", "")
Expand Down
Loading