Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hatchery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ type NextflowConfig struct {
InstanceMaxVCpus int32 `json:"instance-max-vcpus"`
}

// Configuration for S3 bucket mounting into workspace pods
type S3Config struct {
BucketName string `json:"bucketName"` // e.g. "workspace-software-s3-qa-gen3"
Region string `json:"region"` // e.g. "us-east-1"
// Optional; if empty we’ll default to "<userName>/".
PrefixBase string `json:"prefixBase"` // e.g. "" (we’ll compute from userName)
}

// LicenseInfo contains configuration for Gen3 supplied licenses.
type LicenseInfo struct {
Enabled bool `json:"enabled"`
Expand Down Expand Up @@ -142,6 +150,7 @@ type HatcheryConfig struct {
Sidecar SidecarContainer `json:"sidecar"`
MoreConfigs []AppConfigInfo `json:"more-configs"`
PrismaConfig PrismaConfig `json:"prisma"`
S3Config S3Config `json:"s3-config"`
NextflowGlobalConfig NextflowGlobalConfig `json:"nextflow-global"`
Pricing Pricing `json:"pricing"`
}
Expand Down
221 changes: 205 additions & 16 deletions hatchery/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"strings"

Expand All @@ -16,6 +17,8 @@ import (
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"

// AWS modules
"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -78,27 +81,61 @@ func getPodClient(ctx context.Context, userName string, payModelPtr *PayModel) (
return podClient, true, nil
}
} else {
return getLocalPodClient(), false, nil
podClient, err := getLocalPodClient()
if err != nil {
Config.Logger.Printf("Error fetching local kubeconfig: %v", err)
return nil, true, err
}
return podClient, false, nil
}
}

func getLocalPodClient() corev1.CoreV1Interface {
// creates the in-cluster config
config, err := GetConfig()
// Use in-cluster if available; otherwise fall back to kubeconfig.
// Honors $KUBECONFIG (or ~/.kube/config) and optional $KUBE_CONTEXT.
func getLocalPodClient() (corev1.CoreV1Interface, error) {
// 1) In-cluster (works when running inside a pod)
if cfg, err := rest.InClusterConfig(); err == nil {
cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("in-cluster clientset: %w", err)
}
return cs.CoreV1(), nil
}

// 2) Kubeconfig (local dev)
var precedence []string
if kc := os.Getenv("KUBECONFIG"); kc != "" {
// Split on OS-specific list separator (":" on *nix, ";" on Windows).
precedence = filepath.SplitList(kc)
} else {
// Default to ~/.kube/config
precedence = []string{filepath.Join(homedir.HomeDir(), ".kube", "config")}
}

kubeContext := os.Getenv("KUBE_CONTEXT") // optional

loading := &clientcmd.ClientConfigLoadingRules{
Precedence: precedence, // <-- multiple files supported & merged
// Note: leave ExplicitPath empty to allow Precedence merging.
}
overrides := &clientcmd.ConfigOverrides{}
if kubeContext != "" {
overrides.CurrentContext = kubeContext
}

cfg, err := clientcmd.
NewNonInteractiveDeferredLoadingClientConfig(loading, overrides).
ClientConfig()
if err != nil {
Config.Logger.Printf("Error creating in-cluster config: %v", err)
return nil
return nil, fmt.Errorf("kubeconfig load (files=%v): %w", precedence, err)
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)

cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
Config.Logger.Printf("Error creating in-cluster clientset: %v", err)
return nil
Config.Logger.Printf("Error fetching kubeconfig clientset: %v", err)
return nil, fmt.Errorf("kubeconfig clientset: %w", err)
}
// Access jobs. We can't do it all in one line, since we need to receive the
// errors and manage them appropriately
podClient := clientset.CoreV1()
return podClient
return cs.CoreV1(), nil
}

// Generate EKS kubeconfig using AWS role
Expand Down Expand Up @@ -349,6 +386,137 @@ func userToResourceName(userName string, resourceType string) string {
return fmt.Sprintf("%s-%s", resourceType, safeUserName)
}

func s3NamesForUser(userName string) (pvName, pvcName, volumeHandle string) {
base := userToResourceName(userName, "s3") // e.g. john-doe-s3
return base + "-pv", base + "-pvc", "s3-" + base // unique handle
}

func s3PrefixForUser(userName string) string {
// If you prefer purely the username: return fmt.Sprintf("%s/", userName)
// Using resource-safe version tends to be nicer:
return fmt.Sprintf("%s/", userToResourceName(userName, ""))
}

