Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions workspaces/controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

istiov1 "istio.io/client-go/pkg/apis/networking/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -50,6 +51,8 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(istiov1.AddToScheme(scheme))

utilruntime.Must(kubefloworgv1beta1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
Expand Down
6 changes: 6 additions & 0 deletions workspaces/controller/config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- manager.yaml
configMapGenerator:
- envs:
- params.env
name: config
generatorOptions:
disableNameSuffixHash: true
images:
- name: controller
newName: ghcr.io/kubeflow/notebooks/workspace-controller
Expand Down
16 changes: 16 additions & 0 deletions workspaces/controller/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ spec:
- --leader-elect
- --health-probe-bind-address=:8081
- --metrics-bind-address=0
env:
- name: USE_ISTIO
valueFrom:
configMapKeyRef:
name: config
key: USE_ISTIO
- name: ISTIO_GATEWAY
valueFrom:
configMapKeyRef:
name: config
key: ISTIO_GATEWAY
- name: ISTIO_HOST
valueFrom:
configMapKeyRef:
name: config
key: ISTIO_HOST
image: controller:latest
imagePullPolicy: IfNotPresent
name: manager
Expand Down
4 changes: 4 additions & 0 deletions workspaces/controller/config/manager/params.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
USE_ISTIO=true
ISTIO_GATEWAY=kubeflow/kubeflow-gateway
ISTIO_HOST=*
CLUSTER_DOMAIN=cluster.local
5 changes: 4 additions & 1 deletion workspaces/controller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ require (
github.com/go-logr/logr v1.4.2
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
golang.org/x/time v0.3.0
istio.io/api v1.22.8
istio.io/client-go v1.22.8
k8s.io/api v0.31.0
k8s.io/apimachinery v0.31.0
k8s.io/client-go v0.31.0
Expand Down Expand Up @@ -56,9 +59,9 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions workspaces/controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down Expand Up @@ -155,6 +155,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw=
gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw=
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand All @@ -170,6 +172,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
istio.io/api v1.22.8 h1:mhkaeFJ13WZ2d6pvL9+exNeQ9UB6HX7e6m+XwO9XoYY=
istio.io/api v1.22.8/go.mod h1:S3l8LWqNYS9yT+d4bH+jqzH2lMencPkW7SKM1Cu9EyM=
istio.io/client-go v1.22.8 h1:wojmt220jSbfhpRDsPiflj2nSFTBuYtZNiW9hqKeaWE=
istio.io/client-go v1.22.8/go.mod h1:noO8SoyMxLwni3w+yGK67aydi2klExjmiqnXyeRS/00=
k8s.io/api v0.31.0 h1:b9LiSjR2ym/SzTOlfMHm1tr7/21aD7fSkqgD/CVJBCo=
k8s.io/api v0.31.0/go.mod h1:0YiFF+JfFxMM6+1hQei8FY8M7s1Mth+z/q7eF1aJkTE=
k8s.io/apiextensions-apiserver v0.31.0 h1:fZgCVhGwsclj3qCw1buVXCV6khjRzKC5eCFt24kyLSk=
Expand Down
4 changes: 4 additions & 0 deletions workspaces/controller/internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1"

"github.com/kubeflow/notebooks/workspaces/controller/internal/helper"
istiov1 "istio.io/client-go/pkg/apis/networking/v1"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -88,6 +90,8 @@ var _ = BeforeSuite(func() {
By("setting up the scheme")
err = kubefloworgv1beta1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = istiov1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

// +kubebuilder:scaffold:scheme

Expand Down
166 changes: 164 additions & 2 deletions workspaces/controller/internal/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package controller
import (
"context"
"fmt"
"os"
"strings"

"github.com/go-logr/logr"
networkingv1 "istio.io/api/networking/v1"
istiov1 "istio.io/client-go/pkg/apis/networking/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -68,6 +71,7 @@ const (
stateMsgErrorGenFailureService = "Workspace failed to generate Service with error: %s"
stateMsgErrorMultipleStatefulSets = "Workspace owns multiple StatefulSets: %s"
stateMsgErrorMultipleServices = "Workspace owns multiple Services: %s"
stateMsgErrorMultipleVirtualServices = "Workspace owns multiple VirtualServices: %s"
stateMsgErrorStatefulSetWarningEvent = "Workspace StatefulSet has warning event: %s"
stateMsgErrorPodUnschedulable = "Workspace Pod is unschedulable: %s"
stateMsgErrorPodSchedulingGate = "Workspace Pod is waiting for scheduling gate: %s"
Expand Down Expand Up @@ -359,6 +363,71 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// and implement the `spec.podTemplate.httpProxy` options
//

log.V(2).Info("reconciling VirtualService for Workspace")
if os.Getenv("USE_ISTIO") == "true" {
// generateVirtualService
virtualsvc, err := generateVirtualService(workspace, serviceName, currentImageConfig.Spec)
if err != nil {
log.V(0).Info("failed to generate VirtualService for Workspace", "error", err.Error())
return r.updateWorkspaceState(ctx, log, workspace,
kubefloworgv1beta1.WorkspaceStateError,
fmt.Sprintf("failed to generate VirtualService for Workspace: %s", err.Error()),
)
}
if err := ctrl.SetControllerReference(workspace, virtualsvc, r.Scheme); err != nil {
log.Error(err, "unable to set controller reference on VirtualService")
return ctrl.Result{}, err
}

// fetch VirtualServices
// NOTE: we filter by VirtualServices that are owned by the Workspace, not by name
// this allows us to generate a random name for the VirtualService with `metadata.generateName`
var VirtualServiceName string
ownedVirtualServices := &istiov1.VirtualServiceList{}
listOpts = &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(helper.IndexWorkspaceOwnerField, workspace.Name),
Namespace: req.Namespace,
}
if err := r.List(ctx, ownedVirtualServices, listOpts); err != nil {
log.Error(err, "unable to list VirtualServices")
return ctrl.Result{}, err
}
switch numVirtualServices := len(ownedVirtualServices.Items); {
case numVirtualServices > 1:
virtualServiceList := make([]string, len(ownedVirtualServices.Items))
for i, vs := range ownedVirtualServices.Items {
virtualServiceList[i] = vs.Name
}
virtualServiceListString := strings.Join(virtualServiceList, ", ")
log.Error(nil, "Workspace owns multiple VirtualServices", "virtualServices", virtualServiceListString)
return r.updateWorkspaceState(ctx, log, workspace,
kubefloworgv1beta1.WorkspaceStateError,
fmt.Sprintf(stateMsgErrorMultipleVirtualServices, virtualServiceListString),
)
case numVirtualServices == 0:
if err := r.Create(ctx, virtualsvc); err != nil {
log.Error(err, "unable to create VirtualService")
return ctrl.Result{}, err
}
VirtualServiceName = virtualsvc.ObjectMeta.Name
log.V(2).Info("VirtualService created", "virtualService", VirtualServiceName)
default:
foundVirtualService := ownedVirtualServices.Items[0]
VirtualServiceName = foundVirtualService.ObjectMeta.Name
if helper.CopyVirtualServiceFields(virtualsvc, foundVirtualService) {
if err := r.Update(ctx, foundVirtualService); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating VirtualService, will requeue")
return ctrl.Result{Requeue: true}, nil
}
log.Error(err, "unable to update VirtualService")
return ctrl.Result{}, err
}
log.V(2).Info("VirtualService updated", "virtualService", VirtualServiceName)
}
}
}

// fetch Pod
// NOTE: the first StatefulSet Pod is always called "{statefulSetName}-0"
podName := fmt.Sprintf("%s-0", statefulSetName)
Expand Down Expand Up @@ -418,11 +487,20 @@ func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager, opts controller
return labelExists
})

