diff --git a/pipeline.py b/pipeline.py index 1e946de5..67328d94 100644 --- a/pipeline.py +++ b/pipeline.py @@ -26,6 +26,8 @@ skills_processed_data_to_artifact_op, ) from utils import ( + extract_sdg_to_pvc_op, + get_pvc_name_op, ilab_importer_op, model_to_pvc_op, pvc_to_mmlu_branch_op, @@ -73,6 +75,7 @@ def ilab_pipeline( sdg_sample_size: float = 1.0, # FIXME: Not present in default config. Not configurable upstream at this point, capability added via https://github.com/instructlab/sdg/pull/432 sdg_batch_size: int = 32, sdg_num_workers: int = 2, + sdg_pregenerated_uri: str = "", # Training phase train_tolerations: Optional[list] = None, train_node_selectors: Optional[dict] = None, @@ -130,6 +133,7 @@ def ilab_pipeline( sdg_sample_size: SDG parameter. Represents the sdg skills recipe sampling size as percentage in decimal form. sdg_batch_size: SDG parameter. The number of completions per request to the teacher model. Must be a value between 1-4096. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model. sdg_num_workers: SDG parameter. The number of concurrent workers sending completion requests to the teacher model. Must be a value between 2-10. This can be increased to improve SDG performance based on the hardware of the teacher model or reduced if SDG fails due to connection errors with the teacher model. + sdg_pregenerated_uri: SDG parameter. If specified, the SDG phase is skipped and the URI is used to download the SDG output. train_tolerations: Training parameter. List of tolerations applied to training pods. train_node_selectors: Training parameter. A JSON containing node selectors applied to training pods. @@ -176,6 +180,8 @@ def ilab_pipeline( output_model_registry_api_url=output_model_registry_api_url, output_model_name=output_model_name, output_model_version=output_model_version, + # Must use a default of empty string for `dsl.If` to work. + sdg_pregenerated_uri=sdg_pregenerated_uri, ) # SDG stage @@ -187,50 +193,67 @@ def ilab_pipeline( ) sdg_input_pvc_task.after(prerequisites_check_task) - model_tokenizer_source_task = dsl.importer( - artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model - ) - model_tokenizer_source_task.after(prerequisites_check_task) - - sdg_task = sdg_op( - num_instructions_to_generate=sdg_scale_factor, - pipeline=sdg_pipeline, - repo_branch=sdg_repo_branch, - repo_pr=sdg_repo_pr, - sdg_sampling_size=sdg_sample_size, - sdg_secret_name=sdg_teacher_secret, - sdg_batch_size=sdg_batch_size, - sdg_num_cpus=sdg_num_workers, - repo_url=sdg_repo_url, - taxonomy_repo_secret=sdg_repo_secret, - tokenizer_model=model_tokenizer_source_task.output, - ) - sdg_task.set_env_variable("HOME", "/tmp") - sdg_task.set_env_variable("HF_HOME", "/tmp") - - mount_pvc( - task=sdg_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) - sdg_task.set_caching_options(False) - sdg_task.after(prerequisites_check_task) - - # Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow - taxonomy_to_artifact_task = taxonomy_to_artifact_op() - taxonomy_to_artifact_task.after(sdg_task) - mount_pvc( - task=taxonomy_to_artifact_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) - sdg_to_artifact_task = sdg_to_artifact_op() - sdg_to_artifact_task.after(sdg_task) - mount_pvc( - task=sdg_to_artifact_task, - pvc_name=sdg_input_pvc_task.output, - mount_path="/data", - ) + with dsl.If(sdg_pregenerated_uri == "", "run-sdg"): + model_tokenizer_source_task = dsl.importer( + artifact_uri=f"oci://{RUNTIME_GENERIC_IMAGE}", artifact_class=dsl.Model + ) + model_tokenizer_source_task.after(prerequisites_check_task) + get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output) + get_pvc_name_task.after(model_tokenizer_source_task) + sdg_task = sdg_op( + num_instructions_to_generate=sdg_scale_factor, + pipeline=sdg_pipeline, + repo_branch=sdg_repo_branch, + repo_pr=sdg_repo_pr, + sdg_sampling_size=sdg_sample_size, + sdg_secret_name=sdg_teacher_secret, + sdg_batch_size=sdg_batch_size, + sdg_num_cpus=sdg_num_workers, + repo_url=sdg_repo_url, + taxonomy_repo_secret=sdg_repo_secret, + tokenizer_model=model_tokenizer_source_task.output, + ) + sdg_task.set_caching_options(False) + sdg_task.after(prerequisites_check_task) + sdg_task.set_env_variable("HOME", "/tmp") + sdg_task.set_env_variable("HF_HOME", "/tmp") + mount_pvc( + task=sdg_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + # Upload "sdg" and "taxonomy" artifacts to S3 without blocking the rest of the workflow + taxonomy_to_artifact_task = taxonomy_to_artifact_op() + taxonomy_to_artifact_task.after(sdg_task) + mount_pvc( + task=taxonomy_to_artifact_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + sdg_to_artifact_task = sdg_to_artifact_op() + sdg_to_artifact_task.after(sdg_task) + mount_pvc( + task=sdg_to_artifact_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) + with dsl.Else("preload-sdg"): + sdg_source_s3_task = dsl.importer( + artifact_uri=sdg_pregenerated_uri, + artifact_class=dsl.Dataset, + reimport=True, + ) + sdg_source_s3_task.set_caching_options(False) + get_pvc_name_task = get_pvc_name_op(pvc_name=sdg_input_pvc_task.output) + get_pvc_name_task.after(sdg_source_s3_task) + sdg_task = extract_sdg_to_pvc_op(sdg=sdg_source_s3_task.output) + sdg_task.after(sdg_source_s3_task) + sdg_task.after(prerequisites_check_task) + mount_pvc( + task=sdg_task, + pvc_name=get_pvc_name_task.output, + mount_path="/data", + ) # uncomment if updating image with same tag # set_image_pull_policy(sdg_task, "Always") diff --git a/pipeline.yaml b/pipeline.yaml index c779ae56..3a725d82 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -24,6 +24,7 @@ # sdg_max_batch_len: int [Default: 5000.0] # sdg_num_workers: int [Default: 2.0] # sdg_pipeline: str [Default: '/usr/share/instructlab/sdg/pipelines/agentic'] +# sdg_pregenerated_uri: str [Default: ''] # sdg_repo_branch: str # sdg_repo_pr: int # sdg_repo_secret: str [Default: 'taxonomy-repo-secret'] @@ -50,6 +51,260 @@ # train_seed: int [Default: 42.0] # train_tolerations: list components: + comp-condition-1: + dag: + tasks: + test-model-connection-2: + cachingOptions: {} + componentRef: + name: comp-test-model-connection-2 + inputs: + parameters: + secret_name: + componentInputParameter: pipelinechannel--sdg_teacher_secret + taskInfo: + name: test-model-connection-2 + inputDefinitions: + parameters: + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + pipelinechannel--sdg_teacher_secret: + parameterType: STRING + comp-condition-2: + dag: + tasks: + get-pvc-name-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-pvc-name-op + dependentTasks: + - importer + inputs: + parameters: + pvc_name: + componentInputParameter: pipelinechannel--createpvc-name + taskInfo: + name: get-pvc-name-op + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + runtimeValue: + constant: oci://quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d + taskInfo: + name: importer + sdg-op: + cachingOptions: {} + componentRef: + name: comp-sdg-op + dependentTasks: + - get-pvc-name-op + - importer + inputs: + artifacts: + tokenizer_model: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + parameters: + num_instructions_to_generate: + componentInputParameter: pipelinechannel--sdg_scale_factor + pipeline: + componentInputParameter: pipelinechannel--sdg_pipeline + repo_branch: + componentInputParameter: pipelinechannel--sdg_repo_branch + repo_pr: + componentInputParameter: pipelinechannel--sdg_repo_pr + repo_url: + componentInputParameter: pipelinechannel--sdg_repo_url + sdg_batch_size: + componentInputParameter: pipelinechannel--sdg_batch_size + sdg_num_cpus: + componentInputParameter: pipelinechannel--sdg_num_workers + sdg_sampling_size: + componentInputParameter: pipelinechannel--sdg_sample_size + sdg_secret_name: + componentInputParameter: pipelinechannel--sdg_teacher_secret + taxonomy_repo_secret: + componentInputParameter: pipelinechannel--sdg_repo_secret + taskInfo: + name: sdg-op + sdg-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-sdg-to-artifact-op + dependentTasks: + - get-pvc-name-op + - sdg-op + taskInfo: + name: sdg-to-artifact-op + taxonomy-to-artifact-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-taxonomy-to-artifact-op + dependentTasks: + - get-pvc-name-op + - sdg-op + taskInfo: + name: taxonomy-to-artifact-op + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_batch_size: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_num_workers: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_pipeline: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + pipelinechannel--sdg_repo_branch: + parameterType: STRING + pipelinechannel--sdg_repo_pr: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_repo_secret: + parameterType: STRING + pipelinechannel--sdg_repo_url: + parameterType: STRING + pipelinechannel--sdg_sample_size: + parameterType: NUMBER_DOUBLE + pipelinechannel--sdg_scale_factor: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_teacher_secret: + parameterType: STRING + comp-condition-3: + dag: + tasks: + extract-sdg-to-pvc-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-extract-sdg-to-pvc-op + dependentTasks: + - get-pvc-name-op-2 + - importer-2 + inputs: + artifacts: + sdg: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer-2 + taskInfo: + name: extract-sdg-to-pvc-op + get-pvc-name-op-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-pvc-name-op-2 + dependentTasks: + - importer-2 + inputs: + parameters: + pvc_name: + componentInputParameter: pipelinechannel--createpvc-name + taskInfo: + name: get-pvc-name-op-2 + importer-2: + cachingOptions: {} + componentRef: + name: comp-importer-2 + inputs: + parameters: + uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + taskInfo: + name: importer-2 + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + comp-condition-branches-1: + dag: + tasks: + condition-2: + componentRef: + name: comp-condition-2 + inputs: + parameters: + pipelinechannel--createpvc-name: + componentInputParameter: pipelinechannel--createpvc-name + pipelinechannel--sdg_batch_size: + componentInputParameter: pipelinechannel--sdg_batch_size + pipelinechannel--sdg_num_workers: + componentInputParameter: pipelinechannel--sdg_num_workers + pipelinechannel--sdg_pipeline: + componentInputParameter: pipelinechannel--sdg_pipeline + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + pipelinechannel--sdg_repo_branch: + componentInputParameter: pipelinechannel--sdg_repo_branch + pipelinechannel--sdg_repo_pr: + componentInputParameter: pipelinechannel--sdg_repo_pr + pipelinechannel--sdg_repo_secret: + componentInputParameter: pipelinechannel--sdg_repo_secret + pipelinechannel--sdg_repo_url: + componentInputParameter: pipelinechannel--sdg_repo_url + pipelinechannel--sdg_sample_size: + componentInputParameter: pipelinechannel--sdg_sample_size + pipelinechannel--sdg_scale_factor: + componentInputParameter: pipelinechannel--sdg_scale_factor + pipelinechannel--sdg_teacher_secret: + componentInputParameter: pipelinechannel--sdg_teacher_secret + taskInfo: + name: run-sdg + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--sdg_pregenerated_uri'] + == '' + condition-3: + componentRef: + name: comp-condition-3 + inputs: + parameters: + pipelinechannel--createpvc-name: + componentInputParameter: pipelinechannel--createpvc-name + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: pipelinechannel--sdg_pregenerated_uri + taskInfo: + name: preload-sdg + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--sdg_pregenerated_uri''] + == '''')' + inputDefinitions: + parameters: + pipelinechannel--createpvc-name: + parameterType: STRING + pipelinechannel--sdg_batch_size: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_num_workers: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_pipeline: + parameterType: STRING + pipelinechannel--sdg_pregenerated_uri: + parameterType: STRING + pipelinechannel--sdg_repo_branch: + parameterType: STRING + pipelinechannel--sdg_repo_pr: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_repo_secret: + parameterType: STRING + pipelinechannel--sdg_repo_url: + parameterType: STRING + pipelinechannel--sdg_sample_size: + parameterType: NUMBER_DOUBLE + pipelinechannel--sdg_scale_factor: + parameterType: NUMBER_INTEGER + pipelinechannel--sdg_teacher_secret: + parameterType: STRING comp-createpvc: executorLabel: exec-createpvc inputDefinitions: @@ -285,6 +540,19 @@ components: description: Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. parameterType: STRING + comp-extract-sdg-to-pvc-op: + executorLabel: exec-extract-sdg-to-pvc-op + inputDefinitions: + artifacts: + sdg: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + pvc_path: + defaultValue: /data + isOptional: true + parameterType: STRING comp-generate-metrics-report-op: executorLabel: exec-generate-metrics-report-op outputDefinitions: @@ -293,6 +561,26 @@ components: artifactType: schemaTitle: system.Metrics schemaVersion: 0.0.1 + comp-get-pvc-name-op: + executorLabel: exec-get-pvc-name-op + inputDefinitions: + parameters: + pvc_name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-get-pvc-name-op-2: + executorLabel: exec-get-pvc-name-op-2 + inputDefinitions: + parameters: + pvc_name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING comp-importer: executorLabel: exec-importer inputDefinitions: @@ -307,6 +595,18 @@ components: schemaVersion: 0.0.1 comp-importer-2: executorLabel: exec-importer-2 + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-importer-3: + executorLabel: exec-importer-3 inputDefinitions: parameters: uri: @@ -347,26 +647,30 @@ components: comp-prerequisites-check-op: dag: tasks: - test-model-connection: - cachingOptions: {} + condition-1: componentRef: - name: comp-test-model-connection + name: comp-condition-1 inputs: parameters: - secret_name: - componentInputParameter: eval_judge_secret + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: sdg_pregenerated_uri + pipelinechannel--sdg_teacher_secret: + componentInputParameter: sdg_teacher_secret taskInfo: - name: test-model-connection - test-model-connection-2: + name: sdg-prerequisites + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--sdg_pregenerated_uri'] + == '' + test-model-connection: cachingOptions: {} componentRef: - name: comp-test-model-connection-2 + name: comp-test-model-connection inputs: parameters: secret_name: - componentInputParameter: sdg_teacher_secret + componentInputParameter: eval_judge_secret taskInfo: - name: test-model-connection-2 + name: test-model-connection test-model-registry: cachingOptions: {} componentRef: @@ -429,6 +733,10 @@ components: parameterType: NUMBER_INTEGER sdg_num_workers: parameterType: NUMBER_INTEGER + sdg_pregenerated_uri: + defaultValue: '' + isOptional: true + parameterType: STRING sdg_repo_url: parameterType: STRING sdg_teacher_secret: @@ -942,6 +1250,39 @@ deploymentSpec: exec-deletepvc-3: container: image: argostub/deletepvc + exec-extract-sdg-to-pvc-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - extract_sdg_to_pvc_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef extract_sdg_to_pvc_op(sdg: dsl.Input[dsl.Dataset], pvc_path:\ + \ str = \"/data\"):\n import os\n import os.path\n import tarfile\n\ + \n sdg_dir = os.path.join(pvc_path, \"sdg\")\n\n os.makedirs(sdg_dir,\ + \ exist_ok=True)\n\n print(f\"Extracting {sdg.path} to {sdg_dir}\")\n\ + \ with tarfile.open(sdg.path, \"r:gz\") as tar:\n tar.extractall(path=sdg_dir)\n\ + \n" + image: python:3.9 exec-generate-metrics-report-op: container: args: @@ -975,6 +1316,64 @@ deploymentSpec: \ f\"{report}_base_model_score\", report_data[\"base_model_score\"\ ]\n )\n\n" image: quay.io/modh/odh-generic-data-science-notebook@sha256:72c1d095adbda216a1f1b4b6935e3e2c717cbc58964009464ccd36c0b98312b2 + exec-get-pvc-name-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_pvc_name_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_pvc_name_op(pvc_name: str) -> str:\n return pvc_name\n\ + \n" + image: python:3.9 + exec-get-pvc-name-op-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_pvc_name_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.12.1'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_pvc_name_op(pvc_name: str) -> str:\n return pvc_name\n\ + \n" + image: python:3.9 exec-importer: importer: artifactUri: @@ -983,6 +1382,14 @@ deploymentSpec: schemaTitle: system.Model schemaVersion: 0.0.1 exec-importer-2: + importer: + artifactUri: + runtimeParameter: uri + reimport: true + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-importer-3: importer: artifactUri: runtimeParameter: uri @@ -2508,6 +2915,42 @@ pipelineInfo: root: dag: tasks: + condition-branches-1: + componentRef: + name: comp-condition-branches-1 + dependentTasks: + - createpvc + - prerequisites-check-op + inputs: + parameters: + pipelinechannel--createpvc-name: + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc + pipelinechannel--sdg_batch_size: + componentInputParameter: sdg_batch_size + pipelinechannel--sdg_num_workers: + componentInputParameter: sdg_num_workers + pipelinechannel--sdg_pipeline: + componentInputParameter: sdg_pipeline + pipelinechannel--sdg_pregenerated_uri: + componentInputParameter: sdg_pregenerated_uri + pipelinechannel--sdg_repo_branch: + componentInputParameter: sdg_repo_branch + pipelinechannel--sdg_repo_pr: + componentInputParameter: sdg_repo_pr + pipelinechannel--sdg_repo_secret: + componentInputParameter: sdg_repo_secret + pipelinechannel--sdg_repo_url: + componentInputParameter: sdg_repo_url + pipelinechannel--sdg_sample_size: + componentInputParameter: sdg_sample_size + pipelinechannel--sdg_scale_factor: + componentInputParameter: sdg_scale_factor + pipelinechannel--sdg_teacher_secret: + componentInputParameter: sdg_teacher_secret + taskInfo: + name: condition-branches-1 createpvc: cachingOptions: enableCache: true @@ -2579,10 +3022,10 @@ root: componentRef: name: comp-data-processing-op dependentTasks: + - condition-branches-1 - createpvc - createpvc-2 - model-to-pvc-op - - sdg-op inputs: parameters: max_batch_len: @@ -2650,25 +3093,11 @@ root: - run-final-eval-op taskInfo: name: generate-metrics-report-op - importer: - cachingOptions: - enableCache: true - componentRef: - name: comp-importer - dependentTasks: - - prerequisites-check-op - inputs: - parameters: - uri: - runtimeValue: - constant: oci://quay.io/opendatahub/ds-pipelines-runtime-generic@sha256:f53e53a39b1a88c3a530e87ded473ba2648b8d8586ec9e31a4484e9bafb3059d - taskInfo: - name: importer - importer-2: + importer-3: cachingOptions: enableCache: true componentRef: - name: comp-importer-2 + name: comp-importer-3 dependentTasks: - prerequisites-check-op inputs: @@ -2676,7 +3105,7 @@ root: uri: componentInputParameter: sdg_base_model taskInfo: - name: importer-2 + name: importer-3 knowledge-processed-data-to-artifact-op: cachingOptions: {} componentRef: @@ -2692,13 +3121,13 @@ root: name: comp-model-to-pvc-op dependentTasks: - createpvc-2 - - importer-2 + - importer-3 inputs: artifacts: model: taskOutputArtifact: outputArtifactKey: artifact - producerTask: importer-2 + producerTask: importer-3 taskInfo: name: model-to-pvc-op prerequisites-check-op: @@ -2724,6 +3153,8 @@ root: componentInputParameter: sdg_batch_size sdg_num_workers: componentInputParameter: sdg_num_workers + sdg_pregenerated_uri: + componentInputParameter: sdg_pregenerated_uri sdg_repo_url: componentInputParameter: sdg_repo_url sdg_teacher_secret: @@ -2962,53 +3393,6 @@ root: componentInputParameter: eval_gpu_identifier taskInfo: name: run-mt-bench-op - sdg-op: - cachingOptions: {} - componentRef: - name: comp-sdg-op - dependentTasks: - - createpvc - - importer - - prerequisites-check-op - inputs: - artifacts: - tokenizer_model: - taskOutputArtifact: - outputArtifactKey: artifact - producerTask: importer - parameters: - num_instructions_to_generate: - componentInputParameter: sdg_scale_factor - pipeline: - componentInputParameter: sdg_pipeline - repo_branch: - componentInputParameter: sdg_repo_branch - repo_pr: - componentInputParameter: sdg_repo_pr - repo_url: - componentInputParameter: sdg_repo_url - sdg_batch_size: - componentInputParameter: sdg_batch_size - sdg_num_cpus: - componentInputParameter: sdg_num_workers - sdg_sampling_size: - componentInputParameter: sdg_sample_size - sdg_secret_name: - componentInputParameter: sdg_teacher_secret - taxonomy_repo_secret: - componentInputParameter: sdg_repo_secret - taskInfo: - name: sdg-op - sdg-to-artifact-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-sdg-to-artifact-op - dependentTasks: - - createpvc - - sdg-op - taskInfo: - name: sdg-to-artifact-op skills-processed-data-to-artifact-op: cachingOptions: {} componentRef: @@ -3018,16 +3402,6 @@ root: - data-processing-op taskInfo: name: skills-processed-data-to-artifact-op - taxonomy-to-artifact-op: - cachingOptions: - enableCache: true - componentRef: - name: comp-taxonomy-to-artifact-op - dependentTasks: - - createpvc - - sdg-op - taskInfo: - name: taxonomy-to-artifact-op upload-model-op: cachingOptions: {} componentRef: @@ -3201,6 +3575,12 @@ root: Note that ''full'' requires a larger teacher model, Mixtral-8x7b.' isOptional: true parameterType: STRING + sdg_pregenerated_uri: + defaultValue: '' + description: SDG parameter. If specified, the SDG phase is skipped and the + URI is used to download the SDG output. + isOptional: true + parameterType: STRING sdg_repo_branch: description: SDG parameter. Points to a branch within the taxonomy git repository. If set, has priority over sdg_repo_pr @@ -3362,6 +3742,12 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc + exec-extract-sdg-to-pvc-op: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: Output + producerTask: get-pvc-name-op-2 exec-generate-metrics-report-op: pvcMount: - mountPath: /output @@ -3428,14 +3814,14 @@ platforms: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-sdg-to-artifact-op: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-skills-processed-data-to-artifact-op: pvcMount: - mountPath: /data @@ -3446,8 +3832,8 @@ platforms: pvcMount: - mountPath: /data taskOutputParameter: - outputParameterKey: name - producerTask: createpvc + outputParameterKey: Output + producerTask: get-pvc-name-op exec-upload-model-op: pvcMount: - mountPath: /output diff --git a/utils/__init__.py b/utils/__init__.py index 12870b47..ead1c89f 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -1,4 +1,6 @@ from .components import ( + extract_sdg_to_pvc_op, + get_pvc_name_op, ilab_importer_op, model_to_pvc_op, pvc_to_mmlu_branch_op, @@ -8,6 +10,8 @@ ) __all__ = [ + "get_pvc_name_op", + "extract_sdg_to_pvc_op", "model_to_pvc_op", "pvc_to_mt_bench_op", "pvc_to_mt_bench_branch_op", diff --git a/utils/components.py b/utils/components.py index 4552ba9c..10f494c6 100644 --- a/utils/components.py +++ b/utils/components.py @@ -641,6 +641,7 @@ def prerequisites_check_op( output_model_registry_api_url: str, output_model_name: str, output_model_version: str, + sdg_pregenerated_uri: str = "", ): """ Pre-validation checks for the InstructLab pipeline. @@ -652,8 +653,9 @@ def prerequisites_check_op( test_judge_model_op.set_caching_options(False) ## Validate teacher information - test_teacher_model_op = test_model_connection(secret_name=sdg_teacher_secret) - test_teacher_model_op.set_caching_options(False) + with dsl.If(sdg_pregenerated_uri == "", "sdg-prerequisites"): + test_teacher_model_op = test_model_connection(secret_name=sdg_teacher_secret) + test_teacher_model_op.set_caching_options(False) # Validate Model Registry configuration test_model_registry_op = test_model_registry( @@ -679,3 +681,24 @@ def prerequisites_check_op( sdg_batch_size=sdg_batch_size, sdg_num_workers=sdg_num_workers ) test_sdg_params_op.set_caching_options(False) + + +@dsl.component +def extract_sdg_to_pvc_op(sdg: dsl.Input[dsl.Dataset], pvc_path: str = "/data"): + import os + import os.path + import tarfile + + sdg_dir = os.path.join(pvc_path, "sdg") + + os.makedirs(sdg_dir, exist_ok=True) + + print(f"Extracting {sdg.path} to {sdg_dir}") + with tarfile.open(sdg.path, "r:gz") as tar: + tar.extractall(path=sdg_dir) + + +# This is a hack to get the PVC name available to mount in a sub-DAG. +@dsl.component +def get_pvc_name_op(pvc_name: str) -> str: + return pvc_name