diff --git a/Makefile b/Makefile index 9d658d99e..9511f3f15 100644 --- a/Makefile +++ b/Makefile @@ -225,6 +225,24 @@ $(BIN_PATH)/%: .static.%.$(STATIC) .PRECIOUS: $(foreach dir,$(BUILD_DIRS),.static.$(dir).1 .static.$(dir).) +# +# Eskimo words for snow... +# + +balloons: $(BIN_PATH)/nri-resource-policy-balloons +balloons-img: image.nri-resource-policy-balloons +t-a topology-aware: $(BIN_PATH)/nri-resource-policy-topology-aware +ta-img topolog-aware-img: image.nri-resource-policy-topology-aware +template: $(BIN_PATH)/nri-resource-policy-template +template-img: image.nri-resource-policy-template +memqos memory-qos: $(BIN_PATH)/nri-memory-qos +memqos-img memory-qos-img: image.nri-memory-qos +memtierd: $(BIN_PATH)/nri-memtierd +memtierd-img: image.nri-memtierd +sgx-epc sgx: $(BIN_PATH)/nri-sgx-epc +sgx-epc-img: image.nri-sgx-epc +config-manager: $(BIN_PATH)/config-manager + # # Image building test deployment generation targets # diff --git a/README-DRA-driver-proto.md b/README-DRA-driver-proto.md new file mode 100644 index 000000000..c8a3bf640 --- /dev/null +++ b/README-DRA-driver-proto.md @@ -0,0 +1,254 @@ +# Prototyping CPU DRA device abstraction / DRA-based CPU allocation + +## Background + +This prototype patch set bolts a DRA allocation frontend on top of the existing +topology aware resource policy plugin. The main intention with of this patch set +is to + +- provide something practical to play around with for the [feasibility study]( https://docs.google.com/document/d/1Tb_dC60YVCBr7cNYWuVLddUUTMcNoIt3zjd5-8rgug0/edit?tab=t.0#heading=h.iutbebngx80e) of enabling DRA-based CPU allocation, +- allow (relatively) easy experimentation with how to expose CPU as DRA +devices (IOW test various CPU DRA attributes) +- allow testing how DRA-based CPU allocation (using non-trivial CEL expressions) +would scale with cluster and cluster node size + +## Notes + +This patched NRI plugin, especially in its current state and form, is +*not a proposal* for a first real DRA-based CPU driver. + +## Prerequisites for Testing + +To test out this in a cluster, make sure you have + +1. DRA enabled in your cluster +One way to ensure it is to bootstrap you cluster using an InitConfig with the +following bits set: + +```yaml +apiVersion: kubeadm.k8s.io/v1beta4 +kind: InitConfiguration +... +--- +apiServer: + extraArgs: + - name: feature-gates + value: DynamicResourceAllocation=true,DRADeviceTaints=true,DRAAdminAccess=true,DRAPrioritizedList=true,DRAPartitionableDevices=true,DRAResourceClaimDeviceStatus=true + - name: runtime-config + value: resource.k8s.io/v1beta2=true,resource.k8s.io/v1beta1=true,resource.k8s.io/v1alpha3=true +apiVersion: kubeadm.k8s.io/v1beta4 +... +controllerManager: + extraArgs: + - name: feature-gates + value: DynamicResourceAllocation=true,DRADeviceTaints=true +... +scheduler: + extraArgs: + - name: feature-gates + value: DynamicResourceAllocation=true,DRADeviceTaints=true,DRAAdminAccess=true,DRAPrioritizedList=true,DRAPartitionableDevices=true +--- +apiVersion: kubelet.config.k8s.io/v1beta1 +kind: KubeletConfiguration +featureGates: + DynamicResourceAllocation: true +``` + +2. CDI enabled in your runtime configuration + +## Installation and Testing + +Once you have your cluster properly set upset up, you can pull this in to +your cluster with for testing with something like this: + +```bash +helm install --devel -n kube-system test oci://ghcr.io/klihub/nri-plugins/helm-charts/nri-resource-policy-topology-aware --version v0.9-dra-driver-unstable --set image.pullPolicy=Always --set extraEnv.OVERRIDE_SYS_ATOM_CPUS='2-5' --set extraEnv.OVERRIDE_SYS_CORE_CPUS='0\,1\,6-15' +``` + +Once the NRI plugin+DRA driver is up and running, you should see some CPUs +exposed as DRI devices. You can check the resource slices with the following +command + +```bash +[kli@n4c16-fedora-40-cloud-base-containerd ~]# kubectl get resourceslices +NAME NODE DRIVER POOL AGE +n4c16-fedora-40-cloud-base-containerd-native.cpu-jxfkj n4c16-fedora-40-cloud-base-containerd native.cpu pool0 4d2h +``` + +And the exposed devices like this: + +```bash +[kli@n4c16-fedora-40-cloud-base-containerd ~]# kubectl get resourceslices -oyaml | less +apiVersion: v1 +items: +- apiVersion: resource.k8s.io/v1beta2 + kind: ResourceSlice + metadata: + creationTimestamp: "2025-06-10T06:01:54Z" + generateName: n4c16-fedora-40-cloud-base-containerd-native.cpu- + generation: 1 + name: n4c16-fedora-40-cloud-base-containerd-native.cpu-jxfkj + ownerReferences: + - apiVersion: v1 + controller: true + kind: Node + name: n4c16-fedora-40-cloud-base-containerd + uid: 90a99f1f-c1ca-4bea-8dbd-3cc821f744b1 + resourceVersion: "871388" + uid: 4639d31f-e508-4b0a-8378-867f6c1c7cb1 + spec: + devices: + - attributes: + cache0ID: + int: 0 + cache1ID: + int: 8 + cache2ID: + int: 16 + cache3ID: + int: 24 + cluster: + int: 0 + core: + int: 0 + coreType: + string: P-core + die: + int: 0 + isolated: + bool: false + localMemory: + int: 0 + package: + int: 0 + name: cpu1 + - attributes: + - attributes: + cache0ID: + int: 1 + cache1ID: + int: 9 + cache2ID: + int: 17 + cache3ID: + int: 24 + cluster: + int: 2 + core: + int: 1 + coreType: + string: E-core + die: + int: 0 + isolated: + bool: false + localMemory: + int: 0 + package: + int: 0 + name: cpu2 + - attributes: + cache0ID: + int: 1 + cache1ID: + int: 9 + cache2ID: + int: 17 + cache3ID: + int: 24 + cluster: + int: 2 + core: +... +``` + +If everything looks fine and you do have CPUs available as DRA devices, you +can test DRA-based CPU allocation with something like this. This allocates +a single P-core for the container. + +```yaml +apiVersion: resource.k8s.io/v1beta1 +kind: ResourceClaimTemplate +metadata: + name: any-cores +spec: + spec: + devices: + requests: + - name: cpu + deviceClassName: native.cpu +--- +apiVersion: resource.k8s.io/v1beta1 +kind: ResourceClaimTemplate +metadata: + name: p-cores +spec: + spec: + devices: + requests: + - name: cpu + deviceClassName: native.cpu + selectors: + - cel: + expression: device.attributes["native.cpu"].coreType == "P-core" + count: 1 +--- +apiVersion: resource.k8s.io/v1beta1 +kind: ResourceClaimTemplate +metadata: + name: e-cores +spec: + spec: + devices: + requests: + - name: cpu + deviceClassName: native.cpu + selectors: + - cel: + expression: device.attributes["native.cpu"].coreType == "E-core" + count: 1 +--- +apiVersion: v1 +kind: Pod +metadata: + name: pcore-test + labels: + app: pod +spec: + containers: + - name: ctr0 + image: busybox + imagePullPolicy: IfNotPresent + args: + - /bin/sh + - -c + - trap 'exit 0' TERM; sleep 3600 & wait + resources: + requests: + cpu: 1 + memory: 100M + limits: + cpu: 1 + memory: 100M + claims: + - name: claim-pcores + resourceClaims: + - name: claim-pcores + resourceClaimTemplateName: p-cores + terminationGracePeriodSeconds: 1 +``` + +If you want to try a mixed native CPU + DRA-based allocation, try +increasing the CPU request and limit in the pods spec to 1500m CPUs +or CPUs and see what happens. + + +## Playing Around with CPU Abstractions + +If you want to play around with this (for instance modify the exposed CPU abstraction), the easiest way is to +1. [fork](https://github.com/containers/nri-plugins/fork) the [main NRI Reference Plugins](https://github.com/containers/nri-plugins) repo +2. enable github actions in your personal fork +3. make any changes you want (for instance, to alter the CPU abstraction, take a look at [cpu.DRA()](https://github.com/klihub/nri-plugins/blob/test/build/dra-driver/pkg/sysfs/dra.go) +4. Push your changes to ssh://git@github.com/$YOUR_FORK/nri-plugins/refs/heads/test/build/dra-driver. +5. Wait for the image and Helm chart publishing actions to succeed +6. Once done, you can pull the result in to your cluster with something like `helm install --devel -n kube-system test oci://ghcr.io/$YOUR_GITHUB_USERID/nri-plugins/helm-charts/nri-resource-policy-topology-aware --version v0.9-dra-driver-unstable` diff --git a/README.md b/README.md index d152ef7cb..7c63ba263 100644 --- a/README.md +++ b/README.md @@ -19,3 +19,6 @@ Currently following plugins are available: [5]: https://containers.github.io/nri-plugins/stable/docs/memory/sgx-epc.html See the [NRI plugins documentation](https://containers.github.io/nri-plugins/) for more information. + +See the [DRA CPU driver prototype notes](README-DRA-driver-proto.md) for more information +about using the Topology Aware policy as a DRA CPU driver. diff --git a/cmd/plugins/balloons/policy/balloons-policy.go b/cmd/plugins/balloons/policy/balloons-policy.go index 682e3a651..0daa20d64 100644 --- a/cmd/plugins/balloons/policy/balloons-policy.go +++ b/cmd/plugins/balloons/policy/balloons-policy.go @@ -351,6 +351,18 @@ func (p *balloons) UpdateResources(c cache.Container) error { return nil } +// AllocateClaim alloctes CPUs for the claim. +func (p *balloons) AllocateClaim(claim policyapi.Claim) error { + log.Debug("allocating claim %s for pods %v...", claim.String(), claim.GetPods()) + return nil +} + +// ReleaseClaim releases CPUs of the claim. +func (p *balloons) ReleaseClaim(claim policyapi.Claim) error { + log.Debug("releasing claim %s for pods %v...", claim.String(), claim.GetPods()) + return nil +} + // HandleEvent handles policy-specific events. func (p *balloons) HandleEvent(*events.Policy) (bool, error) { log.Debug("(not) handling event...") diff --git a/cmd/plugins/template/policy/template-policy.go b/cmd/plugins/template/policy/template-policy.go index 9333a594b..038bb5801 100644 --- a/cmd/plugins/template/policy/template-policy.go +++ b/cmd/plugins/template/policy/template-policy.go @@ -110,6 +110,18 @@ func (p *policy) UpdateResources(c cache.Container) error { return nil } +// AllocateClaim alloctes CPUs for the claim. +func (p *policy) AllocateClaim(claim policyapi.Claim) error { + log.Debug("allocating claim %s for pods %v...", claim.String(), claim.GetPods()) + return nil +} + +// ReleaseClaim releases CPUs of the claim. +func (p *policy) ReleaseClaim(claim policyapi.Claim) error { + log.Debug("releasing claim %s for pods %v...", claim.String(), claim.GetPods()) + return nil +} + // HandleEvent handles policy-specific events. func (p *policy) HandleEvent(e *events.Policy) (bool, error) { log.Info("received policy event %s.%s with data %v...", e.Source, e.Type, e.Data) diff --git a/cmd/plugins/topology-aware/policy/cache.go b/cmd/plugins/topology-aware/policy/cache.go index 454cb8bd6..82cf5d3e2 100644 --- a/cmd/plugins/topology-aware/policy/cache.go +++ b/cmd/plugins/topology-aware/policy/cache.go @@ -126,6 +126,7 @@ func (p *policy) reinstateGrants(grants map[string]Grant) error { type cachedGrant struct { PrettyName string + Claimed string Exclusive string Part int CPUType cpuClass @@ -140,6 +141,7 @@ type cachedGrant struct { func newCachedGrant(cg Grant) *cachedGrant { ccg := &cachedGrant{} ccg.PrettyName = cg.GetContainer().PrettyName() + ccg.Claimed = cg.ClaimedCPUs().String() ccg.Exclusive = cg.ExclusiveCPUs().String() ccg.Part = cg.CPUPortion() ccg.CPUType = cg.CPUType() @@ -168,6 +170,7 @@ func (ccg *cachedGrant) ToGrant(policy *policy) (Grant, error) { container, ccg.CPUType, cpuset.MustParse(ccg.Exclusive), + cpuset.MustParse(ccg.Claimed), ccg.Part, ccg.MemType, ccg.ColdStart, diff --git a/cmd/plugins/topology-aware/policy/mocks_test.go b/cmd/plugins/topology-aware/policy/mocks_test.go index 26cec65a7..b31df585a 100644 --- a/cmd/plugins/topology-aware/policy/mocks_test.go +++ b/cmd/plugins/topology-aware/policy/mocks_test.go @@ -31,6 +31,7 @@ import ( "github.com/intel/goresctrl/pkg/sst" idset "github.com/intel/goresctrl/pkg/utils" v1 "k8s.io/api/core/v1" + resapi "k8s.io/api/resource/v1beta2" ) type mockSystemNode struct { @@ -208,6 +209,10 @@ func (c *mockCPU) CoreKind() sysfs.CoreKind { return sysfs.PerformanceCore } +func (c *mockCPU) DRA(extras ...map[sysfs.QualifiedName]sysfs.Attribute) *resapi.Device { + panic("unimplmented") +} + type mockSystem struct { isolatedCPU int nodes []system.Node @@ -349,6 +354,9 @@ func (fake *mockSystem) NodeDistance(idset.ID, idset.ID) int { func (fake *mockSystem) NodeHintToCPUs(string) string { return "" } +func (fake *mockSystem) CPUsAsDRADevices(ids []idset.ID) []resapi.Device { + panic("unimplemented") +} type mockContainer struct { name string @@ -409,6 +417,9 @@ func (m *mockContainer) GetAnnotation(string, interface{}) (string, bool) { func (m *mockContainer) GetEnv(string) (string, bool) { panic("unimplemented") } +func (m *mockContainer) GetEnvList() []string { + panic("unimplemented") +} func (m *mockContainer) GetAnnotations() map[string]string { panic("unimplemented") } @@ -726,6 +737,12 @@ func (m *mockCache) SetPolicyEntry(string, interface{}) { func (m *mockCache) GetPolicyEntry(string, interface{}) bool { return m.returnValueForGetPolicyEntry } +func (m *mockCache) SetEntry(string, interface{}) { + panic("unimplemented") +} +func (m *mockCache) GetEntry(string, interface{}) (interface{}, error) { + panic("unimplemented") +} func (m *mockCache) Save() error { return nil } diff --git a/cmd/plugins/topology-aware/policy/pools.go b/cmd/plugins/topology-aware/policy/pools.go index 5aaa6db35..5aad72930 100644 --- a/cmd/plugins/topology-aware/policy/pools.go +++ b/cmd/plugins/topology-aware/policy/pools.go @@ -18,11 +18,15 @@ import ( "fmt" "math" "sort" + "strconv" + "strings" "github.com/containers/nri-plugins/pkg/utils/cpuset" + corev1 "k8s.io/api/core/v1" "github.com/containers/nri-plugins/pkg/resmgr/cache" libmem "github.com/containers/nri-plugins/pkg/resmgr/lib/memory" + policyapi "github.com/containers/nri-plugins/pkg/resmgr/policy" system "github.com/containers/nri-plugins/pkg/sysfs" idset "github.com/intel/goresctrl/pkg/utils" ) @@ -347,7 +351,14 @@ func (p *policy) allocatePool(container cache.Container, poolHint string) (Grant offer *libmem.Offer ) - request := newRequest(container, p.memAllocator.Masks().AvailableTypes()) + claimed, unclaimed, err := p.getClaimedCPUs(container) + if err != nil { + return nil, err + } + + log.Info("**** claimed CPU(s): %s, unclaimed CPU: %d", claimed, unclaimed) + + request := newRequest(container, claimed, p.memAllocator.Masks().AvailableTypes()) if p.root.FreeSupply().ReservedCPUs().IsEmpty() && request.CPUType() == cpuReserved { // Fallback to allocating reserved CPUs from the shared pool @@ -438,6 +449,151 @@ func (p *policy) allocatePool(container cache.Container, poolHint string) (Grant return grant, nil } +func (p *policy) allocateClaim(claim policyapi.Claim) error { + hints := map[string]string{} + + // collect grants we need to free dues to conflciting exclusive allocation + realloc := []Grant{} + cpus := cpuset.New(claim.GetDevices()...) + for _, g := range p.allocations.grants { + if !cpus.Intersection(g.ExclusiveCPUs().Union(g.IsolatedCPUs())).IsEmpty() { + log.Info("***** releasing conflicting grant %s", g) + hints[g.GetContainer().GetID()] = g.GetCPUNode().Name() + realloc = append(realloc, g) + p.releasePool(g.GetContainer()) + p.updateSharedAllocations(&g) + } + } + + pool := p.getPoolForCPUs(cpus) + if pool == nil { + return fmt.Errorf("failed to find pool for claimed CPUs %s", cpus.String()) + } + + log.Info("*** target pool is %s", pool.Name()) + + // free more grants if we need to, for slicing off claimed CPUs + s := pool.FreeSupply() + needShared := s.SharableCPUs().Intersection(cpus).Size() + if available, needed := s.AllocatableSharedCPU(), 1000*needShared; available < needed { + log.Info("***** need to free %d mCPU shared capacity", needed-available) + grants := p.getLargestSharedUsers(pool) + log.Info("grants: %v", grants) + for _, g := range grants { + hints[g.GetContainer().GetID()] = g.GetCPUNode().Name() + realloc = append(realloc, g) + p.releasePool(g.GetContainer()) + p.updateSharedAllocations(&g) + if available, needed = s.AllocatableSharedCPU(), 1000*needShared; available >= needed { + break + } + } + } + s.ClaimCPUs(cpus) + + // TODO: sort old grants by QoS class or size and pool distance from root... + for _, oldg := range realloc { + c := oldg.GetContainer() + log.Info("***** reallocating %s with pool hint %s", c.PrettyName(), hints[c.GetID()]) + err := p.allocateResources(c, hints[c.GetID()]) + if err != nil { + log.Error("failed to reallocate container %s (%s), retrying with no pool hint", c.PrettyName(), c.GetID()) + } + err = p.allocateResources(c, "") + if err != nil { + log.Error("failed to reallocate container %s without pool hints", c.PrettyName(), c.GetID()) + } + } + + return nil +} + +// Find (tightest fitting) target pool for a set of cpus +func (p *policy) getPoolForCPUs(cpus cpuset.CPUSet) Node { + var pool Node + for _, n := range p.pools { + s := n.GetSupply() + poolCPUs := s.SharableCPUs().Union(s.IsolatedCPUs()).Union(s.ReservedCPUs()) + if poolCPUs.Intersection(cpus).Equals(cpus) { + if pool == nil { + pool = n + } else { + if n.RootDistance() > pool.RootDistance() { + pool = n + } + } + } + } + return pool +} + +// Get the largest shared CPU users in a pool and its parents. +func (p *policy) getLargestSharedUsers(pool Node) []Grant { + pools := map[string]struct{}{} + grants := []Grant{} + for n := pool; !n.IsNil(); n = n.Parent() { + pools[n.Name()] = struct{}{} + } + + for _, g := range p.allocations.grants { + if g.SharedPortion() > 0 { + grants = append(grants, g) + } + } + + sort.Slice(grants, func(i, j int) bool { + gi, gj := grants[i], grants[j] + di, dj := gi.GetCPUNode().RootDistance(), gj.GetCPUNode().RootDistance() + si, sj := gi.SharedPortion(), gj.SharedPortion() + if di < dj { + return true + } + return si <= sj + }) + + return grants +} + +func (p *policy) releaseClaim(claim policyapi.Claim) error { + cpus := cpuset.New(claim.GetDevices()...) + p.root.FreeSupply().UnclaimCPUs(cpus) + p.updateSharedAllocations(nil) + + return nil +} + +// Get the claimed CPUs injected into the environment of a container. +func (p *policy) getClaimedCPUs(c cache.Container) (cpuset.CPUSet, int, error) { + var ids []int + for _, env := range c.GetEnvList() { + if !strings.HasPrefix(env, "DRA_CPU") { + continue + } + val := strings.TrimSuffix(strings.TrimPrefix(env, "DRA_CPU"), "=1") + i, err := strconv.ParseInt(val, 10, 32) + if err != nil { + return cpuset.New(), 0, fmt.Errorf("invalid DRA CPU env. var %q: %v", env, err) + } + ids = append(ids, int(i)) + } + + claimed := cpuset.New(ids...) + reqs, ok := c.GetResourceUpdates() + if !ok { + reqs = c.GetResourceRequirements() + } + request := reqs.Requests[corev1.ResourceCPU] + unclaimed := int(request.MilliValue()) - 1000*claimed.Size() + + if unclaimed < 0 { + return claimed, unclaimed, + fmt.Errorf("invalid claimed (%s CPU(s)) vs. requested/native CPU (%dm)", + claimed, request.MilliValue()) + } + + return claimed, unclaimed, nil +} + // setPreferredCpusetCpus pins container's CPUs according to what has been // allocated for it, taking into account if the container should run // with hyperthreads hidden. @@ -463,6 +619,7 @@ func (p *policy) applyGrant(grant Grant) { container := grant.GetContainer() cpuType := grant.CPUType() + claimed := grant.ClaimedCPUs() exclusive := grant.ExclusiveCPUs() reserved := grant.ReservedCPUs() shared := grant.SharedCPUs() @@ -472,16 +629,25 @@ func (p *policy) applyGrant(grant Grant) { kind := "" switch cpuType { case cpuNormal: - if exclusive.IsEmpty() { + if exclusive.IsEmpty() && claimed.IsEmpty() { cpus = shared kind = "shared" } else { - kind = "exclusive" + if !exclusive.IsEmpty() { + kind = "exclusive" + } + if !claimed.IsEmpty() { + if kind == "" { + kind = "claimed" + } else { + kind += "+claimed" + } + } if cpuPortion > 0 { kind += "+shared" - cpus = exclusive.Union(shared) + cpus = exclusive.Union(claimed).Union(shared) } else { - cpus = exclusive + cpus = exclusive.Union(claimed) } } case cpuReserved: @@ -608,9 +774,14 @@ func (p *policy) updateSharedAllocations(grant *Grant) { continue } + if other.SharedPortion() == 0 && !other.ClaimedCPUs().IsEmpty() { + log.Info(" => %s not affected (only claimed CPUs)...", other) + continue + } + if opt.PinCPU { shared := other.GetCPUNode().FreeSupply().SharableCPUs() - exclusive := other.ExclusiveCPUs() + exclusive := other.ExclusiveCPUs().Union(other.ClaimedCPUs()) if exclusive.IsEmpty() { p.setPreferredCpusetCpus(other.GetContainer(), shared, fmt.Sprintf(" => updating %s with shared CPUs of %s: %s...", @@ -652,6 +823,8 @@ func (p *policy) compareScores(request Request, pools []Node, scores map[int]Sco depth1, depth2 := node1.RootDistance(), node2.RootDistance() id1, id2 := node1.NodeID(), node2.NodeID() score1, score2 := scores[id1], scores[id2] + claim1, claim2 := node1.FreeSupply().ClaimedCPUs(), node2.FreeSupply().ClaimedCPUs() + claim := request.ClaimedCPUs() cpuType := request.CPUType() isolated1, reserved1, shared1 := score1.IsolatedCapacity(), score1.ReservedCapacity(), score1.SharedCapacity() isolated2, reserved2, shared2 := score2.IsolatedCapacity(), score2.ReservedCapacity(), score2.SharedCapacity() @@ -668,7 +841,7 @@ func (p *policy) compareScores(request Request, pools []Node, scores map[int]Sco // // Our scoring/score sorting algorithm is: // - // - insufficient isolated, reserved or shared capacity loses + // - insufficient claimed, isolated, reserved or shared capacity loses // - if we have affinity, the higher affinity score wins // - if we have topology hints // * better hint score wins @@ -693,8 +866,12 @@ func (p *policy) compareScores(request Request, pools []Node, scores map[int]Sco // Before this comparison is reached, nodes with insufficient uncompressible resources // (memory) have been filtered out. - // a node with insufficient isolated or shared capacity loses + // a node with insufficient claimed, isolated or shared capacity loses switch { + case claim.Difference(claim1).Size() == 0 && claim.Difference(claim2).Size() > 0: + return true + case claim.Difference(claim1).Size() > 0 && claim.Difference(claim2).Size() == 0: + return false case cpuType == cpuNormal && ((isolated2 < 0 && isolated1 >= 0) || (shared2 <= 0 && shared1 > 0)): log.Debug(" => %s loses, insufficent isolated or shared", node2.Name()) return true diff --git a/cmd/plugins/topology-aware/policy/resources.go b/cmd/plugins/topology-aware/policy/resources.go index 055f0c8dc..2f7af3d04 100644 --- a/cmd/plugins/topology-aware/policy/resources.go +++ b/cmd/plugins/topology-aware/policy/resources.go @@ -64,6 +64,8 @@ type Supply interface { ReservedCPUs() cpuset.CPUSet // SharableCPUs returns the sharable cpuset in this supply. SharableCPUs() cpuset.CPUSet + // ClaimedCPUs returns the claimed CPUs for this supply. + ClaimedCPUs() cpuset.CPUSet // GrantedReserved returns the locally granted reserved CPU capacity in this supply. GrantedReserved() int // GrantedShared returns the locally granted shared CPU capacity in this supply. @@ -89,6 +91,12 @@ type Supply interface { DumpCapacity() string // DumpAllocatable returns a printable representation of the supply's alloctable resources. DumpAllocatable() string + + ClaimCPUs(cpuset.CPUSet) + claimCPUs(cpuset.CPUSet) + + UnclaimCPUs(cpuset.CPUSet) + unclaimCPUs(cpuset.CPUSet) } // Request represents CPU and memory resources requested by a container. @@ -103,6 +111,8 @@ type Request interface { CPUPrio() cpuPrio // SetCPUType sets the type of requested CPU. SetCPUType(cpuType cpuClass) + // ClaimedCPUs returns the CPUs claimed for this request. + ClaimedCPUs() cpuset.CPUSet // FullCPUs return the number of full CPUs requested. FullCPUs() int // CPUFraction returns the amount of fractional milli-CPU requested. @@ -142,6 +152,8 @@ type Grant interface { // CPUPortion returns granted milli-CPUs of non-full CPUs of CPUType(). // CPUPortion() == ReservedPortion() + SharedPortion(). CPUPortion() int + // ClaimedCPUs returns the claimed granted cpuset. + ClaimedCPUs() cpuset.CPUSet // ExclusiveCPUs returns the exclusively granted non-isolated cpuset. ExclusiveCPUs() cpuset.CPUSet // ReservedCPUs returns the reserved granted cpuset. @@ -212,6 +224,7 @@ type supply struct { isolated cpuset.CPUSet // isolated CPUs at this node reserved cpuset.CPUSet // reserved CPUs at this node sharable cpuset.CPUSet // sharable CPUs at this node + claimed cpuset.CPUSet // CPUs allocated through DRA grantedReserved int // amount of reserved CPUs allocated grantedShared int // amount of shareable CPUs allocated } @@ -221,6 +234,7 @@ var _ Supply = &supply{} // request implements our Request interface. type request struct { container cache.Container // container for this request + claimed cpuset.CPUSet // CPUs DRA-claimed for this container full int // number of full CPUs requested fraction int // amount of fractional CPU requested isolate bool // prefer isolated exclusive CPUs @@ -246,6 +260,7 @@ type grant struct { container cache.Container // container CPU is granted to node Node // node CPU is supplied from exclusive cpuset.CPUSet // exclusive CPUs + claimed cpuset.CPUSet // claimed CPUs cpuType cpuClass // type of CPUs (normal, reserved, ...) cpuPortion int // milliCPUs granted from CPUs of cpuType memType memoryType // requested types of memory @@ -310,6 +325,11 @@ func (cs *supply) SharableCPUs() cpuset.CPUSet { return cs.sharable.Clone() } +// ClaimedCPUs returns the claimed CPUs in this supply. +func (cs *supply) ClaimedCPUs() cpuset.CPUSet { + return cs.claimed.Clone() +} + // GrantedReserved returns the locally granted reserved CPU capacity. func (cs *supply) GrantedReserved() int { return cs.grantedReserved @@ -392,6 +412,7 @@ func (cs *supply) AllocateCPU(r Request) (Grant, error) { cr := r.(*request) + claimed := cr.claimed full := cr.full fraction := cr.fraction @@ -450,7 +471,7 @@ func (cs *supply) AllocateCPU(r Request) (Grant, error) { cs.node.Name(), full, cs.sharable, cs.AllocatableSharedCPU()) } - grant := newGrant(cs.node, cr.GetContainer(), cpuType, exclusive, 0, 0, 0) + grant := newGrant(cs.node, cr.GetContainer(), cpuType, exclusive, claimed, 0, 0, 0) grant.AccountAllocateCPU() if fraction > 0 { @@ -694,6 +715,44 @@ func (cs *supply) DumpAllocatable() string { return allocatable } +func (cs *supply) ClaimCPUs(cpus cpuset.CPUSet) { + cs.claimCPUs(cpus) + + cs.GetNode().DepthFirst(func(n Node) { + n.FreeSupply().claimCPUs(cpus) + }) + for n := cs.GetNode(); !n.IsNil(); n = n.Parent() { + n.FreeSupply().claimCPUs(cpus) + } +} + +func (cs *supply) claimCPUs(cpus cpuset.CPUSet) { + cs.claimed = cs.claimed.Union(cpus) + cs.isolated = cs.isolated.Difference(cpus) + cs.sharable = cs.sharable.Difference(cpus) + cs.reserved = cs.reserved.Difference(cpus) +} + +func (cs *supply) UnclaimCPUs(cpus cpuset.CPUSet) { + cs.unclaimCPUs(cpus) + + cs.GetNode().DepthFirst(func(n Node) { + n.FreeSupply().unclaimCPUs(cpus) + }) + for n := cs.GetNode(); !n.IsNil(); n = n.Parent() { + n.FreeSupply().unclaimCPUs(cpus) + } +} + +func (cs *supply) unclaimCPUs(cpus cpuset.CPUSet) { + all := cs.GetNode().GetSupply() + + cs.isolated = cs.isolated.Union(all.IsolatedCPUs().Intersection(cpus)) + cs.sharable = cs.sharable.Union(all.SharableCPUs().Intersection(cpus)) + cs.reserved = cs.reserved.Union(all.ReservedCPUs().Intersection(cpus)) + cs.claimed = cs.claimed.Difference(cpus) +} + // prettyMem formats the given amount as k, M, G, or T units. func prettyMem(value int64) string { units := []string{"k", "M", "G", "T"} @@ -712,14 +771,22 @@ func prettyMem(value int64) string { } // newRequest creates a new request for the given container. -func newRequest(container cache.Container, types libmem.TypeMask) Request { +func newRequest(container cache.Container, claimed cpuset.CPUSet, types libmem.TypeMask) Request { pod, _ := container.GetPod() full, fraction, isolate, cpuType, prio := cpuAllocationPreferences(pod, container) req, lim, mtype := memoryAllocationPreference(pod, container) coldStart := time.Duration(0) - log.Debug("%s: CPU preferences: cpuType=%s, full=%v, fraction=%v, isolate=%v, prio=%v", - container.PrettyName(), cpuType, full, fraction, isolate, prio) + if full >= claimed.Size() { + full -= claimed.Size() + } else { + if fraction >= 1000*claimed.Size() { + fraction -= 1000 * claimed.Size() + } + } + + log.Debug("%s: CPU preferences: cpuType=%s, claim=%s, full=%v, fraction=%v, isolate=%v, prio=%v", + container.PrettyName(), cpuType, claimed, full, fraction, isolate, prio) if mtype == memoryUnspec { mtype = defaultMemoryType &^ memoryHBM @@ -749,6 +816,7 @@ func newRequest(container cache.Container, types libmem.TypeMask) Request { return &request{ container: container, + claimed: claimed, full: full, fraction: fraction, isolate: isolate, @@ -804,6 +872,11 @@ func (cr *request) SetCPUType(cpuType cpuClass) { cr.cpuType = cpuType } +// ClaimedPUs return the claimed CPUs for this request. +func (cr *request) ClaimedCPUs() cpuset.CPUSet { + return cr.claimed +} + // FullCPUs return the number of full CPUs requested. func (cr *request) FullCPUs() int { return cr.full @@ -1040,12 +1113,13 @@ func (score *score) String() string { } // newGrant creates a CPU grant from the given node for the container. -func newGrant(n Node, c cache.Container, cpuType cpuClass, exclusive cpuset.CPUSet, cpuPortion int, mt memoryType, coldstart time.Duration) Grant { +func newGrant(n Node, c cache.Container, cpuType cpuClass, exclusive, claimed cpuset.CPUSet, cpuPortion int, mt memoryType, coldstart time.Duration) Grant { grant := &grant{ node: n, container: c, cpuType: cpuType, exclusive: exclusive, + claimed: claimed, cpuPortion: cpuPortion, memType: mt, coldStart: coldstart, @@ -1084,6 +1158,7 @@ func (cg *grant) Clone() Grant { node: cg.GetCPUNode(), container: cg.GetContainer(), exclusive: cg.ExclusiveCPUs(), + claimed: cg.ClaimedCPUs(), cpuType: cg.CPUType(), cpuPortion: cg.SharedPortion(), memType: cg.MemoryType(), @@ -1138,6 +1213,11 @@ func (cg *grant) ExclusiveCPUs() cpuset.CPUSet { return cg.exclusive } +// ClaimedCPUs returns the claimed CPUSet in this grant. +func (cg *grant) ClaimedCPUs() cpuset.CPUSet { + return cg.claimed +} + // ReservedCPUs returns the reserved CPUSet in the supply of this grant. func (cg *grant) ReservedCPUs() cpuset.CPUSet { return cg.node.GetSupply().ReservedCPUs() diff --git a/cmd/plugins/topology-aware/policy/topology-aware-policy.go b/cmd/plugins/topology-aware/policy/topology-aware-policy.go index 99269d5fe..f9e2176b5 100644 --- a/cmd/plugins/topology-aware/policy/topology-aware-policy.go +++ b/cmd/plugins/topology-aware/policy/topology-aware-policy.go @@ -145,6 +145,10 @@ func (p *policy) Start() error { p.root.Dump("") p.checkAllocations(" ") + if err := p.options.PublishCPUs(p.allowed.Difference(p.reserved).List()); err != nil { + log.Errorf("failed to publish CPU DRA resources: %v", err) + } + return nil } @@ -280,6 +284,18 @@ func (p *policy) UpdateResources(container cache.Container) error { return nil } +// AllocateClaim alloctes CPUs for the claim. +func (p *policy) AllocateClaim(claim policyapi.Claim) error { + log.Info("allocating claim %s for pods %v...", claim.String(), claim.GetPods()) + return p.allocateClaim(claim) +} + +// ReleaseClaim releases CPUs of the claim. +func (p *policy) ReleaseClaim(claim policyapi.Claim) error { + log.Info("releasing claim %s for pods %v...", claim.String(), claim.GetPods()) + return p.releaseClaim(claim) +} + // HandleEvent handles policy-specific events. func (p *policy) HandleEvent(e *events.Policy) (bool, error) { log.Debug("received policy event %s.%s with data %v...", e.Source, e.Type, e.Data) @@ -393,8 +409,9 @@ func (p *policy) ExportResourceData(c cache.Container) map[string]string { data := map[string]string{} shared := grant.SharedCPUs().String() - isolated := grant.ExclusiveCPUs().Intersection(grant.GetCPUNode().GetSupply().IsolatedCPUs()) + isolated := grant.ExclusiveCPUs().Union(grant.ClaimedCPUs()).Intersection(grant.GetCPUNode().GetSupply().IsolatedCPUs()) exclusive := grant.ExclusiveCPUs().Difference(isolated).String() + claimed := grant.ClaimedCPUs().String() if grant.SharedPortion() > 0 && shared != "" { data[policyapi.ExportSharedCPUs] = shared @@ -405,6 +422,9 @@ func (p *policy) ExportResourceData(c cache.Container) map[string]string { if exclusive != "" { data[policyapi.ExportExclusiveCPUs] = exclusive } + if claimed != "" { + data[policyapi.ExportClaimedCPUs] = claimed + } mems := grant.GetMemoryZone() dram := mems.And(p.memAllocator.Masks().NodesByTypes(libmem.TypeMaskDRAM)) @@ -498,6 +518,10 @@ func (p *policy) Reconfigure(newCfg interface{}) error { p.root.Dump("") p.checkAllocations(" ") + if err := p.options.PublishCPUs(p.allowed.Difference(p.reserved).List()); err != nil { + log.Errorf("failed to publish CPU DRA resources: %v", err) + } + return nil } @@ -630,7 +654,11 @@ func (p *policy) newAllocations() allocations { // clone creates a copy of the allocation. func (a *allocations) clone() allocations { - o := allocations{policy: a.policy, grants: make(map[string]Grant)} + o := allocations{ + policy: a.policy, + grants: make(map[string]Grant), + //claims: make(map[string][]policyapi.Claim), + } for id, grant := range a.grants { o.grants[id] = grant.Clone() } diff --git a/deployment/helm/topology-aware/templates/clusterrole.yaml b/deployment/helm/topology-aware/templates/clusterrole.yaml index d5b76ffbb..2a58dd7e7 100644 --- a/deployment/helm/topology-aware/templates/clusterrole.yaml +++ b/deployment/helm/topology-aware/templates/clusterrole.yaml @@ -22,3 +22,27 @@ rules: - list - update - delete +- apiGroups: + - "resource.k8s.io" + resources: + - resourceslices + verbs: + - list + - watch + - create + - update + - delete +- apiGroups: + - "resource.k8s.io" + resources: + - resourceclaims + - deviceclasses + verbs: + - get +- apiGroups: + - "resource.k8s.io" + resources: + - resourceclaims/status + verbs: + - patch + - update diff --git a/deployment/helm/topology-aware/templates/daemonset.yaml b/deployment/helm/topology-aware/templates/daemonset.yaml index 56685b868..03066d2a9 100644 --- a/deployment/helm/topology-aware/templates/daemonset.yaml +++ b/deployment/helm/topology-aware/templates/daemonset.yaml @@ -126,6 +126,12 @@ spec: - name: pod-resources-socket mountPath: /var/lib/kubelet/pod-resources readOnly: true + - name: host-cdi-dir + mountPath: /host/var/run/cdi + - name: kubelet-plugins + mountPath: /var/lib/kubelet/plugins + - name: kubelet-plugins-registry + mountPath: /var/lib/kubelet/plugins_registry {{- if .Values.podPriorityClassNodeCritical }} priorityClassName: system-node-critical {{- end }} @@ -149,6 +155,18 @@ spec: hostPath: path: /var/lib/kubelet/pod-resources type: DirectoryOrCreate + - name: host-cdi-dir + hostPath: + path: /var/run/cdi + type: DirectoryOrCreate + - name: kubelet-plugins + hostPath: + path: /var/lib/kubelet/plugins + type: DirectoryOrCreate + - name: kubelet-plugins-registry + hostPath: + path: /var/lib/kubelet/plugins_registry + type: DirectoryOrCreate {{- if .Values.nri.runtime.patchConfig }} - name: containerd-config hostPath: diff --git a/deployment/helm/topology-aware/templates/native-cpu-device-class.yaml b/deployment/helm/topology-aware/templates/native-cpu-device-class.yaml new file mode 100644 index 000000000..ad8c4346e --- /dev/null +++ b/deployment/helm/topology-aware/templates/native-cpu-device-class.yaml @@ -0,0 +1,8 @@ +apiVersion: resource.k8s.io/v1beta1 +kind: DeviceClass +metadata: + name: native.cpu +spec: + selectors: + - cel: + expression: device.driver == "native.cpu" diff --git a/docs/conf.py b/docs/conf.py index f93341bd0..ba73c1e09 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -127,7 +127,7 @@ def gomod_versions(modules): # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ['build', 'build-aux', '_build', '.github', '_work', 'generate', 'README.md', 'TODO.md', 'SECURITY.md', 'CODE-OF-CONDUCT.md', 'docs/releases', 'test/e2e/README.md', 'docs/resource-policy/releases', 'docs/resource-policy/README.md','test/statistics-analysis/README.md', 'deployment/helm/*/*.md', '**/testdata'] +exclude_patterns = ['build', 'build-aux', '_build', '.github', '_work', 'generate', 'README.md', 'TODO.md', 'SECURITY.md', 'CODE-OF-CONDUCT.md', 'docs/releases', 'test/e2e/README.md', 'docs/resource-policy/releases', 'docs/resource-policy/README.md','test/statistics-analysis/README.md', 'deployment/helm/*/*.md', '**/testdata', 'README-DRA-driver-proto.md'] # -- Options for HTML output ------------------------------------------------- diff --git a/go.mod b/go.mod index d8b6f0960..1f577568c 100644 --- a/go.mod +++ b/go.mod @@ -15,36 +15,37 @@ require ( github.com/onsi/ginkgo/v2 v2.21.0 github.com/onsi/gomega v1.35.1 github.com/pelletier/go-toml/v2 v2.1.0 - github.com/prometheus/client_golang v1.19.1 + github.com/prometheus/client_golang v1.22.0 github.com/prometheus/client_model v0.6.1 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.10.0 - go.opentelemetry.io/otel v1.19.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 - go.opentelemetry.io/otel/sdk v1.19.0 - go.opentelemetry.io/otel/trace v1.19.0 + go.opentelemetry.io/otel v1.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/trace v1.35.0 golang.org/x/sys v0.31.0 golang.org/x/time v0.9.0 - google.golang.org/grpc v1.65.0 - k8s.io/api v0.31.2 + google.golang.org/grpc v1.72.1 + k8s.io/api v0.33.1 k8s.io/apimachinery v0.33.1 - k8s.io/client-go v0.31.2 + k8s.io/client-go v0.33.1 + k8s.io/dynamic-resource-allocation v0.26.0-beta.0.0.20250531103609-72b57f383cd6 k8s.io/klog/v2 v2.130.1 - k8s.io/kubelet v0.31.2 - k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 + k8s.io/kubelet v0.33.1 + k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 sigs.k8s.io/controller-runtime v0.16.2 sigs.k8s.io/yaml v1.4.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/fxamacker/cbor/v2 v2.8.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -53,13 +54,11 @@ require ( github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.9 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db // indirect github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect - github.com/imdario/mergo v0.3.6 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knqyf263/go-plugin v0.8.1-0.20240827022226-114c6257e441 // indirect @@ -70,14 +69,18 @@ require ( github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/pflag v1.0.6 // indirect github.com/tetratelabs/wazero v1.9.0 // indirect github.com/x448/float16 v0.8.4 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect - go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/proto/otlp v1.0.0 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/proto/otlp v1.4.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect @@ -85,8 +88,8 @@ require ( golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect golang.org/x/tools v0.26.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -96,7 +99,7 @@ require ( k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/randfill v1.0.0 // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.7.0 // indirect ) replace ( diff --git a/go.sum b/go.sum index 275f46059..92f6228d4 100644 --- a/go.sum +++ b/go.sum @@ -614,8 +614,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= @@ -680,8 +680,8 @@ github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/ github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= -github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/fxamacker/cbor/v2 v2.8.0 h1:fFtUGXUzXPHTIUdne5+zzMPTfffl3RD5qYnkY40vtxU= +github.com/fxamacker/cbor/v2 v2.8.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= @@ -720,8 +720,6 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGw github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= -github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= -github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -779,8 +777,6 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= -github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -829,15 +825,13 @@ github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 h1:TmHmbvxPmaegwhDubVz0lICL0J5Ka2vwTzhoePEXsGE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0/go.mod h1:qztMSjm835F2bXf+5HKAPIS5qsmQDqZna/PgVt4rWtI= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= -github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/intel/goresctrl v0.9.0 h1:IKI4ZrPTazLyFgdnWEkR9LS+DDATapOgoBtGxVMHePs= github.com/intel/goresctrl v0.9.0/go.mod h1:1S8GDqL46GuKb525bxNhIEEkhf4rhVcbSf9DuKhp7mw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -855,6 +849,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/knqyf263/go-plugin v0.8.1-0.20240827022226-114c6257e441 h1:Q/sZeuWkXprbKJSs7AwXryuZKSEL/a8ltC7e7xSspN0= github.com/knqyf263/go-plugin v0.8.1-0.20240827022226-114c6257e441/go.mod h1:CvCrNDMiKFlAlLFLmcoEfsTROEfNKbEZAMMrwQnLXCM= @@ -868,6 +864,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= @@ -906,15 +904,15 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= -github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= -github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= @@ -934,11 +932,13 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -965,6 +965,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.etcd.io/etcd/client/pkg/v3 v3.6.0 h1:nchnPqpuxvv3UuGGHaz0DQKYi5EIW5wOYsgUNRc365k= +go.etcd.io/etcd/client/pkg/v3 v3.6.0/go.mod h1:Jv5SFWMnGvIBn8o3OaBq/PnT0jjsX8iNokAUessNjoA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -973,27 +975,39 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= -go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx69E/Y/DcQSMPpKANYVMQ7fBA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 h1:5pojmb1U1AogINhN3SurB+zm/nIcusopeBNp42f45QM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0/go.mod h1:57gTHJSE5S1tqg+EKsLPlTWhpHMsWlVmer+LA926XiA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 h1:wpMfgF8E1rkrT1Z6meFh1NDtownE9Ii3n3X2GJYjsaU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0/go.mod h1:wAy0T/dUbs468uOlkT31xjvqQgEVXv58BRFWEgn5v/0= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= -go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= -go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= -go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= +go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -1018,8 +1032,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4= -golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1573,12 +1587,12 @@ google.golang.org/genproto v0.0.0-20230525234025-438c736192d0/go.mod h1:9ExIQyXL google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= google.golang.org/genproto/googleapis/api v0.0.0-20230525234020-1aefcd67740a/go.mod h1:ts19tUU+Z0ZShN1y3aPyq2+O3d5FUNNgT6FtOzmrNn8= google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb h1:p31xT4yrYrSM/G4Sn2+TNUkVhFCbG9y8itM2S6Th950= +google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:jbe3Bkdp+Dh2IrslsFCklNhweNTBgSYanP1UXhJDhKg= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234015-3fc162c6f38a/go.mod h1:xURIpW9ES5+/GZhnV6beoEtxQrnkRGIfP5VQG2tCBLc= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1620,8 +1634,8 @@ google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5v google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= -google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= +google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -1653,8 +1667,6 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= @@ -1666,24 +1678,26 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= -k8s.io/api v0.31.2 h1:3wLBbL5Uom/8Zy98GRPXpJ254nEFpl+hwndmk9RwmL0= -k8s.io/api v0.31.2/go.mod h1:bWmGvrGPssSK1ljmLzd3pwCQ9MgoTsRCuK35u6SygUk= +k8s.io/api v0.33.1 h1:tA6Cf3bHnLIrUK4IqEgb2v++/GYUtqiu9sRVk3iBXyw= +k8s.io/api v0.33.1/go.mod h1:87esjTn9DRSRTD4fWMXamiXxJhpOIREjWOSjsW1kEHw= k8s.io/apimachinery v0.33.1 h1:mzqXWV8tW9Rw4VeW9rEkqvnxj59k1ezDUl20tFK/oM4= k8s.io/apimachinery v0.33.1/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= -k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc= -k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs= +k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4= +k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA= k8s.io/code-generator v0.33.1 h1:ZLzIRdMsh3Myfnx9BaooX6iQry29UJjVfVG+BuS+UMw= k8s.io/code-generator v0.33.1/go.mod h1:HUKT7Ubp6bOgIbbaPIs9lpd2Q02uqkMCMx9/GjDrWpY= +k8s.io/dynamic-resource-allocation v0.26.0-beta.0.0.20250531103609-72b57f383cd6 h1:zWaMknWqO5wnZlXmTVoXiiDh5ea2WigyedfXLT0tLNA= +k8s.io/dynamic-resource-allocation v0.26.0-beta.0.0.20250531103609-72b57f383cd6/go.mod h1:MeeByhu5WaKLJnbU0nCGFHOCstWG+mUFKwP5IME+6lQ= k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 h1:2OX19X59HxDprNCVrWi6jb7LW1PoqTlYqEq5H2oetog= k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= -k8s.io/kubelet v0.31.2 h1:6Hytyw4LqWqhgzoi7sPfpDGClu2UfxmPmaiXPC4FRgI= -k8s.io/kubelet v0.31.2/go.mod h1:0E4++3cMWi2cJxOwuaQP3eMBa7PSOvAFgkTPlVc/2FA= -k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= -k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/kubelet v0.33.1 h1:x4LCw1/iZVWOKA4RoITnuB8gMHnw31HPB3S0EF0EexE= +k8s.io/kubelet v0.33.1/go.mod h1:8WpdC9M95VmsqIdGSQrajXooTfT5otEj8pGWOm+KKfQ= +k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 h1:jgJW5IePPXLGB8e/1wvd0Ich9QE97RvvF3a8J3fP/Lg= +k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= @@ -1729,7 +1743,7 @@ sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8 sigs.k8s.io/randfill v0.0.0-20250304075658-069ef1bbf016/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= -sigs.k8s.io/structured-merge-diff/v4 v4.6.0 h1:IUA9nvMmnKWcj5jl84xn+T5MnlZKThmUW1TdblaLVAc= -sigs.k8s.io/structured-merge-diff/v4 v4.6.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= +sigs.k8s.io/structured-merge-diff/v4 v4.7.0 h1:qPeWmscJcXP0snki5IYF79Z8xrl8ETFxgMd7wez1XkI= +sigs.k8s.io/structured-merge-diff/v4 v4.7.0/go.mod h1:dDy58f92j70zLsuZVuUX5Wp9vtxXpaZnkPGWeqDfCps= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 9f63cbae9..f97766f94 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -22,6 +22,7 @@ import ( "os" "sync" + "github.com/containers/nri-plugins/pkg/kubernetes/client" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -33,7 +34,6 @@ import ( "github.com/containers/nri-plugins/pkg/agent/podresapi" "github.com/containers/nri-plugins/pkg/agent/watch" cfgapi "github.com/containers/nri-plugins/pkg/apis/config/v1alpha1" - k8sclient "k8s.io/client-go/kubernetes" logger "github.com/containers/nri-plugins/pkg/log" ) @@ -127,12 +127,11 @@ type Agent struct { kubeConfig string // kubeconfig path configFile string // configuration file to use instead of custom resource - cfgIf ConfigInterface // custom resource access interface - httpCli *http.Client // shared HTTP client - k8sCli *k8sclient.Clientset // kubernetes client - nrtCli *nrtapi.Client // NRT custom resources client - nrtLock sync.Mutex // serialize NRT custom resource updates - podResCli *podresapi.Client // pod resources API client + cfgIf ConfigInterface // custom resource access interface + k8sCli *client.Client // kubernetes client + nrtCli *nrtapi.Client // NRT custom resources client + nrtLock sync.Mutex // serialize NRT custom resource updates + podResCli *podresapi.Client // pod resources API client notifyFn NotifyFn // config resource change notification callback nodeWatch watch.Interface // kubernetes node watch @@ -264,6 +263,18 @@ func (a *Agent) Stop() { } } +func (a *Agent) NodeName() string { + return a.nodeName +} + +func (a *Agent) KubeClient() *client.Client { + return a.k8sCli +} + +func (a *Agent) KubeConfig() string { + return a.kubeConfig +} + var ( defaultConfig = &cfgapi.AgentConfig{ NodeResourceTopology: true, @@ -295,7 +306,8 @@ func (a *Agent) configure(newConfig metav1.Object) { log.Error("failed to setup NRT client: %w", err) break } - cli, err := nrtapi.NewForConfigAndClient(cfg, a.httpCli) + + cli, err := nrtapi.NewForConfigAndClient(cfg, a.k8sCli.HttpClient()) if err != nil { log.Error("failed to setup NRT client: %w", err) break @@ -331,37 +343,28 @@ func (a *Agent) hasLocalConfig() bool { } func (a *Agent) setupClients() error { + var err error + if a.hasLocalConfig() { log.Warn("running with local configuration, skipping cluster access client setup...") return nil } - // Create HTTP/REST client and K8s client on initial startup. Any failure - // to create these is a failure start up. - if a.httpCli == nil { - log.Info("setting up HTTP/REST client...") - restCfg, err := a.getRESTConfig() - if err != nil { - return err - } - - a.httpCli, err = rest.HTTPClientFor(restCfg) - if err != nil { - return fmt.Errorf("failed to setup kubernetes HTTP client: %w", err) - } + a.k8sCli, err = client.New(client.WithKubeOrInClusterConfig(a.kubeConfig)) + if err != nil { + return err + } - log.Info("setting up K8s client...") - a.k8sCli, err = k8sclient.NewForConfigAndClient(restCfg, a.httpCli) - if err != nil { - a.cleanupClients() - return fmt.Errorf("failed to setup kubernetes client: %w", err) - } + a.nrtCli, err = nrtapi.NewForConfigAndClient(a.k8sCli.RestConfig(), a.k8sCli.HttpClient()) + if err != nil { + a.cleanupClients() + return fmt.Errorf("failed to setup NRT client: %w", err) + } - kubeCfg := *restCfg - err = a.cfgIf.SetKubeClient(a.httpCli, &kubeCfg) - if err != nil { - return fmt.Errorf("failed to setup kubernetes config resource client: %w", err) - } + err = a.cfgIf.SetKubeClient(a.k8sCli.HttpClient(), a.k8sCli.RestConfig()) + if err != nil { + a.cleanupClients() + return fmt.Errorf("failed to setup kubernetes config resource client: %w", err) } a.configure(a.currentCfg) @@ -370,10 +373,9 @@ func (a *Agent) setupClients() error { } func (a *Agent) cleanupClients() { - if a.httpCli != nil { - a.httpCli.CloseIdleConnections() + if a.k8sCli != nil { + a.k8sCli.Close() } - a.httpCli = nil a.k8sCli = nil a.nrtCli = nil } diff --git a/pkg/kubernetes/client/client.go b/pkg/kubernetes/client/client.go new file mode 100644 index 000000000..c60243514 --- /dev/null +++ b/pkg/kubernetes/client/client.go @@ -0,0 +1,194 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "errors" + "net/http" + "strings" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// Option is an option that can be applied to a Client. +type Option func(*Client) error + +// Client enacapsulates our Kubernetes client. +type Client struct { + cfg *rest.Config + http *http.Client + *kubernetes.Clientset +} + +// GetConfigForFile returns a REST configuration for the given file. +func GetConfigForFile(kubeConfig string) (*rest.Config, error) { + return clientcmd.BuildConfigFromFlags("", kubeConfig) +} + +// InClusterConfig returns the in-cluster REST configuration. +func InClusterConfig() (*rest.Config, error) { + return rest.InClusterConfig() +} + +// WithKubeConfig returns a Client Option for using the given kubeconfig file. +func WithKubeConfig(file string) Option { + return func(c *Client) error { + cfg, err := GetConfigForFile(file) + if err != nil { + return err + } + return WithRestConfig(cfg)(c) + } +} + +// WithInClusterConfig returns a Client Option for using the in-cluster configuration. +func WithInClusterConfig() Option { + return func(c *Client) error { + cfg, err := rest.InClusterConfig() + if err != nil { + return err + } + return WithRestConfig(cfg)(c) + } +} + +// WithKubeOrInClusterConfig returns a Client Option for using in-cluster configuration +// if a configuration file is not given. +func WithKubeOrInClusterConfig(file string) Option { + if file == "" { + return WithInClusterConfig() + } + return WithKubeConfig(file) +} + +// WithRestConfig returns a Client Option for using the given REST configuration. +func WithRestConfig(cfg *rest.Config) Option { + return func(c *Client) error { + c.cfg = rest.CopyConfig(cfg) + return nil + } +} + +// WithHttpClient returns a Client Option for using/sharing the given HTTP client. +func WithHttpClient(hc *http.Client) Option { + return func(c *Client) error { + c.http = hc + return nil + } +} + +// WithAcceptContentTypes returns a Client Option for setting the accepted content types. +func WithAcceptContentTypes(contentTypes ...string) Option { + return func(c *Client) error { + if c.cfg == nil { + return errRetryWhenConfigSet + } + c.cfg.AcceptContentTypes = strings.Join(contentTypes, ",") + return nil + } +} + +// WithContentType returns a Client Option for setting the wire format content type. +func WithContentType(contentType string) Option { + return func(c *Client) error { + if c.cfg == nil { + return errRetryWhenConfigSet + } + c.cfg.ContentType = contentType + return nil + } +} + +const ( + ContentTypeJSON = "application/json" + ContentTypeProtobuf = "application/vnd.kubernetes.protobuf" +) + +var ( + // returned by options if applied too early, before a configuration is set + errRetryWhenConfigSet = errors.New("retry when client config is set") +) + +// New creates a new Client with the given options. +func New(options ...Option) (*Client, error) { + c := &Client{} + + var retry []Option + for _, o := range options { + if err := o(c); err != nil { + if err == errRetryWhenConfigSet { + retry = append(retry, o) + } else { + return nil, err + } + } + } + + if c.cfg == nil { + if err := WithInClusterConfig()(c); err != nil { + return nil, err + } + } + + for _, o := range retry { + if err := o(c); err != nil { + return nil, err + } + } + + if c.http == nil { + hc, err := rest.HTTPClientFor(c.cfg) + if err != nil { + return nil, err + } + c.http = hc + } + + client, err := kubernetes.NewForConfigAndClient(c.cfg, c.http) + if err != nil { + return nil, err + } + c.Clientset = client + + return c, nil +} + +// RestConfig returns a shallow copy of the REST configuration of the Client. +func (c *Client) RestConfig() *rest.Config { + cfg := *c.cfg + return &cfg +} + +// HttpClient returns the HTTP client of the Client. +func (c *Client) HttpClient() *http.Client { + return c.http +} + +// K8sClient returns the K8s Clientset of the Client. +func (c *Client) K8sClient() *kubernetes.Clientset { + return c.Clientset +} + +// Close closes the Client. +func (c *Client) Close() { + if c.http != nil { + c.http.CloseIdleConnections() + } + c.cfg = nil + c.http = nil + c.Clientset = nil +} diff --git a/pkg/kubernetes/watch/file.go b/pkg/kubernetes/watch/file.go new file mode 100644 index 000000000..bd28c9689 --- /dev/null +++ b/pkg/kubernetes/watch/file.go @@ -0,0 +1,178 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "errors" + "io/fs" + "os" + "path" + "path/filepath" + "sync" + + "github.com/fsnotify/fsnotify" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +// FileClient takes care of unmarshaling file content to a target object. +type FileClient interface { + Unmarshal([]byte, string) (runtime.Object, error) +} + +type fileWatch struct { + dir string + file string + client FileClient + fsw *fsnotify.Watcher + resultC chan Event + stopOnce sync.Once + stopC chan struct{} + doneC chan struct{} +} + +// File creates a k8s-compatible watch for the given file. Contents of the +// file are unmarshalled into the object of choice by the client. The file +// is monitored for changes and corresponding watch events are generated. +func File(client FileClient, file string) (Interface, error) { + absPath, err := filepath.Abs(file) + if err != nil { + return nil, err + } + + fsw, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + if err = fsw.Add(filepath.Dir(absPath)); err != nil { + return nil, err + } + + fw := &fileWatch{ + dir: filepath.Dir(absPath), + file: filepath.Base(absPath), + client: client, + fsw: fsw, + resultC: make(chan Event, k8swatch.DefaultChanSize), + stopC: make(chan struct{}), + doneC: make(chan struct{}), + } + + obj, err := fw.readObject() + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, err + } + + fw.sendEvent(Added, obj) + + go fw.run() + + return fw, nil +} + +// Stop stops the watch. +func (w *fileWatch) Stop() { + w.stopOnce.Do(func() { + close(w.stopC) + <-w.doneC + w.stopC = nil + }) +} + +// ResultChan returns the channel for receiving events from the watch. +func (w *fileWatch) ResultChan() <-chan Event { + return w.resultC +} + +func (w *fileWatch) run() { + for { + select { + case <-w.stopC: + w.fsw.Close() + close(w.resultC) + close(w.doneC) + return + + case e, ok := <-w.fsw.Events: + log.Debug("%s got event %+v", w.name(), e) + + if !ok { + w.sendEvent( + Error, + &metav1.Status{ + Status: metav1.StatusFailure, + Message: "failed to receive fsnotify event", + }, + ) + close(w.resultC) + close(w.doneC) + return + } + + if path.Base(e.Name) != w.file { + continue + } + + switch { + case (e.Op & (fsnotify.Create | fsnotify.Write)) != 0: + obj, err := w.readObject() + if err != nil { + log.Debug("%s failed to read/unmarshal: %v", w.name(), err) + continue + } + + if (e.Op & fsnotify.Create) != 0 { + w.sendEvent(Added, obj) + } else { + w.sendEvent(Added, obj) + } + + case (e.Op & (fsnotify.Remove | fsnotify.Rename)) != 0: + w.sendEvent(Deleted, nil) + } + } + } +} + +func (w *fileWatch) sendEvent(t EventType, obj runtime.Object) { + select { + case w.resultC <- Event{Type: t, Object: obj}: + default: + log.Warn("failed to deliver fileWatch %v event", t) + } +} + +func (w *fileWatch) readObject() (runtime.Object, error) { + file := path.Join(w.dir, w.file) + data, err := os.ReadFile(file) + if err != nil { + return nil, err + } + + obj, err := w.client.Unmarshal(data, file) + if err != nil { + return nil, err + } + + log.Debug("%s read object %+v", w.name(), obj) + + return obj, nil +} + +func (w *fileWatch) name() string { + return path.Join("filewatch/path:", path.Join(w.dir, w.file)) +} diff --git a/pkg/kubernetes/watch/object.go b/pkg/kubernetes/watch/object.go new file mode 100644 index 000000000..eefe3fed4 --- /dev/null +++ b/pkg/kubernetes/watch/object.go @@ -0,0 +1,160 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + "path" + "sync" + "time" + + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +const ( + createDelay = 1 * time.Second +) + +type watch struct { + sync.Mutex + subject string + client ObjectClient + w Interface + resultC chan Event + createC <-chan time.Time + + stopOnce sync.Once + stopC chan struct{} + doneC chan struct{} +} + +// ObjectClient takes care of low-level details of creating a wrapped watch. +type ObjectClient interface { + CreateWatch() (Interface, error) +} + +// Object creates a wrapped watch using the given client. The watch +// is transparently recreated upon expiration and any errors. +func Object(client ObjectClient, subject string) (Interface, error) { + w := &watch{ + subject: subject, + client: client, + resultC: make(chan Event, k8swatch.DefaultChanSize), + stopC: make(chan struct{}), + doneC: make(chan struct{}), + } + + if err := w.create(); err != nil { + return nil, err + } + + go w.run() + + return w, nil +} + +// Stop stops the watch. +func (w *watch) Stop() { + w.stopOnce.Do(func() { + close(w.stopC) + <-w.doneC + w.stop() + w.stopC = nil + }) +} + +// ResultChan returns the channel for receiving events from the watch. +func (w *watch) ResultChan() <-chan Event { + return w.resultC +} + +func (w *watch) eventChan() <-chan Event { + if w.w == nil { + return nil + } + + return w.w.ResultChan() +} + +func (w *watch) run() { + for { + select { + case <-w.stopC: + close(w.resultC) + close(w.doneC) + return + + case e, ok := <-w.eventChan(): + if !ok { + log.Debug("%s failed...", w.name()) + w.stop() + w.scheduleCreate() + continue + } + + if e.Type == Error { + log.Debug("%s failed with an error...", w.name()) + w.stop() + w.scheduleCreate() + continue + } + + select { + case w.resultC <- e: + default: + log.Debug("%s failed to deliver %v event", w.name(), e.Type) + w.stop() + w.scheduleCreate() + } + + case <-w.createC: + w.createC = nil + log.Debug("reopening %s...", w.name()) + if err := w.create(); err != nil { + w.scheduleCreate() + } + } + } +} + +func (w *watch) create() error { + w.stop() + + wif, err := w.client.CreateWatch() + if err != nil { + return err + } + w.w = wif + + return nil +} + +func (w *watch) scheduleCreate() { + if w.createC == nil { + w.createC = time.After(createDelay) + } +} + +func (w *watch) stop() { + if w.w == nil { + return + } + + w.w.Stop() + w.w = nil +} + +func (w *watch) name() string { + return path.Join("watch", w.subject) +} diff --git a/pkg/kubernetes/watch/watch.go b/pkg/kubernetes/watch/watch.go new file mode 100644 index 000000000..095215211 --- /dev/null +++ b/pkg/kubernetes/watch/watch.go @@ -0,0 +1,38 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch + +import ( + logger "github.com/containers/nri-plugins/pkg/log" + k8swatch "k8s.io/apimachinery/pkg/watch" +) + +type ( + Interface = k8swatch.Interface + EventType = k8swatch.EventType + Event = k8swatch.Event +) + +const ( + Added = k8swatch.Added + Modified = k8swatch.Modified + Deleted = k8swatch.Deleted + Bookmark = k8swatch.Bookmark + Error = k8swatch.Error +) + +var ( + log = logger.Get("watch") +) diff --git a/pkg/log/yaml-formatter.go b/pkg/log/yaml-formatter.go new file mode 100644 index 000000000..ca6b581fb --- /dev/null +++ b/pkg/log/yaml-formatter.go @@ -0,0 +1,41 @@ +// Copyright 2019-2020 Intel Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "sigs.k8s.io/yaml" +) + +func AsYaml(o any) *YamlFormatter { + return &YamlFormatter{o: o} +} + +type YamlFormatter struct { + o any +} + +func (f *YamlFormatter) String() string { + if f == nil || f.o == nil { + return "" + } + + // Use a YAML marshaller to format the object. + data, err := yaml.Marshal(f.o) + if err != nil { + return "" + } + + return string(data) +} diff --git a/pkg/resmgr/cache/cache.go b/pkg/resmgr/cache/cache.go index e89b12cd4..d59738417 100644 --- a/pkg/resmgr/cache/cache.go +++ b/pkg/resmgr/cache/cache.go @@ -197,6 +197,8 @@ type Container interface { GetAnnotation(key string, objPtr interface{}) (string, bool) // GetEnv returns the value of a container environment variable. GetEnv(string) (string, bool) + // GetEnvList return the list of environment variables for the container. + GetEnvList() []string // GetMounts returns all the mounts of the container. GetMounts() []*Mount // GetDevices returns all the linux devices of the container. @@ -420,6 +422,11 @@ type Cache interface { // GetPolicyEntry gets the policy entry for a key. GetPolicyEntry(string, interface{}) bool + // SetEntry sets the entry for a key. + SetEntry(string, interface{}) + // GetEntry gets the entry for a key. + GetEntry(string, interface{}) (interface{}, error) + // Save requests a cache save. Save() error @@ -472,6 +479,8 @@ type cache struct { PolicyName string // name of the active policy policyData map[string]interface{} // opaque policy data PolicyJSON map[string]string // ditto in raw, unmarshaled form + entryData map[string]interface{} // opaque cached data + entryJSON map[string]string // ditto in raw, unmarshaled form pending map[string]struct{} // cache IDs of containers with pending changes @@ -497,6 +506,8 @@ func NewCache(options Options) (Cache, error) { NextID: 1, policyData: make(map[string]interface{}), PolicyJSON: make(map[string]string), + entryData: make(map[string]interface{}), + entryJSON: make(map[string]string), implicit: make(map[string]ImplicitAffinity), } @@ -801,6 +812,30 @@ func (cch *cache) GetContainers() []Container { return containers } +// Set the entry for a key. +func (cch *cache) SetEntry(key string, obj interface{}) { + cch.entryData[key] = obj +} + +var ErrNoEntry = errors.New("no cached entry found") + +// Get the entry for a key. +func (cch *cache) GetEntry(key string, ptr interface{}) (interface{}, error) { + if obj, ok := cch.entryData[key]; ok { + return obj, nil + } + if data, ok := cch.entryJSON[key]; ok { + err := json.Unmarshal([]byte(data), ptr) + if err != nil { + return nil, cacheError("failed to unmarshal entry for key '%s': %w", key, err) + } + cch.entryData[key] = ptr + return ptr, nil + } + + return nil, ErrNoEntry +} + // Set the policy entry for a key. func (cch *cache) SetPolicyEntry(key string, obj interface{}) { cch.policyData[key] = obj @@ -1064,6 +1099,7 @@ type snapshot struct { Pods map[string]*pod Containers map[string]*container NextID uint64 + Entries map[string]string PolicyName string PolicyJSON map[string]string } @@ -1077,6 +1113,7 @@ func (cch *cache) Snapshot() ([]byte, error) { NextID: cch.NextID, PolicyName: cch.PolicyName, PolicyJSON: cch.PolicyJSON, + Entries: make(map[string]string), } for id, p := range cch.Pods { @@ -1096,6 +1133,14 @@ func (cch *cache) Snapshot() ([]byte, error) { s.PolicyJSON[key] = string(data) } + for key, obj := range cch.entryData { + data, err := json.Marshal(obj) + if err != nil { + return nil, cacheError("failed to marshal entry '%s': %v", key, err) + } + s.Entries[key] = string(data) + } + data, err := json.Marshal(s) if err != nil { return nil, cacheError("failed to marshal cache: %v", err) @@ -1110,6 +1155,7 @@ func (cch *cache) Restore(data []byte) error { Pods: make(map[string]*pod), Containers: make(map[string]*container), PolicyJSON: make(map[string]string), + Entries: make(map[string]string), } if err := json.Unmarshal(data, &s); err != nil { @@ -1127,6 +1173,8 @@ func (cch *cache) Restore(data []byte) error { cch.PolicyJSON = s.PolicyJSON cch.PolicyName = s.PolicyName cch.policyData = make(map[string]interface{}) + cch.entryData = make(map[string]interface{}) + cch.entryJSON = s.Entries for _, p := range cch.Pods { p.cache = cch diff --git a/pkg/resmgr/cache/container.go b/pkg/resmgr/cache/container.go index b7f87c9b4..0d90112fc 100644 --- a/pkg/resmgr/cache/container.go +++ b/pkg/resmgr/cache/container.go @@ -427,6 +427,10 @@ func (c *container) GetEnv(key string) (string, bool) { return "", false } +func (c *container) GetEnvList() []string { + return slices.Clone(c.Ctr.GetEnv()) +} + func (c *container) GetMounts() []*Mount { var mounts []*Mount diff --git a/pkg/resmgr/dra.go b/pkg/resmgr/dra.go new file mode 100644 index 000000000..cddd552e8 --- /dev/null +++ b/pkg/resmgr/dra.go @@ -0,0 +1,478 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resmgr + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/containers/nri-plugins/pkg/kubernetes/client" + logger "github.com/containers/nri-plugins/pkg/log" + "github.com/containers/nri-plugins/pkg/resmgr/cache" + system "github.com/containers/nri-plugins/pkg/sysfs" + "google.golang.org/grpc" + resapi "k8s.io/api/resource/v1beta2" + "k8s.io/apimachinery/pkg/types" + "k8s.io/dynamic-resource-allocation/kubeletplugin" + "k8s.io/dynamic-resource-allocation/resourceslice" +) + +type draPlugin struct { + driverName string + nodeName string + resmgr *resmgr + cancel context.CancelFunc + plugin *kubeletplugin.Helper + publishCh chan<- *resourceslice.DriverResources + claims savedClaims +} + +type UID = types.UID + +var ( + dra = logger.NewLogger("dra-driver") +) + +const ( + driverName = "native.cpu" + driverKind = driverName + "/device" + cdiVersion = "0.7.0" + cdiEnvVar = "DRA_CPU" +) + +func (resmgr *resmgr) publishCPUs(cpuIDs []system.ID) error { + if resmgr.dra == nil { + return fmt.Errorf("can't publish CPUs as DRA devices, no DRA plugin") + } + + if err := resmgr.dra.writeCDISpecFile(opt.HostRoot, cpuIDs); err != nil { + log.Errorf("failed to write CDI Spec file: %v", err) + return err + } + + cpuDevices := resmgr.system.CPUsAsDRADevices(cpuIDs) + if err := resmgr.dra.PublishResources(context.Background(), cpuDevices); err != nil { + log.Errorf("failed to publish DRA resources: %v", err) + return err + } + + return nil +} + +func newDRAPlugin(resmgr *resmgr) (*draPlugin, error) { + driverPath := filepath.Join(kubeletplugin.KubeletPluginsDir, driverName) + if err := os.MkdirAll(driverPath, 0750); err != nil { + return nil, fmt.Errorf("failed to create driver directory %s: %w", driverPath, err) + } + + p := &draPlugin{ + driverName: driverName, + nodeName: resmgr.agent.NodeName(), + resmgr: resmgr, + claims: make(map[UID]*draClaim), + } + + p.restoreClaims() + + return p, nil +} + +func (p *draPlugin) start() error { + p.run() + return nil +} + +func (p *draPlugin) run() { + var ( + ctx, cancel = context.WithCancel(context.Background()) + publishCh = make(chan *resourceslice.DriverResources, 1) + ) + + go func() { + for { + var resources *resourceslice.DriverResources + + select { + case <-ctx.Done(): + return + case r, ok := <-publishCh: + if !ok { + return + } + resources = r + } + + if p.plugin == nil { + if err := p.connect(); err != nil { + log.Errorf("failed start DRA plugin: %v", err) + continue + } + } + + if p.plugin != nil { + if err := p.plugin.PublishResources(ctx, *resources); err != nil { + log.Errorf("failed to publish DRA resources: %v", err) + } else { + log.Infof("published DRA resources, using %d pool(s)...", len(resources.Pools)) + } + } + + resources = nil + } + }() + + p.cancel = cancel + p.publishCh = publishCh +} + +func (p *draPlugin) connect() error { + kubeClient, err := client.New( + client.WithKubeOrInClusterConfig(p.resmgr.agent.KubeConfig()), + client.WithAcceptContentTypes(client.ContentTypeProtobuf, client.ContentTypeJSON), + client.WithContentType(client.ContentTypeProtobuf), + ) + if err != nil { + return fmt.Errorf("can't create kube client for DRA plugin: %w", err) + } + + options := []kubeletplugin.Option{ + kubeletplugin.DriverName(p.driverName), + kubeletplugin.NodeName(p.nodeName), + kubeletplugin.KubeClient(kubeClient.Clientset), + kubeletplugin.GRPCInterceptor(p.unaryInterceptor), + } + + log.Infof("using DRA driverName=%s nodeName=%s", p.driverName, p.nodeName) + + plugin, err := kubeletplugin.Start(context.Background(), p, options...) + if err != nil { + return fmt.Errorf("failed to start DRA plugin: %w", err) + } + + p.plugin = plugin + return nil +} + +func (p *draPlugin) stop() { + if p == nil { + return + } + + if p.plugin != nil { + p.plugin.Stop() + } + if p.cancel != nil { + p.cancel() + } + + p.plugin = nil + p.cancel = nil +} + +func (p *draPlugin) unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + dra.Info("=> gRPC call %v, handler %v\n", *info, handler) + rpl, err := handler(ctx, req) + dra.Info("<= gRPC reply: %+v, %v\n", rpl, err) + return rpl, err +} + +func (p *draPlugin) IsRegistered() (bool, error) { + if p == nil || p.plugin == nil { + return false, errors.New("DRA plugin is not initialized") + } + + status := p.plugin.RegistrationStatus() + if status == nil { + return false, nil + } + + var err error + if status.Error != "" { + err = errors.New(status.Error) + } + + return status.PluginRegistered, err +} + +func (p *draPlugin) PublishResources(ctx context.Context, devices []resapi.Device) error { + resources := resourceslice.DriverResources{ + Pools: make(map[string]resourceslice.Pool), + } + + maxPerPool := resapi.ResourceSliceMaxDevices + for n := len(devices); n > 0; n = len(devices) { + if n > maxPerPool { + n = maxPerPool + } + resources.Pools["pool"+strconv.Itoa(len(resources.Pools))] = resourceslice.Pool{ + Slices: []resourceslice.Slice{ + { + Devices: devices[:n], + }, + }, + } + devices = devices[n:] + } + + log.Infof("publishing DRA resources, using %d pool(s)...", len(resources.Pools)) + + select { + case p.publishCh <- &resources: + return nil + default: + } + + return fmt.Errorf("failed to publish resources, failed to send on channel") +} + +func (p *draPlugin) writeCDISpecFile(hostRoot string, cpuIDs []system.ID) error { + spec := bytes.NewBuffer(nil) + fmt.Fprintf(spec, "cdiVersion: %s\nkind: %s\ndevices:\n", cdiVersion, driverKind) + for _, id := range cpuIDs { + fmt.Fprintf(spec, " - name: cpu%d\n", id) + fmt.Fprintf(spec, " containerEdits:\n") + fmt.Fprintf(spec, " env:\n") + fmt.Fprintf(spec, " - %s%d=1\n", cdiEnvVar, id) + } + + dir := filepath.Join(hostRoot, "/var/run/cdi") + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create CDI Spec directory: %w", err) + } + + if err := os.WriteFile(filepath.Join(dir, driverName+".yaml"), spec.Bytes(), 0644); err != nil { + return fmt.Errorf("failed to write CDI Spec file: %w", err) + } + + return nil +} + +func (p *draPlugin) PrepareResourceClaims(ctx context.Context, claims []*resapi.ResourceClaim) (map[UID]kubeletplugin.PrepareResult, error) { + log.Infof("should prepare %d claims:", len(claims)) + + undoAndErrOut := func(err error, release []*resapi.ResourceClaim) error { + for _, c := range release { + if relc := p.claims.del(c.UID); relc != nil { + if undoErr := p.resmgr.policy.ReleaseClaim(relc); undoErr != nil { + log.Error("rollback error, failed to release claim %s: %v", c.UID, undoErr) + } + } + } + return err + } + + defer func() { + if err := p.saveClaims(); err != nil { + log.Error("failed to save claims: %v", err) + } + }() + p.resmgr.Lock() + defer p.resmgr.Unlock() + + result := make(map[UID]kubeletplugin.PrepareResult) + + for i, c := range claims { + if c == nil { + continue + } + + dra.Debug(" - claim #%d:", i) + specHdr := fmt.Sprintf(" ", i) + statusHdr := fmt.Sprintf(" ", i) + dra.DebugBlock(specHdr, "%s", logger.AsYaml(c.Spec)) + dra.DebugBlock(statusHdr, "%s", logger.AsYaml(c.Status)) + + if old, ok := p.claims.get(c.UID); ok { + log.Infof("claim %q already prepared, reusing it", c.UID) + result[c.UID] = *(old.GetResult()) + continue + } + + claim := &draClaim{ResourceClaim: c} + + if err := p.resmgr.policy.AllocateClaim(claim); err != nil { + log.Error("failed to prepare claim %q: %v", c.UID, err) + return nil, undoAndErrOut(err, claims[:i]) + } + + result[claim.GetUID()] = *(claim.GetResult()) + p.claims.add(claim) + } + + return result, nil +} + +func (p *draPlugin) UnprepareResourceClaims(ctx context.Context, claims []kubeletplugin.NamespacedObject) (map[UID]error, error) { + + log.Infof("should un-prepare %d claims:", len(claims)) + + defer func() { + if err := p.saveClaims(); err != nil { + log.Error("failed to save claims: %v", err) + } + }() + p.resmgr.Lock() + defer p.resmgr.Unlock() + + result := make(map[UID]error) + + for _, c := range claims { + log.Infof(" - un-claim %+v", c) + + if claim := p.claims.del(c.UID); claim != nil { + if err := p.resmgr.policy.ReleaseClaim(claim); err != nil { + log.Errorf("failed to release claim %s: %v", claim, err) + } + } + + result[c.UID] = nil + } + + return result, nil +} + +func (p *draPlugin) ErrorHandler(ctx context.Context, err error, msg string) { + log.Errorf("resource slice publishing error: %v (%s)", err, msg) +} + +func (p *draPlugin) saveClaims() error { + p.resmgr.cache.SetEntry("claims", p.claims) + return p.resmgr.cache.Save() +} + +func (p *draPlugin) restoreClaims() { + claims := make(savedClaims) + restored, err := p.resmgr.cache.GetEntry("claims", &claims) + + if err != nil { + if err != cache.ErrNoEntry { + log.Error("failed to restore claims: %v", err) + } + p.claims = make(savedClaims) + } else { + if restored == nil { + p.claims = make(savedClaims) + } else { + p.claims = *restored.(*savedClaims) + } + } +} + +type draClaim struct { + *resapi.ResourceClaim + pods []UID + devs []system.ID +} + +func (c *draClaim) GetUID() UID { + if c == nil || c.ResourceClaim == nil { + return "" + } + return c.UID +} + +func (c *draClaim) GetPods() []UID { + if c == nil || c.ResourceClaim == nil { + return nil + } + if c.pods != nil { + return c.pods + } + + var pods []UID + for _, r := range c.Status.ReservedFor { + if r.Resource == "pods" { + pods = append(pods, r.UID) + } + } + c.pods = pods + + return c.pods +} + +func (c *draClaim) GetDevices() []system.ID { + if c == nil || c.ResourceClaim == nil { + return nil + } + + if c.devs != nil { + return c.devs + } + + var ids []system.ID + for _, r := range c.Status.Allocation.Devices.Results { + num := strings.TrimPrefix(r.Device, "cpu") + i, err := strconv.ParseInt(num, 10, 32) + if err != nil { + log.Errorf("failed to parse CPU ID %q: %v", num, err) + continue + } + ids = append(ids, system.ID(i)) + } + c.devs = ids + + return c.devs +} + +func (c *draClaim) GetResult() *kubeletplugin.PrepareResult { + result := &kubeletplugin.PrepareResult{} + for _, alloc := range c.Status.Allocation.Devices.Results { + result.Devices = append(result.Devices, + kubeletplugin.Device{ + Requests: []string{alloc.Request}, + DeviceName: alloc.Device, + CDIDeviceIDs: []string{driverKind + "=" + alloc.Device}, + }) + } + return result +} + +func (c *draClaim) String() string { + return fmt.Sprintf("", + c.GetUID(), c.GetDevices(), c.GetPods()) +} + +func (c *draClaim) MarshalJSON() ([]byte, error) { + return json.Marshal(c.ResourceClaim) +} + +func (c *draClaim) UnmarshalJSON(b []byte) error { + c.ResourceClaim = &resapi.ResourceClaim{} + return json.Unmarshal(b, c.ResourceClaim) +} + +type savedClaims map[UID]*draClaim + +func (s *savedClaims) add(c *draClaim) { + (*s)[c.UID] = c +} + +func (s *savedClaims) del(uid UID) *draClaim { + c := (*s)[uid] + delete(*s, uid) + return c +} + +func (s *savedClaims) get(uid UID) (*draClaim, bool) { + c, ok := (*s)[uid] + return c, ok +} diff --git a/pkg/resmgr/policy/policy.go b/pkg/resmgr/policy/policy.go index 927005e08..b2b9a83f4 100644 --- a/pkg/resmgr/policy/policy.go +++ b/pkg/resmgr/policy/policy.go @@ -20,6 +20,7 @@ import ( "sort" "k8s.io/apimachinery/pkg/api/resource" + apitypes "k8s.io/apimachinery/pkg/types" "github.com/containers/nri-plugins/pkg/resmgr/cache" "github.com/containers/nri-plugins/pkg/resmgr/events" @@ -56,6 +57,8 @@ type ConstraintSet map[Domain]Constraint type Options struct { // SendEvent is the function for delivering events back to the resource manager. SendEvent SendEventFn + // PublishCPUs is the function for publishing CPUs as DRA devices. + PublishCPUs PublishCPUFn } // BackendOptions describes the options for a policy backend instance @@ -66,6 +69,8 @@ type BackendOptions struct { Cache cache.Cache // SendEvent is the function for delivering events up to the resource manager. SendEvent SendEventFn + // PublishCPUs publishes CPUs as DRA devices. + PublishCPUs PublishCPUFn // Config is the policy-specific configuration. Config interface{} } @@ -76,6 +81,22 @@ type CreateFn func(*BackendOptions) Backend // SendEventFn is the type for a function to send events back to the resource manager. type SendEventFn func(interface{}) error +// PublishCPUFn is the type for a function to publish CPUs as DRA devices. +type PublishCPUFn func([]ID) error + +// Claim represents a DRA Resource Claim, for CPUs. +type Claim interface { + GetUID() UID + GetPods() []UID + GetDevices() []ID + String() string +} + +type ( + UID = apitypes.UID + ID = system.ID +) + const ( // ExportedResources is the basename of the file container resources are exported to. ExportedResources = "resources.sh" @@ -85,6 +106,8 @@ const ( ExportIsolatedCPUs = "ISOLATED_CPUS" // ExportExclusiveCPUs is the shell variable used to export exclusive container CPUs. ExportExclusiveCPUs = "EXCLUSIVE_CPUS" + // ExportClaimedCPUs is the shell variable used to export DRA-claimed container CPUs. + ExportClaimedCPUs = "CLAIMED_CPUS" ) // Backend is the policy (decision making logic) interface exposed by implementations. @@ -110,6 +133,10 @@ type Backend interface { ReleaseResources(cache.Container) error // UpdateResources updates resource allocations of a container. UpdateResources(cache.Container) error + // AllocateClaim allocates CPUs for the claim. + AllocateClaim(Claim) error + // ReleaseClaim releases CPUs of the claim. + ReleaseClaim(Claim) error // HandleEvent processes the given event. The returned boolean indicates whether // changes have been made to any of the containers while handling the event. HandleEvent(*events.Policy) (bool, error) @@ -125,6 +152,8 @@ type Backend interface { type Policy interface { // ActivePolicy returns the name of the policy backend in use. ActivePolicy() string + // System returns the sysfs instance used by the policy. + System() system.System // Start starts up policy, prepare for serving resource management requests. Start(interface{}) error // Reconfigure the policy. @@ -137,6 +166,10 @@ type Policy interface { ReleaseResources(cache.Container) error // UpdateResources updates resource allocations of a container. UpdateResources(cache.Container) error + // AllocateClaim allocates CPUs for the claim. + AllocateClaim(Claim) error + // ReleaseClaim releases CPUs of the claim. + ReleaseClaim(Claim) error // HandleEvent passes on the given event to the active policy. The returned boolean // indicates whether changes have been made to any of the containers while handling // the event. @@ -259,15 +292,20 @@ func (p *policy) ActivePolicy() string { return "" } +func (p *policy) System() system.System { + return p.system +} + // Start starts up policy, preparing it for serving requests. func (p *policy) Start(cfg interface{}) error { log.Info("activating '%s' policy...", p.active.Name()) if err := p.active.Setup(&BackendOptions{ - Cache: p.cache, - System: p.system, - SendEvent: p.options.SendEvent, - Config: cfg, + Cache: p.cache, + System: p.system, + SendEvent: p.options.SendEvent, + PublishCPUs: p.options.PublishCPUs, + Config: cfg, }); err != nil { return err } @@ -300,6 +338,16 @@ func (p *policy) UpdateResources(c cache.Container) error { return p.active.UpdateResources(c) } +// AllocateClaim alloctes CPUs for the claim. +func (p *policy) AllocateClaim(claim Claim) error { + return p.active.AllocateClaim(claim) +} + +// ReleaseClaim releases CPUs of the claim. +func (p *policy) ReleaseClaim(claim Claim) error { + return p.active.ReleaseClaim(claim) +} + // HandleEvent passes on the given event to the active policy. func (p *policy) HandleEvent(e *events.Policy) (bool, error) { return p.active.HandleEvent(e) diff --git a/pkg/resmgr/resource-manager.go b/pkg/resmgr/resource-manager.go index 41d5c7e5e..844c30f75 100644 --- a/pkg/resmgr/resource-manager.go +++ b/pkg/resmgr/resource-manager.go @@ -53,6 +53,7 @@ type resmgr struct { agent *agent.Agent cfg cfgapi.ResmgrConfig cache cache.Cache // cached state + system sysfs.System // sysfs in use policy policy.Policy // resource manager policy control control.Control // policy controllers/enforcement events chan interface{} // channel for delivering events @@ -60,6 +61,7 @@ type resmgr struct { nri *nriPlugin // NRI plugins, if we're running as such rdt *rdtControl // control for RDT allocation and monitoring blkio *blkioControl // control for block I/O prioritization and throttling + dra *draPlugin // DRA plugin, if we have one running bool } @@ -94,7 +96,7 @@ func NewResourceManager(backend policy.Backend, agt *agent.Agent) (ResourceManag return nil, err } - log.Info("running as an NRI plugin...") + log.Info("creating NRI plugin...") nrip, err := newNRIPlugin(m) if err != nil { return nil, err @@ -105,6 +107,13 @@ func NewResourceManager(backend policy.Backend, agt *agent.Agent) (ResourceManag return nil, err } + log.Info("creating DRA plugin...") + drap, err := newDRAPlugin(m) + if err != nil { + return nil, err + } + m.dra = drap + if err := m.setupEventProcessing(); err != nil { return nil, err } @@ -188,6 +197,10 @@ func (m *resmgr) start(cfg cfgapi.ResmgrConfig) error { return err } + if err := m.dra.start(); err != nil { + return err + } + if err := m.policy.Start(m.cfg.PolicyConfig()); err != nil { return err } @@ -228,6 +241,7 @@ func (m *resmgr) Stop() { defer m.Unlock() m.nri.stop() + m.dra.stop() } // setupCache creates a cache and reloads its last saved state if found. @@ -255,10 +269,17 @@ func (m *resmgr) setupPolicy(backend policy.Backend) error { log.Warnf("failed to set active policy: %v", err) } - p, err := policy.NewPolicy(backend, m.cache, &policy.Options{SendEvent: m.SendEvent}) + p, err := policy.NewPolicy(backend, m.cache, + &policy.Options{ + SendEvent: m.SendEvent, + PublishCPUs: m.publishCPUs, + }, + ) if err != nil { return resmgrError("failed to create policy %s: %v", backend.Name(), err) } + + m.system = p.System() m.policy = p return nil diff --git a/pkg/sysfs/dra.go b/pkg/sysfs/dra.go new file mode 100644 index 000000000..4c97bc333 --- /dev/null +++ b/pkg/sysfs/dra.go @@ -0,0 +1,119 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sysfs + +import ( + "fmt" + "strconv" + + idset "github.com/intel/goresctrl/pkg/utils" + resapi "k8s.io/api/resource/v1beta2" +) + +type ( + ID = idset.ID + QualifiedName = resapi.QualifiedName + Attribute = resapi.DeviceAttribute +) + +const ( + // Names for standard CPU device attributes. + AttrPackage = QualifiedName("package") + AttrDie = QualifiedName("die") + AttrCluster = QualifiedName("cluster") + AttrCore = QualifiedName("core") + AttrCoreType = QualifiedName("coreType") + AttrLocalMemory = QualifiedName("localMemory") + AttrIsolated = QualifiedName("isolated") + AttrMinFreq = QualifiedName("minFreq") + AttrMaxFreq = QualifiedName("maxFreq") + AttrBaseFreq = QualifiedName("baseFreq") +) + +// CPUsAsDRADevices returns the given CPUs as DRA devices. +func (sys *system) CPUsAsDRADevices(ids []ID) []resapi.Device { + devices := make([]resapi.Device, 0, len(ids)) + for _, id := range ids { + devices = append(devices, *(sys.CPU(id).DRA())) + } + return devices +} + +// DRA returns the CPU represented as a DRA device. +func (c *cpu) DRA(extras ...map[QualifiedName]Attribute) *resapi.Device { + dra := &resapi.Device{ + Name: "cpu" + strconv.Itoa(c.ID()), + Attributes: map[QualifiedName]Attribute{ + AttrPackage: Attr(c.PackageID()), + AttrDie: Attr(c.DieID()), + AttrCluster: Attr(c.ClusterID()), + AttrCore: Attr(c.CoreID()), + AttrCoreType: Attr(c.CoreKind().String()), + AttrLocalMemory: Attr(c.NodeID()), + AttrIsolated: Attr(c.Isolated()), + }, + } + + if base := c.FrequencyRange().Base; base > 0 { + dra.Attributes[AttrBaseFreq] = Attr(base) + } + if min := c.FrequencyRange().Min; min > 0 { + dra.Attributes[AttrMinFreq] = Attr(min) + } + if max := c.FrequencyRange().Max; max > 0 { + dra.Attributes[AttrMaxFreq] = Attr(max) + } + + for idx, cache := range c.GetCaches() { + dra.Attributes[QualifiedName(fmt.Sprintf("cache%dID", idx))] = Attr(cache.EnumID()) + } + + for _, m := range extras { + for name, value := range m { + if _, ok := dra.Attributes[name]; !ok { + dra.Attributes[name] = value + } + } + } + + return dra +} + +// Attr returns an attribute for the given value. +func Attr(value any) Attribute { + switch v := any(value).(type) { + case int64: + return Attribute{IntValue: &v} + case int: + val := int64(v) + return Attribute{IntValue: &val} + case uint64: + val := int64(v) + return Attribute{IntValue: &val} + case int32: + val := int64(v) + return Attribute{IntValue: &val} + case uint32: + val := int64(v) + return Attribute{IntValue: &val} + case string: + return Attribute{StringValue: &v} + case bool: + return Attribute{BoolValue: &v} + default: + val := fmt.Sprintf("", value) + return Attribute{StringValue: &val} + } +} diff --git a/pkg/sysfs/system.go b/pkg/sysfs/system.go index bb4ff0f4c..02e067bbf 100644 --- a/pkg/sysfs/system.go +++ b/pkg/sysfs/system.go @@ -27,6 +27,7 @@ import ( "strings" "github.com/containers/nri-plugins/pkg/utils/cpuset" + resapi "k8s.io/api/resource/v1beta2" logger "github.com/containers/nri-plugins/pkg/log" "github.com/containers/nri-plugins/pkg/utils" @@ -134,6 +135,8 @@ type System interface { Isolated() cpuset.CPUSet NodeHintToCPUs(string) string + + CPUsAsDRADevices(ids []idset.ID) []resapi.Device } // System devices @@ -230,6 +233,7 @@ type CPU interface { GetLastLevelCaches() []*Cache GetLastLevelCacheCPUSet() cpuset.CPUSet CoreKind() CoreKind + DRA(extras ...map[QualifiedName]Attribute) *resapi.Device } type cpu struct { @@ -313,11 +317,12 @@ const ( // Cache has details about a CPU cache. type Cache struct { - id idset.ID // cache id - level int // cache type - kind CacheType // cache type - size uint64 // cache size - cpus idset.IDSet // CPUs sharing this cache + enumID int // enumerated cache id + id idset.ID // cache id + level int // cache type + kind CacheType // cache type + size uint64 // cache size + cpus idset.IDSet // CPUs sharing this cache } // cacheOverrides is a list of cache overrides for specific CPU sets. @@ -392,6 +397,11 @@ func (sys *system) Discover(flags DiscoveryFlag) error { if err := sys.discoverCPUs(); err != nil { return err } + if (flags & DiscoverCache) != 0 { + if err := sys.enumerateCaches(); err != nil { + return fmt.Errorf("failed to enumerate caches: %v", err) + } + } if err := sys.discoverNodes(); err != nil { return err } @@ -489,9 +499,10 @@ func (sys *system) Discover(flags DiscoveryFlag) error { sys.Debug(" base freq: %d", cpu.freq.Base) sys.Debug(" freq: %d - %d", cpu.freq.Min, cpu.freq.Max) sys.Debug(" epp: %d", cpu.epp) + sys.DebugBlock(" ", "%s", logger.AsYaml(cpu.DRA())) for idx, c := range cpu.caches { - sys.Debug(" cache #%d:", idx) + sys.Debug(" cache #%d (enumerated as #%d):", idx, c.enumID) sys.Debug(" id: %d", c.id) sys.Debug(" level: %d", c.level) sys.Debug(" kind: %s", c.kind) @@ -499,6 +510,9 @@ func (sys *system) Discover(flags DiscoveryFlag) error { sys.Debug(" cpus: %s", c.SharedCPUSet().String()) } } + + sys.DebugBlock(" ", "%s", + logger.AsYaml(sys.CPUsAsDRADevices(sys.CPUIDs()))) } return nil @@ -1308,6 +1322,13 @@ func (c *cpu) CoreKind() CoreKind { return c.coreKind } +func (c *Cache) EnumID() int { + if c == nil { + return 0 + } + return c.enumID +} + func (c *Cache) ID() int { if c == nil { return 0 @@ -2064,6 +2085,55 @@ func (sys *system) saveCache(c *Cache) *Cache { return c } +func (sys *system) enumerateCaches() error { + log.Debug("enumerating CPU caches...") + cMap := map[int]map[CacheType]map[idset.ID]*Cache{} + enumerated := []*Cache{} + for _, caches := range sys.caches { + for _, cacheMap := range caches { + for _, c := range cacheMap { + l, ok := cMap[c.level] + if !ok { + l = map[CacheType]map[idset.ID]*Cache{} + cMap[c.level] = l + } + k, ok := l[c.kind] + if !ok { + k = map[idset.ID]*Cache{} + l[c.kind] = k + } + _, ok = k[c.id] + if !ok { + k[c.id] = c + enumerated = append(enumerated, c) + } + } + } + } + + slices.SortFunc(enumerated, func(ci, cj *Cache) int { + if ci.level != cj.level { + return ci.level - cj.level + } + if ci.kind != cj.kind { + return int(ci.kind) - int(cj.kind) + } + return ci.id - cj.id + }) + + for id, c := range enumerated { + c.enumID = id + sys.Debug(" enumerated cache #%d:", c.enumID) + sys.Debug(" id: %d", c.id) + sys.Debug(" level: %d", c.level) + sys.Debug(" kind: %s", c.kind) + sys.Debug(" size: %dK", c.size/1024) + sys.Debug(" cpus: %s", c.SharedCPUSet().String()) + } + + return nil +} + // eppStrings initialized this way to better catch changes in the enum var eppStrings = func() [EPPUnknown]string { var e [EPPUnknown]string