From 111c8358e742015f66043ac9309505e2a8152acf Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Fri, 2 May 2025 13:46:30 -0500 Subject: [PATCH 1/8] improve local dev for server side development --- Procfile | 4 + helm/servicex/templates/app/deployment.yaml | 19 ++ helm/servicex/values.yaml | 1 + local/port-forward.sh | 259 ++++++++++++++++++++ servicex_app/boot.sh | 15 +- 5 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 Procfile create mode 100644 local/port-forward.sh diff --git a/Procfile b/Procfile new file mode 100644 index 000000000..61d4ca9e2 --- /dev/null +++ b/Procfile @@ -0,0 +1,4 @@ +minikube-mount: minikube start; minikube mount $LOCAL_DIR/servicex_app:/mnt/servicex & sleep 5 && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && sleep infinity +port-forward-app: cd $LOCAL_DIR && bash local/port-forward.sh app +port-forward-minio: cd $LOCAL_DIR && bash local/port-forward.sh minio +port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db \ No newline at end of file diff --git a/helm/servicex/templates/app/deployment.yaml b/helm/servicex/templates/app/deployment.yaml index ab0f817b3..02e5010c9 100644 --- a/helm/servicex/templates/app/deployment.yaml +++ b/helm/servicex/templates/app/deployment.yaml @@ -37,6 +37,11 @@ spec: containers: - name: {{ .Release.Name }}-servicex-app image: {{ .Values.app.image }}:{{ .Values.app.tag }} + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.reload }} + command: [ "./boot.sh", "--reload" ] + {{- end }} + {{- end }} env: - name: APP_CONFIG_FILE value: "/opt/servicex/app.conf" @@ -135,6 +140,12 @@ spec: {{- end }} volumeMounts: + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.mount_local }} + - name: host-volume + mountPath: /home/servicex + {{- end }} + {{- end }} - name: app-cfg mountPath: /opt/servicex - name: sqlite @@ -149,6 +160,14 @@ spec: - containerPort: 5000 volumes: + {{- if eq .Values.app.environment "dev" }} + {{- if .Values.app.mount_local }} + - name: host-volume + hostPath: + - path: /mnt/servicex + - type: DirectoryOrCreate + {{- end }} + {{- end }} - name: app-cfg configMap: name: {{ .Release.Name }}-flask-config diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index 28007947c..a8d40411d 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -1,4 +1,5 @@ app: + environment: production adminEmail: admin@example.com auth: false authExpires: 21600 diff --git a/local/port-forward.sh b/local/port-forward.sh new file mode 100644 index 000000000..d1b971a53 --- /dev/null +++ b/local/port-forward.sh @@ -0,0 +1,259 @@ +#!/bin/bash + +# Constants +NAMESPACE="default" +PING_INTERVAL=2 # seconds +MAX_HELM_RETRIES=12 # Maximum number of times to check for helm installation + +# Configuration based on service type +if [ "$1" = "app" ]; then + APP_LABEL="servicex-servicex-app" + LOCAL_PORT=5000 + CONTAINER_PORT=5000 + PING_URL="http://localhost:${LOCAL_PORT}/servicex" +elif [ "$1" = "minio" ]; then + APP_LABEL="minio" + LOCAL_PORT=9000 + CONTAINER_PORT=9000 + # Don't use direct HTTP check for Minio as it might require auth + PING_URL="" +elif [ "$1" = "db" ]; then + APP_LABEL="postgresql" + LOCAL_PORT=5432 + CONTAINER_PORT=5432 + PING_URL="" # No HTTP endpoint to check for DB + POD_NAME="servicex-postgresql-0" # Fixed pod name for DB +else + echo "Usage: $0 [app|minio|db]" + echo " app - Port forward to ServiceX app (5000:5000)" + echo " minio - Port forward to Minio (9000:9000)" + echo " db - Port forward to PostgreSQL (5432:5432)" + exit 1 +fi + +# Function to log with timestamp +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" +} + +# Function to check if servicex helm installation exists +check_helm_installation() { + log "Checking if 'servicex' helm installation exists" + if helm list | grep -q servicex; then + log "Found 'servicex' helm installation" + return 0 + else + log "No 'servicex' helm installation found" + return 1 + fi +} + +# Function to wait for helm installation +wait_for_helm_installation() { + log "Waiting for 'servicex' helm installation" + local retries=0 + + while ! check_helm_installation && [ $retries -lt $MAX_HELM_RETRIES ]; do + retries=$((retries+1)) + log "Waiting for 'servicex' helm installation... ($retries/$MAX_HELM_RETRIES)" + sleep 5 + done + + if [ $retries -ge $MAX_HELM_RETRIES ]; then + log "Timed out waiting for 'servicex' helm installation" + return 1 + fi + + return 0 +} + +# Function to get pod name +get_pod_name() { + # Skip for DB since we already know the pod name + if [ "$1" = "db" ]; then + return 0 + fi + + log "Getting pod name for $APP_LABEL" + + if [ "$1" = "app" ]; then + POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=servicex-servicex-app" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) + elif [ "$1" = "minio" ]; then + # Try multiple approaches to find the minio pod without detailed logging + POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=minio,release=servicex" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) + + if [ -z "$POD_NAME" ]; then + POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=minio" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) + fi + + if [ -z "$POD_NAME" ]; then + POD_NAME=$(kubectl get pods --namespace $NAMESPACE | grep -i "servicex-minio" | awk '{print $1}' | head -1) + fi + + if [ -z "$POD_NAME" ]; then + POD_NAME=$(kubectl get pods --namespace $NAMESPACE | grep -i "minio" | awk '{print $1}' | head -1) + fi + + if [ -z "$POD_NAME" ]; then + log "Error: Could not find any minio pods." + return 1 + fi + fi + + if [ -z "$POD_NAME" ]; then + log "Error: Could not find pod for $APP_LABEL" + return 1 + fi + + log "Found pod: ${POD_NAME}" + return 0 +} + +# Function to start port forwarding +start_port_forward() { + if [ "$1" = "db" ]; then + log "Starting port forwarding from localhost:${LOCAL_PORT} to servicex-postgresql-0:${CONTAINER_PORT}" + kubectl port-forward --namespace $NAMESPACE servicex-postgresql-0 ${LOCAL_PORT}:${CONTAINER_PORT} & + else + log "Starting port forwarding from localhost:${LOCAL_PORT} to ${POD_NAME}:${CONTAINER_PORT}" + kubectl port-forward --namespace $NAMESPACE $POD_NAME ${LOCAL_PORT}:${CONTAINER_PORT} & + fi + + PORT_FORWARD_PID=$! + + # Give it a moment to establish connection + sleep 2 + + # Check if port forward is successful + if ! ps -p $PORT_FORWARD_PID > /dev/null; then + log "Error: Port forwarding failed to start" + return 1 + fi + + log "Port forwarding established with PID: ${PORT_FORWARD_PID}" + return 0 +} + +# Function to check if the port forwarding is working +check_port_forward() { + # For services without HTTP endpoints or with auth requirements like Minio + if [ -z "$PING_URL" ]; then + # Check if the port is listening + if command -v nc >/dev/null 2>&1; then + if nc -z localhost ${LOCAL_PORT} >/dev/null 2>&1; then + log "Port forward is working - port ${LOCAL_PORT} is open" + return 0 + else + log "Connection failed - port ${LOCAL_PORT} is not open" + return 1 + fi + else + # If nc is not available, try lsof + if command -v lsof >/dev/null 2>&1; then + if lsof -i:${LOCAL_PORT} >/dev/null 2>&1; then + log "Port forward is working - port ${LOCAL_PORT} is open" + return 0 + else + log "Connection failed - port ${LOCAL_PORT} is not open" + return 1 + fi + else + # If neither nc nor lsof is available, just check if the process is running + if ps -p $PORT_FORWARD_PID > /dev/null; then + log "Port forward process is still running" + return 0 + else + log "Port forward process is not running" + return 1 + fi + fi + fi + else + # For services with HTTP endpoints + local response_code + # Use curl with a short timeout to prevent long waits + response_code=$(curl -s -o /dev/null -w "%{http_code}" --max-time 3 "${PING_URL}" || echo "failed") + + if [[ "$response_code" =~ ^(200|302|301|303|307|308|403|0|failed)$ ]]; then + if [[ "$response_code" == "failed" ]]; then + log "Connection failed - no response" + return 1 + elif [[ "$response_code" == "0" ]]; then + log "Connection failed - empty response" + return 1 + elif [[ "$response_code" == "403" ]]; then + # For 403 responses, this means the server is up but authentication failed + # This is actually a successful connection for our purposes + log "Port forward is working - received HTTP ${response_code} (authentication required)" + return 0 + else + # Any HTTP response code indicates the port forward is working + log "Port forward is working - received HTTP ${response_code}" + return 0 + fi + else + log "Unexpected status code: ${response_code}" + # Consider this a success anyway if we got any response + log "But got a response, so port forward is working" + return 0 + fi + fi +} + +# Function to clean up resources +cleanup() { + log "Cleaning up resources..." + if [ ! -z "$PORT_FORWARD_PID" ]; then + log "Killing port forwarding process (PID: ${PORT_FORWARD_PID})" + kill $PORT_FORWARD_PID 2>/dev/null || true + fi + log "Cleanup complete, exiting" + exit 0 +} + +# Set trap for cleanup +trap cleanup SIGINT SIGTERM + +# Main loop +log "Starting port forwarding monitor script for $1" + +# First, wait for the helm installation +if ! wait_for_helm_installation; then + log "Failed to find servicex helm installation after multiple attempts" + log "Please check if the servicex helm chart is properly installed" + exit 1 +fi + +while true; do + # Get pod name + if ! get_pod_name "$1"; then + log "Retrying in 5 seconds..." + sleep 5 + continue + fi + + # Start port forwarding + if ! start_port_forward "$1"; then + log "Retrying in 5 seconds..." + sleep 5 + continue + fi + + # Monitor port forwarding + while true; do + if ! check_port_forward "$1"; then + log "Port forward appears to be down, restarting..." + + # Kill existing port forward process if it exists + if [ ! -z "$PORT_FORWARD_PID" ]; then + kill $PORT_FORWARD_PID 2>/dev/null || true + unset PORT_FORWARD_PID + fi + + break + fi + + # Wait before checking again + sleep $PING_INTERVAL + done +done \ No newline at end of file diff --git a/servicex_app/boot.sh b/servicex_app/boot.sh index 71801c24e..2b5dfab5d 100755 --- a/servicex_app/boot.sh +++ b/servicex_app/boot.sh @@ -1,4 +1,17 @@ #!/bin/sh + +# Initialize reload flag +RELOAD="" + +# Parse command line arguments +for arg in "$@" +do + if [ "$arg" = "--reload" ]; then + RELOAD="--reload" + break + fi +done + mkdir instance # SQLite doesn't handle migrations, so rely on SQLAlchmy table creation if grep "sqlite://" $APP_CONFIG_FILE; then @@ -7,5 +20,5 @@ else FLASK_APP=servicex_app/app.py flask db upgrade; fi [ -d "/default_users" ] && python3 servicex/cli/create_default_users.py -exec gunicorn -b [::]:5000 --workers=5 --threads=1 --timeout 120 --log-level=warning --access-logfile /tmp/gunicorn.log --error-logfile - "servicex_app:create_app()" +exec gunicorn -b [::]:5000 $RELOAD --workers=5 --threads=1 --timeout 120 --log-level=warning --access-logfile /tmp/gunicorn.log --error-logfile - "servicex_app:create_app()" # to log requests to stdout --access-logfile - \ No newline at end of file From ef60177b0aebd0adb97a6697acb21121f7e9a95e Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Fri, 2 May 2025 19:45:30 -0500 Subject: [PATCH 2/8] correct port forward script --- local/port-forward.sh | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/local/port-forward.sh b/local/port-forward.sh index d1b971a53..45df369ca 100644 --- a/local/port-forward.sh +++ b/local/port-forward.sh @@ -174,28 +174,23 @@ check_port_forward() { # Use curl with a short timeout to prevent long waits response_code=$(curl -s -o /dev/null -w "%{http_code}" --max-time 3 "${PING_URL}" || echo "failed") - if [[ "$response_code" =~ ^(200|302|301|303|307|308|403|0|failed)$ ]]; then - if [[ "$response_code" == "failed" ]]; then - log "Connection failed - no response" - return 1 - elif [[ "$response_code" == "0" ]]; then - log "Connection failed - empty response" - return 1 - elif [[ "$response_code" == "403" ]]; then - # For 403 responses, this means the server is up but authentication failed - # This is actually a successful connection for our purposes - log "Port forward is working - received HTTP ${response_code} (authentication required)" + if [[ "$response_code" =~ ^(200|302|301|303|307|308|403)$ ]]; then + # Any of these HTTP response codes indicates the port forward is working + log "Port forward is working - received HTTP ${response_code}" + return 0 + elif [[ "$response_code" == "failed" || "$response_code" == "0" || "$response_code" == "000" ]]; then + log "Connection failed - no response from service. Will retry." + return 1 + else + log "Received unexpected status code: ${response_code}" + # Check if we should consider this a success + if [[ "$response_code" =~ ^[1-5][0-9][0-9]$ ]]; then + log "But got a valid HTTP response, so port forward is working" return 0 else - # Any HTTP response code indicates the port forward is working - log "Port forward is working - received HTTP ${response_code}" - return 0 + log "Invalid response - port forward may not be working correctly" + return 1 fi - else - log "Unexpected status code: ${response_code}" - # Consider this a success anyway if we got any response - log "But got a response, so port forward is working" - return 0 fi fi } @@ -256,4 +251,4 @@ while true; do # Wait before checking again sleep $PING_INTERVAL done -done \ No newline at end of file +done From 829c57524f921f18869dee9a646a93dd5e1e5007 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Mon, 12 May 2025 13:03:09 -0500 Subject: [PATCH 3/8] move minikube mount outside of Procfile, correct yaml errors in deployment --- Procfile | 3 ++- helm/servicex/templates/app/deployment.yaml | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Procfile b/Procfile index 61d4ca9e2..238cd75f4 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,5 @@ -minikube-mount: minikube start; minikube mount $LOCAL_DIR/servicex_app:/mnt/servicex & sleep 5 && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && sleep infinity +minikube-mount: minikube mount $LOCAL_DIR/servicex_app:/mnt/servicex & echo "Mount active on $LOCAL_DIR/servicex_app:/mnt/servicex" && (while true; do sleep 86400; done) +servicex-install: until kubectl get nodes &>/dev/null; do echo "Waiting for Kubernetes to be available..."; sleep 2; done && echo "Kubernetes is ready!" && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && echo "Helm installation complete, keeping process alive..." && while true; do sleep 86400; done port-forward-app: cd $LOCAL_DIR && bash local/port-forward.sh app port-forward-minio: cd $LOCAL_DIR && bash local/port-forward.sh minio port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db \ No newline at end of file diff --git a/helm/servicex/templates/app/deployment.yaml b/helm/servicex/templates/app/deployment.yaml index 02e5010c9..c6d32b8b1 100644 --- a/helm/servicex/templates/app/deployment.yaml +++ b/helm/servicex/templates/app/deployment.yaml @@ -164,8 +164,8 @@ spec: {{- if .Values.app.mount_local }} - name: host-volume hostPath: - - path: /mnt/servicex - - type: DirectoryOrCreate + path: /mnt/servicex + type: DirectoryOrCreate {{- end }} {{- end }} - name: app-cfg From 74da0ae1939482fc6381b7f20d5946cf979082cd Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 5 Jun 2025 14:42:52 -0500 Subject: [PATCH 4/8] update Procfile --- Procfile | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Procfile b/Procfile index 238cd75f4..04e9e2c10 100644 --- a/Procfile +++ b/Procfile @@ -1,5 +1,4 @@ -minikube-mount: minikube mount $LOCAL_DIR/servicex_app:/mnt/servicex & echo "Mount active on $LOCAL_DIR/servicex_app:/mnt/servicex" && (while true; do sleep 86400; done) -servicex-install: until kubectl get nodes &>/dev/null; do echo "Waiting for Kubernetes to be available..."; sleep 2; done && echo "Kubernetes is ready!" && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && echo "Helm installation complete, keeping process alive..." && while true; do sleep 86400; done +minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex & sleep 5 && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && while true; do sleep 86400; done port-forward-app: cd $LOCAL_DIR && bash local/port-forward.sh app port-forward-minio: cd $LOCAL_DIR && bash local/port-forward.sh minio port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db \ No newline at end of file From 9ab70f17eb3e469bfd8096b3d3fa36d2abbc3b4c Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 18 Jun 2025 14:01:06 -0500 Subject: [PATCH 5/8] update local commands --- Procfile | 4 +- local/port-forward.sh | 309 +++++++++++++++--------------------------- 2 files changed, 110 insertions(+), 203 deletions(-) mode change 100644 => 100755 local/port-forward.sh diff --git a/Procfile b/Procfile index 04e9e2c10..baf80995e 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,4 @@ -minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex & sleep 5 && cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && while true; do sleep 86400; done +minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex & MOUNT_PID=$!; sleep 5; cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && wait $MOUNT_PID port-forward-app: cd $LOCAL_DIR && bash local/port-forward.sh app port-forward-minio: cd $LOCAL_DIR && bash local/port-forward.sh minio -port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db \ No newline at end of file +port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db diff --git a/local/port-forward.sh b/local/port-forward.sh old mode 100644 new mode 100755 index 45df369ca..7e737fe8a --- a/local/port-forward.sh +++ b/local/port-forward.sh @@ -1,254 +1,161 @@ -#!/bin/bash +#!/usr/bin/env bash + +set -euo pipefail # Constants -NAMESPACE="default" -PING_INTERVAL=2 # seconds -MAX_HELM_RETRIES=12 # Maximum number of times to check for helm installation - -# Configuration based on service type -if [ "$1" = "app" ]; then - APP_LABEL="servicex-servicex-app" - LOCAL_PORT=5000 - CONTAINER_PORT=5000 - PING_URL="http://localhost:${LOCAL_PORT}/servicex" -elif [ "$1" = "minio" ]; then - APP_LABEL="minio" - LOCAL_PORT=9000 - CONTAINER_PORT=9000 - # Don't use direct HTTP check for Minio as it might require auth - PING_URL="" -elif [ "$1" = "db" ]; then - APP_LABEL="postgresql" - LOCAL_PORT=5432 - CONTAINER_PORT=5432 - PING_URL="" # No HTTP endpoint to check for DB - POD_NAME="servicex-postgresql-0" # Fixed pod name for DB -else - echo "Usage: $0 [app|minio|db]" - echo " app - Port forward to ServiceX app (5000:5000)" - echo " minio - Port forward to Minio (9000:9000)" - echo " db - Port forward to PostgreSQL (5432:5432)" - exit 1 -fi +readonly NAMESPACE="${NAMESPACE:-default}" +readonly PING_INTERVAL=5 +readonly MAX_RETRIES=10 + +# Parse service configuration +case "${1:-}" in + app) + SERVICE_NAME="servicex-servicex-app" + CONTAINER_PORT="8000" + LOCAL_PORT="5001" + ;; + minio) + SERVICE_NAME="servicex-minio" + CONTAINER_PORT="9000" + LOCAL_PORT="9000" + ;; + db) + SERVICE_NAME="servicex-postgresql" + CONTAINER_PORT="5432" + LOCAL_PORT="5432" + ;; + *) + echo "Usage: $0 [app|minio|db]" + echo " app - Port forward to ServiceX app (5001 -> 8000)" + echo " minio - Port forward to Minio (9000 -> 9000)" + echo " db - Port forward to PostgreSQL (5432 -> 5432)" + exit 1 + ;; +esac + +readonly SERVICE_TYPE="$1" # Function to log with timestamp log() { - echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >&2 } -# Function to check if servicex helm installation exists -check_helm_installation() { - log "Checking if 'servicex' helm installation exists" - if helm list | grep -q servicex; then - log "Found 'servicex' helm installation" - return 0 - else - log "No 'servicex' helm installation found" - return 1 - fi +# Function to check if service exists +check_service_exists() { + kubectl get service "$SERVICE_NAME" --namespace="$NAMESPACE" >/dev/null 2>&1 } -# Function to wait for helm installation -wait_for_helm_installation() { - log "Waiting for 'servicex' helm installation" +# Function to wait for service availability +wait_for_service() { + log "Waiting for service $SERVICE_NAME to be available..." local retries=0 - while ! check_helm_installation && [ $retries -lt $MAX_HELM_RETRIES ]; do - retries=$((retries+1)) - log "Waiting for 'servicex' helm installation... ($retries/$MAX_HELM_RETRIES)" - sleep 5 + while ! check_service_exists && [[ $retries -lt $MAX_RETRIES ]]; do + ((retries++)) + log "Service not found, retrying... ($retries/$MAX_RETRIES)" + sleep 3 done - if [ $retries -ge $MAX_HELM_RETRIES ]; then - log "Timed out waiting for 'servicex' helm installation" + if [[ $retries -ge $MAX_RETRIES ]]; then + log "Service $SERVICE_NAME not found after $MAX_RETRIES attempts" return 1 fi + log "Service $SERVICE_NAME is available" return 0 } -# Function to get pod name -get_pod_name() { - # Skip for DB since we already know the pod name - if [ "$1" = "db" ]; then - return 0 - fi - - log "Getting pod name for $APP_LABEL" - - if [ "$1" = "app" ]; then - POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=servicex-servicex-app" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) - elif [ "$1" = "minio" ]; then - # Try multiple approaches to find the minio pod without detailed logging - POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=minio,release=servicex" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) - - if [ -z "$POD_NAME" ]; then - POD_NAME=$(kubectl get pods --namespace $NAMESPACE -l "app=minio" -o jsonpath="{.items[0].metadata.name}" 2>/dev/null) - fi - - if [ -z "$POD_NAME" ]; then - POD_NAME=$(kubectl get pods --namespace $NAMESPACE | grep -i "servicex-minio" | awk '{print $1}' | head -1) - fi - - if [ -z "$POD_NAME" ]; then - POD_NAME=$(kubectl get pods --namespace $NAMESPACE | grep -i "minio" | awk '{print $1}' | head -1) - fi - - if [ -z "$POD_NAME" ]; then - log "Error: Could not find any minio pods." - return 1 - fi - fi - - if [ -z "$POD_NAME" ]; then - log "Error: Could not find pod for $APP_LABEL" - return 1 - fi - - log "Found pod: ${POD_NAME}" - return 0 -} - -# Function to start port forwarding +# Function to start port forwarding using service start_port_forward() { - if [ "$1" = "db" ]; then - log "Starting port forwarding from localhost:${LOCAL_PORT} to servicex-postgresql-0:${CONTAINER_PORT}" - kubectl port-forward --namespace $NAMESPACE servicex-postgresql-0 ${LOCAL_PORT}:${CONTAINER_PORT} & - else - log "Starting port forwarding from localhost:${LOCAL_PORT} to ${POD_NAME}:${CONTAINER_PORT}" - kubectl port-forward --namespace $NAMESPACE $POD_NAME ${LOCAL_PORT}:${CONTAINER_PORT} & - fi - + log "Starting port forwarding: localhost:${LOCAL_PORT} -> ${SERVICE_NAME}:${CONTAINER_PORT}" + kubectl port-forward --namespace="$NAMESPACE" "service/$SERVICE_NAME" "${LOCAL_PORT}:${CONTAINER_PORT}" & PORT_FORWARD_PID=$! # Give it a moment to establish connection sleep 2 - # Check if port forward is successful - if ! ps -p $PORT_FORWARD_PID > /dev/null; then - log "Error: Port forwarding failed to start" + # Check if port forward process is still running + if ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Port forwarding failed to start" return 1 fi - log "Port forwarding established with PID: ${PORT_FORWARD_PID}" + log "Port forwarding established (PID: $PORT_FORWARD_PID)" return 0 } -# Function to check if the port forwarding is working +# Function to check if port forwarding is working check_port_forward() { - # For services without HTTP endpoints or with auth requirements like Minio - if [ -z "$PING_URL" ]; then - # Check if the port is listening - if command -v nc >/dev/null 2>&1; then - if nc -z localhost ${LOCAL_PORT} >/dev/null 2>&1; then - log "Port forward is working - port ${LOCAL_PORT} is open" - return 0 - else - log "Connection failed - port ${LOCAL_PORT} is not open" - return 1 - fi - else - # If nc is not available, try lsof - if command -v lsof >/dev/null 2>&1; then - if lsof -i:${LOCAL_PORT} >/dev/null 2>&1; then - log "Port forward is working - port ${LOCAL_PORT} is open" - return 0 - else - log "Connection failed - port ${LOCAL_PORT} is not open" - return 1 - fi - else - # If neither nc nor lsof is available, just check if the process is running - if ps -p $PORT_FORWARD_PID > /dev/null; then - log "Port forward process is still running" - return 0 - else - log "Port forward process is not running" - return 1 - fi - fi - fi - else - # For services with HTTP endpoints - local response_code - # Use curl with a short timeout to prevent long waits - response_code=$(curl -s -o /dev/null -w "%{http_code}" --max-time 3 "${PING_URL}" || echo "failed") - - if [[ "$response_code" =~ ^(200|302|301|303|307|308|403)$ ]]; then - # Any of these HTTP response codes indicates the port forward is working - log "Port forward is working - received HTTP ${response_code}" - return 0 - elif [[ "$response_code" == "failed" || "$response_code" == "0" || "$response_code" == "000" ]]; then - log "Connection failed - no response from service. Will retry." + # Simply check if the port forward process is still running + if ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Port forward process is not running" + return 1 + fi + + # Additional check: verify port is actually listening + if command -v nc >/dev/null 2>&1; then + if ! nc -z localhost "$LOCAL_PORT" 2>/dev/null; then + log "Port $LOCAL_PORT is not accessible" return 1 - else - log "Received unexpected status code: ${response_code}" - # Check if we should consider this a success - if [[ "$response_code" =~ ^[1-5][0-9][0-9]$ ]]; then - log "But got a valid HTTP response, so port forward is working" - return 0 - else - log "Invalid response - port forward may not be working correctly" - return 1 - fi fi fi + + return 0 } # Function to clean up resources cleanup() { - log "Cleaning up resources..." - if [ ! -z "$PORT_FORWARD_PID" ]; then - log "Killing port forwarding process (PID: ${PORT_FORWARD_PID})" - kill $PORT_FORWARD_PID 2>/dev/null || true + log "Cleaning up..." + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + log "Terminating port forwarding process (PID: $PORT_FORWARD_PID)" + kill "$PORT_FORWARD_PID" 2>/dev/null || true + wait "$PORT_FORWARD_PID" 2>/dev/null || true fi - log "Cleanup complete, exiting" + log "Cleanup complete" exit 0 } # Set trap for cleanup -trap cleanup SIGINT SIGTERM - -# Main loop -log "Starting port forwarding monitor script for $1" - -# First, wait for the helm installation -if ! wait_for_helm_installation; then - log "Failed to find servicex helm installation after multiple attempts" - log "Please check if the servicex helm chart is properly installed" - exit 1 -fi - -while true; do - # Get pod name - if ! get_pod_name "$1"; then - log "Retrying in 5 seconds..." - sleep 5 - continue +trap cleanup SIGINT SIGTERM EXIT + +# Main execution +main() { + log "Starting port forwarding for $SERVICE_TYPE: $SERVICE_NAME" + + # Wait for service to be available + if ! wait_for_service; then + log "Service $SERVICE_NAME is not available" + exit 1 fi - # Start port forwarding - if ! start_port_forward "$1"; then - log "Retrying in 5 seconds..." - sleep 5 - continue - fi - - # Monitor port forwarding + # Main monitoring loop while true; do - if ! check_port_forward "$1"; then - log "Port forward appears to be down, restarting..." + # Start port forwarding if not already running + if [[ -z "${PORT_FORWARD_PID:-}" ]] || ! kill -0 "$PORT_FORWARD_PID" 2>/dev/null; then + log "Starting port forwarding..." + if ! start_port_forward; then + log "Failed to start port forwarding, retrying in $PING_INTERVAL seconds..." + sleep "$PING_INTERVAL" + continue + fi + fi - # Kill existing port forward process if it exists - if [ ! -z "$PORT_FORWARD_PID" ]; then - kill $PORT_FORWARD_PID 2>/dev/null || true + # Check if port forwarding is working + if ! check_port_forward; then + log "Port forwarding failed, restarting..." + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + kill "$PORT_FORWARD_PID" 2>/dev/null || true + wait "$PORT_FORWARD_PID" 2>/dev/null || true unset PORT_FORWARD_PID fi - - break + sleep 2 + continue fi - # Wait before checking again - sleep $PING_INTERVAL + # Wait before next check + sleep "$PING_INTERVAL" done -done +} + +# Run main function +main From 0695b74e68a639c913dcb1d5fc888ea13dbb055a Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 18 Jun 2025 14:19:10 -0500 Subject: [PATCH 6/8] improve local script --- Procfile | 6 ++--- local/port-forward.sh | 52 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/Procfile b/Procfile index baf80995e..93e6276f5 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,4 @@ minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex & MOUNT_PID=$!; sleep 5; cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && wait $MOUNT_PID -port-forward-app: cd $LOCAL_DIR && bash local/port-forward.sh app -port-forward-minio: cd $LOCAL_DIR && bash local/port-forward.sh minio -port-forward-db: cd $LOCAL_DIR && bash local/port-forward.sh db +port-forward-app: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh app +port-forward-minio: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh minio +port-forward-db: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh db diff --git a/local/port-forward.sh b/local/port-forward.sh index 7e737fe8a..ad3e3c4b3 100755 --- a/local/port-forward.sh +++ b/local/port-forward.sh @@ -12,7 +12,7 @@ case "${1:-}" in app) SERVICE_NAME="servicex-servicex-app" CONTAINER_PORT="8000" - LOCAL_PORT="5001" + LOCAL_PORT="5000" ;; minio) SERVICE_NAME="servicex-minio" @@ -26,7 +26,7 @@ case "${1:-}" in ;; *) echo "Usage: $0 [app|minio|db]" - echo " app - Port forward to ServiceX app (5001 -> 8000)" + echo " app - Port forward to ServiceX app (5000 -> 8000)" echo " minio - Port forward to Minio (9000 -> 9000)" echo " db - Port forward to PostgreSQL (5432 -> 5432)" exit 1 @@ -45,6 +45,37 @@ check_service_exists() { kubectl get service "$SERVICE_NAME" --namespace="$NAMESPACE" >/dev/null 2>&1 } +# Function to wait for pod to be ready +wait_for_pod_ready() { + log "Waiting for pods of service $SERVICE_NAME to be ready..." + local retries=0 + + while [[ $retries -lt $MAX_RETRIES ]]; do + # Get pods for the service using selector + local selector=$(kubectl get service "$SERVICE_NAME" --namespace="$NAMESPACE" -o jsonpath='{.spec.selector}' 2>/dev/null || echo "") + + if [[ -n "$selector" ]]; then + # Convert JSON selector to kubectl selector format + local kubectl_selector=$(echo "$selector" | sed 's/[{}"]//g' | sed 's/:/=/g' | sed 's/,/,/g') + + # Check if any pods are ready + local ready_pods=$(kubectl get pods --namespace="$NAMESPACE" --selector="$kubectl_selector" --field-selector=status.phase=Running -o name 2>/dev/null | wc -l) + + if [[ $ready_pods -gt 0 ]]; then + log "Found $ready_pods ready pod(s) for service $SERVICE_NAME" + return 0 + fi + fi + + ((retries++)) + log "No ready pods found, retrying... ($retries/$MAX_RETRIES)" + sleep 5 + done + + log "No ready pods found for service $SERVICE_NAME after $MAX_RETRIES attempts" + return 1 +} + # Function to wait for service availability wait_for_service() { log "Waiting for service $SERVICE_NAME to be available..." @@ -68,6 +99,13 @@ wait_for_service() { # Function to start port forwarding using service start_port_forward() { log "Starting port forwarding: localhost:${LOCAL_PORT} -> ${SERVICE_NAME}:${CONTAINER_PORT}" + + # Check if pods are still ready before attempting port forward + if ! wait_for_pod_ready; then + log "Pods not ready, cannot start port forwarding" + return 1 + fi + kubectl port-forward --namespace="$NAMESPACE" "service/$SERVICE_NAME" "${LOCAL_PORT}:${CONTAINER_PORT}" & PORT_FORWARD_PID=$! @@ -127,6 +165,12 @@ main() { log "Service $SERVICE_NAME is not available" exit 1 fi + + # Wait for pods to be ready + if ! wait_for_pod_ready; then + log "No ready pods found for service $SERVICE_NAME" + exit 1 + fi # Main monitoring loop while true; do @@ -135,6 +179,10 @@ main() { log "Starting port forwarding..." if ! start_port_forward; then log "Failed to start port forwarding, retrying in $PING_INTERVAL seconds..." + # Reset PID if it exists but failed + if [[ -n "${PORT_FORWARD_PID:-}" ]]; then + unset PORT_FORWARD_PID + fi sleep "$PING_INTERVAL" continue fi From 7c2515bb4df975ee2236be19cf2e5140ba5df824 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 18 Jun 2025 18:56:01 -0500 Subject: [PATCH 7/8] update helm and kubernetes charts --- helm/servicex/templates/app/configmap.yaml | 4 +++ helm/servicex/templates/app/deployment.yaml | 2 +- helm/servicex/values.yaml | 1 + .../servicex_app/transformer_manager.py | 35 +++++++++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/helm/servicex/templates/app/configmap.yaml b/helm/servicex/templates/app/configmap.yaml index ac0465857..4c7b45499 100644 --- a/helm/servicex/templates/app/configmap.yaml +++ b/helm/servicex/templates/app/configmap.yaml @@ -13,6 +13,10 @@ data: CHART = '{{ .Chart.Name }}-{{ .Chart.Version }}' APP_IMAGE_TAG = '{{ .Values.app.tag }}' + + # Application environment and local development settings + APP_ENVIRONMENT = '{{ .Values.app.environment }}' + APP_MOUNT_LOCAL = {{- if .Values.app.mountLocal }}True{{- else }}False{{- end }} #SERVER_NAME = '127.0.0.1:5000' # this is the session secret, used to protect the Flask session. You should diff --git a/helm/servicex/templates/app/deployment.yaml b/helm/servicex/templates/app/deployment.yaml index 0278401b4..c87159e21 100644 --- a/helm/servicex/templates/app/deployment.yaml +++ b/helm/servicex/templates/app/deployment.yaml @@ -164,7 +164,7 @@ spec: {{- if .Values.app.mount_local }} - name: host-volume hostPath: - path: /mnt/servicex + path: /mnt/servicex/servicex_app type: DirectoryOrCreate {{- end }} {{- end }} diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index e16c8a232..e6b235dde 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -1,5 +1,6 @@ app: environment: production + mountLocal: false adminEmail: admin@example.com auth: false authExpires: 21600 diff --git a/servicex_app/servicex_app/transformer_manager.py b/servicex_app/servicex_app/transformer_manager.py index 46cbbf357..4808709ed 100644 --- a/servicex_app/servicex_app/transformer_manager.py +++ b/servicex_app/servicex_app/transformer_manager.py @@ -109,6 +109,41 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, empty_dir=client.V1EmptyDirVolumeSource()) ) + # Only mount local volumes in development environment with mountLocal enabled + if (current_app.config.get('APP_ENVIRONMENT') == 'dev' and + current_app.config.get('APP_MOUNT_LOCAL')): + + volume_mounts.append( + client.V1VolumeMount( + name='host-volume', + mount_path='/servicex' + ) + ) + volumes.append( + client.V1Volume( + name='host-volume', + host_path=client.V1HostPathVolumeSource( + path='/mnt/servicex/transformer_sidecar/src', + type='DirectoryOrCreate' + ), + ) + ) + volume_mounts.append( + client.V1VolumeMount( + name='host-scripts-volume', + mount_path='/servicex/scripts' + ) + ) + volumes.append( + client.V1Volume( + name='host-scripts-volume', + host_path=client.V1HostPathVolumeSource( + path='/mnt/servicex/transformer_sidecar/scripts', + type='DirectoryOrCreate' + ), + ) + ) + if x509_secret: volume_mounts.append( client.V1VolumeMount( From 5d92658b4672dc65000f7d39ea0fe22f4dfe96f4 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 26 Jun 2025 14:39:48 -0500 Subject: [PATCH 8/8] correct Procfile and correct transformer_manager for flake8 --- Procfile | 3 +- .../servicex_app/transformer_manager.py | 434 ++++++++++-------- 2 files changed, 256 insertions(+), 181 deletions(-) diff --git a/Procfile b/Procfile index 93e6276f5..b943aecd1 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,5 @@ -minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex & MOUNT_PID=$!; sleep 5; cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && wait $MOUNT_PID +minikube-mount: minikube mount $LOCAL_DIR:/mnt/servicex +helm-install: sleep 5; cd $CHART_DIR && helm install -f $VALUES_FILE servicex . && tail -f /dev/null port-forward-app: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh app port-forward-minio: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh minio port-forward-db: sleep 20 && cd $LOCAL_DIR && bash local/port-forward.sh db diff --git a/servicex_app/servicex_app/transformer_manager.py b/servicex_app/servicex_app/transformer_manager.py index 4808709ed..fd419a417 100644 --- a/servicex_app/servicex_app/transformer_manager.py +++ b/servicex_app/servicex_app/transformer_manager.py @@ -41,7 +41,7 @@ class TransformerManager: # This is the number of seconds tha thte transformer has to finish uploading # objects to the object store before it is terminated. - POD_TERMINATION_GRACE_PERIOD = 5*60 + POD_TERMINATION_GRACE_PERIOD = 5 * 60 @classmethod def make_api(cls, celery_app): @@ -49,35 +49,39 @@ def make_api(cls, celery_app): return cls def __init__(self, manager_mode): - if manager_mode == 'internal-kubernetes': + if manager_mode == "internal-kubernetes": kubernetes.config.load_incluster_config() - elif manager_mode == 'external-kubernetes': + elif manager_mode == "external-kubernetes": kubernetes.config.load_kube_config() else: current_app.logger.error(f"Manager mode {manager_mode} not valid") - raise ValueError('Manager mode '+manager_mode+' not valid') + raise ValueError("Manager mode " + manager_mode + " not valid") def start_transformers(self, config: dict, request_rec: TransformRequest): """ Start the transformers for a given request """ - rabbitmq_uri = config['TRANSFORMER_RABBIT_MQ_URL'] - namespace = config['TRANSFORMER_NAMESPACE'] - x509_secret = config['TRANSFORMER_X509_SECRET'] + rabbitmq_uri = config["TRANSFORMER_RABBIT_MQ_URL"] + namespace = config["TRANSFORMER_NAMESPACE"] + x509_secret = config["TRANSFORMER_X509_SECRET"] generated_code_cm = request_rec.generated_code_cm request_rec.workers = min(max(1, request_rec.files), request_rec.workers) current_app.logger.info( f"Launching {request_rec.workers} transformers.", - extra={'requestId': request_rec.request_id}) + extra={"requestId": request_rec.request_id}, + ) self.launch_transformer_jobs( - image=request_rec.image, request_id=request_rec.request_id, + image=request_rec.image, + request_id=request_rec.request_id, workers=request_rec.workers, - max_workers=(max(1, request_rec.files) - if request_rec.status == TransformStatus.running - else config["TRANSFORMER_MAX_REPLICAS"]), + max_workers=( + max(1, request_rec.files) + if request_rec.status == TransformStatus.running + else config["TRANSFORMER_MAX_REPLICAS"] + ), rabbitmq_uri=rabbitmq_uri, namespace=namespace, x509_secret=x509_secret, @@ -85,61 +89,65 @@ def start_transformers(self, config: dict, request_rec: TransformRequest): result_destination=request_rec.result_destination, result_format=request_rec.result_format, transformer_language=request_rec.transformer_language, - transformer_command=request_rec.transformer_command + transformer_command=request_rec.transformer_command, ) @staticmethod - def create_job_object(request_id, image, rabbitmq_uri, workers, - result_destination, result_format, x509_secret, - generated_code_cm, transformer_language, transformer_command): + def create_job_object( + request_id, + image, + rabbitmq_uri, + workers, + result_destination, + result_format, + x509_secret, + generated_code_cm, + transformer_language, + transformer_command, + ): volume_mounts = [] volumes = [] # append sidecar volume - output_path = current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'] + output_path = current_app.config["TRANSFORMER_SIDECAR_VOLUME_PATH"] volume_mounts.append( - client.V1VolumeMount( - name='sidecar-volume', - mount_path=output_path) + client.V1VolumeMount(name="sidecar-volume", mount_path=output_path) ) volumes.append( client.V1Volume( - name='sidecar-volume', - empty_dir=client.V1EmptyDirVolumeSource()) + name="sidecar-volume", empty_dir=client.V1EmptyDirVolumeSource() + ) ) # Only mount local volumes in development environment with mountLocal enabled - if (current_app.config.get('APP_ENVIRONMENT') == 'dev' and - current_app.config.get('APP_MOUNT_LOCAL')): + if current_app.config.get( + "APP_ENVIRONMENT" + ) == "dev" and current_app.config.get("APP_MOUNT_LOCAL"): volume_mounts.append( - client.V1VolumeMount( - name='host-volume', - mount_path='/servicex' - ) + client.V1VolumeMount(name="host-volume", mount_path="/servicex") ) volumes.append( client.V1Volume( - name='host-volume', + name="host-volume", host_path=client.V1HostPathVolumeSource( - path='/mnt/servicex/transformer_sidecar/src', - type='DirectoryOrCreate' + path="/mnt/servicex/transformer_sidecar/src", + type="DirectoryOrCreate", ), ) ) volume_mounts.append( client.V1VolumeMount( - name='host-scripts-volume', - mount_path='/servicex/scripts' + name="host-scripts-volume", mount_path="/servicex/scripts" ) ) volumes.append( client.V1Volume( - name='host-scripts-volume', + name="host-scripts-volume", host_path=client.V1HostPathVolumeSource( - path='/mnt/servicex/transformer_sidecar/scripts', - type='DirectoryOrCreate' + path="/mnt/servicex/transformer_sidecar/scripts", + type="DirectoryOrCreate", ), ) ) @@ -147,217 +155,265 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, if x509_secret: volume_mounts.append( client.V1VolumeMount( - name='x509-secret', - mount_path='/etc/grid-security-ro') + name="x509-secret", mount_path="/etc/grid-security-ro" + ) + ) + volumes.append( + client.V1Volume( + name="x509-secret", + secret=client.V1SecretVolumeSource(secret_name=x509_secret), + ) ) - volumes.append(client.V1Volume( - name='x509-secret', - secret=client.V1SecretVolumeSource(secret_name=x509_secret) - )) if generated_code_cm: - volumes.append(client.V1Volume( - name='generated-code', - config_map=client.V1ConfigMapVolumeSource( - name=generated_code_cm) - ) + volumes.append( + client.V1Volume( + name="generated-code", + config_map=client.V1ConfigMapVolumeSource(name=generated_code_cm), + ) ) volume_mounts.append( - client.V1VolumeMount(mount_path="/generated", name='generated-code')) + client.V1VolumeMount(mount_path="/generated", name="generated-code") + ) if "TRANSFORMER_LOCAL_PATH" in current_app.config: - path = current_app.config['TRANSFORMER_LOCAL_PATH'] - volumes.append(client.V1Volume( - name='rootfiles', - host_path=client.V1HostPathVolumeSource(path=path))) + path = current_app.config["TRANSFORMER_LOCAL_PATH"] + volumes.append( + client.V1Volume( + name="rootfiles", host_path=client.V1HostPathVolumeSource(path=path) + ) + ) volume_mounts.append( - client.V1VolumeMount(mount_path="/data", name='rootfiles')) + client.V1VolumeMount(mount_path="/data", name="rootfiles") + ) # Compute Environment Vars env = [client.V1EnvVar(name="BASH_ENV", value="/servicex/.bashrc")] # provide pods with level and logging server info env += [ - client.V1EnvVar("LOG_LEVEL", value=os.environ.get('LOG_LEVEL', 'INFO').upper()), - client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get('LOGSTASH_HOST')), - client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get('LOGSTASH_PORT')) + client.V1EnvVar( + "LOG_LEVEL", value=os.environ.get("LOG_LEVEL", "INFO").upper() + ), + client.V1EnvVar("LOGSTASH_HOST", value=os.environ.get("LOGSTASH_HOST")), + client.V1EnvVar("LOGSTASH_PORT", value=os.environ.get("LOGSTASH_PORT")), ] # Provide each pod with an environment var holding that pod's name pod_name_value_from = client.V1EnvVarSource( - field_ref=client.V1ObjectFieldSelector( - field_path="metadata.name")) + field_ref=client.V1ObjectFieldSelector(field_path="metadata.name") + ) host_name_value_from = client.V1EnvVarSource( - field_ref=client.V1ObjectFieldSelector( - field_path="spec.nodeName")) + field_ref=client.V1ObjectFieldSelector(field_path="spec.nodeName") + ) env += [ client.V1EnvVar("POD_NAME", value_from=pod_name_value_from), - client.V1EnvVar("HOST_NAME", value_from=host_name_value_from) + client.V1EnvVar("HOST_NAME", value_from=host_name_value_from), ] # Provide each pod with an environment var holding that instance name if "INSTANCE_NAME" in current_app.config: - instance_name = current_app.config['INSTANCE_NAME'] - env_var_instance_name = client.V1EnvVar("INSTANCE_NAME", - value=instance_name) + instance_name = current_app.config["INSTANCE_NAME"] + env_var_instance_name = client.V1EnvVar( + "INSTANCE_NAME", value=instance_name + ) env = env + [env_var_instance_name] # provide each pod with an environment var holding cache prefix path if "TRANSFORMER_CACHE_PREFIX" in current_app.config: - env += [client.V1EnvVar("CACHE_PREFIX", - value=current_app.config["TRANSFORMER_CACHE_PREFIX"])] + env += [ + client.V1EnvVar( + "CACHE_PREFIX", value=current_app.config["TRANSFORMER_CACHE_PREFIX"] + ) + ] - if result_destination == 'object-store': + if result_destination == "object-store": env = env + [ - client.V1EnvVar(name='MINIO_URL', - value=current_app.config['MINIO_URL_TRANSFORMER']), - client.V1EnvVar(name='MINIO_ACCESS_KEY', - value=current_app.config['MINIO_ACCESS_KEY']), - client.V1EnvVar(name='MINIO_SECRET_KEY', - value=current_app.config['MINIO_SECRET_KEY']), + client.V1EnvVar( + name="MINIO_URL", value=current_app.config["MINIO_URL_TRANSFORMER"] + ), + client.V1EnvVar( + name="MINIO_ACCESS_KEY", + value=current_app.config["MINIO_ACCESS_KEY"], + ), + client.V1EnvVar( + name="MINIO_SECRET_KEY", + value=current_app.config["MINIO_SECRET_KEY"], + ), ] - if 'MINIO_ENCRYPT' in current_app.config: - env += [client.V1EnvVar(name='MINIO_ENCRYPT', - value=str(current_app.config['MINIO_ENCRYPT']))] + if "MINIO_ENCRYPT" in current_app.config: + env += [ + client.V1EnvVar( + name="MINIO_ENCRYPT", + value=str(current_app.config["MINIO_ENCRYPT"]), + ) + ] - if result_destination == 'volume': + if result_destination == "volume": TransformerManager.create_posix_volume(volumes, volume_mounts) science_command = " " if x509_secret: - science_command += "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" \ - "do sleep 5;done &&" \ - " /servicex/output/scripts/proxy-exporter.sh & sleep 5 && " - - sidecar_command = "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + \ - "python /servicex/transformer_sidecar/transformer.py " + \ - " --shared-dir /servicex/output " + \ - " --request-id " + request_id + \ - " --rabbit-uri " + rabbitmq_uri + \ - " --result-destination " + result_destination + \ - " --result-format " + result_format - - watch_path = os.path.join(current_app.config['TRANSFORMER_SIDECAR_VOLUME_PATH'], - request_id) - science_command += "cp /generated/transformer_capabilities.json {op} && " \ - "PYTHONPATH=/generated:$PYTHONPATH " \ - "bash {op}/scripts/watch.sh ".format(op=output_path) + \ - "{TL} ".format(TL=transformer_language) + \ - "{TC} ".format(TC=transformer_command) + \ - watch_path - - if result_destination == 'volume': + science_command += ( + "until [ -f /servicex/output/scripts/proxy-exporter.sh ];" + "do sleep 5;done &&" + " /servicex/output/scripts/proxy-exporter.sh & sleep 5 && " + ) + + sidecar_command = ( + "PYTHONPATH=/servicex/transformer_sidecar:$PYTHONPATH " + + "python /servicex/transformer_sidecar/transformer.py " + + " --shared-dir /servicex/output " + + " --request-id " + + request_id + + " --rabbit-uri " + + rabbitmq_uri + + " --result-destination " + + result_destination + + " --result-format " + + result_format + ) + + watch_path = os.path.join( + current_app.config["TRANSFORMER_SIDECAR_VOLUME_PATH"], request_id + ) + science_command += ( + "cp /generated/transformer_capabilities.json {op} && " + "PYTHONPATH=/generated:$PYTHONPATH " + "bash {op}/scripts/watch.sh ".format(op=output_path) + + "{TL} ".format(TL=transformer_language) + + "{TC} ".format(TC=transformer_command) + + watch_path + ) + + if result_destination == "volume": sidecar_command += " --output-dir " + os.path.join( TransformerManager.POSIX_VOLUME_MOUNT, - current_app.config['TRANSFORMER_PERSISTENCE_SUBDIR']) + current_app.config["TRANSFORMER_PERSISTENCE_SUBDIR"], + ) resources = client.V1ResourceRequirements( - limits={"cpu": current_app.config['TRANSFORMER_CPU_LIMIT'], - "memory": current_app.config['TRANSFORMER_MEMORY_LIMIT']} + limits={ + "cpu": current_app.config["TRANSFORMER_CPU_LIMIT"], + "memory": current_app.config["TRANSFORMER_MEMORY_LIMIT"], + } ) # Configure Pod template container science_container = client.V1Container( name="transformer", image=image, - image_pull_policy=current_app.config['TRANSFORMER_SCIENCE_IMAGE_PULL_POLICY'], + image_pull_policy=current_app.config[ + "TRANSFORMER_SCIENCE_IMAGE_PULL_POLICY" + ], volume_mounts=volume_mounts, command=["bash", "--login", "-c"], env=env, args=[science_command], - resources=resources + resources=resources, ) # Configure Pod template container sidecar = client.V1Container( name="sidecar", - image=current_app.config['TRANSFORMER_SIDECAR_IMAGE'], - image_pull_policy=current_app.config['TRANSFORMER_SIDECAR_PULL_POLICY'], + image=current_app.config["TRANSFORMER_SIDECAR_IMAGE"], + image_pull_policy=current_app.config["TRANSFORMER_SIDECAR_PULL_POLICY"], volume_mounts=volume_mounts, command=["bash", "-c"], env=env, args=[sidecar_command], - resources=resources + resources=resources, ) # Create and Configure a spec section template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={'app': "transformer-" + request_id}), + metadata=client.V1ObjectMeta(labels={"app": "transformer-" + request_id}), spec=client.V1PodSpec( restart_policy="Always", termination_grace_period_seconds=TransformerManager.POD_TERMINATION_GRACE_PERIOD, - priority_class_name=current_app.config.get('TRANSFORMER_PRIORITY_CLASS', None), - containers=[sidecar, science_container], # Containers are started in this order - volumes=volumes)) + priority_class_name=current_app.config.get( + "TRANSFORMER_PRIORITY_CLASS", None + ), + containers=[ + sidecar, + science_container, + ], # Containers are started in this order + volumes=volumes, + ), + ) # Create the specification of deployment selector = client.V1LabelSelector( - match_labels={ - "app": "transformer-" + request_id - }) + match_labels={"app": "transformer-" + request_id} + ) # If we are using Autoscaler then always start with one replica - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: - replicas = current_app.config.get('TRANSFORMER_MIN_REPLICAS', 1) + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: + replicas = current_app.config.get("TRANSFORMER_MIN_REPLICAS", 1) else: replicas = workers spec = client.V1DeploymentSpec( - template=template, - selector=selector, - replicas=replicas + template=template, selector=selector, replicas=replicas ) deployment = client.V1Deployment( api_version="apps/v1", kind="Deployment", metadata=client.V1ObjectMeta(name="transformer-" + request_id), - spec=spec + spec=spec, ) return deployment @staticmethod def create_posix_volume(volumes, volume_mounts): - if 'TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM' not in current_app.config or \ - not current_app.config['TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM']: + if ( + "TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM" not in current_app.config + or not current_app.config["TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM"] + ): empty_dir = client.V1Volume( - name='posix-volume', - empty_dir=client.V1EmptyDirVolumeSource()) + name="posix-volume", empty_dir=client.V1EmptyDirVolumeSource() + ) volumes.append(empty_dir) else: volumes.append( client.V1Volume( - name='posix-volume', + name="posix-volume", persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( - claim_name=current_app.config['TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM'] - ) + claim_name=current_app.config[ + "TRANSFORMER_PERSISTENCE_PROVIDED_CLAIM" + ] + ), ) ) volume_mounts.append( - client.V1VolumeMount(mount_path=TransformerManager.POSIX_VOLUME_MOUNT, - name='posix-volume')) + client.V1VolumeMount( + mount_path=TransformerManager.POSIX_VOLUME_MOUNT, name="posix-volume" + ) + ) def persistent_volume_claim_exists(self, claim_name, namespace): api = client.CoreV1Api() - pvcs = api.list_namespaced_persistent_volume_claim(namespace=namespace, watch=False) + pvcs = api.list_namespaced_persistent_volume_claim( + namespace=namespace, watch=False + ) for pvc in pvcs.items: if pvc.metadata.name == claim_name: - if pvc.status.phase == 'Bound': + if pvc.status.phase == "Bound": return True else: - current_app.logger.warning(f"Volume Claim '{claim_name} found, " - f"but it is not bound") + current_app.logger.warning( + f"Volume Claim '{claim_name} found, " f"but it is not bound" + ) return False return False @staticmethod def create_hpa_object(request_id, max_workers): target = client.V1CrossVersionObjectReference( - api_version="apps/v1", - kind='Deployment', - name="transformer-" + request_id + api_version="apps/v1", kind="Deployment", name="transformer-" + request_id ) cfg = current_app.config @@ -366,13 +422,13 @@ def create_hpa_object(request_id, max_workers): scale_target_ref=target, target_cpu_utilization_percentage=cfg["TRANSFORMER_CPU_SCALE_THRESHOLD"], min_replicas=min(max_workers, cfg["TRANSFORMER_MIN_REPLICAS"]), - max_replicas=min(max_workers, cfg["TRANSFORMER_MAX_REPLICAS"]) + max_replicas=min(max_workers, cfg["TRANSFORMER_MAX_REPLICAS"]), ) hpa = client.V1HorizontalPodAutoscaler( api_version="autoscaling/v1", - kind='HorizontalPodAutoscaler', + kind="HorizontalPodAutoscaler", metadata=client.V1ObjectMeta(name="transformer-" + request_id), - spec=spec + spec=spec, ) return hpa @@ -389,26 +445,44 @@ def _create_job(api_instance, job, namespace): def _create_hpa(api_instance, hpa, namespace): try: api_instance.create_namespaced_horizontal_pod_autoscaler( - body=hpa, - namespace=namespace) + body=hpa, namespace=namespace + ) current_app.logger.info("HPA created.") except ApiException as e: current_app.logger.exception(f"Exception during HPA Creation: {e}") - def launch_transformer_jobs(self, image, request_id, workers, max_workers, - rabbitmq_uri, namespace, x509_secret, generated_code_cm, - result_destination, result_format, transformer_language, - transformer_command - ): + def launch_transformer_jobs( + self, + image, + request_id, + workers, + max_workers, + rabbitmq_uri, + namespace, + x509_secret, + generated_code_cm, + result_destination, + result_format, + transformer_language, + transformer_command, + ): api_v1 = client.AppsV1Api() - job = self.create_job_object(request_id, image, rabbitmq_uri, workers, - result_destination, result_format, - x509_secret, generated_code_cm, - transformer_language, transformer_command) + job = self.create_job_object( + request_id, + image, + rabbitmq_uri, + workers, + result_destination, + result_format, + x509_secret, + generated_code_cm, + transformer_language, + transformer_command, + ) self._create_job(api_v1, job, namespace) - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: autoscaler_api = kubernetes.client.AutoscalingV1Api() hpa = self.create_hpa_object(request_id, max_workers) self._create_hpa(autoscaler_api, hpa, namespace) @@ -416,48 +490,54 @@ def launch_transformer_jobs(self, image, request_id, workers, max_workers, @classmethod def shutdown_transformer_job(cls, request_id, namespace): try: - if current_app.config['TRANSFORMER_AUTOSCALE_ENABLED']: + if current_app.config["TRANSFORMER_AUTOSCALE_ENABLED"]: autoscaler_api = kubernetes.client.AutoscalingV1Api() autoscaler_api.delete_namespaced_horizontal_pod_autoscaler( - name="transformer-" + request_id, - namespace=namespace + name="transformer-" + request_id, namespace=namespace ) except ApiException: - current_app.logger.exception("Exception during Job HPA Shut Down", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job HPA Shut Down", extra={"requestId": request_id} + ) try: api_core = client.CoreV1Api() configmap_name = "{}-generated-source".format(request_id) - api_core.delete_namespaced_config_map(name=configmap_name, - namespace=namespace) + api_core.delete_namespaced_config_map( + name=configmap_name, namespace=namespace + ) except ApiException: - current_app.logger.exception("Exception during Job ConfigMap cleanup", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job ConfigMap cleanup", + extra={"requestId": request_id}, + ) try: api_v1 = client.AppsV1Api() api_v1.delete_namespaced_deployment( - name="transformer-" + request_id, - namespace=namespace + name="transformer-" + request_id, namespace=namespace ) except ApiException: - current_app.logger.exception("Exception during Job Deployment Shut Down", extra={ - "requestId": request_id}) + current_app.logger.exception( + "Exception during Job Deployment Shut Down", + extra={"requestId": request_id}, + ) # delete RabbitMQ queue try: - current_app.logger.info(f"Stopping workers connected to transformer-{request_id}") + current_app.logger.info( + f"Stopping workers connected to transformer-{request_id}" + ) cls.celery_app.control.cancel_consumer(f"transformer-{request_id}") except Exception as e: - current_app.logger.exception("Exception during Celery queue cancellation", extra={ - "requestId": request_id, - "exception": e - }) + current_app.logger.exception( + "Exception during Celery queue cancellation", + extra={"requestId": request_id, "exception": e}, + ) @staticmethod def get_deployment_status( - request_id: str + request_id: str, ) -> Optional[kubernetes.client.models.v1_deployment_status.V1DeploymentStatus]: namespace = current_app.config["TRANSFORMER_NAMESPACE"] api = client.AppsV1Api() @@ -473,9 +553,8 @@ def get_deployment_status( def create_configmap_from_zip(zipfile, request_id, namespace): configmap_name = "{}-generated-source".format(request_id) data = { - file.filename: - base64.b64encode(zipfile.open(file).read()).decode("ascii") for file in - zipfile.filelist + file.filename: base64.b64encode(zipfile.open(file).read()).decode("ascii") + for file in zipfile.filelist } metadata = client.V1ObjectMeta( @@ -485,14 +564,9 @@ def create_configmap_from_zip(zipfile, request_id, namespace): # Instantiate the configmap object configmap = client.V1ConfigMap( - api_version="v1", - kind="ConfigMap", - binary_data=data, - metadata=metadata + api_version="v1", kind="ConfigMap", binary_data=data, metadata=metadata ) api_instance = client.CoreV1Api() - api_instance.create_namespaced_config_map( - namespace=namespace, - body=configmap) + api_instance.create_namespaced_config_map(namespace=namespace, body=configmap) return configmap_name