@@ -18,37 +18,46 @@ import (
1818 "context"
1919 "errors"
2020 "fmt"
21- "github.com/koupleless/module_controller/common/zaplogger"
22- "github.com/koupleless/module_controller/report_server"
23- "github.com/koupleless/virtual-kubelet/vnode_controller"
2421 "os"
2522 "os/signal"
26- ctrl "sigs.k8s.io/controller-runtime"
27- "sigs.k8s.io/controller-runtime/pkg/healthz"
2823 "strconv"
2924 "syscall"
3025
3126 "github.com/google/uuid"
3227 "github.com/koupleless/module_controller/common/model"
28+ "github.com/koupleless/module_controller/common/zaplogger"
3329 "github.com/koupleless/module_controller/controller/module_deployment_controller"
3430 "github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel"
3531 "github.com/koupleless/module_controller/module_tunnels/koupleless_mqtt_tunnel"
32+ "github.com/koupleless/module_controller/report_server"
33+ "github.com/koupleless/module_controller/staging/kubelet_proxy"
3634 "github.com/koupleless/virtual-kubelet/common/tracker"
3735 "github.com/koupleless/virtual-kubelet/common/utils"
3836 vkModel "github.com/koupleless/virtual-kubelet/model"
3937 "github.com/koupleless/virtual-kubelet/tunnel"
38+ "github.com/koupleless/virtual-kubelet/vnode_controller"
4039 "github.com/sirupsen/logrus"
4140 "github.com/virtual-kubelet/virtual-kubelet/log"
4241 logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
4342 "github.com/virtual-kubelet/virtual-kubelet/trace"
4443 "github.com/virtual-kubelet/virtual-kubelet/trace/opencensus"
44+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
45+ "k8s.io/client-go/kubernetes"
46+ ctrl "sigs.k8s.io/controller-runtime"
4547 "sigs.k8s.io/controller-runtime/pkg/cache"
4648 "sigs.k8s.io/controller-runtime/pkg/client/config"
49+ "sigs.k8s.io/controller-runtime/pkg/healthz"
4750 "sigs.k8s.io/controller-runtime/pkg/manager"
4851 "sigs.k8s.io/controller-runtime/pkg/manager/signals"
4952 "sigs.k8s.io/controller-runtime/pkg/metrics/server"
5053)
5154
55+ const (
56+ certFilePath = "/etc/virtual-kubelet/tls/tls.crt"
57+ keyFilePath = "/etc/virtual-kubelet/tls/tls.key"
58+ DefaultKubeletHttpListenAddr = "10250" // Default listen address for the HTTP server
59+ )
60+
5261// Main function for the module controller
5362// Responsibilities:
5463// 1. Sets up signal handling for graceful shutdown
@@ -59,6 +68,7 @@ import (
5968// 6. Creates and configures the VNode controller
6069// 7. Optionally creates module deployment controller
6170// 8. Starts all tunnels and the manager
71+ // 9. Starts the kubelet proxy server(if enabled)
6272
6373func main () {
6474 ctx , cancel := context .WithCancel (context .Background ())
@@ -75,27 +85,29 @@ func main() {
7585 trace .T = opencensus.Adapter {}
7686
7787 // Get configuration from environment variables
78- clientID := utils .GetEnv ("CLIENT_ID" , uuid .New ().String ())
88+ clientID := utils .GetEnv (model . EnvKeyOfClientID , uuid .New ().String ())
7989 env := utils .GetEnv ("ENV" , "dev" )
8090
8191 zlogger := zaplogger .GetLogger ()
8292 ctx = zaplogger .WithLogger (ctx , zlogger )
8393
8494 // Parse configuration with defaults
85- isCluster := utils .GetEnv ("IS_CLUSTER" , "" ) == "true"
86- workloadMaxLevel , err := strconv .Atoi (utils .GetEnv ("WORKLOAD_MAX_LEVEL" , "3" ))
95+ isCluster := utils .GetEnv (model . EnvKeyOfClusterModeEnabled , "" ) == "true"
96+ workloadMaxLevel , err := strconv .Atoi (utils .GetEnv (model . EnvKeyOfWorkloadMaxLevel , "3" ))
8797
8898 if err != nil {
8999 zlogger .Error (err , "failed to parse WORKLOAD_MAX_LEVEL, will be set to 3 default" )
90100 workloadMaxLevel = 3
91101 }
92102
93- vnodeWorkerNum , err := strconv .Atoi (utils .GetEnv ("VNODE_WORKER_NUM" , "8" ))
103+ vnodeWorkerNum , err := strconv .Atoi (utils .GetEnv (model . EnvKeyOfVNodeWorkerNum , "8" ))
94104 if err != nil {
95105 zlogger .Error (err , "failed to parse VNODE_WORKER_NUM, will be set to 8 default" )
96106 vnodeWorkerNum = 8
97107 }
98108
109+ deployNamespace := utils .GetEnv (model .EnvKeyOfNamespace , "default" )
110+
99111 // Initialize controller manager
100112 kubeConfig := config .GetConfigOrDie ()
101113 // TODO: should support to set from parameter
@@ -111,14 +123,25 @@ func main() {
111123 BindAddress : ":9090" ,
112124 },
113125 })
114-
115126 if err != nil {
116127 zlogger .Error (err , "unable to set up overall controller manager" )
117128 os .Exit (1 )
118129 }
119130
120131 tracker .SetTracker (& tracker.DefaultTracker {})
121132
133+ k8sClientSet := kubernetes .NewForConfigOrDie (kubeConfig )
134+
135+ var moduleControllerServiceIP string
136+ var kubeletProxyEnabled bool
137+ if os .Getenv (model .EnvKeyOfKubeletProxyEnabled ) == "true" {
138+ moduleControllerServiceIP , err = lookupProxyServiceIP (ctx , deployNamespace , k8sClientSet )
139+ if err != nil {
140+ log .G (ctx ).Fatalf ("Failed to lookup kubelet proxy service IP: %v" , err )
141+ }
142+ kubeletProxyEnabled = true
143+ }
144+
122145 // Configure and create VNode controller
123146 vNodeControllerConfig := vkModel.BuildVNodeControllerConfig {
124147 ClientID : clientID ,
@@ -127,6 +150,8 @@ func main() {
127150 IsCluster : isCluster ,
128151 WorkloadMaxLevel : workloadMaxLevel ,
129152 VNodeWorkerNum : vnodeWorkerNum ,
153+ // vnode ip will fall back to the ip of the base pod if not set
154+ PseudoNodeIP : moduleControllerServiceIP ,
130155 }
131156
132157 moduleDeploymentController , err := module_deployment_controller .NewModuleDeploymentController (env )
@@ -164,6 +189,21 @@ func main() {
164189 os .Exit (1 )
165190 }
166191
192+ if kubeletProxyEnabled {
193+ zlogger .Info ("starting kubelet proxy server" )
194+ err := kubelet_proxy .StartKubeletProxy (
195+ ctx ,
196+ certFilePath ,
197+ keyFilePath ,
198+ ":" + utils .GetEnv (model .EnvKeyOfKubeletProxyPort , DefaultKubeletHttpListenAddr ),
199+ k8sClientSet ,
200+ )
201+ if err != nil {
202+ zlogger .Error (err , "failed to start kubelet proxy server" )
203+ os .Exit (1 )
204+ }
205+ }
206+
167207 zlogger .Info ("Module controller running" )
168208 err = k8sControllerManager .Start (signals .SetupSignalHandler ())
169209 if err != nil {
@@ -218,3 +258,25 @@ func startTunnels(ctx context.Context, clientId string, env string, mgr manager.
218258 // we only using one tunnel for now
219259 return tunnels [0 ]
220260}
261+
262+ // lookupProxyServiceIP retrieves the ClusterIP of the kubelet proxy service in the specified namespace.
263+ func lookupProxyServiceIP (ctx context.Context , namespace string , clientSet kubernetes.Interface ) (string , error ) {
264+ svcList , err := clientSet .CoreV1 ().Services (namespace ).List (ctx , metav1.ListOptions {
265+ LabelSelector : fmt .Sprintf ("%s=%s" , model .LabelKeyOfKubeletProxyService , "true" ),
266+ })
267+ if err != nil {
268+ return "" , fmt .Errorf ("failed to list services in namespace %s: %w" , namespace , err )
269+ }
270+ if len (svcList .Items ) == 0 {
271+ return "" , fmt .Errorf ("no kubelet proxy service found in namespace %s" , namespace )
272+ }
273+ if len (svcList .Items ) > 1 {
274+ return "" , fmt .Errorf ("multiple kubelet proxy services deteched in namespace %s, expected only one" , namespace )
275+ }
276+
277+ firstSvc := svcList .Items [0 ]
278+ if firstSvc .Spec .ClusterIP == "" || firstSvc .Spec .ClusterIP == "None" {
279+ return "" , fmt .Errorf ("kubelet proxy service %s in namespace %s has no valid ClusterIP" , firstSvc .Name , namespace )
280+ }
281+ return firstSvc .Spec .ClusterIP , nil
282+ }
0 commit comments