diff --git a/pkg/webhooks/pod_webhook.go b/pkg/webhooks/pod_webhook.go index 860ca83f2..2fd26336f 100644 --- a/pkg/webhooks/pod_webhook.go +++ b/pkg/webhooks/pod_webhook.go @@ -123,7 +123,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { groupUniqueKey = pod.Labels[leaderworkerset.GroupUniqueHashLabelKey] } if epKey, foundEpKey := pod.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]; foundEpKey { - SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey) + SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey, leaderworkerset.GroupIndexLabelKey, pod.Labels[leaderworkerset.GroupIndexLabelKey]) } _, foundSubGroupSize := pod.Annotations[leaderworkerset.SubGroupSizeAnnotationKey] subGroupPolicyType := pod.Annotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey] @@ -133,7 +133,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { subGroupUniqueKey := genGroupUniqueKey(pod.Name, "0") pod.Labels[leaderworkerset.SubGroupUniqueHashLabelKey] = subGroupUniqueKey if subEpKey, foundSubEpKey := pod.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]; foundSubEpKey { - SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey) + SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey, "", "") } } } else { @@ -154,7 +154,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { subGroupUniqueKey := genGroupUniqueKey(leaderName, subGroupIndexKey) pod.Labels[leaderworkerset.SubGroupUniqueHashLabelKey] = subGroupUniqueKey if subEpKey, foundSubEpKey := pod.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]; foundSubEpKey { - SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey) + SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey, "", "") } } } @@ -178,7 +178,7 @@ func genGroupUniqueKey(ns string, podName string) string { } // SetExclusiveAffinities set the pod affinity/anti-affinity -func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey string, podAffinityKey string) { +func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey string, podAffinityKey string, groupIndexKey string, groupIndex string) { if exclusiveAffinityApplied(*pod, topologyKey) { return } @@ -192,6 +192,9 @@ func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} } + // Get the LWS name from pod labels + lwsName := pod.Labels[leaderworkerset.SetNameLabelKey] + // Pod affinity ensures the pods of this set land on the same topology domain. pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, corev1.PodAffinityTerm{ @@ -204,20 +207,34 @@ func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey }}, TopologyKey: topologyKey, }) - // Pod anti-affinity ensures exclusively this set lands on the topology, preventing multiple sets per topology domain. + + // Pod anti-affinity ensures exclusively one replica/group of this set lands on the topology, preventing multiple replicas per topology domain. + // and only apply mutual exclusion within the same LWS instance + antiAffinityNotInKey := groupIndexKey + antiAffinityNotInValue := groupIndex + if groupIndexKey == "" || groupIndex == "" { + antiAffinityNotInKey = podAffinityKey + antiAffinityNotInValue = groupUniqueKey + } pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, corev1.PodAffinityTerm{ LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: leaderworkerset.SetNameLabelKey, + Operator: metav1.LabelSelectorOpIn, + Values: []string{lwsName}, + }, { Key: podAffinityKey, Operator: metav1.LabelSelectorOpExists, }, { - Key: podAffinityKey, + Key: antiAffinityNotInKey, Operator: metav1.LabelSelectorOpNotIn, - Values: []string{groupUniqueKey}, + Values: []string{antiAffinityNotInValue}, }, }}, + Namespaces: []string{pod.Namespace}, TopologyKey: topologyKey, }) } diff --git a/pkg/webhooks/pod_webhook_test.go b/pkg/webhooks/pod_webhook_test.go index af404fa85..95cac9867 100644 --- a/pkg/webhooks/pod_webhook_test.go +++ b/pkg/webhooks/pod_webhook_test.go @@ -70,6 +70,8 @@ func TestSetExclusiveAffinities(t *testing.T) { groupUniqueKey string topologyKey string podAffinityKey string + groupIndexKey string + groupIndex string expectedPod *corev1.Pod }{ { @@ -77,14 +79,20 @@ func TestSetExclusiveAffinities(t *testing.T) { pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"}, + Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"}, + Namespace: "default", }, }, groupUniqueKey: "test-key", topologyKey: "topologyKey", podAffinityKey: leaderworkerset.GroupUniqueHashLabelKey, + groupIndexKey: leaderworkerset.GroupIndexLabelKey, + groupIndex: "0", expectedPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"}, + Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"}, + Namespace: "default", }, Spec: corev1.PodSpec{ Affinity: &corev1.Affinity{ @@ -102,16 +110,22 @@ func TestSetExclusiveAffinities(t *testing.T) { }, PodAntiAffinity: &corev1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{ + Namespaces: []string{"default"}, TopologyKey: "topologyKey", LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "leaderworkerset.sigs.k8s.io/name", + Operator: "In", + Values: []string{"test-lws"}, + }, { Key: "leaderworkerset.sigs.k8s.io/group-key", Operator: "Exists", }, { - Key: "leaderworkerset.sigs.k8s.io/group-key", + Key: "leaderworkerset.sigs.k8s.io/group-index", Operator: "NotIn", - Values: []string{"test-key"}, + Values: []string{"0"}, }, }}, }}, @@ -125,6 +139,7 @@ func TestSetExclusiveAffinities(t *testing.T) { pod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"}, + Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"}, }, Spec: corev1.PodSpec{ Affinity: &corev1.Affinity{ @@ -140,9 +155,12 @@ func TestSetExclusiveAffinities(t *testing.T) { groupUniqueKey: "test-key", topologyKey: "topologyKey", podAffinityKey: leaderworkerset.GroupUniqueHashLabelKey, + groupIndexKey: leaderworkerset.GroupIndexLabelKey, + groupIndex: "", expectedPod: &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"}, + Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"}, }, Spec: corev1.PodSpec{ Affinity: &corev1.Affinity{ @@ -160,7 +178,7 @@ func TestSetExclusiveAffinities(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - SetExclusiveAffinities(tc.pod, tc.groupUniqueKey, tc.topologyKey, tc.podAffinityKey) + SetExclusiveAffinities(tc.pod, tc.groupUniqueKey, tc.topologyKey, tc.podAffinityKey, tc.groupIndexKey, tc.groupIndex) if diff := cmp.Diff(tc.pod, tc.expectedPod); diff != "" { t.Errorf("unexpected set exclusive affinities operation: %s", diff) } diff --git a/test/integration/webhooks/pod_test.go b/test/integration/webhooks/pod_test.go index cc45d2fa5..fff8a99f8 100644 --- a/test/integration/webhooks/pod_test.go +++ b/test/integration/webhooks/pod_test.go @@ -705,7 +705,7 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f }, Spec: wrappers.MakeLeaderPodSpecWithTPUResource(), } - webhooks.SetExclusiveAffinities(pod, "uniquehash", "topologyKey", leaderworkerset.GroupUniqueHashLabelKey) + webhooks.SetExclusiveAffinities(pod, "uniquehash", "topologyKey", leaderworkerset.GroupUniqueHashLabelKey, leaderworkerset.GroupIndexLabelKey, "4") return *pod }, checkExpectedPod: func(expected corev1.Pod, got corev1.Pod) error {