Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ go.work.sum
.run/

.DS_Store

gateway-config-dev*.yaml
2 changes: 1 addition & 1 deletion config/gateway-config-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ sparkManagerPort: "8081"

gateway:
gatewayApiVersion: v1
gatewayPort: "8080"
gatewayPort: "8082"

middleware:
- type: RegexBasicAuthAllowMiddleware
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ require (
)

require (
github.com/a-h/templ v0.3.943
github.com/a-h/templ/examples/integration-gin v0.0.0-20250818063052-abb427c0fb8d
github.com/jackc/pgx/v5 v5.7.4
github.com/knadh/koanf/providers/confmap v1.0.0
github.com/prometheus/client_golang v1.22.0
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/a-h/templ v0.3.943 h1:o+mT/4yqhZ33F3ootBiHwaY4HM5EVaOJfIshvd5UNTY=
github.com/a-h/templ v0.3.943/go.mod h1:oCZcnKRf5jjsGpf2yELzQfodLphd2mwecwG4Crk5HBo=
github.com/a-h/templ/examples/integration-gin v0.0.0-20250818063052-abb427c0fb8d h1:L8EvgXLCOLvgMjyWloBGrxWJGjsAAHyR5df79sC6IOc=
github.com/a-h/templ/examples/integration-gin v0.0.0-20250818063052-abb427c0fb8d/go.mod h1:1t59aGzdEXDnMYwmfWTKqDSASvUPtBe4iH00XPvh54U=
github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk=
github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
Expand Down Expand Up @@ -70,8 +74,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo=
github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand Down Expand Up @@ -133,8 +137,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg=
github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw=
github.com/onsi/gomega v1.36.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
Expand Down
26 changes: 21 additions & 5 deletions internal/gateway/application/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ import (
"context"
"errors"
"fmt"
swaggerDocs "github.com/slackhq/spark-gateway/docs/swagger"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"net/http"
"strconv"
"strings"

swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"

swaggerDocs "github.com/slackhq/spark-gateway/docs/swagger"

"github.com/gin-gonic/gin"
"github.com/kubeflow/spark-operator/v2/api/v1beta2"

"github.com/slackhq/spark-gateway/pkg/gatewayerrors"
pkgHttp "github.com/slackhq/spark-gateway/pkg/http"
"github.com/slackhq/spark-gateway/pkg/model"
Expand All @@ -47,7 +51,7 @@ const sparkApplicationPathName = "applications"

type GatewayApplicationService interface {
Get(ctx context.Context, gatewayId string) (*model.GatewayApplication, error)
List(ctx context.Context, cluster string, namespace string) ([]*model.GatewayApplicationMeta, error)
List(ctx context.Context, cluster string, namespace string, appState *v1beta2.ApplicationStateType) ([]*model.GatewayApplicationMeta, error)
Create(ctx context.Context, application *v1beta2.SparkApplication, user string) (*model.GatewayApplication, error)
Status(ctx context.Context, gatewayId string) (*v1beta2.SparkApplicationStatus, error)
Logs(ctx context.Context, gatewayId string, tailLines int) (*string, error)
Expand Down Expand Up @@ -90,6 +94,7 @@ func (h *ApplicationHandler) RegisterRoutes(rg *gin.RouterGroup) {
// @Security BasicAuth
// @Param cluster query string true "Cluster name"
// @Param namespace query string false "Namespace (optional)"
// @Param appState query v1beta2.ApplicationStateType false "Filter by Spark Application state"
// @Success 200 {array} model.GatewayApplicationMeta "List of SparkApplication metadata"
// @Router / [get]
func (h *ApplicationHandler) List(c *gin.Context) {
Expand All @@ -102,7 +107,18 @@ func (h *ApplicationHandler) List(c *gin.Context) {

namespace := c.Query("namespace")

appMetaList, err := h.service.List(c, cluster, namespace)
_appState := strings.ToUpper(c.Query("appState"))
var appState *v1beta2.ApplicationStateType = nil
if _appState != "" {
state := v1beta2.ApplicationStateType(_appState)
if !model.ValidSparkApplicationStatesMap[state] {
c.Error(fmt.Errorf("invalid application state: %s", state))
return
}
appState = &state
}

appMetaList, err := h.service.List(c, cluster, namespace, appState)

if err != nil {
c.Error(err)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions internal/gateway/application/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,16 @@ func (r *SparkManagerRepository) Get(ctx context.Context, cluster model.KubeClus
return kube.Sanitize(&app), nil
}

func (r *SparkManagerRepository) List(ctx context.Context, cluster model.KubeCluster, namespace string) ([]*model.SparkManagerApplicationMeta, error) {
func (r *SparkManagerRepository) List(ctx context.Context, cluster model.KubeCluster, namespace string, appState *v1beta2.ApplicationStateType) ([]*model.SparkManagerApplicationMeta, error) {

clusterEndpoint := r.ClusterEndpoints[cluster.Name]
// Url: http://host:port/namespace
url := fmt.Sprintf("%s/%s", clusterEndpoint, namespace)
// Url: http://host:port/namespace?appState=appState
var url string
if appState == nil {
url = fmt.Sprintf("%s/%s", clusterEndpoint, namespace)
} else {
url = fmt.Sprintf("%s/%s?appState=%s", clusterEndpoint, namespace, string(*appState))
}

request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions internal/gateway/application/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package service

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -39,7 +38,7 @@ import (

type SparkApplicationRepository interface {
Get(ctx context.Context, cluster model.KubeCluster, namespace string, name string) (*v1beta2.SparkApplication, error)
List(ctx context.Context, cluster model.KubeCluster, namespace string) ([]*model.SparkManagerApplicationMeta, error)
List(ctx context.Context, cluster model.KubeCluster, namespace string, appState *v1beta2.ApplicationStateType) ([]*model.SparkManagerApplicationMeta, error)
Status(ctx context.Context, cluster model.KubeCluster, namespace string, name string) (*v1beta2.SparkApplicationStatus, error)
Logs(ctx context.Context, cluster model.KubeCluster, namespace string, name string, tailLines int) (*string, error)
Create(ctx context.Context, cluster model.KubeCluster, application *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error)
Expand Down Expand Up @@ -109,24 +108,24 @@ func (s *service) Get(ctx context.Context, gatewayId string) (*model.GatewayAppl
return nil, gatewayerrors.NewFrom(fmt.Errorf("error getting SparkApplication '%s': %w", gatewayId, err))
}

user, ok := sparkApp.Labels[model.GATEWAY_USER_LABEL]
if !ok {
return nil, gatewayerrors.NewFrom(errors.New("no gateway user associated with this application, possibly not created through spark-gateway?"))
user, err := model.GetUser(sparkApp.Labels)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}

gatewayApp := &model.GatewayApplication{
SparkApplication: sparkApp,
GatewayId: sparkApp.Name,
Cluster: cluster.Name,
User: user,
User: *user,
SparkLogURLs: GetRenderedURLs(s.config.StatusUrlTemplates, sparkApp),
}

return gatewayApp, nil
}

// List retrieves `num` number of GatewayApplications from specified namespace `namespace` in cluster `cluster`
func (s *service) List(ctx context.Context, cluster string, namespace string) ([]*model.GatewayApplicationMeta, error) {
// List retrieves a list of GatewayApplicationMeta from specified namespace `namespace` in cluster `cluster` with appState state
func (s *service) List(ctx context.Context, cluster string, namespace string, appState *v1beta2.ApplicationStateType) ([]*model.GatewayApplicationMeta, error) {

kubeCluster, err := s.clusterRepository.GetByName(cluster)

Expand All @@ -149,13 +148,17 @@ func (s *service) List(ctx context.Context, cluster string, namespace string) ([

var appMetaList []*model.GatewayApplicationMeta
for _, ns := range namespaces {
nsAppMetas, err := s.sparkAppRepo.List(ctx, *kubeCluster, ns)
nsAppMetas, err := s.sparkAppRepo.List(ctx, *kubeCluster, ns, appState)
if err != nil {
return nil, gatewayerrors.NewFrom(fmt.Errorf("error getting applications: %w", err))
}

for _, appMeta := range nsAppMetas {
appMetaList = append(appMetaList, model.NewGatewayApplicationMeta(appMeta, cluster))
user, err := model.GetUser(appMeta.ObjectMeta.Labels)
if user == nil || err != nil {
continue
}
appMetaList = append(appMetaList, model.NewGatewayApplicationMeta(appMeta, cluster, *user))
}

}
Expand Down
2 changes: 1 addition & 1 deletion internal/gateway/cluster/local_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (r *LocalClusterRepo) GetAll() ([]model.KubeCluster, error) {
}

if len(clusters) == 0 {
klog.Warningf("GetAll: no clusters found in clusters config")
return nil, fmt.Errorf("GetAll: no clusters found in clusters config")
}

return clusters, nil
Expand Down
5 changes: 5 additions & 0 deletions internal/gateway/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/slackhq/spark-gateway/internal/gateway/application/repository"
"github.com/slackhq/spark-gateway/internal/gateway/application/service"
"github.com/slackhq/spark-gateway/internal/gateway/cluster"
"github.com/slackhq/spark-gateway/internal/gateway/web"

"time"

Expand Down Expand Up @@ -128,6 +129,10 @@ func NewGateway(ctx context.Context, sgConfig *cfg.SparkGatewayConfig, sparkMana
handler.RegisterSwaggerDocs(rootGroup, sgConfig.GatewayConfig.GatewayApiVersion)
}

// Register UI
webHandler := web.NewWebHandler(localClusterRepo, appService, ginRouter, rootGroup)
webHandler.RegisterRoutes()

/// Register versioned handlers
versionGroup := ginRouter.Group(fmt.Sprintf("/%s", sgConfig.GatewayConfig.GatewayApiVersion), mwHandlerChain...)
appHandler.RegisterRoutes(versionGroup)
Expand Down
Loading