return ctrl.NewControllerManagedBy(mgr).
// Build the controller with core resources
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
WithOptions(opts).
For(&kubefloworgv1beta1.Workspace{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&corev1.Service{})

// Conditionally add VirtualService ownership if the CRD is available
// This prevents test failures when Istio CRDs are not installed
if _, err := mgr.GetRESTMapper().RESTMapping(istiov1.SchemeGroupVersion.WithKind("VirtualService").GroupKind()); err == nil {
controllerBuilder = controllerBuilder.Owns(&istiov1.VirtualService{})
}

return controllerBuilder.
Watches(
&kubefloworgv1beta1.WorkspaceKind{},
handler.EnqueueRequestsFromMapFunc(r.mapWorkspaceKindToRequest),
Expand Down Expand Up @@ -881,6 +959,90 @@ func generateService(workspace *kubefloworgv1beta1.Workspace, imageConfigSpec ku
return service, nil
}

// generateVirtualService generates a VirtualService for a Workspace
func generateVirtualService(workspace *kubefloworgv1beta1.Workspace, serviceName string, imageConfigSpec kubefloworgv1beta1.ImageConfigSpec) (*istiov1.VirtualService, error) {
// NOTE: the name prefix is used to generate a unique name for the VirtualService
namePrefix := generateNamePrefix(workspace.Name, maxServiceNameLength)

// TODO: Change this to reference podtemplate ports.[].portID
portID := imageConfigSpec.Ports[0].Port
if portID < 0 { // check is needed due to go lint rules G115
return nil, fmt.Errorf("port ID must be a non-negative integer, got %d", portID)
}

matchUriPrefix := fmt.Sprintf("/workspace/%s/%s/", workspace.Namespace, workspace.Name)

// TODO: Change this to reference podtemplate ports.[].httpProxy.removePathPrefix
rewriteUri := fmt.Sprintf("/workspace/%s/%s/", workspace.Namespace, workspace.Name)

clusterDomain := "cluster.local"
if clusterDomainEnv, ok := os.LookupEnv("CLUSTER_DOMAIN"); ok {
clusterDomain = clusterDomainEnv
}
serviceHost := fmt.Sprintf("%s.%s.svc.%s", serviceName, workspace.Namespace, clusterDomain)

// TODO: Add a possible default for istioGateway
istioGateway := os.Getenv("ISTIO_GATEWAY")
if istioGateway == "" {
return nil, fmt.Errorf("ISTIO_GATEWAY environment variable is not set")
}

istioHosts := "*"
if istioHostsEnv, ok := os.LookupEnv("ISTIO_HOSTS"); ok {
istioHosts = istioHostsEnv
}

// generate VirtualService
virtualService := &istiov1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
GenerateName: namePrefix,
Namespace: workspace.Namespace,
Labels: map[string]string{
workspaceNameLabel: workspace.Name,
},
},
Spec: networkingv1.VirtualService{
Gateways: []string{istioGateway},
Hosts: []string{istioHosts},
Http: []*networkingv1.HTTPRoute{
{
Headers: &networkingv1.Headers{
Request: &networkingv1.Headers_HeaderOperations{},
},
Match: []*networkingv1.HTTPMatchRequest{
{
Uri: &networkingv1.StringMatch{
MatchType: &networkingv1.StringMatch_Prefix{
Prefix: matchUriPrefix,
},
},
},
},
Route: []*networkingv1.HTTPRouteDestination{
{
Destination: &networkingv1.Destination{
Host: serviceHost,
Port: &networkingv1.PortSelector{
Number: uint32(portID), // use the first port as the destination port
},
},
},
},
},
},
},
}

// set the rewrite URI if it is not empty
if rewriteUri != "" {
virtualService.Spec.Http[0].Rewrite = &networkingv1.HTTPRewrite{
Uri: rewriteUri,
}
}

return virtualService, nil
}

