|
| 1 | +# Copyright 2024 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +# |
| 16 | +# Script deploys vertex AI endpoint |
| 17 | +# and Capture endpoint performance to BQ |
| 18 | +# |
| 19 | +# Authors: ajitsonawane@,suddhasatwa@ |
| 20 | +# Team: Google Cloud Consulting |
| 21 | +# Date: 25.01.2024 |
| 22 | + |
| 23 | +# Imports |
| 24 | +import sys |
| 25 | +import logging |
| 26 | +import traceback |
| 27 | +import uuid |
| 28 | +import time |
| 29 | +import json |
| 30 | +from google.cloud import aiplatform |
| 31 | + |
| 32 | +from utils import utils |
| 33 | +# from utils import config_parser as cfp |
| 34 | +# from utils.utils import register_latency |
| 35 | +# from utils.utils import log_latencies_to_bq |
| 36 | +# from utils.utils import write_results_to_bq |
| 37 | + |
| 38 | +# function to process requests to endpoint. |
| 39 | +def process(machine_type: str, latencies: list, log_level: str): |
| 40 | + """ |
| 41 | + Deploys machine based on user input, creates endpoint and measure latencies. |
| 42 | + Takes the latencies List as input. |
| 43 | + Calls the Vegata utility to update latencies for each machine type. |
| 44 | + Passes it to another utility to generate full Results. |
| 45 | + Returns the Results back. |
| 46 | +
|
| 47 | + Inputs: |
| 48 | + machine_type: each type of machine to be tested. |
| 49 | + latencies: list (usually empty) to get results from Vegata |
| 50 | + log_level: level of logging. |
| 51 | +
|
| 52 | + Outputs: |
| 53 | + results: Combined results for each machine type. |
| 54 | + """ |
| 55 | + |
| 56 | + # set logging setup |
| 57 | + logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
| 58 | + |
| 59 | + # start logging. |
| 60 | + logging.info("Reading configuration.") |
| 61 | + |
| 62 | + # read config. |
| 63 | + config_data = utils.read_config("config/config.ini") |
| 64 | + MODEL_ID = config_data["config"]["model_id"] # model ID |
| 65 | + RATE = json.loads(config_data["config"]["rate"]) # the QPS rates to try |
| 66 | + DURATION = str(config_data["config"]["duration"]) # duration for which tests will be ran |
| 67 | + PROJECT = config_data["config"]["project"] # project ID |
| 68 | + LOCATION = config_data["config"]["location"] # region |
| 69 | + TIMEOUT = config_data["config"]["timeout"] # endpoint timeout |
| 70 | + MIN_NODES = int(config_data["config"]["min_nodes"]) # min nodes for scaling |
| 71 | + MAX_NODES = int(config_data["config"]["max_nodes"]) #max nodes for scaling |
| 72 | + REQUEST_FILE = str(config_data["config"]["request_file"]) |
| 73 | + |
| 74 | + # deploy model on endpoint. |
| 75 | + logging.info( |
| 76 | + "Deploying endpoint on machine: %s for model: %s", machine_type, MODEL_ID) |
| 77 | + try: |
| 78 | + # create client for Vertex AI. |
| 79 | + logging.info("Creating AI Platform object.") |
| 80 | + aiplatform.init(project=PROJECT, location=LOCATION) |
| 81 | + |
| 82 | + # load the model from registry. |
| 83 | + logging.info("Loading {} from Model registry.".format(MODEL_ID)) |
| 84 | + model = aiplatform.Model(model_name=MODEL_ID) |
| 85 | + |
| 86 | + # generate random UUID |
| 87 | + logging.info("Generating random UUID for endpoint creation.") |
| 88 | + ep_uuid = uuid.uuid4().hex |
| 89 | + display_name = f"ep_{machine_type}_{ep_uuid}" |
| 90 | + |
| 91 | + # create endpoint instance |
| 92 | + logging.info("Creating endpoint instance.") |
| 93 | + endpoint = aiplatform.Endpoint.create(display_name=display_name) |
| 94 | + |
| 95 | + # deploy endpoint on specific machine type |
| 96 | + logging.info("Deploying model {} on endpoint {}".format(model, display_name)) |
| 97 | + endpoint.deploy(model, min_replica_count=MIN_NODES, |
| 98 | + max_replica_count=MAX_NODES, machine_type=machine_type) |
| 99 | + |
| 100 | + # Sleep for 5 minutes |
| 101 | + # general best practice with Vertex AI Endpoints |
| 102 | + logging.info("Sleeping for 5 minutes, for the endpoint to be ready!") |
| 103 | + time.sleep(TIMEOUT) |
| 104 | + |
| 105 | + # Register latencies for predictions |
| 106 | + logging.info("Calling utility to register the latencies.") |
| 107 | + ret_code, latencies = utils.register_latencies(RATE, DURATION, endpoint, machine_type, endpoint.display_name, latencies, REQUEST_FILE, log_level) |
| 108 | + if ret_code == 1: |
| 109 | + logging.info("Latencies recorded for {}".format(machine_type)) |
| 110 | + else: |
| 111 | + logging.error("Error in recording latencies for {}".format(machine_type)) |
| 112 | + sys.exit(1) |
| 113 | + |
| 114 | + # preprocess registered latencies |
| 115 | + logging.info("Calling utility to prepare latencies for BigQuery.") |
| 116 | + results = utils.log_latencies_to_bq(MODEL_ID, latencies, log_level) |
| 117 | + if results: |
| 118 | + logging.info("Latencies information processed successfully.") |
| 119 | + else: |
| 120 | + logging.error("Error in recording all latencies. Exiting.") |
| 121 | + sys.exit(1) |
| 122 | + |
| 123 | + # Un-deploy endpoint |
| 124 | + logging.info("Un-deploying endpoint: %s", endpoint.resource_name) |
| 125 | + endpoint.undeploy_all() |
| 126 | + |
| 127 | + # Deleting endpoint |
| 128 | + logging.info("Deleting endpoint: %s", endpoint.resource_name) |
| 129 | + endpoint.delete() |
| 130 | + |
| 131 | + logging.info("Processing completed for machine: %s", machine_type) |
| 132 | + |
| 133 | + except Exception as ex: |
| 134 | + logging.error(''.join(traceback.format_exception(etype=type(ex), |
| 135 | + value=ex, tb=ex.__traceback__))) |
| 136 | + |
| 137 | + # return results. |
| 138 | + return (results) |
| 139 | + |
| 140 | +# entrypoint function. |
| 141 | +def main(): |
| 142 | + """ Entrypoint """ |
| 143 | + |
| 144 | + # Read config. |
| 145 | + config_data = utils.read_config("config/config.ini") |
| 146 | + MACHINE_TYPES_LST = config_data["config"]["machine_types_lst"].split(',') # List of machine types |
| 147 | + LOG_LEVEL = config_data["config"]["log_level"] # level of logging. |
| 148 | + OUTPUT_BQ_TBL_ID = config_data["config"]["output_bq_tbl_id"] # BigQuery table to store results |
| 149 | + PROJECT = config_data["config"]["project"] # project ID |
| 150 | + |
| 151 | + # log setup. |
| 152 | + logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") |
| 153 | + |
| 154 | + # start logging. |
| 155 | + logging.info("Vertex Endpoint Stress Tester Utility.") |
| 156 | + |
| 157 | + # variables |
| 158 | + logging.info("Prepping local variables.") |
| 159 | + LATENCIES = [] |
| 160 | + RESULTS = [] |
| 161 | + |
| 162 | + # record start time. |
| 163 | + start = time.time() |
| 164 | + |
| 165 | + # loop through each machine type |
| 166 | + # and process the records. |
| 167 | + try: |
| 168 | + for machine_type in MACHINE_TYPES_LST: |
| 169 | + # log calling the utility |
| 170 | + logging.info("Calling data processing utility.") |
| 171 | + |
| 172 | + # append the results from utility |
| 173 | + RESULTS.extend(process(machine_type, LATENCIES, LOG_LEVEL)) |
| 174 | + |
| 175 | + # log end. |
| 176 | + logging.info("Results utility completed.") |
| 177 | + |
| 178 | + # reset the latencies variable |
| 179 | + LATENCIES = [] |
| 180 | + except Exception as e: |
| 181 | + # log error |
| 182 | + logging.error("Got error while running load tests.") |
| 183 | + logging.error(e) |
| 184 | + # exit |
| 185 | + sys.exit(1) |
| 186 | + |
| 187 | + # REMOVE |
| 188 | + logging.info(len(LATENCIES)) |
| 189 | + logging.info(len(RESULTS)) |
| 190 | + |
| 191 | + # write collected results to BigQuery |
| 192 | + logging.info(" Writing data of load testing on machine type %s", machine_type) |
| 193 | + bq_write_ret_code = utils.write_results_to_bq(RESULTS, OUTPUT_BQ_TBL_ID, PROJECT, LOG_LEVEL) |
| 194 | + if bq_write_ret_code == 1: |
| 195 | + # log success |
| 196 | + logging.info("Successfully written data into BQ in {} table.".format(OUTPUT_BQ_TBL_ID)) |
| 197 | + else: |
| 198 | + # log error |
| 199 | + logging.error("Errors in writing data into BigQuery. Exiting.") |
| 200 | + # exit |
| 201 | + sys.exit(1) |
| 202 | + |
| 203 | + # print the total time taken. |
| 204 | + # this is for all machines. |
| 205 | + logging.info(f"Total time taken for execution {time.time()-start}") |
| 206 | + |
| 207 | +# Call entrypoint |
| 208 | +if __name__ == "__main__": |
| 209 | + main() |
| 210 | + |
| 211 | +# End. |
0 commit comments