Skip to content

Commit 8b0f42c

Browse files
committed
Python script to query prometheus for CPU and memory data
1 parent bd357f9 commit 8b0f42c

File tree

1 file changed

+249
-0
lines changed

1 file changed

+249
-0
lines changed

tools/run_tests/performance/prom.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright 2022 The gRPC Authors
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
18+
# example usage: python3 prom.py --url=http://10.108.0.60:9090 --pod_type=driver --pod_type=client --container_name=main --container_name=sidecar --debug=true
19+
import argparse
20+
import json
21+
import requests
22+
import statistics
23+
import argparse
24+
from typing import Any, Dict, List
25+
26+
27+
class Prom:
28+
def __init__(
29+
self,
30+
url,
31+
start,
32+
end,
33+
debug=False):
34+
self.url = url
35+
self.start = start
36+
self.end = end
37+
self.debug = debug
38+
39+
def fetch_by_query(self, query: str) -> Any:
40+
"""Fetch the given query with time range"""
41+
resp = requests.get(self.url + "/api/v1/query_range", {
42+
"query": query,
43+
"start": self.start,
44+
"end": self.end,
45+
"step": 5
46+
})
47+
48+
if not resp.ok:
49+
print(resp.content)
50+
raise Exception(str(resp))
51+
52+
return resp.json()
53+
54+
def fetch_cpu_for_pod(self, container_matcher: str, pod_name: str) -> Dict[str, List[float]]:
55+
"""Fetch the cpu data for each pod and construct the container
56+
name to cpu data list Dict"""
57+
query = 'irate(container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="' + pod_name + '",container=' + \
58+
container_matcher + '}[100s])'
59+
if self.debug:
60+
print("running prometheus query for cpu:" + query)
61+
62+
cpu_data = self.fetch_by_query(query)
63+
64+
if self.debug:
65+
print("raw cpu data:")
66+
print(cpu_data)
67+
68+
cpu_container_name_to_data_list = get_data_list_from_timeseries(
69+
cpu_data)
70+
71+
return cpu_container_name_to_data_list
72+
73+
def fetch_memory_for_pod(self, container_matcher: str, pod_name: str) -> Dict[str, List[float]]:
74+
"""Fetch the memory data for each pod and construct the
75+
container name to memory data list Dict"""
76+
query = 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' + pod_name + '",container=' + \
77+
container_matcher + '}'
78+
if self.debug:
79+
print("running prometheus query for memory:" + query)
80+
81+
memory_data = self.fetch_by_query(query)
82+
83+
if self.debug:
84+
print("raw memory data:")
85+
print(memory_data)
86+
87+
memory_container_name_to_data_list = get_data_list_from_timeseries(
88+
memory_data)
89+
90+
return memory_container_name_to_data_list
91+
92+
def fetch_cpu_and_memory_data(self, container_list: List[str], pod_list: List[str]) -> Dict[str, Dict[str, Dict[str, Dict[str, Any]]]]:
93+
"""Fetch and process min, max, mean, std for the memory and cpu
94+
data for each container in the container_list for each pod in
95+
the pod_list and construct processed data group first by metric
96+
type (cpu or memory) and then by pod name and container name.
97+
If a given container does not exit on a pod, it is ignored"""
98+
container_matcher = construct_container_matcher(container_list)
99+
processed_data = {}
100+
101+
raw_cpu_data = {}
102+
raw_memory_data = {}
103+
for pod in pod_list:
104+
raw_cpu_data[pod] = self.fetch_cpu_for_pod(container_matcher, pod)
105+
raw_memory_data[pod] = self.fetch_memory_for_pod(
106+
container_matcher, pod)
107+
108+
processed_data['cpu'] = compute_min_max_mean_std(raw_cpu_data)
109+
processed_data['memory'] = compute_min_max_mean_std(raw_memory_data)
110+
return processed_data
111+
112+
113+
def construct_container_matcher(container_list: List[str]) -> str:
114+
"""Construct the container matching string used in the
115+
prometheus query to match container names based on given
116+
list of the containers"""
117+
if len(container_list) == 0:
118+
raise Exception("no container name provided")
119+
120+
containers_to_fetch = '"'
121+
if len(container_list) == 1:
122+
containers_to_fetch = container_list[0]
123+
else:
124+
containers_to_fetch = '~"' + container_list[0]
125+
for container in container_list[1:]:
126+
containers_to_fetch = containers_to_fetch + "|" + container
127+
containers_to_fetch = containers_to_fetch + '"'
128+
return containers_to_fetch
129+
130+
131+
def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]:
132+
"""Construct a Dict as keys are the container names and
133+
values are a list of data taken from given timeserie data"""
134+
if data['status'] != "success":
135+
raise Exception("command failed: " +
136+
data['status'] + str(data))
137+
if data['data']['resultType'] != "matrix":
138+
raise Exception("resultType is not matrix: " +
139+
data['data']['resultType'])
140+
141+
container_name_to_data_list = {}
142+
for res in data["data"]["result"]:
143+
container_name = res["metric"]["container"]
144+
container_data_timeserie = res["values"]
145+
146+
container_data = []
147+
for d in container_data_timeserie:
148+
container_data.append(float(d[1]))
149+
container_name_to_data_list[container_name] = container_data
150+
return container_name_to_data_list
151+
152+
153+
def compute_min_max_mean_std_for_each(data: List[float]) -> Dict[str, Any]:
154+
"""Compute the min, max, mean and standard deviation for
155+
given list of data and return the processed results in a Dict
156+
keyed by min, max, mean and std"""
157+
min_value = min(data)
158+
max_value = max(data)
159+
mean_value = statistics.mean(data)
160+
std_value = statistics.pstdev(data)
161+
processed_data = {"min": min_value, "max": max_value,
162+
"mean": mean_value, "std": std_value}
163+
return processed_data
164+
165+
166+
def compute_min_max_mean_std(cpu_data_dicts: Dict[str, Dict[str, List[float]]]) -> Dict[str, Dict[str, Dict[str, Any]]]:
167+
"""Compute the min, max, mean and standard deviation for
168+
given set of data"""
169+
pod_name_to_data_dicts = {}
170+
for pod_name, pod_data_dicts in cpu_data_dicts.items():
171+
container_name_to_processed_data = {}
172+
for container_name, data_list in pod_data_dicts.items():
173+
container_name_to_processed_data[container_name] = compute_min_max_mean_std_for_each(
174+
data_list)
175+
pod_name_to_data_dicts[pod_name] = container_name_to_processed_data
176+
177+
return pod_name_to_data_dicts
178+
179+
180+
def construct_pod_list(node_info_file: str, pod_types: List[str]) -> List[str]:
181+
"""Construct a list of pod names to be queried"""
182+
with open(node_info_file, "r") as f:
183+
pod_names = json.load(f)
184+
pod_type_to_name = {"client": [],
185+
"driver": [],
186+
"server": []}
187+
188+
for client in pod_names["Clients"]:
189+
pod_type_to_name["client"].append(client["Name"])
190+
for server in pod_names["Servers"]:
191+
pod_type_to_name["server"].append(server["Name"])
192+
193+
pod_type_to_name["driver"].append(pod_names["Driver"]["Name"])
194+
195+
pod_names_to_query = []
196+
for pod_type in pod_types:
197+
for each in pod_type_to_name[pod_type]:
198+
pod_names_to_query.append(each)
199+
return pod_names_to_query
200+
201+
202+
def main() -> None:
203+
argp = argparse.ArgumentParser(
204+
description='Fetch cpu and memory stats from prometheus')
205+
argp.add_argument('--url',
206+
help='Prometheus base url',
207+
required=True)
208+
argp.add_argument("--scenario_result_file",
209+
default='scenario_result.json',
210+
type=str,
211+
help="File contains epoch seconds for start and end time")
212+
argp.add_argument("--node_info_file",
213+
default='/var/data/qps_workers/node_info.json',
214+
help="File contains pod name to query the metrics for")
215+
argp.add_argument("--pod_type", action='append',
216+
help="Pod_type to query the metrics for",
217+
choices=['driver', 'client', 'server'],
218+
required=True)
219+
argp.add_argument("--container_name", action='append',
220+
help="The container names to query the metrics for",
221+
required=True)
222+
argp.add_argument('--debug',
223+
default=False,
224+
help='Print debug messages.')
225+
argp.add_argument("--export_file_name",
226+
default='prometheus_query_result.json',
227+
type=str,
228+
help="Name of exported JSON file.")
229+
230+
args = argp.parse_args()
231+
232+
with open(args.scenario_result_file, "r") as q:
233+
scenario_result = json.load(q)
234+
p = Prom(url=args.url, start=scenario_result["summary"]
235+
["start"], end=scenario_result["summary"]["end"], debug=args.debug)
236+
237+
pod_list = construct_pod_list(args.node_info_file, args.pod_type)
238+
processed_data = p.fetch_cpu_and_memory_data(
239+
container_list=args.container_name, pod_list=pod_list)
240+
241+
if args.debug:
242+
print(json.dumps(processed_data, sort_keys=True, indent=4))
243+
244+
with open(args.export_file_name, 'w') as export_file:
245+
json.dump(processed_data, export_file, ensure_ascii=False, indent=4)
246+
247+
248+
if __name__ == "__main__":
249+
main()

0 commit comments

Comments
 (0)