blob: 57a782f52f7670ba407b4e3562faedabea3a258f [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Alarm filter CLI commands
"""
from optparse import make_option, OptionValueError
from cmd2 import Cmd, options
from google.protobuf.empty_pb2 import Empty
from table import print_pb_list_as_table
from voltha_protos import voltha_pb2
from voltha_protos.events_pb2 import AlarmEventType, AlarmEventSeverity, AlarmEventCategory
class AlarmFiltersCli(Cmd):
def __init__(self, get_stub):
Cmd.__init__(self)
self.get_stub = get_stub
self.prompt = '(' + self.colorize(
self.colorize('alarm_filters', 'red'), 'bold') + ') '
def cmdloop(self):
self._cmdloop()
def help_show(self):
self.poutput(
'''
Display the list of configured filters.
Valid options:
-i FILTER_ID | --filter-id=FILTER_ID Display the filter rules for a specific filter id (OPTIONAL)
'''
)
@options([
make_option('-i', '--filter-id', action="store", dest='filter_id')
])
def do_show(self, line, opts):
stub = self.get_stub()
if not opts.filter_id:
result = stub.ListAlarmFilters(Empty())
print_pb_list_as_table("Alarm Filters:", result.filters, {}, self.poutput)
else:
result = stub.GetAlarmFilter(voltha_pb2.ID(id=opts.filter_id))
print_pb_list_as_table("Rules for Filter ID = {}:".format(opts.filter_id),
result.rules, {}, self.poutput)
@staticmethod
def construct_rule(raw_rule):
rule = dict()
rule_kv = raw_rule.strip().split(':')
if len(rule_kv) == 2:
rule['key'] = rule_kv[0].lower()
rule['value'] = rule_kv[1].lower()
else:
raise OptionValueError("Error: A rule must be a colon separated key/value pair")
return rule
def parse_filter_rules(option, opt_str, value, parser):
rules = getattr(parser.values, option.dest)
if rules is None:
rules = list()
rules.append(AlarmFiltersCli.construct_rule(value))
for arg in parser.rargs:
if (arg[:2] == "--" and len(arg) > 2) or (arg[:1] == "-" and len(arg) > 1 and arg[1] != "-"):
break
else:
rules.append(AlarmFiltersCli.construct_rule(arg))
setattr(parser.values, option.dest, rules)
else:
raise OptionValueError('Warning: The filter rule option can only be specified once')
def help_create(self):
types = list(
k for k, v in
AlarmEventType.DESCRIPTOR.enum_values_by_name.items())
categories = list(
k for k, v in
AlarmEventCategory.DESCRIPTOR.enum_values_by_name.items())
severities = list(
k for k, v in
AlarmEventSeverity.DESCRIPTOR.enum_values_by_name.items())
alarm_types = types
alarm_categories = categories
alarm_severities = severities
usage = '''
Create a new alarm filter.
Valid options:
-r rule:value ... | --filter-rules rule:value ... Specify one or more filter rules as key/value pairs (REQUIRED)
Valid rule keys and expected values:
id : Identifier of an incoming alarm
type : Type of an incoming alarm {}
category : Category of an incoming alarm {}
severity : Severity of an incoming alarm {}
resource_id : Resource identifier of an incoming alarm
device_id : Device identifier of an incoming alarm
Example:
# Filter any alarm that matches the following criteria
create -r type:environment severity:indeterminate
create -r device_id:754f9dcbe4a6
'''.format(alarm_types, alarm_categories, alarm_severities)
self.poutput(usage)
@options([
make_option('-r', '--filter-rules', help='<key>:<value>...', action="callback",
callback=parse_filter_rules, type='string', dest='filter_rules'),
])
def do_create(self, line, opts):
if opts.filter_rules:
stub = self.get_stub()
result = stub.CreateAlarmFilter(voltha_pb2.AlarmFilter(rules=opts.filter_rules))
print_pb_list_as_table("Rules for Filter ID = {}:".format(result.id),
result.rules, {}, self.poutput)
def help_delete(self):
self.poutput(
'''
Delete a specific alarm filter entry.
Valid options:
-i FILTER_ID | --filter-id=FILTER_ID Display the filter rules for a specific filter id (REQUIRED)
'''
)
@options([
make_option('-i', '--filter-id', action="store", dest='filter_id')
])
def do_delete(self, line, opts):
if not opts.filter_id:
self.poutput(self.colorize('Error: ', 'red') + 'Specify ' + \
self.colorize(self.colorize('"filter id"', 'blue'),
'bold') + ' to update')
return
stub = self.get_stub()
stub.DeleteAlarmFilter(voltha_pb2.ID(id=opts.filter_id))
def help_update(self):
types = list(
k for k, v in
AlarmEventType.DESCRIPTOR.enum_values_by_name.items())
categories = list(
k for k, v in
AlarmEventCategory.DESCRIPTOR.enum_values_by_name.items())
severities = list(
k for k, v in
AlarmEventSeverity.DESCRIPTOR.enum_values_by_name.items())
alarm_types = types
alarm_categories = categories
alarm_severities = severities
usage = '''
Update the filter rules for an existing alarm filter.
Valid options:
-i FILTER_ID | --filter-id=FILTER_ID Indicate the alarm filter identifier to update (REQUIRED)
-r rule:value ... | --filter-rules rule:value ... Specify one or more filter rules as key/value pairs (REQUIRED)
Valid rule keys and expected values:
id : Identifier of an incoming alarm
type : Type of an incoming alarm {}
category : Category of an incoming alarm {}
severity : Severity of an incoming alarm {}
resource_id : Resource identifier of an incoming alarm
device_id : Device identifier of an incoming alarm
Example:
# Filter any alarm that matches the following criteria
update -i 9da115b900bc -r type:environment severity:indeterminate resource_id:1554b0517a07
'''.format(alarm_types, alarm_categories, alarm_severities)
self.poutput(usage)
@options([
make_option('-r', '--filter-rules', help='<key>:<value>...', action="callback",
callback=parse_filter_rules, type='string', dest='filter_rules'),
make_option('-i', '--filter-id', action="store", dest='filter_id')
])
def do_update(self, line, opts):
if not opts.filter_id:
self.poutput(self.colorize('Error: ', 'red') + 'Specify ' + \
self.colorize(self.colorize('"filter id"', 'blue'),
'bold') + ' to update')
return
if opts.filter_rules:
stub = self.get_stub()
result = stub.UpdateAlarmFilter(
voltha_pb2.AlarmFilter(id=opts.filter_id, rules=opts.filter_rules)
)
print_pb_list_as_table("Rules for Filter ID = {}:".format(result.id),
result.rules, {}, self.poutput)