func ensureS3PVandPVC(
ctx context.Context,
podClient corev1.CoreV1Interface,
namespace string,
userName string,
bucket string,
region string,
) (pvcName string, err error) {

pvName, pvcName, volumeHandle := s3NamesForUser(userName)
prefix := s3PrefixForUser(userName)

// ----- Ensure PV exists (cluster-scoped) -----
// Try GET; if not found, create.
if _, errGet := podClient.PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{}); errGet != nil {
pv := &k8sv1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
},
Spec: k8sv1.PersistentVolumeSpec{
Capacity: k8sv1.ResourceList{
k8sv1.ResourceStorage: resource.MustParse("1200Gi"), // ignored by S3 CSI, but required
},
AccessModes: []k8sv1.PersistentVolumeAccessMode{k8sv1.ReadWriteMany},
StorageClassName: "", // static provisioning
ClaimRef: &k8sv1.ObjectReference{
Namespace: namespace,
Name: pvcName,
},
MountOptions: []string{
"allow-delete",
fmt.Sprintf("region %s", region),
fmt.Sprintf("prefix %s", prefix),
},
PersistentVolumeSource: k8sv1.PersistentVolumeSource{
CSI: &k8sv1.CSIPersistentVolumeSource{
Driver: "s3.csi.aws.com",
VolumeHandle: volumeHandle, // must be unique
VolumeAttributes: map[string]string{
"bucketName": bucket,
},
},
},
},
}
if _, errCreatePV := podClient.PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}); errCreatePV != nil {
return "", fmt.Errorf("failed to create S3 PV %s: %w", pvName, errCreatePV)
}
}

// ----- Ensure PVC exists (namespaced) -----
if _, errGet := podClient.PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}); errGet != nil {
empty := "" // PVC.StorageClassName is *string
pvc := &k8sv1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: k8sv1.PersistentVolumeClaimSpec{
AccessModes: []k8sv1.PersistentVolumeAccessMode{k8sv1.ReadWriteMany},
StorageClassName: &empty,
Resources: k8sv1.VolumeResourceRequirements{
Requests: k8sv1.ResourceList{
k8sv1.ResourceStorage: resource.MustParse("1200Gi"),
},
},
VolumeName: pvName,
},
}
if _, errCreatePVC := podClient.PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}); errCreatePVC != nil {
return "", fmt.Errorf("failed to create S3 PVC %s: %w", pvcName, errCreatePVC)
}
}

return pvcName, nil
}

// Adds the S3 volume + mount to the pod in-place.
// Call this after buildPod(), before creating it.
func addS3VolumeToPod(pod *k8sv1.Pod, pvcName string) {
// 1) Add Volume
hasVolume := false
for _, v := range pod.Spec.Volumes {
if v.Name == "s3-volume" {
hasVolume = true
break
}
}
if !hasVolume {
pod.Spec.Volumes = append(pod.Spec.Volumes, k8sv1.Volume{
Name: "s3-volume",
VolumeSource: k8sv1.VolumeSource{
PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
// ReadOnly: false, // optional
},
},
})
}

// 2) Add VolumeMount (/apps) to every container (or just the main one)
for ci := range pod.Spec.Containers {
c := &pod.Spec.Containers[ci]
alreadyMounted := false
for _, m := range c.VolumeMounts {
if m.Name == "s3-volume" {
alreadyMounted = true
break
}
}
if !alreadyMounted {
c.VolumeMounts = append(c.VolumeMounts, k8sv1.VolumeMount{
Name: "s3-volume",
// TODO: Read this path from config
MountPath: "/apps",
})
}
}
}

// buildPod returns a pod ready to pass to the k8s API given
// a hatchery Container instance, and the name of the user
// launching the app
Expand Down Expand Up @@ -674,6 +842,24 @@ var createLocalK8sPod = func(ctx context.Context, hash string, userName string,
Config.Logger.Panicf("Error in createLocalK8sPod: %v", err)
return err
}
// ensure S3 PV/PVC (dynamic per user) and wire into pod
if Config.Config.S3Config.BucketName != "" && Config.Config.S3Config.Region != "" {
Config.Logger.Print("Mounting S3 bucket as well..")
s3PVCName, err := ensureS3PVandPVC(
ctx,
podClient,
Config.Config.UserNamespace,
userName,
Config.Config.S3Config.BucketName,
Config.Config.S3Config.Region,
)
if err != nil {
Config.Logger.Printf("Failed ensuring S3 PV/PVC for user %s: %v", userName, err)
return err
}
addS3VolumeToPod(pod, s3PVCName)
}

// a null image indicates a dockstore app - always mount user volume
mountUserVolume := hatchApp.UserVolumeLocation != ""
if mountUserVolume {
Expand Down Expand Up @@ -974,8 +1160,11 @@ tls: %s
annotationsService := make(map[string]string)
annotationsService["getambassador.io/config"] = fmt.Sprintf(localAmbassadorYaml, userToResourceName(userName, "mapping"), userName, serviceURL, NodePort, hatchApp.PathRewrite, hatchApp.UseTLS)

localPodClient := getLocalPodClient()
_, err := localPodClient.Services(Config.Config.UserNamespace).Get(ctx, serviceName, metav1.GetOptions{})
localPodClient, err := getLocalPodClient()
if err == nil {
Config.Logger.Printf("Error fetching local kubeconfig: %v", err)
}
_, err = localPodClient.Services(Config.Config.UserNamespace).Get(ctx, serviceName, metav1.GetOptions{})
if err == nil {
// This probably happened as the result of some error... there was no pod but was a service
// Lets just clean it up and proceed
Expand Down
Loading