Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pkg/webhooks/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
groupUniqueKey = pod.Labels[leaderworkerset.GroupUniqueHashLabelKey]
}
if epKey, foundEpKey := pod.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey]; foundEpKey {
SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey)
SetExclusiveAffinities(pod, groupUniqueKey, epKey, leaderworkerset.GroupUniqueHashLabelKey, leaderworkerset.GroupIndexLabelKey, pod.Labels[leaderworkerset.GroupIndexLabelKey])
}
_, foundSubGroupSize := pod.Annotations[leaderworkerset.SubGroupSizeAnnotationKey]
subGroupPolicyType := pod.Annotations[leaderworkerset.SubGroupPolicyTypeAnnotationKey]
Expand All @@ -133,7 +133,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
subGroupUniqueKey := genGroupUniqueKey(pod.Name, "0")
pod.Labels[leaderworkerset.SubGroupUniqueHashLabelKey] = subGroupUniqueKey
if subEpKey, foundSubEpKey := pod.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]; foundSubEpKey {
SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey)
SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey, "", "")
}
}
} else {
Expand All @@ -154,7 +154,7 @@ func (p *PodWebhook) Default(ctx context.Context, obj runtime.Object) error {
subGroupUniqueKey := genGroupUniqueKey(leaderName, subGroupIndexKey)
pod.Labels[leaderworkerset.SubGroupUniqueHashLabelKey] = subGroupUniqueKey
if subEpKey, foundSubEpKey := pod.Annotations[leaderworkerset.SubGroupExclusiveKeyAnnotationKey]; foundSubEpKey {
SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey)
SetExclusiveAffinities(pod, subGroupUniqueKey, subEpKey, leaderworkerset.SubGroupUniqueHashLabelKey, "", "")
}
}
}
Expand All @@ -178,7 +178,7 @@ func genGroupUniqueKey(ns string, podName string) string {
}

// SetExclusiveAffinities set the pod affinity/anti-affinity
func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey string, podAffinityKey string) {
func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey string, podAffinityKey string, groupIndexKey string, groupIndex string) {
if exclusiveAffinityApplied(*pod, topologyKey) {
return
}
Expand All @@ -192,6 +192,9 @@ func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey
pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}

// Get the LWS name from pod labels
lwsName := pod.Labels[leaderworkerset.SetNameLabelKey]

// Pod affinity ensures the pods of this set land on the same topology domain.
pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
Expand All @@ -204,20 +207,34 @@ func SetExclusiveAffinities(pod *corev1.Pod, groupUniqueKey string, topologyKey
}},
TopologyKey: topologyKey,
})
// Pod anti-affinity ensures exclusively this set lands on the topology, preventing multiple sets per topology domain.

// Pod anti-affinity ensures exclusively one replica/group of this set lands on the topology, preventing multiple replicas per topology domain.
// and only apply mutual exclusion within the same LWS instance
antiAffinityNotInKey := groupIndexKey
antiAffinityNotInValue := groupIndex
if groupIndexKey == "" || groupIndex == "" {
antiAffinityNotInKey = podAffinityKey
antiAffinityNotInValue = groupUniqueKey
}
pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: leaderworkerset.SetNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{lwsName},
},
{
Key: podAffinityKey,
Operator: metav1.LabelSelectorOpExists,
},
{
Key: podAffinityKey,
Key: antiAffinityNotInKey,
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{groupUniqueKey},
Values: []string{antiAffinityNotInValue},
},
}},
Namespaces: []string{pod.Namespace},
TopologyKey: topologyKey,
})
}
Expand Down
24 changes: 21 additions & 3 deletions pkg/webhooks/pod_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,29 @@ func TestSetExclusiveAffinities(t *testing.T) {
groupUniqueKey string
topologyKey string
podAffinityKey string
groupIndexKey string
groupIndex string
expectedPod *corev1.Pod
}{
{
name: "Pod with only Exclusive Topology Annotation",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"},
Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"},
Namespace: "default",
},
},
groupUniqueKey: "test-key",
topologyKey: "topologyKey",
podAffinityKey: leaderworkerset.GroupUniqueHashLabelKey,
groupIndexKey: leaderworkerset.GroupIndexLabelKey,
groupIndex: "0",
expectedPod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"},
Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"},
Namespace: "default",
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
Expand All @@ -102,16 +110,22 @@ func TestSetExclusiveAffinities(t *testing.T) {
},
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
Namespaces: []string{"default"},
TopologyKey: "topologyKey",
LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "leaderworkerset.sigs.k8s.io/name",
Operator: "In",
Values: []string{"test-lws"},
},
{
Key: "leaderworkerset.sigs.k8s.io/group-key",
Operator: "Exists",
},
{
Key: "leaderworkerset.sigs.k8s.io/group-key",
Key: "leaderworkerset.sigs.k8s.io/group-index",
Operator: "NotIn",
Values: []string{"test-key"},
Values: []string{"0"},
},
}},
}},
Expand All @@ -125,6 +139,7 @@ func TestSetExclusiveAffinities(t *testing.T) {
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"},
Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
Expand All @@ -140,9 +155,12 @@ func TestSetExclusiveAffinities(t *testing.T) {
groupUniqueKey: "test-key",
topologyKey: "topologyKey",
podAffinityKey: leaderworkerset.GroupUniqueHashLabelKey,
groupIndexKey: leaderworkerset.GroupIndexLabelKey,
groupIndex: "",
expectedPod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/exclusive-topology": "topologyKey"},
Labels: map[string]string{"leaderworkerset.sigs.k8s.io/name": "test-lws"},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
Expand All @@ -160,7 +178,7 @@ func TestSetExclusiveAffinities(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
SetExclusiveAffinities(tc.pod, tc.groupUniqueKey, tc.topologyKey, tc.podAffinityKey)
SetExclusiveAffinities(tc.pod, tc.groupUniqueKey, tc.topologyKey, tc.podAffinityKey, tc.groupIndexKey, tc.groupIndex)
if diff := cmp.Diff(tc.pod, tc.expectedPod); diff != "" {
t.Errorf("unexpected set exclusive affinities operation: %s", diff)
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/webhooks/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ var _ = ginkgo.Describe("leaderworkerset pod defaulting, creation and update", f
},
Spec: wrappers.MakeLeaderPodSpecWithTPUResource(),
}
webhooks.SetExclusiveAffinities(pod, "uniquehash", "topologyKey", leaderworkerset.GroupUniqueHashLabelKey)
webhooks.SetExclusiveAffinities(pod, "uniquehash", "topologyKey", leaderworkerset.GroupUniqueHashLabelKey, leaderworkerset.GroupIndexLabelKey, "4")
return *pod
},
checkExpectedPod: func(expected corev1.Pod, got corev1.Pod) error {
Expand Down