Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pipe-cli/src/api/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def launch_pipeline(cls, pipeline_id, version, parameters,
status_notifications=False,
status_notifications_status=None, status_notifications_recipient=None,
status_notifications_subject=None, status_notifications_body=None,
run_as_user=None):
run_as_user=None, pod_assign_policy=None):
api = cls.instance()
params = {}
for parameter in parameters:
Expand Down Expand Up @@ -147,6 +147,8 @@ def launch_pipeline(cls, pipeline_id, version, parameters,
payload['prettyUrl'] = friendly_url
if run_as_user:
payload['runAs'] = run_as_user
if pod_assign_policy is not None:
payload['podAssignPolicy'] = pod_assign_policy
if status_notifications:
if status_notifications_body:
with open(status_notifications_body, 'r') as f:
Expand All @@ -172,7 +174,7 @@ def launch_command(cls, instance_disk, instance_type,
status_notifications=False,
status_notifications_status=None, status_notifications_recipient=None,
status_notifications_subject=None, status_notifications_body=None,
run_as_user=None):
run_as_user=None, pod_assign_policy=None):
api = cls.instance()
payload = {}
if instance_disk is not None:
Expand All @@ -199,6 +201,8 @@ def launch_command(cls, instance_disk, instance_type,
payload['prettyUrl'] = friendly_url
if run_as_user:
payload['runAs'] = run_as_user
if pod_assign_policy is not None:
payload['podAssignPolicy'] = pod_assign_policy
if status_notifications:
if status_notifications_body:
with open(status_notifications_body, 'r') as f:
Expand Down
72 changes: 72 additions & 0 deletions pipe-cli/src/utilities/capacity_block_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2025 EPAM Systems, Inc. (https://www.epam.com/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import sys
import click

from src.api.preferenceapi import PreferenceAPI


class CapacityBlockProcessor:
CONFIG_PREFERENCE = 'launch.reservation.parameters'

def __init__(self, instance_type):
self._config = None
if not instance_type:
pass
self._instance_type = instance_type
config = self._find_capacity_block_config()
self._config = config.get(self._instance_type)

def verify(self, parameters):
if not self._config:
# instance type is not capacity block
return
self._validate_parameter('cpu_requests_enabled',
parameters, 'CP_CAP_REQUESTS_CPU')
self._validate_parameter('gpu_requests_enabled',
parameters, 'CP_CAP_REQUESTS_GPU')
self._validate_parameter('ram_requests_enabled',
parameters, 'CP_CAP_REQUESTS_RAM')

def apply_config(self, parameters):
if not self._config:
# instance type is not capacity block
return parameters, None
if parameters is None:
parameters = {}
for param_name, param_value in self._config.get('parameters', {}).items():
if param_name not in parameters:
parameters.update({param_name: param_value})
kube_policy = self._config.get('kube_assign_policy')
return parameters, kube_policy

def _find_capacity_block_config(self):
preference = PreferenceAPI.get_preference(self.CONFIG_PREFERENCE)
if not preference:
return {}
preference_value = preference.value
if not preference_value:
return {}
return json.loads(preference_value)

def _validate_parameter(self, config_marker_name, parameters, parameter_name):
if not self._config.get(config_marker_name):
return
if parameters.get(parameter_name):
return
click.echo('Parameter %s shall be specified for instance type %s.' % (parameter_name, self._instance_type),
err=True)
sys.exit(1)
37 changes: 27 additions & 10 deletions pipe-cli/src/utilities/pipeline_run_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from src.model.pipeline_run_model import PriceType
from src.model.pipeline_run_parameter_model import PipelineRunParameterModel
from src.utilities.api_wait import wait_for_server_enabling_if_needed
from src.utilities.capacity_block_processor import CapacityBlockProcessor
from src.utilities.cluster_manager import ClusterManager
from src.utilities.user_operations_manager import UserOperationsManager
from src.utilities.user_token_operations import UserTokenOperations
Expand Down Expand Up @@ -124,20 +125,21 @@ def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, insta
else:
if not quiet:
click.echo('Evaluating estimated price...', nl=False)
run_price = Pipeline.get_estimated_price(pipeline_model.identifier,
pipeline_run_parameters.version,
instance_type,
instance_disk,
config_name=config,
price_type=price_type,
region_id=region_id)
run_price = Pipeline.get_estimated_price(pipeline_model.identifier,
pipeline_run_parameters.version,
instance_type,
instance_disk,
config_name=config,
price_type=price_type,
region_id=region_id)
instance_type = instance_type or run_price.instance_type
if not quiet:
click.echo('done.', nl=True)
price_table = prettytable.PrettyTable()
price_table.field_names = ["key", "value"]
price_table.align = "l"
price_table.set_style(12)
price_table.header = False
instance_type = instance_type or run_price.instance_type

price_table.add_row(['Price per hour ({}, hdd {})'.format(run_price.instance_type,
run_price.instance_disk),
Expand Down Expand Up @@ -172,6 +174,10 @@ def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, insta
wrong_parameters = True
elif run_params_dict.get(parameter.name) is not None:
parameter.value = run_params_dict.get(parameter.name)

run_params_dict, pod_assign_policy = cls._apply_capacity_block_config_if_required(instance_type,
run_params_dict)

for user_parameter in run_params_dict.keys():
custom_parameter = True
for parameter in pipeline_run_parameters.parameters:
Expand Down Expand Up @@ -207,7 +213,8 @@ def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, insta
status_notifications_recipient=status_notifications_recipient,
status_notifications_subject=status_notifications_subject,
status_notifications_body=status_notifications_body,
run_as_user=run_as_user)
run_as_user=run_as_user,
pod_assign_policy=pod_assign_policy)
pipeline_run_id = pipeline_run_model.identifier
if not quiet:
click.echo('"{}" pipeline run scheduled with RunId: {}'.format(
Expand Down Expand Up @@ -250,6 +257,9 @@ def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, insta
if not quiet:
cls._check_gpu_and_cuda_compatibility(instance_type, docker_image=docker_image)

run_params_dict, pod_assign_policy = cls._apply_capacity_block_config_if_required(instance_type,
run_params_dict)

if not yes:
click.confirm('Are you sure you want to schedule a run?', abort=True)

Expand All @@ -270,7 +280,8 @@ def run(cls, pipeline, config, parameters, yes, run_params, instance_disk, insta
status_notifications_recipient=status_notifications_recipient,
status_notifications_subject=status_notifications_subject,
status_notifications_body=status_notifications_body,
run_as_user=run_as_user)
run_as_user=run_as_user,
pod_assign_policy=pod_assign_policy)
pipeline_run_id = pipeline_run_model.identifier
if not quiet:
click.echo('Pipeline run scheduled with RunId: {}'.format(pipeline_run_id))
Expand Down Expand Up @@ -489,3 +500,9 @@ def _check_gpu_and_cuda_compatibility(cls, instance_type, pipeline_run_parameter
click.echo(GPU_WITHOUT_CUDA_WARN_MSG)
if not gpu_enabled and cuda_available:
click.echo(CPU_WITH_CUDA_WARN_MSG)

@classmethod
def _apply_capacity_block_config_if_required(cls, instance_type, run_params_dict):
capacity_block_processor = CapacityBlockProcessor(instance_type)
capacity_block_processor.verify(run_params_dict)
return capacity_block_processor.apply_config(run_params_dict)