blob: 9c445762271d22b259a18148c655a11de04bbd6b [file] [log] [blame]
# Copyright 2017-2024 Open Networking Foundation (ONF) and the ONF Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import csv
import glob
import re
STATS_TO_AGGREGATE = [
'memory',
'etcd_stats',
'kafka_msg_per_topic',
'cpu',
]
def data_to_csv(data, output=None):
"""
Get a dictionary of lists saves a csv
:param data: the input dictionary
:type data: {metric: []values}
:param output: the destination file
:type output: str
"""
csv_file = open(output, "w+")
csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for k, v in data.items():
csv_writer.writerow([k] + v)
def aggergateCpu(files):
"""
Aggregates memory consuption from multiple runs of the pipeline
:param files: The list of files to aggregate
:type files: []str
:return: a dictionary of aggregate stats
:rtype: {podName: []values}
"""
cpu_values = {}
for file in files:
csv_file = open(file)
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
# for each row remove the random chars from the pod name
# and concat the values to the existing one
regex = "(.*)-[a-z0-9]{9,10}-[a-z0-9]{5}$"
x = re.search(regex, row[0])
if x is not None:
podName = x.groups(1)[0]
if not podName in cpu_values:
cpu_values[podName] = []
cpu_values[podName] = cpu_values[podName] + row[1:]
return cpu_values
def aggregateMemory(files):
"""
Aggregates memory consuption from multiple runs of the pipeline
:param files: The list of files to aggregate
:type files: []str
:return: a dictionary of aggregate stats
:rtype: {podName: []values}
"""
# this function assumes that the files are ordered by time
mem_values = {}
for file in files:
csv_file = open(file)
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
# for each row remove the random chars from the pod name
# and concat the values to the existing one
regex = "(.*)-[a-z0-9]{9,10}-[a-z0-9]{5}$"
x = re.search(regex, row[0])
if x is not None:
podName = x.groups(1)[0]
if not podName in mem_values:
mem_values[podName] = []
mem_values[podName] = mem_values[podName] + row[1:]
return mem_values
def aggregateEtcd(files):
etcd_size = {}
etcd_keys = {}
for file in files:
csv_file = open(file)
csv_reader = csv.reader(csv_file, delimiter=',')
regex = ".*\/([0-9-]{5})\/.*"
topology = re.search(regex, file).groups(1)[0]
for row in csv_reader:
if row[0] == "keys":
if topology not in etcd_keys:
etcd_keys[topology] = []
etcd_keys[topology].append(row[1])
if row[0] == "size":
if topology not in etcd_size:
etcd_size[topology] = []
etcd_size[topology].append(row[1])
return [etcd_keys, etcd_size]
def aggregateKafka(files):
kafka = {}
for file in files:
csv_file = open(file)
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
topic = row[0]
count = row[1]
if topic not in kafka:
kafka[topic] = []
kafka[topic].append(count)
return kafka
def aggregateStats(stat, files, out_dir):
# sort file in alphabetical order
# we assume that we always run the topologies in incremental order
files.sort()
if stat == "memory":
agg = aggregateMemory(files)
data_to_csv(agg, output="%s/aggregated-memory.csv" % out_dir)
if stat == "cpu":
agg = aggregateMemory(files)
data_to_csv(agg, output="%s/aggregated-cpu.csv" % out_dir)
if stat == "etcd_stats":
[keys, size] = aggregateEtcd(files)
data_to_csv(keys, output="%s/aggregated-etcd-keys.csv" % out_dir)
data_to_csv(size, output="%s/aggregated-etcd-size.csv" % out_dir)
if stat == "kafka_msg_per_topic":
agg = aggregateKafka(files)
data_to_csv(agg, output="%s/aggregated-kafka-msg-count.csv" % out_dir)
def main(source, out_dir):
for stat in STATS_TO_AGGREGATE:
files = [f for f in glob.iglob('%s/**/%s.csv' % (source, stat), recursive=True)]
aggregateStats(stat, files, out_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="stats-aggregation")
parser.add_argument("-o", "--output", help="Where to output the generated files", default="plots")
parser.add_argument("-s", "--source", help="Directory in which to look for stats", required=True)
args = parser.parse_args()
main(args.source, args.output)