|
| 1 | +# This program is distributed in the hope that it will be useful, |
| 2 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 3 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
| 4 | +# |
| 5 | +# See LICENSE for more details. |
| 6 | +# |
| 7 | +# Copyright (c) 2016 ScyllaDB |
| 8 | + |
| 9 | +import time |
| 10 | + |
| 11 | +from longevity_test import LongevityTest |
| 12 | +from sdcm.utils.adaptive_timeouts import adaptive_timeout, Operations |
| 13 | +from sdcm.utils.cluster_tools import group_nodes_by_dc_idx |
| 14 | +from sdcm.sct_events.system import InfoEvent |
| 15 | +from sdcm.cluster import MAX_TIME_WAIT_FOR_NEW_NODE_UP |
| 16 | + |
| 17 | + |
| 18 | +class ScaleClusterTest(LongevityTest): |
| 19 | + @staticmethod |
| 20 | + def is_target_reached(current: list[int], target: list[int]) -> bool: |
| 21 | + return all([x >= y for x, y in zip(current, target)]) |
| 22 | + |
| 23 | + def grow_to_cluster_target_size(self, current_cluster_size: list[int], target_cluster_size: list): |
| 24 | + InfoEvent( |
| 25 | + message=f"Starting to grow cluster from {current_cluster_size} to {target_cluster_size}").publish() |
| 26 | + |
| 27 | + add_node_cnt = self.params.get('add_node_cnt') |
| 28 | + try: |
| 29 | + while not self.is_target_reached(current_cluster_size, target_cluster_size): |
| 30 | + for dcx, target in enumerate(target_cluster_size): |
| 31 | + if current_cluster_size[dcx] >= target: |
| 32 | + continue |
| 33 | + add_nodes_num = add_node_cnt if ( |
| 34 | + target - current_cluster_size[dcx]) >= add_node_cnt else target - current_cluster_size[dcx] |
| 35 | + |
| 36 | + for rack in range(self.db_cluster.racks_count): |
| 37 | + added_nodes = [] |
| 38 | + InfoEvent( |
| 39 | + message=f"Adding next number of nodes {add_nodes_num} to dc_idx {dcx} and rack {rack}").publish() |
| 40 | + added_nodes.extend(self.db_cluster.add_nodes( |
| 41 | + count=add_nodes_num, enable_auto_bootstrap=True, dc_idx=dcx, rack=rack)) |
| 42 | + self.monitors.reconfigure_scylla_monitoring() |
| 43 | + up_timeout = MAX_TIME_WAIT_FOR_NEW_NODE_UP |
| 44 | + with adaptive_timeout(Operations.NEW_NODE, node=self.db_cluster.data_nodes[0], timeout=up_timeout): |
| 45 | + self.db_cluster.wait_for_init( |
| 46 | + node_list=added_nodes, timeout=up_timeout, check_node_health=False) |
| 47 | + self.db_cluster.wait_for_nodes_up_and_normal(nodes=added_nodes) |
| 48 | + InfoEvent(f"New nodes up and normal {[node.name for node in added_nodes]}").publish() |
| 49 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 50 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 51 | + finally: |
| 52 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 53 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 54 | + InfoEvent(message=f"Grow cluster finished, cluster size is {current_cluster_size}").publish() |
| 55 | + |
| 56 | + def shrink_to_cluster_target_size(self, current_cluster_size: list[int], target_cluster_size: list[int]): |
| 57 | + InfoEvent( |
| 58 | + message=f"Starting to shrink cluster from {current_cluster_size} to {target_cluster_size}").publish() |
| 59 | + try: |
| 60 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 61 | + while not self.is_target_reached(target_cluster_size, current_cluster_size): |
| 62 | + for dcx, _ in enumerate(current_cluster_size): |
| 63 | + nodes_by_racks = self.db_cluster.get_nodes_per_datacenter_and_rack_idx(nodes_by_dcx[dcx]) |
| 64 | + for nodes in nodes_by_racks.values(): |
| 65 | + decommissioning_node = nodes[-1] |
| 66 | + decommissioning_node.running_nemesis = "Decommissioning node" |
| 67 | + self.db_cluster.decommission(node=decommissioning_node, timeout=7200) |
| 68 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 69 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 70 | + finally: |
| 71 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 72 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 73 | + InfoEvent( |
| 74 | + message=f"Reached cluster size {current_cluster_size}").publish() |
| 75 | + |
| 76 | + def create_schema(self): |
| 77 | + number_of_table = self.params.get( |
| 78 | + 'user_profile_table_count') or 0 |
| 79 | + cs_user_profiles = self.params.get('cs_user_profiles') |
| 80 | + keyspace_num = self.params.get('keyspace_num') |
| 81 | + if not number_of_table and not cs_user_profiles: |
| 82 | + self.log.debug("User schema will not be created") |
| 83 | + return |
| 84 | + if not cs_user_profiles: |
| 85 | + region_dc_names = self.db_cluster.get_datacenter_name_per_region(self.db_cluster.nodes) |
| 86 | + replication_factor = self.db_cluster.racks_count |
| 87 | + InfoEvent("Create keyspace and 100 empty tables").publish() |
| 88 | + for i in range(1, keyspace_num + 1): |
| 89 | + self.create_keyspace(keyspace_name=f"testing_keyspace_{i}", replication_factor={ |
| 90 | + dc_name: replication_factor for dc_name in region_dc_names.values()}) |
| 91 | + for j in range(1, number_of_table + 1): |
| 92 | + self.create_table(name=f"table_{j}", keyspace_name=f"testing_keyspace_{i}") |
| 93 | + InfoEvent(f"{keyspace_num} Keyspaces and {number_of_table} tables were created").publish() |
| 94 | + else: |
| 95 | + self._pre_create_templated_user_schema() |
| 96 | + |
| 97 | + def test_grow_target_size_of_empty_cluster(self): |
| 98 | + self.create_schema() |
| 99 | + cluster_target_size = self.params.get('cluster_target_size') |
| 100 | + InfoEvent(f"Start grow cluster up to {cluster_target_size}").publish() |
| 101 | + if not cluster_target_size: |
| 102 | + self.log.error("cluster_target_size param is not set, cannot grow cluster") |
| 103 | + raise ValueError("cluster_target_size param is not set, cannot grow cluster") |
| 104 | + cluster_target_size = list(map(int, cluster_target_size.split())) if isinstance( |
| 105 | + cluster_target_size, str) else [cluster_target_size] |
| 106 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 107 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 108 | + |
| 109 | + self.grow_to_cluster_target_size(current_cluster_size, cluster_target_size) |
| 110 | + |
| 111 | + def test_shrink_target_size_of_empty_cluster(self): |
| 112 | + self.create_schema() |
| 113 | + cluster_target_size = self.params.get('cluster_target_size') |
| 114 | + InfoEvent(f"Start shrink cluster to {cluster_target_size}").publish() |
| 115 | + if not cluster_target_size: |
| 116 | + self.log.error("cluster_target_size param is not set, cannot shrink cluster") |
| 117 | + raise ValueError("cluster_target_size param is not set, cannot shrink cluster") |
| 118 | + cluster_target_size = list(map(int, cluster_target_size.split())) if isinstance( |
| 119 | + cluster_target_size, str) else [cluster_target_size] |
| 120 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 121 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 122 | + |
| 123 | + self.shrink_to_cluster_target_size(current_cluster_size, cluster_target_size) |
| 124 | + |
| 125 | + def test_grow_shrink_cluster(self): |
| 126 | + self.create_schema() |
| 127 | + cluster_target_size = self.params.get('cluster_target_size') |
| 128 | + InfoEvent(f"Start grow cluster up to {cluster_target_size}").publish() |
| 129 | + if not cluster_target_size: |
| 130 | + self.log.error("cluster_target_size param is not set, cannot grow cluster") |
| 131 | + raise ValueError("cluster_target_size param is not set, cannot grow cluster") |
| 132 | + cluster_target_size = list(map(int, cluster_target_size.split())) if isinstance( |
| 133 | + cluster_target_size, str) else [cluster_target_size] |
| 134 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 135 | + init_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 136 | + |
| 137 | + try: |
| 138 | + self.grow_to_cluster_target_size(init_cluster_size, cluster_target_size) |
| 139 | + except Exception as ex: # noqa: BLE001 |
| 140 | + self.log.error(f"Failed to grow cluster: {ex}") |
| 141 | + nodes_by_dcx = group_nodes_by_dc_idx(self.db_cluster.data_nodes) |
| 142 | + current_cluster_size = [len(nodes_by_dcx[dcx]) for dcx in sorted(nodes_by_dcx)] |
| 143 | + try: |
| 144 | + InfoEvent(message=f"Cluster size is {current_cluster_size}").publish() |
| 145 | + self.shrink_to_cluster_target_size(current_cluster_size, init_cluster_size) |
| 146 | + except Exception as ex: # noqa: BLE001 |
| 147 | + self.log.error(f"Failed to shrink cluster: {ex}") |
| 148 | + |
| 149 | + def test_resize_cluster(self): |
| 150 | + |
| 151 | + self.create_schema() |
| 152 | + |
| 153 | + self.db_cluster.add_nemesis(nemesis=self.get_nemesis_class(), tester_obj=self) |
| 154 | + self.db_cluster.start_nemesis() |
| 155 | + duration = int(self.params.get('idle_duration')) |
| 156 | + InfoEvent(f"Wait {duration} minutes while cluster resizing").publish() |
| 157 | + time.sleep(duration * 60) |
| 158 | + InfoEvent("Test done") |
0 commit comments