diff --git a/hatchery/config.go b/hatchery/config.go index 29e5eb5..d4ef752 100644 --- a/hatchery/config.go +++ b/hatchery/config.go @@ -35,6 +35,14 @@ type NextflowConfig struct { InstanceMaxVCpus int32 `json:"instance-max-vcpus"` } +// Configuration for S3 bucket mounting into workspace pods +type S3Config struct { + BucketName string `json:"bucketName"` // e.g. "workspace-software-s3-qa-gen3" + Region string `json:"region"` // e.g. "us-east-1" + // Optional; if empty we’ll default to "/". + PrefixBase string `json:"prefixBase"` // e.g. "" (we’ll compute from userName) +} + // LicenseInfo contains configuration for Gen3 supplied licenses. type LicenseInfo struct { Enabled bool `json:"enabled"` @@ -142,6 +150,7 @@ type HatcheryConfig struct { Sidecar SidecarContainer `json:"sidecar"` MoreConfigs []AppConfigInfo `json:"more-configs"` PrismaConfig PrismaConfig `json:"prisma"` + S3Config S3Config `json:"s3-config"` NextflowGlobalConfig NextflowGlobalConfig `json:"nextflow-global"` Pricing Pricing `json:"pricing"` } diff --git a/hatchery/pods.go b/hatchery/pods.go index 394b0ee..a0593df 100644 --- a/hatchery/pods.go +++ b/hatchery/pods.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "path/filepath" "strconv" "strings" @@ -16,6 +17,8 @@ import ( "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" // AWS modules "github.com/aws/aws-sdk-go/aws" @@ -78,27 +81,61 @@ func getPodClient(ctx context.Context, userName string, payModelPtr *PayModel) ( return podClient, true, nil } } else { - return getLocalPodClient(), false, nil + podClient, err := getLocalPodClient() + if err != nil { + Config.Logger.Printf("Error fetching local kubeconfig: %v", err) + return nil, true, err + } + return podClient, false, nil } } -func getLocalPodClient() corev1.CoreV1Interface { - // creates the in-cluster config - config, err := GetConfig() +// Use in-cluster if available; otherwise fall back to kubeconfig. +// Honors $KUBECONFIG (or ~/.kube/config) and optional $KUBE_CONTEXT. +func getLocalPodClient() (corev1.CoreV1Interface, error) { + // 1) In-cluster (works when running inside a pod) + if cfg, err := rest.InClusterConfig(); err == nil { + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("in-cluster clientset: %w", err) + } + return cs.CoreV1(), nil + } + + // 2) Kubeconfig (local dev) + var precedence []string + if kc := os.Getenv("KUBECONFIG"); kc != "" { + // Split on OS-specific list separator (":" on *nix, ";" on Windows). + precedence = filepath.SplitList(kc) + } else { + // Default to ~/.kube/config + precedence = []string{filepath.Join(homedir.HomeDir(), ".kube", "config")} + } + + kubeContext := os.Getenv("KUBE_CONTEXT") // optional + + loading := &clientcmd.ClientConfigLoadingRules{ + Precedence: precedence, // <-- multiple files supported & merged + // Note: leave ExplicitPath empty to allow Precedence merging. + } + overrides := &clientcmd.ConfigOverrides{} + if kubeContext != "" { + overrides.CurrentContext = kubeContext + } + + cfg, err := clientcmd. + NewNonInteractiveDeferredLoadingClientConfig(loading, overrides). + ClientConfig() if err != nil { - Config.Logger.Printf("Error creating in-cluster config: %v", err) - return nil + return nil, fmt.Errorf("kubeconfig load (files=%v): %w", precedence, err) } - // creates the clientset - clientset, err := kubernetes.NewForConfig(config) + + cs, err := kubernetes.NewForConfig(cfg) if err != nil { - Config.Logger.Printf("Error creating in-cluster clientset: %v", err) - return nil + Config.Logger.Printf("Error fetching kubeconfig clientset: %v", err) + return nil, fmt.Errorf("kubeconfig clientset: %w", err) } - // Access jobs. We can't do it all in one line, since we need to receive the - // errors and manage them appropriately - podClient := clientset.CoreV1() - return podClient + return cs.CoreV1(), nil } // Generate EKS kubeconfig using AWS role @@ -349,6 +386,137 @@ func userToResourceName(userName string, resourceType string) string { return fmt.Sprintf("%s-%s", resourceType, safeUserName) } +func s3NamesForUser(userName string) (pvName, pvcName, volumeHandle string) { + base := userToResourceName(userName, "s3") // e.g. john-doe-s3 + return base + "-pv", base + "-pvc", "s3-" + base // unique handle +} + +func s3PrefixForUser(userName string) string { + // If you prefer purely the username: return fmt.Sprintf("%s/", userName) + // Using resource-safe version tends to be nicer: + return fmt.Sprintf("%s/", userToResourceName(userName, "")) +} + +func ensureS3PVandPVC( + ctx context.Context, + podClient corev1.CoreV1Interface, + namespace string, + userName string, + bucket string, + region string, +) (pvcName string, err error) { + + pvName, pvcName, volumeHandle := s3NamesForUser(userName) + prefix := s3PrefixForUser(userName) + + // ----- Ensure PV exists (cluster-scoped) ----- + // Try GET; if not found, create. + if _, errGet := podClient.PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{}); errGet != nil { + pv := &k8sv1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvName, + }, + Spec: k8sv1.PersistentVolumeSpec{ + Capacity: k8sv1.ResourceList{ + k8sv1.ResourceStorage: resource.MustParse("1200Gi"), // ignored by S3 CSI, but required + }, + AccessModes: []k8sv1.PersistentVolumeAccessMode{k8sv1.ReadWriteMany}, + StorageClassName: "", // static provisioning + ClaimRef: &k8sv1.ObjectReference{ + Namespace: namespace, + Name: pvcName, + }, + MountOptions: []string{ + "allow-delete", + fmt.Sprintf("region %s", region), + fmt.Sprintf("prefix %s", prefix), + }, + PersistentVolumeSource: k8sv1.PersistentVolumeSource{ + CSI: &k8sv1.CSIPersistentVolumeSource{ + Driver: "s3.csi.aws.com", + VolumeHandle: volumeHandle, // must be unique + VolumeAttributes: map[string]string{ + "bucketName": bucket, + }, + }, + }, + }, + } + if _, errCreatePV := podClient.PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}); errCreatePV != nil { + return "", fmt.Errorf("failed to create S3 PV %s: %w", pvName, errCreatePV) + } + } + + // ----- Ensure PVC exists (namespaced) ----- + if _, errGet := podClient.PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}); errGet != nil { + empty := "" // PVC.StorageClassName is *string + pvc := &k8sv1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: k8sv1.PersistentVolumeClaimSpec{ + AccessModes: []k8sv1.PersistentVolumeAccessMode{k8sv1.ReadWriteMany}, + StorageClassName: &empty, + Resources: k8sv1.VolumeResourceRequirements{ + Requests: k8sv1.ResourceList{ + k8sv1.ResourceStorage: resource.MustParse("1200Gi"), + }, + }, + VolumeName: pvName, + }, + } + if _, errCreatePVC := podClient.PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}); errCreatePVC != nil { + return "", fmt.Errorf("failed to create S3 PVC %s: %w", pvcName, errCreatePVC) + } + } + + return pvcName, nil +} + +// Adds the S3 volume + mount to the pod in-place. +// Call this after buildPod(), before creating it. +func addS3VolumeToPod(pod *k8sv1.Pod, pvcName string) { + // 1) Add Volume + hasVolume := false + for _, v := range pod.Spec.Volumes { + if v.Name == "s3-volume" { + hasVolume = true + break + } + } + if !hasVolume { + pod.Spec.Volumes = append(pod.Spec.Volumes, k8sv1.Volume{ + Name: "s3-volume", + VolumeSource: k8sv1.VolumeSource{ + PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + // ReadOnly: false, // optional + }, + }, + }) + } + + // 2) Add VolumeMount (/apps) to every container (or just the main one) + for ci := range pod.Spec.Containers { + c := &pod.Spec.Containers[ci] + alreadyMounted := false + for _, m := range c.VolumeMounts { + if m.Name == "s3-volume" { + alreadyMounted = true + break + } + } + if !alreadyMounted { + c.VolumeMounts = append(c.VolumeMounts, k8sv1.VolumeMount{ + Name: "s3-volume", + // TODO: Read this path from config + MountPath: "/apps", + }) + } + } +} + // buildPod returns a pod ready to pass to the k8s API given // a hatchery Container instance, and the name of the user // launching the app @@ -674,6 +842,24 @@ var createLocalK8sPod = func(ctx context.Context, hash string, userName string, Config.Logger.Panicf("Error in createLocalK8sPod: %v", err) return err } + // ensure S3 PV/PVC (dynamic per user) and wire into pod + if Config.Config.S3Config.BucketName != "" && Config.Config.S3Config.Region != "" { + Config.Logger.Print("Mounting S3 bucket as well..") + s3PVCName, err := ensureS3PVandPVC( + ctx, + podClient, + Config.Config.UserNamespace, + userName, + Config.Config.S3Config.BucketName, + Config.Config.S3Config.Region, + ) + if err != nil { + Config.Logger.Printf("Failed ensuring S3 PV/PVC for user %s: %v", userName, err) + return err + } + addS3VolumeToPod(pod, s3PVCName) + } + // a null image indicates a dockstore app - always mount user volume mountUserVolume := hatchApp.UserVolumeLocation != "" if mountUserVolume { @@ -974,8 +1160,11 @@ tls: %s annotationsService := make(map[string]string) annotationsService["getambassador.io/config"] = fmt.Sprintf(localAmbassadorYaml, userToResourceName(userName, "mapping"), userName, serviceURL, NodePort, hatchApp.PathRewrite, hatchApp.UseTLS) - localPodClient := getLocalPodClient() - _, err := localPodClient.Services(Config.Config.UserNamespace).Get(ctx, serviceName, metav1.GetOptions{}) + localPodClient, err := getLocalPodClient() + if err == nil { + Config.Logger.Printf("Error fetching local kubeconfig: %v", err) + } + _, err = localPodClient.Services(Config.Config.UserNamespace).Get(ctx, serviceName, metav1.GetOptions{}) if err == nil { // This probably happened as the result of some error... there was no pod but was a service // Lets just clean it up and proceed