Skip to content
This repository was archived by the owner on Sep 24, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 70 additions & 61 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
skills_processed_data_to_artifact_op,
)
from utils import (
extract_sdg_to_pvc_op,
get_pvc_name_op,
ilab_importer_op,
model_to_pvc_op,
pvc_to_mmlu_branch_op,
Expand Down Expand Up @@ -73,6 +75,7 @@ def ilab_pipeline(
sdg_sample_size: float = 1.0, # FIXME: Not present in default config. Not configurable upstream at this point, capability added via https://github.com/instructlab/sdg/pull/432
sdg_batch_size: int = 32,
sdg_num_workers: int = 2,
sdg_pregenerated_uri: str = "",
# Training phase
train_tolerations: Optional[list] = None,
train_node_selectors: Optional[dict] = None,
Expand Down Expand Up @@ -130,6 +133,7 @@ def ilab_pipeline(
sdg_sample_size: SDG parameter. Represents the sdg skills recipe sampling size as percentage in decimal form.
sdg_batch_size: SDG parameter. The number of completions per request to the teacher model. Must be a value between 1-4096. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model.
sdg_num_workers: SDG parameter. The number of concurrent workers sending completion requests to the teacher model. Must be a value between 2-10. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model.
sdg_pregenerated_uri: SDG parameter. If specified, the SDG phase is skipped and the URI is used to download the SDG output.

train_tolerations: Training parameter. List of tolerations applied to training pods.
train_node_selectors: Training parameter. A JSON containing node selectors applied to training pods.
Expand Down Expand Up @@ -165,18 +169,6 @@ def ilab_pipeline(
k8s_storage_size: The storage size of the persistent volume used for data passing within the pipeline.
"""
# Pre-requisites check stage
prerequisites_check_task = prerequisites_check_op(
sdg_repo_url=sdg_repo_url,
output_oci_registry_secret=output_oci_registry_secret,
eval_judge_secret=eval_judge_secret,
sdg_teacher_secret=sdg_teacher_secret,
sdg_batch_size=sdg_batch_size,
sdg_num_workers=sdg_num_workers,
output_oci_model_uri=output_oci_model_uri,
output_model_registry_api_url=output_model_registry_api_url,
output_model_name=output_model_name,
output_model_version=output_model_version,
)

# SDG stage
sdg_input_pvc_task = CreatePVC(
Expand All @@ -185,52 +177,69 @@ def ilab_pipeline(
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)
sdg_input_pvc_task.after(prerequisites_check_task)

model_tokenizer_source_task = dsl.importer(
artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model
)
model_tokenizer_source_task.after(prerequisites_check_task)

sdg_task = sdg_op(
num_instructions_to_generate=sdg_scale_factor,
pipeline=sdg_pipeline,
repo_branch=sdg_repo_branch,
repo_pr=sdg_repo_pr,
sdg_sampling_size=sdg_sample_size,
sdg_secret_name=sdg_teacher_secret,
sdg_batch_size=sdg_batch_size,
sdg_num_cpus=sdg_num_workers,
repo_url=sdg_repo_url,
taxonomy_repo_secret=sdg_repo_secret,
tokenizer_model=model_tokenizer_source_task.output,
)
sdg_task.set_env_variable("HOME", "/tmp")
sdg_task.set_env_variable("HF_HOME", "/tmp")

mount_pvc(
task=sdg_task,
pvc_name=sdg_input_pvc_task.output,
mount_path="/data",
)
sdg_task.set_caching_options(False)
sdg_task.after(prerequisites_check_task)

# Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow
taxonomy_to_artifact_task = taxonomy_to_artifact_op()
taxonomy_to_artifact_task.after(sdg_task)
mount_pvc(
task=taxonomy_to_artifact_task,
pvc_name=sdg_input_pvc_task.output,
mount_path="/data",
)
sdg_to_artifact_task = sdg_to_artifact_op()
sdg_to_artifact_task.after(sdg_task)
mount_pvc(
task=sdg_to_artifact_task,
pvc_name=sdg_input_pvc_task.output,
mount_path="/data",
)
# sdg_input_pvc_task.after(prerequisites_check_task)

with dsl.If(sdg_pregenerated_uri == "", "run-sdg"):
model_tokenizer_source_task = dsl.importer(
artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model
)
# model_tokenizer_source_task.after(prerequisites_check_task)
get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output)
get_pvc_name_task.after(model_tokenizer_source_task)
sdg_task = sdg_op(
num_instructions_to_generate=sdg_scale_factor,
pipeline=sdg_pipeline,
repo_branch=sdg_repo_branch,
repo_pr=sdg_repo_pr,
sdg_sampling_size=sdg_sample_size,
sdg_secret_name=sdg_teacher_secret,
sdg_batch_size=sdg_batch_size,
sdg_num_cpus=sdg_num_workers,
repo_url=sdg_repo_url,
taxonomy_repo_secret=sdg_repo_secret,
tokenizer_model=model_tokenizer_source_task.output,
)
sdg_task.set_caching_options(False)
# sdg_task.after(prerequisites_check_task)
sdg_task.set_env_variable("HOME", "/tmp")
sdg_task.set_env_variable("HF_HOME", "/tmp")
mount_pvc(
task=sdg_task,
pvc_name=get_pvc_name_task.output,
mount_path="/data",
)
# Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow
taxonomy_to_artifact_task = taxonomy_to_artifact_op()
taxonomy_to_artifact_task.after(sdg_task)
mount_pvc(
task=taxonomy_to_artifact_task,
pvc_name=get_pvc_name_task.output,
mount_path="/data",
)
sdg_to_artifact_task = sdg_to_artifact_op()
sdg_to_artifact_task.after(sdg_task)
mount_pvc(
task=sdg_to_artifact_task,
pvc_name=get_pvc_name_task.output,
mount_path="/data",
)
with dsl.Else("preload-sdg"):
sdg_source_s3_task = dsl.importer(
artifact_uri=sdg_pregenerated_uri,
artifact_class=dsl.Dataset,
reimport=True,
)
sdg_source_s3_task.set_caching_options(False)
get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output)
get_pvc_name_task.after(sdg_source_s3_task)
sdg_task = extract_sdg_to_pvc_op(sdg=sdg_source_s3_task.output)
sdg_task.after(sdg_source_s3_task)
# sdg_task.after(prerequisites_check_task)
mount_pvc(
task=sdg_task,
pvc_name=get_pvc_name_task.output,
mount_path="/data",
)

# uncomment if updating image with same tag
# set_image_pull_policy(sdg_task, "Always")
Expand All @@ -239,15 +248,15 @@ def ilab_pipeline(
model_source_task = dsl.importer(
artifact_uri=sdg_base_model, artifact_class=dsl.Model
)
model_source_task.after(prerequisites_check_task)
# model_source_task.after(prerequisites_check_task)

model_pvc_task = CreatePVC(
pvc_name_suffix="-model-cache",
access_modes=["ReadWriteMany"],
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)
model_pvc_task.after(prerequisites_check_task)
# model_pvc_task.after(prerequisites_check_task)

model_to_pvc_task = model_to_pvc_op(model=model_source_task.output)
model_to_pvc_task.set_caching_options(False)
Expand Down Expand Up @@ -297,7 +306,7 @@ def ilab_pipeline(
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)
output_pvc_task.after(prerequisites_check_task)
# output_pvc_task.after(prerequisites_check_task)

# Training 1
# Using pvc_create_task.output as PyTorchJob name since dsl.PIPELINE_* global variables do not template/work in KFP v2
Expand Down
Loading