diff --git a/pipeline.py b/pipeline.py index 170e6333..aa2f7b79 100644 --- a/pipeline.py +++ b/pipeline.py @@ -26,6 +26,8 @@ skills_processed_data_to_artifact_op, ) from utils import ( + extract_sdg_to_pvc_op, + get_pvc_name_op, ilab_importer_op, model_to_pvc_op, pvc_to_mmlu_branch_op, @@ -73,6 +75,7 @@ def ilab_pipeline( sdg_sample_size: float = 1.0, # FIXME: Not present in default config. Not configurable upstream at this point, capability added via https://github.com/instructlab/sdg/pull/432 sdg_batch_size: int = 32, sdg_num_workers: int = 2, + sdg_pregenerated_uri: str = "", # Training phase train_tolerations: Optional[list] = None, train_node_selectors: Optional[dict] = None, @@ -130,6 +133,7 @@ def ilab_pipeline( sdg_sample_size: SDG parameter. Represents the sdg skills recipe sampling size as percentage in decimal form. sdg_batch_size: SDG parameter. The number of completions per request to the teacher model. Must be a value between 1-4096. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model. sdg_num_workers: SDG parameter. The number of concurrent workers sending completion requests to the teacher model. Must be a value between 2-10. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model. + sdg_pregenerated_uri: SDG parameter. If specified, the SDG phase is skipped and the URI is used to download the SDG output. train_tolerations: Training parameter. List of tolerations applied to training pods. train_node_selectors: Training parameter. A JSON containing node selectors applied to training pods. @@ -165,18 +169,6 @@ def ilab_pipeline( k8s_storage_size: The storage size of the persistent volume used for data passing within the pipeline. """ # Pre-requisites check stage - prerequisites_check_task = prerequisites_check_op( - sdg_repo_url=sdg_repo_url, - output_oci_registry_secret=output_oci_registry_secret, - eval_judge_secret=eval_judge_secret, - sdg_teacher_secret=sdg_teacher_secret, - sdg_batch_size=sdg_batch_size, - sdg_num_workers=sdg_num_workers, - output_oci_model_uri=output_oci_model_uri, - output_model_registry_api_url=output_model_registry_api_url, - output_model_name=output_model_name, - output_model_version=output_model_version, - ) # SDG stage sdg_input_pvc_task = CreatePVC( @@ -185,52 +177,69 @@ def ilab_pipeline( size=k8s_storage_size, storage_class_name=k8s_storage_class_name, ) - sdg_input_pvc_task.after(prerequisites_check_task) - - model_tokenizer_source_task = dsl.importer( - artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model - ) - model_tokenizer_source_task.after(prerequisites_check_task) - - sdg_task = sdg_op( - num_instructions_to_generate=sdg_scale_factor, - pipeline=sdg_pipeline, - repo_branch=sdg_repo_branch, - repo_pr=sdg_repo_pr, - sdg_sampling_size=sdg_sample_size, - sdg_secret_name=sdg_teacher_secret, - sdg_batch_size=sdg_batch_size, - sdg_num_cpus=sdg_num_workers, - repo_url=sdg_repo_url, - taxonomy_repo_secret=sdg_repo_secret, - tokenizer_model=model_tokenizer_source_task.output, - ) - sdg_task.set_env_variable("HOME", "/tmp") - sdg_task.set_env_variable("HF_HOME", "/tmp") - - mount_pvc( - task=sdg_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) - sdg_task.set_caching_options(False) - sdg_task.after(prerequisites_check_task) - - # Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow - taxonomy_to_artifact_task = taxonomy_to_artifact_op() - taxonomy_to_artifact_task.after(sdg_task) - mount_pvc( - task=taxonomy_to_artifact_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) - sdg_to_artifact_task = sdg_to_artifact_op() - sdg_to_artifact_task.after(sdg_task) - mount_pvc( - task=sdg_to_artifact_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) + # sdg_input_pvc_task.after(prerequisites_check_task) + + with dsl.If(sdg_pregenerated_uri == "", "run-sdg"): + model_tokenizer_source_task = dsl.importer( + artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model + ) + # model_tokenizer_source_task.after(prerequisites_check_task) + get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output) + get_pvc_name_task.after(model_tokenizer_source_task) + sdg_task = sdg_op( + num_instructions_to_generate=sdg_scale_factor, + pipeline=sdg_pipeline, + repo_branch=sdg_repo_branch, + repo_pr=sdg_repo_pr, + sdg_sampling_size=sdg_sample_size, + sdg_secret_name=sdg_teacher_secret, + sdg_batch_size=sdg_batch_size, + sdg_num_cpus=sdg_num_workers, + repo_url=sdg_repo_url, + taxonomy_repo_secret=sdg_repo_secret, + tokenizer_model=model_tokenizer_source_task.output, + ) + sdg_task.set_caching_options(False) + # sdg_task.after(prerequisites_check_task) + sdg_task.set_env_variable("HOME", "/tmp") + sdg_task.set_env_variable("HF_HOME", "/tmp") + mount_pvc( + task=sdg_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + # Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow + taxonomy_to_artifact_task = taxonomy_to_artifact_op() + taxonomy_to_artifact_task.after(sdg_task) + mount_pvc( + task=taxonomy_to_artifact_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + sdg_to_artifact_task = sdg_to_artifact_op() + sdg_to_artifact_task.after(sdg_task) + mount_pvc( + task=sdg_to_artifact_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + with dsl.Else("preload-sdg"): + sdg_source_s3_task = dsl.importer( + artifact_uri=sdg_pregenerated_uri, + artifact_class=dsl.Dataset, + reimport=True, + ) + sdg_source_s3_task.set_caching_options(False) + get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output) + get_pvc_name_task.after(sdg_source_s3_task) + sdg_task = extract_sdg_to_pvc_op(sdg=sdg_source_s3_task.output) + sdg_task.after(sdg_source_s3_task) + # sdg_task.after(prerequisites_check_task) + mount_pvc( + task=sdg_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) # uncomment if updating image with same tag # set_image_pull_policy(sdg_task, "Always") @@ -239,7 +248,7 @@ def ilab_pipeline( model_source_task = dsl.importer( artifact_uri=sdg_base_model, artifact_class=dsl.Model ) - model_source_task.after(prerequisites_check_task) + # model_source_task.after(prerequisites_check_task) model_pvc_task = CreatePVC( pvc_name_suffix="-model-cache", @@ -247,7 +256,7 @@ def ilab_pipeline( size=k8s_storage_size, storage_class_name=k8s_storage_class_name, ) - model_pvc_task.after(prerequisites_check_task) + # model_pvc_task.after(prerequisites_check_task) model_to_pvc_task = model_to_pvc_op(model=model_source_task.output) model_to_pvc_task.set_caching_options(False) @@ -297,7 +306,7 @@ def ilab_pipeline( size=k8s_storage_size, storage_class_name=k8s_storage_class_name, ) - output_pvc_task.after(prerequisites_check_task) + # output_pvc_task.after(prerequisites_check_task) # Training 1 # Using pvc_create_task.output as PyTorchJob name since dsl.PIPELINE_* global variables do not template/work in KFP v2 diff --git a/pipeline.yaml b/pipeline.yaml index fcfb3fc4..bf538e0f 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -24,6 +24,7 @@ # sdg_max_batch_len: int [Default: 5000.0] # sdg_num_workers: int [Default: 2.0] # sdg_pipeline: str [Default: '/usr/share/instructlab/sdg/pipelines/agentic'] +# sdg_pregenerated_uri: str [Default: ''] # sdg_repo_branch: str [Default: 'main'] # sdg_repo_pr: int [Default: 0.0] # sdg_repo_secret: str [Default: 'taxonomy-repo-secret'] @@ -50,6 +51,241 @@ # train_seed: int [Default: 42.0] # train_tolerations: list components: + comp-condition-2: + dag: + tasks: + get-pvc-name-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-pvc-name-op + dependentTasks: + - importer + inputs: + parameters: + pvc_name: + componentInputParameter: pipelinechannel--createpvc-name + taskInfo: + name: get-pvc-name-op + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + runtimeValue: + constant: oci://quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d + taskInfo: + name: importer + sdg-op: + cachingOptions: {} + componentRef: + name: comp-sdg-op + dependentTasks: + - get-pvc-name-op + - importer + inputs: + artifacts: + tokenizer_model: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + parameters: + num_instructions_to_generate: + componentInputParameter: pipelinechannel--sdg_scale_factor + pipeline: + componentInputParameter: pipelinechannel--sdg_pipeline + repo_branch: + componentInputParameter: pipelinechannel--sdg_repo_branch + repo_pr: + componentInputParameter: pipelinechannel--sdg_repo_pr + repo_url: + componentInputParameter: pipelinechannel--sdg_repo_url + sdg_batch_size: + componentInputParameter: pipelinechannel--sdg_batch_size + sdg_num_cpus: + componentInputParameter: pipelinechannel--sdg_num_workers + sdg_sampling_size: + componentInputParameter: pipelinechannel--sdg_sample_size + sdg_secret_name: + componentInputParameter: pipelinechannel--sdg_teacher_secret + taxonomy_repo_secret: + componentInputParameter: pipelinechannel--sdg_repo_secret + taskInfo: + name: sdg-op + sdg-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-sdg-to-artifact-op + dependentTasks: + - get-pvc-name-op + - sdg-op + taskInfo: + name: sdg-to-artifact-op + taxonomy-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-taxonomy-to-artifact-op + dependentTasks: + - get-pvc-name-op + - sdg-op + taskInfo: + name: taxonomy-to-artifact-op + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_batch_size: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_num_workers: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_pipeline: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + pipelinechannel--sdg_repo_branch: + parameterType: STRING + pipelinechannel--sdg_repo_pr: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_repo_secret: + parameterType: STRING + pipelinechannel--sdg_repo_url: + parameterType: STRING + pipelinechannel--sdg_sample_size: + parameterType: NUMBER_DOUBLE + pipelinechannel--sdg_scale_factor: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_teacher_secret: + parameterType: STRING + comp-condition-3: + dag: + tasks: + extract-sdg-to-pvc-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-extract-sdg-to-pvc-op + dependentTasks: + - get-pvc-name-op-2 + - importer-2 + inputs: + artifacts: + sdg: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer-2 + taskInfo: + name: extract-sdg-to-pvc-op + get-pvc-name-op-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-pvc-name-op-2 + dependentTasks: + - importer-2 + inputs: + parameters: + pvc_name: + componentInputParameter: pipelinechannel--createpvc-name + taskInfo: + name: get-pvc-name-op-2 + importer-2: + cachingOptions: {} + componentRef: + name: comp-importer-2 + inputs: + parameters: + uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + taskInfo: + name: importer-2 + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + comp-condition-branches-1: + dag: + tasks: + condition-2: + componentRef: + name: comp-condition-2 + inputs: + parameters: + pipelinechannel--createpvc-name: + componentInputParameter: pipelinechannel--createpvc-name + pipelinechannel--sdg_batch_size: + componentInputParameter: pipelinechannel--sdg_batch_size + pipelinechannel--sdg_num_workers: + componentInputParameter: pipelinechannel--sdg_num_workers + pipelinechannel--sdg_pipeline: + componentInputParameter: pipelinechannel--sdg_pipeline + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + pipelinechannel--sdg_repo_branch: + componentInputParameter: pipelinechannel--sdg_repo_branch + pipelinechannel--sdg_repo_pr: + componentInputParameter: pipelinechannel--sdg_repo_pr + pipelinechannel--sdg_repo_secret: + componentInputParameter: pipelinechannel--sdg_repo_secret + pipelinechannel--sdg_repo_url: + componentInputParameter: pipelinechannel--sdg_repo_url + pipelinechannel--sdg_sample_size: + componentInputParameter: pipelinechannel--sdg_sample_size + pipelinechannel--sdg_scale_factor: + componentInputParameter: pipelinechannel--sdg_scale_factor + pipelinechannel--sdg_teacher_secret: + componentInputParameter: pipelinechannel--sdg_teacher_secret + taskInfo: + name: run-sdg + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--sdg_pregenerated_uri'] + == '' + condition-3: + componentRef: + name: comp-condition-3 + inputs: + parameters: + pipelinechannel--createpvc-name: + componentInputParameter: pipelinechannel--createpvc-name + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + taskInfo: + name: preload-sdg + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--sdg_pregenerated_uri''] + == '''')' + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_batch_size: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_num_workers: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_pipeline: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + pipelinechannel--sdg_repo_branch: + parameterType: STRING + pipelinechannel--sdg_repo_pr: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_repo_secret: + parameterType: STRING + pipelinechannel--sdg_repo_url: + parameterType: STRING + pipelinechannel--sdg_sample_size: + parameterType: NUMBER_DOUBLE + pipelinechannel--sdg_scale_factor: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_teacher_secret: + parameterType: STRING comp-createpvc: executorLabel: exec-createpvc inputDefinitions: @@ -285,6 +521,19 @@ components: description: Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. parameterType: STRING + comp-extract-sdg-to-pvc-op: + executorLabel: exec-extract-sdg-to-pvc-op + inputDefinitions: + artifacts: + sdg: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + pvc_path: + defaultValue: /data + isOptional: true + parameterType: STRING comp-generate-metrics-report-op: executorLabel: exec-generate-metrics-report-op outputDefinitions: @@ -293,6 +542,26 @@ components: artifactType: schemaTitle: system.Metrics schemaVersion: 0.0.1 + comp-get-pvc-name-op: + executorLabel: exec-get-pvc-name-op + inputDefinitions: + parameters: + pvc_name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-get-pvc-name-op-2: + executorLabel: exec-get-pvc-name-op-2 + inputDefinitions: + parameters: + pvc_name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING comp-importer: executorLabel: exec-importer inputDefinitions: @@ -307,6 +576,18 @@ components: schemaVersion: 0.0.1 comp-importer-2: executorLabel: exec-importer-2 + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-importer-3: + executorLabel: exec-importer-3 inputDefinitions: parameters: uri: @@ -344,95 +625,6 @@ components: defaultValue: /model isOptional: true parameterType: STRING - comp-prerequisites-check-op: - dag: - tasks: - test-model-connection: - cachingOptions: {} - componentRef: - name: comp-test-model-connection - inputs: - parameters: - secret_name: - componentInputParameter: eval_judge_secret - taskInfo: - name: test-model-connection - test-model-connection-2: - cachingOptions: {} - componentRef: - name: comp-test-model-connection-2 - inputs: - parameters: - secret_name: - componentInputParameter: sdg_teacher_secret - taskInfo: - name: test-model-connection-2 - test-model-registry: - cachingOptions: {} - componentRef: - name: comp-test-model-registry - inputs: - parameters: - model_name: - componentInputParameter: output_model_name - model_registry_endpoint: - componentInputParameter: output_model_registry_api_url - model_version: - componentInputParameter: output_model_version - taskInfo: - name: test-model-registry - test-oci-model: - cachingOptions: {} - componentRef: - name: comp-test-oci-model - inputs: - parameters: - output_oci_model_uri: - componentInputParameter: output_oci_model_uri - output_oci_registry_secret: - componentInputParameter: output_oci_registry_secret - taskInfo: - name: test-oci-model - test-sdg-params: - cachingOptions: {} - componentRef: - name: comp-test-sdg-params - inputs: - parameters: - sdg_batch_size: - componentInputParameter: sdg_batch_size - sdg_num_workers: - componentInputParameter: sdg_num_workers - taskInfo: - name: test-sdg-params - test-training-operator: - cachingOptions: {} - componentRef: - name: comp-test-training-operator - taskInfo: - name: test-training-operator - inputDefinitions: - parameters: - eval_judge_secret: - parameterType: STRING - output_model_name: - parameterType: STRING - output_model_registry_api_url: - parameterType: STRING - output_model_version: - parameterType: STRING - output_oci_model_uri: - parameterType: STRING - output_oci_registry_secret: - parameterType: STRING - sdg_batch_size: - parameterType: NUMBER_INTEGER - sdg_num_workers: - parameterType: NUMBER_INTEGER - sdg_repo_url: - parameterType: STRING - sdg_teacher_secret: - parameterType: STRING comp-pvc-to-mmlu-branch-op: executorLabel: exec-pvc-to-mmlu-branch-op inputDefinitions: @@ -770,46 +962,6 @@ components: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 - comp-test-model-connection: - executorLabel: exec-test-model-connection - inputDefinitions: - parameters: - secret_name: - parameterType: STRING - comp-test-model-connection-2: - executorLabel: exec-test-model-connection-2 - inputDefinitions: - parameters: - secret_name: - parameterType: STRING - comp-test-model-registry: - executorLabel: exec-test-model-registry - inputDefinitions: - parameters: - model_name: - parameterType: STRING - model_registry_endpoint: - parameterType: STRING - model_version: - parameterType: STRING - comp-test-oci-model: - executorLabel: exec-test-oci-model - inputDefinitions: - parameters: - output_oci_model_uri: - parameterType: STRING - output_oci_registry_secret: - parameterType: STRING - comp-test-sdg-params: - executorLabel: exec-test-sdg-params - inputDefinitions: - parameters: - sdg_batch_size: - parameterType: NUMBER_INTEGER - sdg_num_workers: - parameterType: NUMBER_INTEGER - comp-test-training-operator: - executorLabel: exec-test-training-operator comp-upload-model-op: executorLabel: exec-upload-model-op inputDefinitions: @@ -942,6 +1094,39 @@ deploymentSpec: exec-deletepvc-3: container: image: argostub/deletepvc + exec-extract-sdg-to-pvc-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - extract_sdg_to_pvc_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef extract_sdg_to_pvc_op(sdg: dsl.Input[dsl.Dataset], pvc_path:\ + \ str = \"/data\"):\n import os\n import os.path\n import tarfile\n\ + \n sdg_dir = os.path.join(pvc_path, \"sdg\")\n\n os.makedirs(sdg_dir,\ + \ exist_ok=True)\n\n print(f\"Extracting {sdg.path} to {sdg_dir}\")\n\ + \ with tarfile.open(sdg.path, \"r:gz\") as tar:\n tar.extractall(path=sdg_dir)\n\ + \n" + image: python:3.9 exec-generate-metrics-report-op: container: args: @@ -975,6 +1160,64 @@ deploymentSpec: \ f\"{report}_base_model_score\", report_data[\"base_model_score\"\ ]\n )\n\n" image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 + exec-get-pvc-name-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_pvc_name_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_pvc_name_op(pvc_name: str) -> str:\n return pvc_name\n\ + \n" + image: python:3.9 + exec-get-pvc-name-op-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_pvc_name_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_pvc_name_op(pvc_name: str) -> str:\n return pvc_name\n\ + \n" + image: python:3.9 exec-importer: importer: artifactUri: @@ -983,6 +1226,14 @@ deploymentSpec: schemaTitle: system.Model schemaVersion: 0.0.1 exec-importer-2: + importer: + artifactUri: + runtimeParameter: uri + reimport: true + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-importer-3: importer: artifactUri: runtimeParameter: uri @@ -2057,320 +2308,6 @@ deploymentSpec: - /bin/sh - -c image: registry.redhat.io/ubi9/toolbox@sha256:da31dee8904a535d12689346e65e5b00d11a6179abf1fa69b548dbd755fa2770 - exec-test-model-connection: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_model_connection - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_model_connection(secret_name: str):\n import base64\n\ - \ import ssl\n import sys\n import textwrap\n import time\n\n\ - \ import httpx\n from kubernetes import client, config\n from kubernetes.client.rest\ - \ import ApiException\n\n config.load_incluster_config()\n\n model_endpoint\ - \ = \"\"\n model_name = \"\"\n model_api_key = \"\"\n with open(\n\ - \ \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\", \"\ - r\"\n ) as namespace_path:\n namespace = namespace_path.readline()\n\ - \n with client.ApiClient() as api_client:\n core_api = client.CoreV1Api(api_client)\n\ - \n try:\n secret = core_api.read_namespaced_secret(secret_name,\ - \ namespace)\n print(f\"Reading secret {secret_name} data...\"\ - )\n model_api_key = base64.b64decode(secret.data[\"api_token\"\ - ]).decode(\"utf-8\")\n model_name = base64.b64decode(secret.data[\"\ - model_name\"]).decode(\"utf-8\")\n model_endpoint = base64.b64decode(secret.data[\"\ - endpoint\"]).decode(\"utf-8\")\n except (ApiException, KeyError)\ - \ as e:\n print(f\"\"\"\n ############################################\ - \ ERROR #####################################################\n \ - \ # Error reading {secret_name}. Ensure you created a secret with this\ - \ name in namespace {namespace} and #\n # has 'api_key', 'model_name',\ - \ and 'endpoint' present \ - \ #\n ########################################################################################################\n\ - \ \"\"\")\n sys.exit(1)\n\n request_auth = {\"\ - Authorization\": f\"Bearer {model_api_key}\"}\n request_body = {\n \ - \ \"model\": model_name,\n \"messages\": [{\"role\": \"user\"\ - , \"content\": \"tell me a funny joke.\"}],\n }\n\n # Use the default\ - \ SSL context since it leverages OpenSSL to use the correct CA bundle.\n\ - \ http_client = httpx.Client(verify=ssl.create_default_context())\n\n\ - \ # Make 3 attempts\n for i in range(1, 3):\n resp = http_client.post(\n\ - \ f\"{model_endpoint}/chat/completions\",\n headers=request_auth,\n\ - \ json=request_body,\n )\n if resp.status_code\ - \ != 200:\n print(f\"Model Server {model_name} is not available.\ - \ Attempt {i}/3...\")\n time.sleep(5)\n else:\n \ - \ print(\n textwrap.dedent(f\"\"\"\\\n ###################\ - \ INFO #######################\n # Model Server {model_name}\ - \ is up and running. #\n ################################################\\\ - \n \"\"\")\n )\n return\n print(\n \ - \ textwrap.dedent(f\"\"\"\\\n ############################################\ - \ ERROR ####################################################\n # Model\ - \ Server {model_name} is unavailable. Ensure the model is up and it is ready\ - \ to serve requests. #\n #######################################################################################################\\\ - \n \"\"\")\n )\n sys.exit(1)\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - exec-test-model-connection-2: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_model_connection - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_model_connection(secret_name: str):\n import base64\n\ - \ import ssl\n import sys\n import textwrap\n import time\n\n\ - \ import httpx\n from kubernetes import client, config\n from kubernetes.client.rest\ - \ import ApiException\n\n config.load_incluster_config()\n\n model_endpoint\ - \ = \"\"\n model_name = \"\"\n model_api_key = \"\"\n with open(\n\ - \ \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\", \"\ - r\"\n ) as namespace_path:\n namespace = namespace_path.readline()\n\ - \n with client.ApiClient() as api_client:\n core_api = client.CoreV1Api(api_client)\n\ - \n try:\n secret = core_api.read_namespaced_secret(secret_name,\ - \ namespace)\n print(f\"Reading secret {secret_name} data...\"\ - )\n model_api_key = base64.b64decode(secret.data[\"api_token\"\ - ]).decode(\"utf-8\")\n model_name = base64.b64decode(secret.data[\"\ - model_name\"]).decode(\"utf-8\")\n model_endpoint = base64.b64decode(secret.data[\"\ - endpoint\"]).decode(\"utf-8\")\n except (ApiException, KeyError)\ - \ as e:\n print(f\"\"\"\n ############################################\ - \ ERROR #####################################################\n \ - \ # Error reading {secret_name}. Ensure you created a secret with this\ - \ name in namespace {namespace} and #\n # has 'api_key', 'model_name',\ - \ and 'endpoint' present \ - \ #\n ########################################################################################################\n\ - \ \"\"\")\n sys.exit(1)\n\n request_auth = {\"\ - Authorization\": f\"Bearer {model_api_key}\"}\n request_body = {\n \ - \ \"model\": model_name,\n \"messages\": [{\"role\": \"user\"\ - , \"content\": \"tell me a funny joke.\"}],\n }\n\n # Use the default\ - \ SSL context since it leverages OpenSSL to use the correct CA bundle.\n\ - \ http_client = httpx.Client(verify=ssl.create_default_context())\n\n\ - \ # Make 3 attempts\n for i in range(1, 3):\n resp = http_client.post(\n\ - \ f\"{model_endpoint}/chat/completions\",\n headers=request_auth,\n\ - \ json=request_body,\n )\n if resp.status_code\ - \ != 200:\n print(f\"Model Server {model_name} is not available.\ - \ Attempt {i}/3...\")\n time.sleep(5)\n else:\n \ - \ print(\n textwrap.dedent(f\"\"\"\\\n ###################\ - \ INFO #######################\n # Model Server {model_name}\ - \ is up and running. #\n ################################################\\\ - \n \"\"\")\n )\n return\n print(\n \ - \ textwrap.dedent(f\"\"\"\\\n ############################################\ - \ ERROR ####################################################\n # Model\ - \ Server {model_name} is unavailable. Ensure the model is up and it is ready\ - \ to serve requests. #\n #######################################################################################################\\\ - \n \"\"\")\n )\n sys.exit(1)\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - exec-test-model-registry: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_model_registry - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_model_registry(\n model_registry_endpoint: Optional[str],\n\ - \ model_name: Optional[str],\n model_version: Optional[str],\n):\n\ - \ import sys\n import textwrap\n import urllib\n\n from model_registry\ - \ import ModelRegistry\n from model_registry.exceptions import StoreError\n\ - \n if not model_registry_endpoint:\n print(\n textwrap.dedent(f\"\ - \"\"\\\n ########################### INFO ##############################\n\ - \ # Model Registry endpoint is not provided. Skipping this step #\n\ - \ ###############################################################\\\ - \n \"\"\")\n )\n return\n\n try:\n # Extract\ - \ the port out of the URL because the ModelRegistry client expects those\ - \ as separate arguments.\n model_registry_endpoint = model_registry_endpoint.rstrip(\"\ - /\")\n model_registry_api_url_parsed = urllib.parse.urlparse(model_registry_endpoint)\n\ - \ model_registry_api_url_port = model_registry_api_url_parsed.port\n\ - \n if model_registry_api_url_port:\n model_registry_api_server_address\ - \ = model_registry_endpoint.replace(\n model_registry_api_url_parsed.netloc,\n\ - \ model_registry_api_url_parsed.hostname,\n )\n\ - \ else:\n if model_registry_api_url_parsed.scheme == \"\ - http\":\n model_registry_api_url_port = 80\n else:\n\ - \ model_registry_api_url_port = 443\n\n model_registry_api_server_address\ - \ = model_registry_endpoint\n\n if not model_registry_api_url_parsed.scheme:\n\ - \ model_registry_api_server_address = (\n \"https://\"\ - \ + model_registry_api_server_address\n )\n\n with open(\n\ - \ \"/var/run/secrets/kubernetes.io/serviceaccount/token\", \"\ - r\"\n ) as token_file:\n token = token_file.readline()\n\ - \n registry = ModelRegistry(\n server_address=model_registry_api_server_address,\n\ - \ port=model_registry_api_url_port,\n author=\"InstructLab\ - \ Pipeline\",\n user_token=token,\n )\n if registry.get_model_version(model_name,\ - \ model_version) is not None:\n print(f\"\"\"\n ######################################################\ - \ ERROR ######################################################\n \ - \ # The version {model_version} for model {model_name} is already registered.\ - \ You cannot overwrite a model version. #\n ###################################################################################################################\n\ - \ \"\"\")\n sys.exit(1)\n except StoreError as\ - \ store:\n # The model has no versions registered.\n # Do\ - \ nothing, just to avoid this exception to return failure\n print(\n\ - \ textwrap.dedent(f\"\"\"\\\n ########### INFO ##############\n\ - \ # Model Registry is available #\n ###############################\\\ - \n \"\"\")\n )\n sys.exit(0)\n except Exception\ - \ as e:\n print(\n textwrap.dedent(f\"\"\"\\\n \ - \ ############# ERROR ###############\n # Model Registry is not available\ - \ #\n ###################################\\\n \"\"\")\n \ - \ )\n raise\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - exec-test-oci-model: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_oci_model - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_oci_model(output_oci_model_uri: str, output_oci_registry_secret:\ - \ str):\n import base64\n import json\n import sys\n import\ - \ textwrap\n\n from kubernetes import client, config\n from kubernetes.client.rest\ - \ import ApiException\n\n with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\"\n ) as namespace_path:\n namespace = namespace_path.readline()\n\ - \ config.load_incluster_config()\n\n if output_oci_model_uri is None:\n\ - \ print(\n textwrap.dedent(f\"\"\"\\\n ##############################\ - \ INFO ##################################\n # Parameter output_oci_model_uri\ - \ not provided. Skipping this step... #\n ######################################################################\\\ - \n \"\"\")\n )\n sys.exit(0)\n\n # Extract from\ - \ sdg_base_model parameter the registry name\n registry_name = output_oci_model_uri.replace(\"\ - oci://\", \"\").split(\"/\")[0]\n\n with client.ApiClient() as api_client:\n\ - \ core_api = client.CoreV1Api(api_client)\n try:\n \ - \ secret = core_api.read_namespaced_secret(\n output_oci_registry_secret,\ - \ namespace\n )\n print(f\"Reading secret {output_oci_registry_secret}\ - \ data...\")\n if secret.type == \"kubernetes.io/dockerconfigjson\"\ - :\n # handle authentication if secret provided is kubernetes.io/dockerconfigjson\n\ - \ docker_config_json = json.loads(\n base64.b64decode(secret.data[\"\ - .dockerconfigjson\"]).decode(\"utf-8\")\n )\n \ - \ print(\n textwrap.dedent(f\"\"\"\\\n \ - \ ############## INFO #################\n # OCI Secret\ - \ has auth token present #\n #####################################\\\ - \n \"\"\")\n )\n elif secret.type\ - \ == \"kubernetes.io/dockercfg\":\n # handle authentication\ - \ if secret provided is kubernetes.io/dockercfg\n dockercfg_json\ - \ = json.loads(\n base64.b64decode(secret.data[\".dockercfg\"\ - ]).decode(\"utf-8\")\n )\n print(\n \ - \ textwrap.dedent(f\"\"\"\\\n ##############\ - \ INFO #################\n # OCI Secret has auth token present\ - \ #\n #####################################\\\n \ - \ \"\"\")\n )\n except ApiException as e:\n\ - \ print(\n textwrap.dedent(f\"\"\"\\\n \ - \ ################################################# ERROR ###################################################################\n\ - \ # Secret {output_oci_registry_secret} does not exist. Ensure\ - \ you created a secret with this name in namespace {namespace} #\n \ - \ ###########################################################################################################################\\\ - \n \"\"\")\n )\n sys.exit(1)\n except\ - \ Exception as e:\n print(\n textwrap.dedent(f\"\ - \"\"\\\n ################## ERROR ##################\n \ - \ # Failed to check oci model and/or secret #\n ###########################################\\\ - \n \"\"\")\n )\n raise\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - exec-test-sdg-params: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_sdg_params - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_sdg_params(sdg_batch_size: int, sdg_num_workers: int):\n\ - \ import sys\n import textwrap\n\n if (\n sdg_batch_size\ - \ < 1\n or sdg_batch_size > 4096\n or sdg_num_workers < 2\n\ - \ or sdg_num_workers > 10\n ):\n print(\n textwrap.dedent(\n\ - \ f\"\"\"\\\n ##############################################\ - \ ERROR ##############################################\n \ - \ # sdg_batch_size must be a value between 1-4096 and sdg_num_workers must\ - \ be a value between 2-10 #\n ###################################################################################################\\\ - \n \"\"\"\n )\n )\n sys.exit(1)\n\ - \n print(\n textwrap.dedent(\n f\"\"\"\\\n \ - \ ############# INFO #############\n # The SDG parameters\ - \ seem okay #\n ################################\\\n \ - \ \"\"\"\n )\n )\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - exec-test-training-operator: - container: - args: - - --executor_input - - '{{$}}' - - --function_to_execute - - test_training_operator - command: - - sh - - -ec - - 'program_path=$(mktemp -d) - - - printf "%s" "$0" > "$program_path/ephemeral_component.py" - - _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" - - ' - - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef test_training_operator():\n import sys\n import textwrap\n\ - \n from kubernetes import client, config\n from kubernetes.client.rest\ - \ import ApiException\n\n with open(\n \"/var/run/secrets/kubernetes.io/serviceaccount/namespace\"\ - , \"r\"\n ) as namespace_path:\n namespace = namespace_path.readline()\n\ - \ config.load_incluster_config()\n\n with client.ApiClient() as api_client:\n\ - \ api_instance = client.CustomObjectsApi(api_client)\n\n group\ - \ = \"kubeflow.org\"\n version = \"v1\"\n plural = \"pytorchjobs\"\ - \n\n try:\n api_response = api_instance.list_namespaced_custom_object(\n\ - \ group, version, namespace, plural\n )\n \ - \ print(\n textwrap.dedent(\"\"\"\\\n #########################\ - \ INFO ###########################\n # Kubeflow Training Operator\ - \ PyTorchJob CRD is available #\n ##########################################################\\\ - \n \"\"\")\n )\n except ApiException as e:\n\ - \ print(\n textwrap.dedent(\"\"\"\\\n \ - \ #################################################### ERROR ######################################################################\n\ - \ # Kubeflow Training Operator PyTorchJob CRD is unavailable.\ - \ Ensure your OpenShift AI installation has Training Operator enabled #\n\ - \ #################################################################################################################################\\\ - \n \"\"\")\n )\n sys.exit(1)\n\n" - image: quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d exec-upload-model-op: container: args: @@ -2511,13 +2448,46 @@ pipelineInfo: root: dag: tasks: + condition-branches-1: + componentRef: + name: comp-condition-branches-1 + dependentTasks: + - createpvc + inputs: + parameters: + pipelinechannel--createpvc-name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + pipelinechannel--sdg_batch_size: + componentInputParameter: sdg_batch_size + pipelinechannel--sdg_num_workers: + componentInputParameter: sdg_num_workers + pipelinechannel--sdg_pipeline: + componentInputParameter: sdg_pipeline + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: sdg_pregenerated_uri + pipelinechannel--sdg_repo_branch: + componentInputParameter: sdg_repo_branch + pipelinechannel--sdg_repo_pr: + componentInputParameter: sdg_repo_pr + pipelinechannel--sdg_repo_secret: + componentInputParameter: sdg_repo_secret + pipelinechannel--sdg_repo_url: + componentInputParameter: sdg_repo_url + pipelinechannel--sdg_sample_size: + componentInputParameter: sdg_sample_size + pipelinechannel--sdg_scale_factor: + componentInputParameter: sdg_scale_factor + pipelinechannel--sdg_teacher_secret: + componentInputParameter: sdg_teacher_secret + taskInfo: + name: condition-branches-1 createpvc: cachingOptions: enableCache: true componentRef: name: comp-createpvc - dependentTasks: - - prerequisites-check-op inputs: parameters: access_modes: @@ -2538,8 +2508,6 @@ root: enableCache: true componentRef: name: comp-createpvc-2 - dependentTasks: - - prerequisites-check-op inputs: parameters: access_modes: @@ -2560,8 +2528,6 @@ root: enableCache: true componentRef: name: comp-createpvc-3 - dependentTasks: - - prerequisites-check-op inputs: parameters: access_modes: @@ -2582,10 +2548,10 @@ root: componentRef: name: comp-data-processing-op dependentTasks: + - condition-branches-1 - createpvc - createpvc-2 - model-to-pvc-op - - sdg-op inputs: parameters: max_batch_len: @@ -2653,33 +2619,17 @@ root: - run-final-eval-op taskInfo: name: generate-metrics-report-op - importer: - cachingOptions: - enableCache: true - componentRef: - name: comp-importer - dependentTasks: - - prerequisites-check-op - inputs: - parameters: - uri: - runtimeValue: - constant: oci://quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - taskInfo: - name: importer - importer-2: + importer-3: cachingOptions: enableCache: true componentRef: - name: comp-importer-2 - dependentTasks: - - prerequisites-check-op + name: comp-importer-3 inputs: parameters: uri: componentInputParameter: sdg_base_model taskInfo: - name: importer-2 + name: importer-3 knowledge-processed-data-to-artifact-op: cachingOptions: {} componentRef: @@ -2695,44 +2645,15 @@ root: name: comp-model-to-pvc-op dependentTasks: - createpvc-2 - - importer-2 + - importer-3 inputs: artifacts: model: taskOutputArtifact: outputArtifactKey: artifact - producerTask: importer-2 + producerTask: importer-3 taskInfo: name: model-to-pvc-op - prerequisites-check-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-prerequisites-check-op - inputs: - parameters: - eval_judge_secret: - componentInputParameter: eval_judge_secret - output_model_name: - componentInputParameter: output_model_name - output_model_registry_api_url: - componentInputParameter: output_model_registry_api_url - output_model_version: - componentInputParameter: output_model_version - output_oci_model_uri: - componentInputParameter: output_oci_model_uri - output_oci_registry_secret: - componentInputParameter: output_oci_registry_secret - sdg_batch_size: - componentInputParameter: sdg_batch_size - sdg_num_workers: - componentInputParameter: sdg_num_workers - sdg_repo_url: - componentInputParameter: sdg_repo_url - sdg_teacher_secret: - componentInputParameter: sdg_teacher_secret - taskInfo: - name: prerequisites-check-op pvc-to-mmlu-branch-op: cachingOptions: {} componentRef: @@ -2965,53 +2886,6 @@ root: componentInputParameter: eval_gpu_identifier taskInfo: name: run-mt-bench-op - sdg-op: - cachingOptions: {} - componentRef: - name: comp-sdg-op - dependentTasks: - - createpvc - - importer - - prerequisites-check-op - inputs: - artifacts: - tokenizer_model: - taskOutputArtifact: - outputArtifactKey: artifact - producerTask: importer - parameters: - num_instructions_to_generate: - componentInputParameter: sdg_scale_factor - pipeline: - componentInputParameter: sdg_pipeline - repo_branch: - componentInputParameter: sdg_repo_branch - repo_pr: - componentInputParameter: sdg_repo_pr - repo_url: - componentInputParameter: sdg_repo_url - sdg_batch_size: - componentInputParameter: sdg_batch_size - sdg_num_cpus: - componentInputParameter: sdg_num_workers - sdg_sampling_size: - componentInputParameter: sdg_sample_size - sdg_secret_name: - componentInputParameter: sdg_teacher_secret - taxonomy_repo_secret: - componentInputParameter: sdg_repo_secret - taskInfo: - name: sdg-op - sdg-to-artifact-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-sdg-to-artifact-op - dependentTasks: - - createpvc - - sdg-op - taskInfo: - name: sdg-to-artifact-op skills-processed-data-to-artifact-op: cachingOptions: {} componentRef: @@ -3021,16 +2895,6 @@ root: - data-processing-op taskInfo: name: skills-processed-data-to-artifact-op - taxonomy-to-artifact-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-taxonomy-to-artifact-op - dependentTasks: - - createpvc - - sdg-op - taskInfo: - name: taxonomy-to-artifact-op upload-model-op: cachingOptions: {} componentRef: @@ -3204,6 +3068,12 @@ root: Note that ''full'' requires a larger teacher model, Mixtral-8x7b.' isOptional: true parameterType: STRING + sdg_pregenerated_uri: + defaultValue: '' + description: SDG parameter. If specified, the SDG phase is skipped and the + URI is used to download the SDG output. + isOptional: true + parameterType: STRING sdg_repo_branch: defaultValue: main description: SDG parameter. Points to a branch within the taxonomy git repository. @@ -3366,6 +3236,12 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc + exec-extract-sdg-to-pvc-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: Output + producerTask: get-pvc-name-op-2 exec-generate-metrics-report-op: pvcMount: - mountPath: /output @@ -3432,14 +3308,14 @@ platforms: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-sdg-to-artifact-op: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-skills-processed-data-to-artifact-op: pvcMount: - mountPath: /data @@ -3450,8 +3326,8 @@ platforms: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-upload-model-op: pvcMount: - mountPath: /output diff --git a/training/components.py b/training/components.py index 1a0a6522..d6ce76de 100644 --- a/training/components.py +++ b/training/components.py @@ -152,6 +152,7 @@ def pytorch_job_launcher_op( ): import logging import os + import json from kubeflow.training import TrainingClient, models from kubeflow.training.constants.constants import ISTIO_SIDECAR_INJECTION @@ -266,6 +267,17 @@ def list_phase1_final_model(): claim_name=output_pvc_name ), ), + models.V1Volume( + name="shm-volume", + empty_dir=models.V1EmptyDirVolumeSource( + medium="Memory", + size_limit="20Gi" + ), + ), + models.V1Volume( + name="shared-volume", + empty_dir=models.V1EmptyDirVolumeSource(), + ), ] # Set volume mounts @@ -275,6 +287,8 @@ def list_phase1_final_model(): ), models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), models.V1VolumeMount(mount_path="/output", name="output"), + models.V1VolumeMount(mount_path="/dev/shm", name="shm-volume"), + models.V1VolumeMount(mount_path="/mnt/shared", name="shared-volume"), ] volume_mounts_worker = [ @@ -283,6 +297,8 @@ def list_phase1_final_model(): ), models.V1VolumeMount(mount_path="/input_model", name="model", read_only=True), models.V1VolumeMount(mount_path="/output", name="output", read_only=True), + models.V1VolumeMount(mount_path="/dev/shm", name="shm-volume"), + models.V1VolumeMount(mount_path="/mnt/shared", name="shared-volume"), ] # Set env variables @@ -291,10 +307,23 @@ def list_phase1_final_model(): models.V1EnvVar(name="NPROC_PER_NODE", value=f"{nproc_per_node}"), models.V1EnvVar(name="XDG_CACHE_HOME", value="/tmp"), models.V1EnvVar(name="TRITON_CACHE_DIR", value="/tmp"), + models.V1EnvVar(name="TRITON_HOME", value="/tmp"), + models.V1EnvVar(name="TRITON_DUMP_DIR", value="/tmp"), + models.V1EnvVar(name="TRITON_OVERRIDE_DIR", value="/tmp"), models.V1EnvVar(name="HF_HOME", value="/tmp"), models.V1EnvVar(name="TRANSFORMERS_CACHE", value="/tmp"), ] + # Set up secondary network interfaces + _namespace = "ilab-training-dsp" + network_annotation_value = [{"name": f"network-port-{i}", "namespace": _namespace} for i in range(15,21)] + network_annotation_str = json.dumps(network_annotation_value) + container_security_context = models.V1SecurityContext( + capabilities=models.V1Capabilities( + add=["IPC_LOCK", "SYS_RESOURCE", "NET_ADMIN", "NET_RAW"] + ) + ) + # Get master and worker container specs master_container_spec = kfto_utils.get_container_spec( base_image=base_image, @@ -305,9 +334,9 @@ def list_phase1_final_model(): # In the next release of kubeflow-training, the command # and the args will be a part of kfto_utils.get_container_spec function + master_container_spec.security_context = container_security_context master_container_spec.command = command master_container_spec.args = master_args - master_container_spec.env = env_vars worker_container_spec = kfto_utils.get_container_spec( @@ -316,25 +345,37 @@ def list_phase1_final_model(): resources=resources_per_worker, volume_mounts=volume_mounts_worker, ) + + worker_container_spec.security_context = container_security_context worker_container_spec.command = command worker_container_spec.args = worker_args worker_container_spec.env = env_vars # create master pod spec master_pod_template_spec = models.V1PodTemplateSpec( - metadata=models.V1ObjectMeta(annotations={ISTIO_SIDECAR_INJECTION: "false"}), - spec=models.V1PodSpec( - init_containers=None, - containers=[master_container_spec], - volumes=volumes, - tolerations=tolerations, - node_selector=node_selectors, - ), + metadata=models.V1ObjectMeta( + annotations={ + ISTIO_SIDECAR_INJECTION: "false", + "k8s.v1.cni.cncf.io/networks": network_annotation_str, + } + ), + spec=models.V1PodSpec( + init_containers=None, + containers=[master_container_spec], + volumes=volumes, + tolerations=tolerations, + node_selector=node_selectors, + ), ) # create worker pod spec worker_pod_template_spec = models.V1PodTemplateSpec( - metadata=models.V1ObjectMeta(annotations={ISTIO_SIDECAR_INJECTION: "false"}), + metadata=models.V1ObjectMeta( + annotations={ + ISTIO_SIDECAR_INJECTION: "false", + "k8s.v1.cni.cncf.io/networks": network_annotation_str, + } + ), spec=models.V1PodSpec( init_containers=None, containers=[worker_container_spec], diff --git a/utils/__init__.py b/utils/__init__.py index 12870b47..ead1c89f 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1,4 +1,6 @@ from .components import ( + extract_sdg_to_pvc_op, + get_pvc_name_op, ilab_importer_op, model_to_pvc_op, pvc_to_mmlu_branch_op, @@ -8,6 +10,8 @@ ) __all__ = [ + "get_pvc_name_op", + "extract_sdg_to_pvc_op", "model_to_pvc_op", "pvc_to_mt_bench_op", "pvc_to_mt_bench_branch_op", diff --git a/utils/components.py b/utils/components.py index 4552ba9c..10f494c6 100644 --- a/utils/components.py +++ b/utils/components.py @@ -641,6 +641,7 @@ def prerequisites_check_op( output_model_registry_api_url: str, output_model_name: str, output_model_version: str, + sdg_pregenerated_uri: str = "", ): """ Pre-validation checks for the InstructLab pipeline. @@ -652,8 +653,9 @@ def prerequisites_check_op( test_judge_model_op.set_caching_options(False) ## Validate teacher information - test_teacher_model_op = test_model_connection(secret_name=sdg_teacher_secret) - test_teacher_model_op.set_caching_options(False) + with dsl.If(sdg_pregenerated_uri == "", "sdg-prerequisites"): + test_teacher_model_op = test_model_connection(secret_name=sdg_teacher_secret) + test_teacher_model_op.set_caching_options(False) # Validate Model Registry configuration test_model_registry_op = test_model_registry( @@ -679,3 +681,24 @@ def prerequisites_check_op( sdg_batch_size=sdg_batch_size, sdg_num_workers=sdg_num_workers ) test_sdg_params_op.set_caching_options(False) + + +@dsl.component +def extract_sdg_to_pvc_op(sdg: dsl.Input[dsl.Dataset], pvc_path: str = "/data"): + import os + import os.path + import tarfile + + sdg_dir = os.path.join(pvc_path, "sdg") + + os.makedirs(sdg_dir, exist_ok=True) + + print(f"Extracting {sdg.path} to {sdg_dir}") + with tarfile.open(sdg.path, "r:gz") as tar: + tar.extractall(path=sdg_dir) + + +# This is a hack to get the PVC name available to mount in a sub-DAG. +@dsl.component +def get_pvc_name_op(pvc_name: str) -> str: + return pvc_name