|
1 | 1 | package pilot
|
2 | 2 |
|
3 | 3 | import (
|
4 |
| - "k8s.io/api/core/v1" |
| 4 | + "fmt" |
| 5 | + |
| 6 | + "k8s.io/api/apps/v1beta1" |
5 | 7 | k8sErrors "k8s.io/apimachinery/pkg/api/errors"
|
6 | 8 | metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
7 |
| - "k8s.io/apimachinery/pkg/labels" |
| 9 | + "k8s.io/apimachinery/pkg/util/sets" |
8 | 10 | appslisters "k8s.io/client-go/listers/apps/v1beta1"
|
9 | 11 | corelisters "k8s.io/client-go/listers/core/v1"
|
10 | 12 | "k8s.io/client-go/tools/record"
|
11 | 13 |
|
| 14 | + "github.com/golang/glog" |
| 15 | + "github.com/pkg/errors" |
| 16 | + |
12 | 17 | "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
|
13 | 18 | navigator "github.com/jetstack/navigator/pkg/client/clientset/versioned"
|
14 | 19 | navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
|
15 |
| - "github.com/jetstack/navigator/pkg/controllers" |
16 | 20 | "github.com/jetstack/navigator/pkg/controllers/cassandra/util"
|
17 | 21 | )
|
18 | 22 |
|
@@ -51,76 +55,143 @@ func NewControl(
|
51 | 55 |
|
52 | 56 | }
|
53 | 57 |
|
54 |
| -func (c *pilotControl) clusterPods(cluster *v1alpha1.CassandraCluster) ([]*v1.Pod, error) { |
55 |
| - var clusterPods []*v1.Pod |
56 |
| - allPods, err := c.pods.Pods(cluster.Namespace).List(labels.Everything()) |
| 58 | +func (c *pilotControl) pilotsForSet( |
| 59 | + cluster *v1alpha1.CassandraCluster, |
| 60 | + ss *v1beta1.StatefulSet, |
| 61 | +) []*v1alpha1.Pilot { |
| 62 | + pilots := make([]*v1alpha1.Pilot, *ss.Spec.Replicas) |
| 63 | + for i := int32(0); i < *ss.Spec.Replicas; i++ { |
| 64 | + pilots[i] = PilotForCluster(cluster, ss, i) |
| 65 | + } |
| 66 | + return pilots |
| 67 | +} |
| 68 | + |
| 69 | +func (c *pilotControl) createPilot(pilot *v1alpha1.Pilot) error { |
| 70 | + _, err := c.naviClient.NavigatorV1alpha1().Pilots(pilot.Namespace).Create(pilot) |
| 71 | + if k8sErrors.IsAlreadyExists(err) { |
| 72 | + glog.Warning("Pilot already exists %s/%s.", pilot.Namespace, pilot.Name) |
| 73 | + return nil |
| 74 | + } |
57 | 75 | if err != nil {
|
58 |
| - return clusterPods, err |
| 76 | + return errors.Wrap(err, "unable to create pilot") |
59 | 77 | }
|
60 |
| - for _, pod := range allPods { |
61 |
| - podControlledByCluster, err := controllers.PodControlledByCluster( |
62 |
| - cluster, |
63 |
| - pod, |
64 |
| - c.statefulSets, |
| 78 | + glog.V(4).Infof("Created pilot %s/%s.", pilot.Namespace, pilot.Name) |
| 79 | + return nil |
| 80 | +} |
| 81 | + |
| 82 | +func (c *pilotControl) deletePilot(cluster *v1alpha1.CassandraCluster, pilot *v1alpha1.Pilot) error { |
| 83 | + err := util.OwnerCheck(pilot, cluster) |
| 84 | + if err != nil { |
| 85 | + glog.Errorf( |
| 86 | + "Skipping deletion of foreign owned pilot %s/%s: %s.", |
| 87 | + pilot.Namespace, pilot.Name, err, |
65 | 88 | )
|
66 |
| - if err != nil { |
67 |
| - return clusterPods, err |
68 |
| - } |
69 |
| - if !podControlledByCluster { |
70 |
| - continue |
71 |
| - } |
72 |
| - clusterPods = append(clusterPods, pod) |
| 89 | + return nil |
73 | 90 | }
|
74 |
| - return clusterPods, nil |
75 |
| -} |
76 | 91 |
|
77 |
| -func (c *pilotControl) createPilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error { |
78 |
| - desiredPilot := PilotForCluster(cluster, pod) |
79 |
| - client := c.naviClient.NavigatorV1alpha1().Pilots(desiredPilot.GetNamespace()) |
80 |
| - lister := c.pilots.Pilots(desiredPilot.GetNamespace()) |
81 |
| - existingPilot, err := lister.Get(desiredPilot.GetName()) |
82 |
| - // Pilot already exists |
| 92 | + _, err = c.pods.Pods(cluster.Namespace).Get(pilot.Name) |
83 | 93 | if err == nil {
|
84 |
| - return util.OwnerCheck(existingPilot, cluster) |
| 94 | + glog.V(4).Infof( |
| 95 | + "Skipping deletion of pilot %s/%s because its pod still exists.", |
| 96 | + pilot.Namespace, pilot.Name, |
| 97 | + ) |
| 98 | + return nil |
85 | 99 | }
|
86 |
| - // The only error we expect is that the pilot does not exist. |
87 | 100 | if !k8sErrors.IsNotFound(err) {
|
88 |
| - return err |
| 101 | + return errors.Wrap(err, "unable to get pod for pilot") |
89 | 102 | }
|
90 |
| - _, err = client.Create(desiredPilot) |
91 |
| - return err |
| 103 | + err = c.naviClient.NavigatorV1alpha1(). |
| 104 | + Pilots(cluster.Namespace).Delete(pilot.Name, &metav1.DeleteOptions{}) |
| 105 | + if k8sErrors.IsNotFound(err) { |
| 106 | + glog.Warning("Pilot already deleted %s/%s.", pilot.Namespace, pilot.Name) |
| 107 | + return nil |
| 108 | + } |
| 109 | + if err != nil { |
| 110 | + return errors.Wrapf( |
| 111 | + err, "unable to delete pilot %s/%s", pilot.Namespace, pilot.Name, |
| 112 | + ) |
| 113 | + } |
| 114 | + glog.V(4).Infof("Deleted pilot %s/%s.", pilot.Namespace, pilot.Name) |
| 115 | + return nil |
92 | 116 | }
|
93 | 117 |
|
94 |
| -func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error { |
95 |
| - pods, err := c.clusterPods(cluster) |
| 118 | +// Sync ensures the correct number of Pilots for each nodepool. |
| 119 | +// |
| 120 | +// For each nodepool StatefulSet: |
| 121 | +// * Create a number of pilots to match the number of pods that will be created for the StatefulSet. |
| 122 | +// * Delete higher index pilots which have been left behind after the statefulset has been scaled in. |
| 123 | +// * Do not delete a pilot if there is a pod with a matching name, |
| 124 | +// (that pod won't be able to decommission its self |
| 125 | +// unless it can read its desired configuration from its pilot) |
| 126 | +// * Do not delete a pilot unless it is owned by the cluster that is being synchronised. |
| 127 | +// (this is not an expected state, |
| 128 | +// but we don't want to delete anything unless it was created by us) |
| 129 | +func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { |
| 130 | + selector, err := util.SelectorForClusterNodePools(cluster) |
| 131 | + if err != nil { |
| 132 | + return errors.Wrap(err, "unable to create cluster nodepools selector") |
| 133 | + } |
| 134 | + statefulSets, err := c.statefulSets.StatefulSets(cluster.Namespace).List(selector) |
| 135 | + if err != nil { |
| 136 | + return errors.Wrap(err, "unable to list statefulsets") |
| 137 | + } |
| 138 | + |
| 139 | + actualPilots, err := c.pilots.Pilots(cluster.Namespace).List(selector) |
96 | 140 | if err != nil {
|
97 |
| - return err |
| 141 | + return errors.Wrap(err, "unable to list pilots") |
98 | 142 | }
|
99 |
| - for _, pod := range pods { |
100 |
| - err = c.createPilot(cluster, pod) |
| 143 | + actualPilotNames := setOfPilotNames(actualPilots) |
| 144 | + |
| 145 | + expectedPilots := []*v1alpha1.Pilot{} |
| 146 | + for _, set := range statefulSets { |
| 147 | + expectedPilots = append(expectedPilots, c.pilotsForSet(cluster, set)...) |
| 148 | + } |
| 149 | + expectedPilotNames := setOfPilotNames(expectedPilots) |
| 150 | + |
| 151 | + pilotsToCreate := expectedPilotNames.Difference(actualPilotNames) |
| 152 | + glog.V(4).Infof("Creating pilots: %v", pilotsToCreate.List()) |
| 153 | + for _, pilot := range expectedPilots { |
| 154 | + if !pilotsToCreate.Has(pilot.Name) { |
| 155 | + continue |
| 156 | + } |
| 157 | + err := c.createPilot(pilot) |
101 | 158 | if err != nil {
|
102 |
| - return err |
| 159 | + return errors.Wrap(err, "error in createPilot") |
103 | 160 | }
|
104 | 161 | }
|
105 |
| - return err |
106 |
| -} |
107 | 162 |
|
108 |
| -func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { |
109 |
| - err := c.syncPilots(cluster) |
110 |
| - if err != nil { |
111 |
| - return err |
| 163 | + pilotsToDelete := actualPilotNames.Difference(expectedPilotNames) |
| 164 | + glog.V(4).Infof("Deleting pilots: %v", pilotsToDelete.List()) |
| 165 | + for _, pilot := range actualPilots { |
| 166 | + if !pilotsToDelete.Has(pilot.Name) { |
| 167 | + continue |
| 168 | + } |
| 169 | + err := c.deletePilot(cluster, pilot) |
| 170 | + if err != nil { |
| 171 | + return errors.Wrap(err, "error in deletePilot") |
| 172 | + } |
112 | 173 | }
|
113 |
| - // TODO: Housekeeping. Remove pilots that don't have a corresponding pod. |
114 | 174 | return nil
|
115 | 175 | }
|
116 | 176 |
|
117 |
| -func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot { |
118 |
| - return &v1alpha1.Pilot{ |
| 177 | +func PilotForCluster(cluster *v1alpha1.CassandraCluster, ss *v1beta1.StatefulSet, index int32) *v1alpha1.Pilot { |
| 178 | + o := &v1alpha1.Pilot{ |
119 | 179 | ObjectMeta: metav1.ObjectMeta{
|
120 |
| - Name: pod.Name, |
121 |
| - Namespace: pod.Namespace, |
| 180 | + Name: fmt.Sprintf("%s-%d", ss.Name, index), |
| 181 | + Namespace: ss.Namespace, |
122 | 182 | Labels: util.ClusterLabels(cluster),
|
123 | 183 | OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)},
|
124 | 184 | },
|
125 | 185 | }
|
| 186 | + o.Labels[v1alpha1.CassandraNodePoolNameLabel] = ss.Labels[v1alpha1.CassandraNodePoolNameLabel] |
| 187 | + o.Labels[v1alpha1.CassandraNodePoolIndexLabel] = fmt.Sprintf("%d", index) |
| 188 | + return o |
| 189 | +} |
| 190 | + |
| 191 | +func setOfPilotNames(pilots []*v1alpha1.Pilot) sets.String { |
| 192 | + names := sets.NewString() |
| 193 | + for _, pilot := range pilots { |
| 194 | + names.Insert(pilot.Name) |
| 195 | + } |
| 196 | + return names |
126 | 197 | }
|
0 commit comments