// generateWorkspaceStatus generates a WorkspaceStatus for a Workspace
func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log logr.Logger, workspace *kubefloworgv1beta1.Workspace, pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (kubefloworgv1beta1.WorkspaceStatus, error) {
// NOTE: some fields are populated before this function is called,
Expand Down
39 changes: 39 additions & 0 deletions workspaces/controller/internal/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package helper
import (
"reflect"

istiov1 "istio.io/client-go/pkg/apis/networking/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -118,6 +119,44 @@ func CopyServiceFields(desired *corev1.Service, target *corev1.Service) bool {
return requireUpdate
}

// CopyVirtualServiceFields updates a target VirtualService with the fields from a desired VirtualService, returning true if an update is required.
func CopyVirtualServiceFields(desired *istiov1.VirtualService, target *istiov1.VirtualService) bool {
requireUpdate := false

// Using the Spec definition https://pkg.go.dev/istio.io/api/networking/v1alpha3alpha3#VirtualService
if !reflect.DeepEqual(target.Spec.Gateways, desired.Spec.Gateways) {
target.Spec.Gateways = desired.Spec.Gateways
requireUpdate = true
}

if !reflect.DeepEqual(target.Spec.Hosts, desired.Spec.Hosts) {
target.Spec.Hosts = desired.Spec.Hosts
requireUpdate = true
}

if !reflect.DeepEqual(target.Spec.Http, desired.Spec.Http) {
target.Spec.Http = desired.Spec.Http
requireUpdate = true
}

if !reflect.DeepEqual(target.Spec.Tls, desired.Spec.Tls) {
target.Spec.Tls = desired.Spec.Tls
requireUpdate = true
}

if !reflect.DeepEqual(target.Spec.Tcp, desired.Spec.Tcp) {
target.Spec.Tcp = desired.Spec.Tcp
requireUpdate = true
}

if !reflect.DeepEqual(target.Spec.ExportTo, desired.Spec.ExportTo) {
target.Spec.ExportTo = desired.Spec.ExportTo
requireUpdate = true
}

return requireUpdate
}

// NormalizePodConfigSpec normalizes a PodConfigSpec so that it can be compared with reflect.DeepEqual
func NormalizePodConfigSpec(spec kubefloworgv1beta1.PodConfigSpec) error {

Expand Down
Loading
Loading