blob: 360374818a7c51c9ea1368330b5f525a2c99c348 [file] [log] [blame]
Matteo Scandolo515889f2020-11-05 09:36:07 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import argparse
17import csv
18import glob
19import re
20
21STATS_TO_AGGREGATE = [
22 'memory',
23 'etcd_stats',
24 'kafka_msg_per_topic',
25 'cpu',
26]
27
28
29def data_to_csv(data, output=None):
30 """
31 Get a dictionary of lists saves a csv
32 :param data: the input dictionary
33 :type data: {metric: []values}
34 :param output: the destination file
35 :type output: str
36 """
37
38 csv_file = open(output, "w+")
39 csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
40
41 for k, v in data.items():
42 csv_writer.writerow([k] + v)
43
44
45def aggergateCpu(files):
46 """
47 Aggregates memory consuption from multiple runs of the pipeline
48 :param files: The list of files to aggregate
49 :type files: []str
50 :return: a dictionary of aggregate stats
51 :rtype: {podName: []values}
52 """
53 cpu_values = {}
54
55 for file in files:
56 csv_file = open(file)
57 csv_reader = csv.reader(csv_file, delimiter=',')
58
59 for row in csv_reader:
60 # for each row remove the random chars from the pod name
61 # and concat the values to the existing one
62 regex = "(.*)-[a-z0-9]{9,10}-[a-z0-9]{5}$"
63 x = re.search(regex, row[0])
64
65 if x is not None:
66 podName = x.groups(1)[0]
67 if not podName in cpu_values:
68 cpu_values[podName] = []
69
70 cpu_values[podName] = cpu_values[podName] + row[1:]
71
72 return cpu_values
73
74
75def aggregateMemory(files):
76 """
77 Aggregates memory consuption from multiple runs of the pipeline
78 :param files: The list of files to aggregate
79 :type files: []str
80 :return: a dictionary of aggregate stats
81 :rtype: {podName: []values}
82 """
83 # this function assumes that the files are ordered by time
84
85 mem_values = {}
86
87 for file in files:
88 csv_file = open(file)
89 csv_reader = csv.reader(csv_file, delimiter=',')
90
91 for row in csv_reader:
92 # for each row remove the random chars from the pod name
93 # and concat the values to the existing one
94 regex = "(.*)-[a-z0-9]{9,10}-[a-z0-9]{5}$"
95 x = re.search(regex, row[0])
96
97 if x is not None:
98 podName = x.groups(1)[0]
99 if not podName in mem_values:
100 mem_values[podName] = []
101
102 mem_values[podName] = mem_values[podName] + row[1:]
103
104 return mem_values
105
106
107def aggregateEtcd(files):
108 etcd_size = {}
109 etcd_keys = {}
110
111 for file in files:
112 csv_file = open(file)
113 csv_reader = csv.reader(csv_file, delimiter=',')
114
115 regex = ".*\/([0-9-]{5})\/.*"
116 topology = re.search(regex, file).groups(1)[0]
117
118 for row in csv_reader:
119 if row[0] == "keys":
120 if topology not in etcd_keys:
121 etcd_keys[topology] = []
122 etcd_keys[topology].append(row[1])
123 if row[0] == "size":
124 if topology not in etcd_size:
125 etcd_size[topology] = []
126 etcd_size[topology].append(row[1])
127 return [etcd_keys, etcd_size]
128
129def aggregateKafka(files):
130 kafka = {}
131
132 for file in files:
133 csv_file = open(file)
134 csv_reader = csv.reader(csv_file, delimiter=',')
135
136 for row in csv_reader:
137 topic = row[0]
138 count = row[1]
139
140 if topic not in kafka:
141 kafka[topic] = []
142 kafka[topic].append(count)
143 return kafka
144
145def aggregateStats(stat, files, out_dir):
146 # sort file in alphabetical order
147 # we assume that we always run the topologies in incremental order
148 files.sort()
149 if stat == "memory":
150 agg = aggregateMemory(files)
151 data_to_csv(agg, output="%s/aggregated-memory.csv" % out_dir)
152 if stat == "cpu":
153 agg = aggregateMemory(files)
154 data_to_csv(agg, output="%s/aggregated-cpu.csv" % out_dir)
155 if stat == "etcd_stats":
156 [keys, size] = aggregateEtcd(files)
157 data_to_csv(keys, output="%s/aggregated-etcd-keys.csv" % out_dir)
158 data_to_csv(size, output="%s/aggregated-etcd-size.csv" % out_dir)
159 if stat == "kafka_msg_per_topic":
160 agg = aggregateKafka(files)
161 data_to_csv(agg, output="%s/aggregated-kafka-msg-count.csv" % out_dir)
162
163
164def main(source, out_dir):
165 for stat in STATS_TO_AGGREGATE:
166 files = [f for f in glob.iglob('%s/**/%s.csv' % (source, stat), recursive=True)]
167 aggregateStats(stat, files, out_dir)
168
169
170if __name__ == "__main__":
171 parser = argparse.ArgumentParser(prog="stats-aggregation")
172 parser.add_argument("-o", "--output", help="Where to output the generated files", default="plots")
173 parser.add_argument("-s", "--source", help="Directory in which to look for stats", required=True)
174
175 args = parser.parse_args()
176 main(args.source, args.output)