diff --git a/Procfile b/Procfile new file mode 100644 index 00000000..b943aecd --- /dev/null +++ b/Procfile @@ -0,0 +1,5 @@ +minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex +helm-install: sleep 5; cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && tail -f /dev/null +port-forward-app: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh app +port-forward-minio: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh minio +port-forward-db: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh db diff --git a/helm/servicex/templates/app/configmap.yaml b/helm/servicex/templates/app/configmap.yaml index ac046585..4c7b4549 100644 --- a/helm/servicex/templates/app/configmap.yaml +++ b/helm/servicex/templates/app/configmap.yaml @@ -13,6 +13,10 @@ data: CHART = '{{ .Chart.Name }}-{{ .Chart.Version }}' APP_IMAGE_TAG = '{{ .Values.app.tag }}' + + # Application environment and local development settings + APP_ENVIRONMENT = '{{ .Values.app.environment }}' + APP_MOUNT_LOCAL = {{- if .Values.app.mountLocal }}True{{- else }}False{{- end }} #SERVER_NAME = '127.0.0.1:5000' # this is the session secret, used to protect the Flask session. You should diff --git a/helm/servicex/templates/app/deployment.yaml b/helm/servicex/templates/app/deployment.yaml index 7fbd30f8..c87159e2 100644 --- a/helm/servicex/templates/app/deployment.yaml +++ b/helm/servicex/templates/app/deployment.yaml @@ -37,6 +37,11 @@ spec: containers: - name: {{ .Release.Name }}-servicex-app image: {{ .Values.app.image }}:{{ .Values.app.tag }} + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.reload }} + command: [ "./boot.sh", "--reload" ] + {{- end }} + {{- end }} env: - name: APP_CONFIG_FILE value: "/opt/servicex/app.conf" @@ -135,6 +140,12 @@ spec: {{- end }} volumeMounts: + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.mount_local }} + - name: host-volume + mountPath: /home/servicex + {{- end }} + {{- end }} - name: app-cfg mountPath: /opt/servicex - name: sqlite @@ -149,6 +160,14 @@ spec: - containerPort: 5000 volumes: + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.mount_local }} + - name: host-volume + hostPath: + path: /mnt/servicex/servicex_app + type: DirectoryOrCreate + {{- end }} + {{- end }} - name: app-cfg configMap: name: {{ .Release.Name }}-flask-config diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index a4bd5a29..e6b235dd 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -1,4 +1,6 @@ app: + environment: production + mountLocal: false adminEmail: admin@example.com auth: false authExpires: 21600 diff --git a/local/port-forward.sh b/local/port-forward.sh new file mode 100755 index 00000000..ad3e3c4b --- /dev/null +++ b/local/port-forward.sh @@ -0,0 +1,209 @@ +#!/usr/bin/env bash + +set -euo pipefail + +# Constants +readonly NAMESPACE="${NAMESPACE:-default}" +readonly PING_INTERVAL=5 +readonly MAX_RETRIES=10 + +# Parse service configuration +case "${1:-}" in + app) + SERVICE_NAME="servicex-servicex-app" + CONTAINER_PORT="8000" + LOCAL_PORT="5000" + ;; + minio) + SERVICE_NAME="servicex-minio" + CONTAINER_PORT="9000" + LOCAL_PORT="9000" + ;; + db) + SERVICE_NAME="servicex-postgresql" + CONTAINER_PORT="5432" + LOCAL_PORT="5432" + ;; + *) + echo "Usage: $0 [app|minio|db]" + echo " app - Port forward to ServiceX app (5000 -> 8000)" + echo " minio - Port forward to Minio (9000 -> 9000)" + echo " db - Port forward to PostgreSQL (5432 -> 5432)" + exit 1 + ;; +esac + +readonly SERVICE_TYPE="$1" + +# Function to log with timestamp +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >&2 +} + +# Function to check if service exists +check_service_exists() { + kubectl get service "$SERVICE_NAME" --namespace="$NAMESPACE" >/dev/null 2>&1 +} + +# Function to wait for pod to be ready +wait_for_pod_ready() { + log "Waiting for pods of service $SERVICE_NAME to be ready..." + local retries=0 + + while [[ $retries -lt $MAX_RETRIES ]]; do + # Get pods for the service using selector + local selector=$(kubectl get service "$SERVICE_NAME" --namespace="$NAMESPACE" -o jsonpath='{.spec.selector}' 2>/dev/null || echo "") + + if [[ -n "$selector" ]]; then + # Convert JSON selector to kubectl selector format + local kubectl_selector=$(echo "$selector" | sed 's/[{}"]//g' | sed 's/:/=/g' | sed 's/,/,/g') + + # Check if any pods are ready + local ready_pods=$(kubectl get pods --namespace="$NAMESPACE" --selector="$kubectl_selector" --field-selector=status.phase=Running -o name 2>/dev/null | wc -l) + + if [[ $ready_pods -gt 0 ]]; then + log "Found $ready_pods ready pod(s) for service $SERVICE_NAME" + return 0 + fi + fi + + ((retries++)) + log "No ready pods found, retrying... ($retries/$MAX_RETRIES)" + sleep 5 + done + + log "No ready pods found for service $SERVICE_NAME after $MAX_RETRIES attempts" + return 1 +} + +# Function to wait for service availability +wait_for_service() { + log "Waiting for service $SERVICE_NAME to be available..." + local retries=0 + + while ! check_service_exists && [[ $retries -lt $MAX_RETRIES ]]; do + ((retries++)) + log "Service not found, retrying... ($retries/$MAX_RETRIES)" + sleep 3 + done + + if [[ $retries -ge $MAX_RETRIES ]]; then + log "Service $SERVICE_NAME not found after $MAX_RETRIES attempts" + return 1 + fi + + log "Service $SERVICE_NAME is available" + return 0 +} + +# Function to start port forwarding using service +start_port_forward() { + log "Starting port forwarding: localhost:${LOCAL_PORT} -> ${SERVICE_NAME}:${CONTAINER_PORT}" + + # Check if pods are still ready before attempting port forward + if ! wait_for_pod_ready; then + log "Pods not ready, cannot start port forwarding" + return 1 + fi + + kubectl port-forward --namespace="$NAMESPACE" "service/$SERVICE_NAME" "${LOCAL_PORT}:${CONTAINER_PORT}" & + PORT_FORWARD_PID=$! + + # Give it a moment to establish connection + sleep 2 + + # Check if port forward process is still running + if ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Port forwarding failed to start" + return 1 + fi + + log "Port forwarding established (PID: $PORT_FORWARD_PID)" + return 0 +} + +# Function to check if port forwarding is working +check_port_forward() { + # Simply check if the port forward process is still running + if ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Port forward process is not running" + return 1 + fi + + # Additional check: verify port is actually listening + if command -v nc >/dev/null 2>&1; then + if ! nc -z localhost "$LOCAL_PORT" 2>/dev/null; then + log "Port $LOCAL_PORT is not accessible" + return 1 + fi + fi + + return 0 +} + +# Function to clean up resources +cleanup() { + log "Cleaning up..." + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + log "Terminating port forwarding process (PID: $PORT_FORWARD_PID)" + kill "$PORT_FORWARD_PID" 2>/dev/null || true + wait "$PORT_FORWARD_PID" 2>/dev/null || true + fi + log "Cleanup complete" + exit 0 +} + +# Set trap for cleanup +trap cleanup SIGINT SIGTERM EXIT + +# Main execution +main() { + log "Starting port forwarding for $SERVICE_TYPE: $SERVICE_NAME" + + # Wait for service to be available + if ! wait_for_service; then + log "Service $SERVICE_NAME is not available" + exit 1 + fi + + # Wait for pods to be ready + if ! wait_for_pod_ready; then + log "No ready pods found for service $SERVICE_NAME" + exit 1 + fi + + # Main monitoring loop + while true; do + # Start port forwarding if not already running + if [[ -z "${PORT_FORWARD_PID:-}" ]] || ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Starting port forwarding..." + if ! start_port_forward; then + log "Failed to start port forwarding, retrying in $PING_INTERVAL seconds..." + # Reset PID if it exists but failed + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + unset PORT_FORWARD_PID + fi + sleep "$PING_INTERVAL" + continue + fi + fi + + # Check if port forwarding is working + if ! check_port_forward; then + log "Port forwarding failed, restarting..." + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + kill "$PORT_FORWARD_PID" 2>/dev/null || true + wait "$PORT_FORWARD_PID" 2>/dev/null || true + unset PORT_FORWARD_PID + fi + sleep 2 + continue + fi + + # Wait before next check + sleep "$PING_INTERVAL" + done +} + +# Run main function +main diff --git a/servicex_app/boot.sh b/servicex_app/boot.sh index 71801c24..2b5dfab5 100755 --- a/servicex_app/boot.sh +++ b/servicex_app/boot.sh @@ -1,4 +1,17 @@ #!/bin/sh + +# Initialize reload flag +RELOAD="" + +# Parse command line arguments +for arg in "$@" +do + if [ "$arg" = "--reload" ]; then + RELOAD="--reload" + break + fi +done + mkdir instance # SQLite doesn't handle migrations, so rely on SQLAlchmy table creation if grep "sqlite://" $APP_CONFIG_FILE; then @@ -7,5 +20,5 @@ else FLASK_APP=servicex_app/app.py flask db upgrade; fi [ -d "/default_users" ] && python3 servicex/cli/create_default_users.py -exec gunicorn -b [::]:5000 --workers=5 --threads=1 --timeout 120 --log-level=warning --access-logfile /tmp/gunicorn.log --error-logfile - "servicex_app:create_app()" +exec gunicorn -b [::]:5000 $RELOAD --workers=5 --threads=1 --timeout 120 --log-level=warning --access-logfile /tmp/gunicorn.log --error-logfile - "servicex_app:create_app()" # to log requests to stdout --access-logfile - \ No newline at end of file diff --git a/servicex_app/servicex_app/transformer_manager.py b/servicex_app/servicex_app/transformer_manager.py index 46cbbf35..fd419a41 100644 --- a/servicex_app/servicex_app/transformer_manager.py +++ b/servicex_app/servicex_app/transformer_manager.py @@ -41,7 +41,7 @@ class TransformerManager: # This is the number of seconds tha thte transformer has to finish uploading # objects to the object store before it is terminated. - POD_TERMINATION_GRACE_PERIOD = 5*60 + POD_TERMINATION_GRACE_PERIOD = 5 * 60 @classmethod def make_api(cls, celery_app): @@ -49,35 +49,39 @@ def make_api(cls, celery_app): return cls def __init__(self, manager_mode): - if manager_mode == 'internal-kubernetes': + if manager_mode == "internal-kubernetes": kubernetes.config.load_incluster_config() - elif manager_mode == 'external-kubernetes': + elif manager_mode == "external-kubernetes": kubernetes.config.load_kube_config() else: current_app.logger.error(f"Manager mode {manager_mode} not valid") - raise ValueError('Manager mode '+manager_mode+' not valid') + raise ValueError("Manager mode " + manager_mode + " not valid") def start_transformers(self, config: dict, request_rec: TransformRequest): """ Start the transformers for a given request """ - rabbitmq_uri = config['TRANSFORMER_RABBIT_MQ_URL'] - namespace = config['TRANSFORMER_NAMESPACE'] - x509_secret = config['TRANSFORMER_X509_SECRET'] + rabbitmq_uri = config["TRANSFORMER_RABBIT_MQ_URL"] + namespace = config["TRANSFORMER_NAMESPACE"] + x509_secret = config["TRANSFORMER_X509_SECRET"] generated_code_cm = request_rec.generated_code_cm request_rec.workers = min(max(1, request_rec.files), request_rec.workers) current_app.logger.info( f"Launching {request_rec.workers} transformers.", - extra={'requestId': request_rec.request_id}) + extra={"requestId": request_rec.request_id}, + ) self.launch_transformer_jobs( - image=request_rec.image, request_id=request_rec.request_id, + image=request_rec.image, + request_id=request_rec.request_id, workers=request_rec.workers, - max_workers=(max(1, request_rec.files) - if request_rec.status == TransformStatus.running - else config["TRANSFORMER_MAX_REPLICAS"]), + max_workers=( + max(1, request_rec.files) + if request_rec.status == TransformStatus.running + else config["TRANSFORMER_MAX_REPLICAS"] + ), rabbitmq_uri=rabbitmq_uri, namespace=namespace, x509_secret=x509_secret, @@ -85,244 +89,331 @@ def start_transformers(self, config: dict, request_rec: TransformRequest): result_destination=request_rec.result_destination, result_format=request_rec.result_format, transformer_language=request_rec.transformer_language, - transformer_command=request_rec.transformer_command + transformer_command=request_rec.transformer_command, ) @staticmethod - def create_job_object(request_id, image, rabbitmq_uri, workers, - result_destination, result_format, x509_secret, - generated_code_cm, transformer_language, transformer_command): + def create_job_object( + request_id, + image, + rabbitmq_uri, + workers, + result_destination, + result_format, + x509_secret, + generated_code_cm, + transformer_language, + transformer_command, + ): volume_mounts = [] volumes = [] # append sidecar volume - output_path = current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'] + output_path = current_app.config["TRANSFORMER_SIDECAR_VOLUME_PATH"] volume_mounts.append( - client.V1VolumeMount( - name='sidecar-volume', - mount_path=output_path) + client.V1VolumeMount(name="sidecar-volume", mount_path=output_path) ) volumes.append( client.V1Volume( - name='sidecar-volume', - empty_dir=client.V1EmptyDirVolumeSource()) + name="sidecar-volume", empty_dir=client.V1EmptyDirVolumeSource() + ) ) + # Only mount local volumes in development environment with mountLocal enabled + if current_app.config.get( + "APP_ENVIRONMENT" + ) == "dev" and current_app.config.get("APP_MOUNT_LOCAL"): + + volume_mounts.append( + client.V1VolumeMount(name="host-volume", mount_path="/servicex") + ) + volumes.append( + client.V1Volume( + name="host-volume", + host_path=client.V1HostPathVolumeSource( + path="/mnt/servicex/transformer_sidecar/src", + type="DirectoryOrCreate", + ), + ) + ) + volume_mounts.append( + client.V1VolumeMount( + name="host-scripts-volume", mount_path="/servicex/scripts" + ) + ) + volumes.append( + client.V1Volume( + name="host-scripts-volume", + host_path=client.V1HostPathVolumeSource( + path="/mnt/servicex/transformer_sidecar/scripts", + type="DirectoryOrCreate", + ), + ) + ) + if x509_secret: volume_mounts.append( client.V1VolumeMount( - name='x509-secret', - mount_path='/etc/grid-security-ro') + name="x509-secret", mount_path="/etc/grid-security-ro" + ) + ) + volumes.append( + client.V1Volume( + name="x509-secret", + secret=client.V1SecretVolumeSource(secret_name=x509_secret), + ) ) - volumes.append(client.V1Volume( - name='x509-secret', - secret=client.V1SecretVolumeSource(secret_name=x509_secret) - )) if generated_code_cm: - volumes.append(client.V1Volume( - name='generated-code', - config_map=client.V1ConfigMapVolumeSource( - name=generated_code_cm) - ) + volumes.append( + client.V1Volume( + name="generated-code", + config_map=client.V1ConfigMapVolumeSource(name=generated_code_cm), + ) ) volume_mounts.append( - client.V1VolumeMount(mount_path="/generated", name='generated-code')) + client.V1VolumeMount(mount_path="/generated", name="generated-code") + ) if "TRANSFORMER_LOCAL_PATH" in current_app.config: - path = current_app.config['TRANSFORMER_LOCAL_PATH'] - volumes.append(client.V1Volume( - name='rootfiles', - host_path=client.V1HostPathVolumeSource(path=path))) + path = current_app.config["TRANSFORMER_LOCAL_PATH"] + volumes.append( + client.V1Volume( + name="rootfiles", host_path=client.V1HostPathVolumeSource(path=path) + ) + ) volume_mounts.append( - client.V1VolumeMount(mount_path="/data", name='rootfiles')) + client.V1VolumeMount(mount_path="/data", name="rootfiles") + ) # Compute Environment Vars env = [client.V1EnvVar(name="BASH_ENV", value="/servicex/.bashrc")] # provide pods with level and logging server info env += [ - client.V1EnvVar("LOG_LEVEL", value=os.environ.get('LOG_LEVEL', 'INFO').upper()), - client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get('LOGSTASH_HOST')), - client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get('LOGSTASH_PORT')) + client.V1EnvVar( + "LOG_LEVEL", value=os.environ.get("LOG_LEVEL", "INFO").upper() + ), + client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get("LOGSTASH_HOST")), + client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get("LOGSTASH_PORT")), ] # Provide each pod with an environment var holding that pod's name pod_name_value_from = client.V1EnvVarSource( - field_ref=client.V1ObjectFieldSelector( - field_path="metadata.name")) + field_ref=client.V1ObjectFieldSelector(field_path="metadata.name") + ) host_name_value_from = client.V1EnvVarSource( - field_ref=client.V1ObjectFieldSelector( - field_path="spec.nodeName")) + field_ref=client.V1ObjectFieldSelector(field_path="spec.nodeName") + ) env += [ client.V1EnvVar("POD_NAME", value_from=pod_name_value_from), - client.V1EnvVar("HOST_NAME", value_from=host_name_value_from) + client.V1EnvVar("HOST_NAME", value_from=host_name_value_from), ] # Provide each pod with an environment var holding that instance name if "INSTANCE_NAME" in current_app.config: - instance_name = current_app.config['INSTANCE_NAME'] - env_var_instance_name = client.V1EnvVar("INSTANCE_NAME", - value=instance_name) + instance_name = current_app.config["INSTANCE_NAME"] + env_var_instance_name = client.V1EnvVar( + "INSTANCE_NAME", value=instance_name + ) env = env + [env_var_instance_name] # provide each pod with an environment var holding cache prefix path if "TRANSFORMER_CACHE_PREFIX" in current_app.config: - env += [client.V1EnvVar("CACHE_PREFIX", - value=current_app.config["TRANSFORMER_CACHE_PREFIX"])] + env += [ + client.V1EnvVar( + "CACHE_PREFIX", value=current_app.config["TRANSFORMER_CACHE_PREFIX"] + ) + ] - if result_destination == 'object-store': + if result_destination == "object-store": env = env + [ - client.V1EnvVar(name='MINIO_URL', - value=current_app.config['MINIO_URL_TRANSFORMER']), - client.V1EnvVar(name='MINIO_ACCESS_KEY', - value=current_app.config['MINIO_ACCESS_KEY']), - client.V1EnvVar(name='MINIO_SECRET_KEY', - value=current_app.config['MINIO_SECRET_KEY']), + client.V1EnvVar( + name="MINIO_URL", value=current_app.config["MINIO_URL_TRANSFORMER"] + ), + client.V1EnvVar( + name="MINIO_ACCESS_KEY", + value=current_app.config["MINIO_ACCESS_KEY"], + ), + client.V1EnvVar( + name="MINIO_SECRET_KEY", + value=current_app.config["MINIO_SECRET_KEY"], + ), ] - if 'MINIO_ENCRYPT' in current_app.config: - env += [client.V1EnvVar(name='MINIO_ENCRYPT', - value=str(current_app.config['MINIO_ENCRYPT']))] + if "MINIO_ENCRYPT" in current_app.config: + env += [ + client.V1EnvVar( + name="MINIO_ENCRYPT", + value=str(current_app.config["MINIO_ENCRYPT"]), + ) + ] - if result_destination == 'volume': + if result_destination == "volume": TransformerManager.create_posix_volume(volumes, volume_mounts) science_command = " " if x509_secret: - science_command += "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" \ - "do sleep 5;done &&" \ - " /servicex/output/scripts/proxy-exporter.sh & sleep 5 && " - - sidecar_command = "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \ - "python /servicex/transformer_sidecar/transformer.py " + \ - " --shared-dir /servicex/output " + \ - " --request-id " + request_id + \ - " --rabbit-uri " + rabbitmq_uri + \ - " --result-destination " + result_destination + \ - " --result-format " + result_format - - watch_path = os.path.join(current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'], - request_id) - science_command += "cp /generated/transformer_capabilities.json {op} && " \ - "PYTHONPATH=/generated:$PYTHONPATH " \ - "bash {op}/scripts/watch.sh ".format(op=output_path) + \ - "{TL} ".format(TL=transformer_language) + \ - "{TC} ".format(TC=transformer_command) + \ - watch_path - - if result_destination == 'volume': + science_command += ( + "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" + "do sleep 5;done &&" + " /servicex/output/scripts/proxy-exporter.sh & sleep 5 && " + ) + + sidecar_command = ( + "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + + "python /servicex/transformer_sidecar/transformer.py " + + " --shared-dir /servicex/output " + + " --request-id " + + request_id + + " --rabbit-uri " + + rabbitmq_uri + + " --result-destination " + + result_destination + + " --result-format " + + result_format + ) + + watch_path = os.path.join( + current_app.config["TRANSFORMER_SIDECAR_VOLUME_PATH"], request_id + ) + science_command += ( + "cp /generated/transformer_capabilities.json {op} && " + "PYTHONPATH=/generated:$PYTHONPATH " + "bash {op}/scripts/watch.sh ".format(op=output_path) + + "{TL} ".format(TL=transformer_language) + + "{TC} ".format(TC=transformer_command) + + watch_path + ) + + if result_destination == "volume": sidecar_command += " --output-dir " + os.path.join( TransformerManager.POSIX_VOLUME_MOUNT, - current_app.config['TRANSFORMER_PERSISTENCE_SUBDIR']) + current_app.config["TRANSFORMER_PERSISTENCE_SUBDIR"], + ) resources = client.V1ResourceRequirements( - limits={"cpu": current_app.config['TRANSFORMER_CPU_LIMIT'], - "memory": current_app.config['TRANSFORMER_MEMORY_LIMIT']} + limits={ + "cpu": current_app.config["TRANSFORMER_CPU_LIMIT"], + "memory": current_app.config["TRANSFORMER_MEMORY_LIMIT"], + } ) # Configure Pod template container science_container = client.V1Container( name="transformer", image=image, - image_pull_policy=current_app.config['TRANSFORMER_SCIENCE_IMAGE_PULL_POLICY'], + image_pull_policy=current_app.config[ + "TRANSFORMER_SCIENCE_IMAGE_PULL_POLICY" + ], volume_mounts=volume_mounts, command=["bash", "--login", "-c"], env=env, args=[science_command], - resources=resources + resources=resources, ) # Configure Pod template container sidecar = client.V1Container( name="sidecar", - image=current_app.config['TRANSFORMER_SIDECAR_IMAGE'], - image_pull_policy=current_app.config['TRANSFORMER_SIDECAR_PULL_POLICY'], + image=current_app.config["TRANSFORMER_SIDECAR_IMAGE"], + image_pull_policy=current_app.config["TRANSFORMER_SIDECAR_PULL_POLICY"], volume_mounts=volume_mounts, command=["bash", "-c"], env=env, args=[sidecar_command], - resources=resources + resources=resources, ) # Create and Configure a spec section template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={'app': "transformer-" + request_id}), + metadata=client.V1ObjectMeta(labels={"app": "transformer-" + request_id}), spec=client.V1PodSpec( restart_policy="Always", termination_grace_period_seconds=TransformerManager.POD_TERMINATION_GRACE_PERIOD, - priority_class_name=current_app.config.get('TRANSFORMER_PRIORITY_CLASS', None), - containers=[sidecar, science_container], # Containers are started in this order - volumes=volumes)) + priority_class_name=current_app.config.get( + "TRANSFORMER_PRIORITY_CLASS", None + ), + containers=[ + sidecar, + science_container, + ], # Containers are started in this order + volumes=volumes, + ), + ) # Create the specification of deployment selector = client.V1LabelSelector( - match_labels={ - "app": "transformer-" + request_id - }) + match_labels={"app": "transformer-" + request_id} + ) # If we are using Autoscaler then always start with one replica - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: - replicas = current_app.config.get('TRANSFORMER_MIN_REPLICAS', 1) + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: + replicas = current_app.config.get("TRANSFORMER_MIN_REPLICAS", 1) else: replicas = workers spec = client.V1DeploymentSpec( - template=template, - selector=selector, - replicas=replicas + template=template, selector=selector, replicas=replicas ) deployment = client.V1Deployment( api_version="apps/v1", kind="Deployment", metadata=client.V1ObjectMeta(name="transformer-" + request_id), - spec=spec + spec=spec, ) return deployment @staticmethod def create_posix_volume(volumes, volume_mounts): - if 'TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM' not in current_app.config or \ - not current_app.config['TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM']: + if ( + "TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM" not in current_app.config + or not current_app.config["TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM"] + ): empty_dir = client.V1Volume( - name='posix-volume', - empty_dir=client.V1EmptyDirVolumeSource()) + name="posix-volume", empty_dir=client.V1EmptyDirVolumeSource() + ) volumes.append(empty_dir) else: volumes.append( client.V1Volume( - name='posix-volume', + name="posix-volume", persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name=current_app.config['TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM'] - ) + claim_name=current_app.config[ + "TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM" + ] + ), ) ) volume_mounts.append( - client.V1VolumeMount(mount_path=TransformerManager.POSIX_VOLUME_MOUNT, - name='posix-volume')) + client.V1VolumeMount( + mount_path=TransformerManager.POSIX_VOLUME_MOUNT, name="posix-volume" + ) + ) def persistent_volume_claim_exists(self, claim_name, namespace): api = client.CoreV1Api() - pvcs = api.list_namespaced_persistent_volume_claim(namespace=namespace, watch=False) + pvcs = api.list_namespaced_persistent_volume_claim( + namespace=namespace, watch=False + ) for pvc in pvcs.items: if pvc.metadata.name == claim_name: - if pvc.status.phase == 'Bound': + if pvc.status.phase == "Bound": return True else: - current_app.logger.warning(f"Volume Claim '{claim_name} found, " - f"but it is not bound") + current_app.logger.warning( + f"Volume Claim '{claim_name} found, " f"but it is not bound" + ) return False return False @staticmethod def create_hpa_object(request_id, max_workers): target = client.V1CrossVersionObjectReference( - api_version="apps/v1", - kind='Deployment', - name="transformer-" + request_id + api_version="apps/v1", kind="Deployment", name="transformer-" + request_id ) cfg = current_app.config @@ -331,13 +422,13 @@ def create_hpa_object(request_id, max_workers): scale_target_ref=target, target_cpu_utilization_percentage=cfg["TRANSFORMER_CPU_SCALE_THRESHOLD"], min_replicas=min(max_workers, cfg["TRANSFORMER_MIN_REPLICAS"]), - max_replicas=min(max_workers, cfg["TRANSFORMER_MAX_REPLICAS"]) + max_replicas=min(max_workers, cfg["TRANSFORMER_MAX_REPLICAS"]), ) hpa = client.V1HorizontalPodAutoscaler( api_version="autoscaling/v1", - kind='HorizontalPodAutoscaler', + kind="HorizontalPodAutoscaler", metadata=client.V1ObjectMeta(name="transformer-" + request_id), - spec=spec + spec=spec, ) return hpa @@ -354,26 +445,44 @@ def _create_job(api_instance, job, namespace): def _create_hpa(api_instance, hpa, namespace): try: api_instance.create_namespaced_horizontal_pod_autoscaler( - body=hpa, - namespace=namespace) + body=hpa, namespace=namespace + ) current_app.logger.info("HPA created.") except ApiException as e: current_app.logger.exception(f"Exception during HPA Creation: {e}") - def launch_transformer_jobs(self, image, request_id, workers, max_workers, - rabbitmq_uri, namespace, x509_secret, generated_code_cm, - result_destination, result_format, transformer_language, - transformer_command - ): + def launch_transformer_jobs( + self, + image, + request_id, + workers, + max_workers, + rabbitmq_uri, + namespace, + x509_secret, + generated_code_cm, + result_destination, + result_format, + transformer_language, + transformer_command, + ): api_v1 = client.AppsV1Api() - job = self.create_job_object(request_id, image, rabbitmq_uri, workers, - result_destination, result_format, - x509_secret, generated_code_cm, - transformer_language, transformer_command) + job = self.create_job_object( + request_id, + image, + rabbitmq_uri, + workers, + result_destination, + result_format, + x509_secret, + generated_code_cm, + transformer_language, + transformer_command, + ) self._create_job(api_v1, job, namespace) - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: autoscaler_api = kubernetes.client.AutoscalingV1Api() hpa = self.create_hpa_object(request_id, max_workers) self._create_hpa(autoscaler_api, hpa, namespace) @@ -381,48 +490,54 @@ def launch_transformer_jobs(self, image, request_id, workers, max_workers, @classmethod def shutdown_transformer_job(cls, request_id, namespace): try: - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: autoscaler_api = kubernetes.client.AutoscalingV1Api() autoscaler_api.delete_namespaced_horizontal_pod_autoscaler( - name="transformer-" + request_id, - namespace=namespace + name="transformer-" + request_id, namespace=namespace ) except ApiException: - current_app.logger.exception("Exception during Job HPA Shut Down", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job HPA Shut Down", extra={"requestId": request_id} + ) try: api_core = client.CoreV1Api() configmap_name = "{}-generated-source".format(request_id) - api_core.delete_namespaced_config_map(name=configmap_name, - namespace=namespace) + api_core.delete_namespaced_config_map( + name=configmap_name, namespace=namespace + ) except ApiException: - current_app.logger.exception("Exception during Job ConfigMap cleanup", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job ConfigMap cleanup", + extra={"requestId": request_id}, + ) try: api_v1 = client.AppsV1Api() api_v1.delete_namespaced_deployment( - name="transformer-" + request_id, - namespace=namespace + name="transformer-" + request_id, namespace=namespace ) except ApiException: - current_app.logger.exception("Exception during Job Deployment Shut Down", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job Deployment Shut Down", + extra={"requestId": request_id}, + ) # delete RabbitMQ queue try: - current_app.logger.info(f"Stopping workers connected to transformer-{request_id}") + current_app.logger.info( + f"Stopping workers connected to transformer-{request_id}" + ) cls.celery_app.control.cancel_consumer(f"transformer-{request_id}") except Exception as e: - current_app.logger.exception("Exception during Celery queue cancellation", extra={ - "requestId": request_id, - "exception": e - }) + current_app.logger.exception( + "Exception during Celery queue cancellation", + extra={"requestId": request_id, "exception": e}, + ) @staticmethod def get_deployment_status( - request_id: str + request_id: str, ) -> Optional[kubernetes.client.models.v1_deployment_status.V1DeploymentStatus]: namespace = current_app.config["TRANSFORMER_NAMESPACE"] api = client.AppsV1Api() @@ -438,9 +553,8 @@ def get_deployment_status( def create_configmap_from_zip(zipfile, request_id, namespace): configmap_name = "{}-generated-source".format(request_id) data = { - file.filename: - base64.b64encode(zipfile.open(file).read()).decode("ascii") for file in - zipfile.filelist + file.filename: base64.b64encode(zipfile.open(file).read()).decode("ascii") + for file in zipfile.filelist } metadata = client.V1ObjectMeta( @@ -450,14 +564,9 @@ def create_configmap_from_zip(zipfile, request_id, namespace): # Instantiate the configmap object configmap = client.V1ConfigMap( - api_version="v1", - kind="ConfigMap", - binary_data=data, - metadata=metadata + api_version="v1", kind="ConfigMap", binary_data=data, metadata=metadata ) api_instance = client.CoreV1Api() - api_instance.create_namespaced_config_map( - namespace=namespace, - body=configmap) + api_instance.create_namespaced_config_map(namespace=namespace, body=configmap) return configmap_name