from google.protobuf.json_format import MessageToDict
from time import time, sleep
from voltha.core.flow_decomposer import *
from voltha.protos import openflow_13_pb2 as ofp
import simplejson, jsonschema
import os
import subprocess
import select

from tests.itests.docutests.test_utils import \
    run_command_to_completion_with_raw_stdout, \
    run_command_to_completion_with_stdout_in_list

from common.utils.consulhelpers import verify_all_services_healthy

from voltha.protos.device_pb2 import Device
from tests.itests.voltha.rest_base import RestBase
from common.utils.consulhelpers import get_endpoint_from_consul
from voltha.protos.voltha_pb2 import AlarmFilter

LOCAL_CONSUL = "localhost:8500"
DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"

command_defs = dict(
    docker_ps="docker ps",
    docker_compose_start_all="docker-compose -f {} up -d "
        .format(DOCKER_COMPOSE_FILE),
    docker_stop_and_remove_all_containers="docker-compose -f {} down"
        .format(DOCKER_COMPOSE_FILE),
    docker_compose_start_voltha="docker-compose -f {} up -d voltha "
        .format(DOCKER_COMPOSE_FILE),
    docker_compose_stop_voltha="docker-compose -f {} stop voltha"
        .format(DOCKER_COMPOSE_FILE),
    docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
        .format(DOCKER_COMPOSE_FILE),
    kafka_topics="kafkacat -b {} -L",
    kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
    kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
)

ALARM_SCHEMA = {
    "type": "object",
    "properties": {
        "id": {"type": "string"},
        "type": {"type": "string"},
        "category": {"type": "string"},
        "state": {"type": "string"},
        "severity": {"type": "string"},
        "resource_id": {"type": "string"},
        "raised_ts": {"type": "number"},
        "reported_ts": {"type": "number"},
        "changed_ts": {"type": "number"},
        "description": {"type": "string"},
        "context": {
            "type": "object",
            "additionalProperties": {"type": "string"}
        }
    }
}


class TestConsulPersistence(RestBase):
    t0 = [time()]

    def pt(self, msg=''):
        t1 = time()
        print '%20.8f ms - %s' % (1000 * (t1 - TestConsulPersistence.t0[0]),
                                  msg)
        TestConsulPersistence.t0[0] = t1

    def test_all_scenarios(self):
        self.basic_scenario()
        self.data_integrity()

    def basic_scenario(self):
        # 1. Setup the test
        #       A. Stop and restart all containers (to start from clean)
        #       B. Setup the REST endpoint
        self.pt('Test setup - starts')
        self._stop_and_remove_all_containers()
        sleep(5)  # A small wait for the system to settle down
        self.start_all_containers()
        self.set_rest_endpoint()
        self.set_kafka_endpoint()
        self.pt('Test setup - ends')

        # 2. Test 1 - Verify no data is present in voltha
        self.pt('Test 1 - starts')
        self.verify_instance_has_no_data()
        self.pt('Test 1 - ends')

        # 3. Test 2 - Verify voltha data is preserved after a restart
        #       A. Add data to voltha
        #       B. Stop voltha instance only (data is in consul)
        #       C. Start a new voltha instance
        #       D. Verify the data is previoulsy set data is in the new voltha
        #           instance
        self.pt('Test 2 - starts')
        self.add_data_to_voltha_instance()
        instance_data_before = self.get_voltha_instance_data()
        self.stop_remove_start_voltha()
        instance_data_after = self.get_voltha_instance_data()
        self.assertEqual(instance_data_before, instance_data_after)
        self.pt('Test 2 - ends')

    def data_integrity(self):
        """
        This test goes through several voltha restarts along with variations 
        of configurations in between to ensure data integrity is preserved.
        
        During this test, the user will be prompted to start ponsim.  Use 
        these commands to run ponsim with 1 OLT and 4 ONUs. THis will also 
        enable alarm at a frequency of 5 seconds:
            sudo -s
            . ./env.sh
            ./ponsim/main.py -v -o 4 -a -f 5

         The user will also be prompted to enable port forwarding on ponmgmt 
         bridge. Use these commands:
            sudo -s
            echo 8 > /sys/class/net/ponmgmt/bridge/group_fwd_mask            
         """

        def prompt(input_func, text):
            val = input_func(text)
            return val

        def prompt_for_return(text):
            return raw_input(text)

        # 1. Setup the test
        #       A. Stop and restart all containers (to start from clean)
        #       B. Setup the REST endpoint
        self.pt('Test setup - starts')
        self._stop_and_remove_all_containers()
        sleep(5)  # A small wait for the system to settle down
        self.start_all_containers()
        self.consume_kafka_message_starting_at = time()
        self.set_rest_endpoint()
        self.set_kafka_endpoint()
        self.pt('Test setup - ends')

        # Get the user to start PONSIM as root
        prompt(prompt_for_return,
               '\nStart PONSIM as root with alarms enabled in another window ...')

        prompt(prompt_for_return,
               '\nEnsure port forwarding is set on ponmgnt ...')

        # 2. Configure some data on the volthainstance
        self.assert_no_device_present()
        olt = self.add_olt_device()
        olt_id = olt['id']
        self.pt(olt_id)
        self.verify_device_preprovisioned_state(olt_id)
        self.enable_device(olt_id)
        ldev_id = self.wait_for_logical_device(olt_id)
        onu_ids = self.wait_for_onu_discovery(olt_id)
        self.verify_logical_ports(ldev_id, 5)
        self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
        self.verify_olt_eapol_flow(olt_id)
        self.assert_kpis_event(olt_id)
        self.assert_alarm_generation(olt_id)
        alarm_filter = self.create_device_filter(olt_id)
        self.consume_kafka_message_starting_at = time()
        self.assert_alarm_generation(olt_id, event_present=False)

        # 3. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id)
        self.remove_device_filter(alarm_filter['id'])
        self.assert_alarm_generation(olt_id)

        self.pt('Voltha restart with initial set of data - successful')

        # 4. Ensure we can keep doing operation on the new voltha instance
        # as if nothing happened
        olt_ids, onu_ids = self.get_olt_onu_devices()
        self.disable_device(onu_ids[0])
        self.verify_logical_ports(ldev_id, 4)

        # 5. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id)
        alarm_filter = self.create_device_filter(olt_id)
        self.consume_kafka_message_starting_at = time()
        self.assert_alarm_generation(olt_id, event_present=False)
        self.remove_device_filter(alarm_filter['id'])
        self.assert_alarm_generation(olt_id)

        self.pt('Voltha restart with disabled ONU - successful')

        # 6. Do some more operations
        self.enable_device(onu_ids[0])
        self.verify_logical_ports(ldev_id, 5)
        self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
        self.verify_olt_eapol_flow(olt_id)
        self.disable_device(olt_ids[0])
        self.assert_all_onus_state(olt_ids[0], 'DISABLED', 'UNKNOWN')
        self.assert_no_logical_device()

        # 6. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id, event_present=False)
        self.assert_alarm_generation(olt_id, event_present=False)

        self.pt('Voltha restart with disabled OLT - successful')

        # 7. Enable OLT and very states of ONUs
        self.enable_device(olt_ids[0])
        self.assert_all_onus_state(olt_ids[0], 'ENABLED', 'ACTIVE')
        self.wait_for_logical_device(olt_ids[0])

        # 8. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id)
        self.assert_alarm_generation(olt_id)

        self.pt('Voltha restart with re-enabled OLT - successful')

        # 9. Install EAPOL and disable ONU
        self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
        self.verify_olt_eapol_flow(olt_id)
        self.disable_device(onu_ids[0])

        # 10. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id)
        self.assert_alarm_generation(olt_id)

        self.pt('Voltha restart with EAPOL and disabled ONU - successful')

        # 11. Delete the OLT and ONU
        self.delete_device(onu_ids[0])
        self.verify_logical_ports(ldev_id, 4)
        self.disable_device(olt_ids[0])
        self.delete_device(olt_ids[0])
        self.assert_no_device_present()

        # 13. Kill and restart the voltha instance
        self.assert_restart_voltha_successful()
        self.assert_kpis_event(olt_id, event_present=False)
        self.assert_alarm_generation(olt_id, event_present=False)

        self.pt('Voltha restart with no data - successful')

        # 14.  Verify no device present
        self.assert_no_device_present()

    def set_rest_endpoint(self):
        self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
                                                      'chameleon-rest')
        self.base_url = 'http://' + self.rest_endpoint

    def set_kafka_endpoint(self):
        self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')

    def assert_restart_voltha_successful(self):
        self.maxDiff = None
        instance_data_before = self.get_voltha_instance_data()
        self.stop_remove_start_voltha()
        instance_data_after = self.get_voltha_instance_data()
        self.assertEqual(instance_data_before, instance_data_after)

    def stop_remove_start_voltha(self):
        self.stop_voltha(remove=True)
        self.consume_kafka_message_starting_at = time()
        self.start_voltha()
        # REST endpoint may have changed after a voltha restart
        # Send a basic command to trigger the REST endpoint to refresh itself
        try:
            self.get_devices()
        except Exception as e:
            self.pt('get-devices-fail expected')
        # Wait for everything to settle
        self.wait_till('chameleon service HEALTHY',
                       lambda: verify_all_services_healthy(LOCAL_CONSUL,
                                                           service_name='chameleon-rest') == True,
                       timeout=30)
        # Chameleon takes some time to compile the protos and make them
        # available.  So let's wait 10 seconds
        sleep(10)
        # Update the REST endpoint info
        self.set_rest_endpoint()

    def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
        deadline = time() + timeout
        while time() < deadline:
            if predicate():
                return
            sleep(interval)
        self.fail('Timed out while waiting for condition: {}'.format(msg))

    def _stop_and_remove_all_containers(self):
        # check if there are any running containers first
        cmd = command_defs['docker_ps']
        out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
        self.assertEqual(rc, 0)
        if len(out) > 1:  # not counting docker ps header
            cmd = command_defs['docker_stop_and_remove_all_containers']
            out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
            self.assertEqual(rc, 0)

    def start_all_containers(self):
        t0 = time()

        # start all the containers
        self.pt("Starting all containers ...")
        cmd = command_defs['docker_compose_start_all']
        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
        self.assertEqual(rc, 0)

        self.pt("Waiting for voltha and chameleon containers to be ready ...")
        self.wait_till('voltha services HEALTHY',
                       lambda: verify_all_services_healthy(LOCAL_CONSUL,
                                                           service_name='voltha-grpc') == True,
                       timeout=10)
        self.wait_till('chameleon services HEALTHY',
                       lambda: verify_all_services_healthy(LOCAL_CONSUL,
                                                           service_name='chameleon-rest') == True,
                       timeout=10)

        # Chameleon takes some time to compile the protos and make them
        # available.  So let's wait 10 seconds
        sleep(10)

    def start_voltha(self):
        t0 = time()
        self.pt("Starting voltha ...")
        cmd = command_defs['docker_compose_start_voltha']
        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
        self.assertEqual(rc, 0)

        self.pt("Waiting for voltha to be ready ...")
        self.wait_till('voltha service HEALTHY',
                       lambda: verify_all_services_healthy(LOCAL_CONSUL,
                                                           service_name='voltha-grpc') == True,
                       timeout=30)
        self.pt("Voltha is ready ...")

    def stop_voltha(self, remove=False):
        t0 = time()
        self.pt("Stopping voltha ...")
        cmd = command_defs['docker_compose_stop_voltha']
        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
        self.assertEqual(rc, 0)

        if remove:
            cmd = command_defs['docker_compose_remove_voltha']
            out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
            self.assertEqual(rc, 0)

    def get_devices(self):
        devices = self.get('/api/v1/devices')['items']
        return devices

    def get_logical_devices(self):
        ldevices = self.get('/api/v1/logical_devices')['items']
        return ldevices

    def get_adapters(self):
        adapters = self.get('/api/v1/local/adapters')['items']
        return adapters

    def verify_instance_has_no_data(self):
        data = self.get_voltha_instance_data()
        self.assertEqual(data['logical_devices'], [])
        self.assertEqual(data['devices'], [])

    def add_data_to_voltha_instance(self):
        # Preprovision a bunch of ponsim devices
        self.n_olts = 100
        self.olt_devices = {}
        for i in xrange(self.n_olts):
            d = self.add_olt_device()
            self.olt_devices[d['id']] = d

    def get_voltha_instance_data(self):
        return self.get('/api/v1/local', headers={'get-depth': '-1'})

    def add_olt_device(self):
        device = Device(
            type='ponsim_olt',
            host_and_port='172.17.0.1:50060'
        )
        device = self.post('/api/v1/devices', MessageToDict(device),
                           expected_code=200)
        return device

    def get_olt_onu_devices(self):
        devices = self.get('/api/v1/devices')['items']
        olt_ids = []
        onu_ids = []
        for d in devices:
            if d['adapter'] == 'ponsim_olt':
                olt_ids.append(d['id'])
            elif d['adapter'] == 'ponsim_onu':
                onu_ids.append(d['id'])
            else:
                onu_ids.append(d['id'])
        return olt_ids, onu_ids

    def verify_device_preprovisioned_state(self, olt_id):
        # we also check that so far what we read back is same as what we get
        # back on create
        device = self.get('/api/v1/devices/{}'.format(olt_id))
        self.assertNotEqual(device['id'], '')
        self.assertEqual(device['adapter'], 'ponsim_olt')
        self.assertEqual(device['admin_state'], 'PREPROVISIONED')
        self.assertEqual(device['oper_status'], 'UNKNOWN')

    def enable_device(self, olt_id):
        path = '/api/v1/devices/{}'.format(olt_id)
        self.post(path + '/enable', expected_code=200)
        device = self.get(path)
        self.assertEqual(device['admin_state'], 'ENABLED')

        self.wait_till(
            'admin state moves to ACTIVATING or ACTIVE',
            lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
            timeout=0.5)

        # eventually, it shall move to active state and by then we shall have
        # device details filled, connect_state set, and device ports created
        self.wait_till(
            'admin state ACTIVE',
            lambda: self.get(path)['oper_status'] == 'ACTIVE',
            timeout=0.5)
        device = self.get(path)
        self.assertEqual(device['connect_status'], 'REACHABLE')

        ports = self.get(path + '/ports')['items']
        self.assertEqual(len(ports), 2)

    def wait_for_logical_device(self, olt_id):
        # we shall find the logical device id from the parent_id of the olt
        # (root) device
        device = self.get(
            '/api/v1/devices/{}'.format(olt_id))
        self.assertNotEqual(device['parent_id'], '')
        logical_device = self.get(
            '/api/v1/logical_devices/{}'.format(device['parent_id']))

        # the logical device shall be linked back to the hard device,
        # its ports too
        self.assertEqual(logical_device['root_device_id'], device['id'])

        logical_ports = self.get(
            '/api/v1/logical_devices/{}/ports'.format(
                logical_device['id'])
        )['items']
        self.assertGreaterEqual(len(logical_ports), 1)
        logical_port = logical_ports[0]
        self.assertEqual(logical_port['id'], 'nni')
        self.assertEqual(logical_port['ofp_port']['name'], 'nni')
        self.assertEqual(logical_port['ofp_port']['port_no'], 0)
        self.assertEqual(logical_port['device_id'], device['id'])
        self.assertEqual(logical_port['device_port_no'], 2)
        return logical_device['id']

    def find_onus(self, olt_id):
        devices = self.get('/api/v1/devices')['items']
        return [
            d for d in devices
            if d['parent_id'] == olt_id
        ]

    def wait_for_onu_discovery(self, olt_id):
        # shortly after we shall see the discovery of four new onus, linked to
        # the olt device
        self.wait_till(
            'find four ONUs linked to the olt device',
            lambda: len(self.find_onus(olt_id)) >= 4,
            2
        )
        # verify that they are properly set
        onus = self.find_onus(olt_id)
        for onu in onus:
            self.assertEqual(onu['admin_state'], 'ENABLED')
            self.assertEqual(onu['oper_status'], 'ACTIVE')

        return [onu['id'] for onu in onus]

    def assert_all_onus_state(self, olt_id, admin_state, oper_state):
        # verify all onus are in a given state
        onus = self.find_onus(olt_id)
        for onu in onus:
            self.assertEqual(onu['admin_state'], admin_state)
            self.assertEqual(onu['oper_status'], oper_state)

        return [onu['id'] for onu in onus]

    def assert_onu_state(self, onu_id, admin_state, oper_state):
        # Verify the onu states are correctly set
        onu = self.get('/api/v1/devices/{}'.format(onu_id))
        self.assertEqual(onu['admin_state'], admin_state)
        self.assertEqual(onu['oper_status'], oper_state)

    def verify_logical_ports(self, ldev_id, num_ports):

        # at this point we shall see num_ports logical ports on the
        # logical device
        logical_ports = self.get(
            '/api/v1/logical_devices/{}/ports'.format(ldev_id)
        )['items']
        self.assertGreaterEqual(len(logical_ports), num_ports)

        # verify that all logical ports are LIVE (state=4)
        for lport in logical_ports:
            self.assertEqual(lport['ofp_port']['state'], 4)

    def simulate_eapol_flow_install(self, ldev_id, olt_id, onu_ids):

        # emulate the flow mod requests that shall arrive from the SDN
        # controller, one for each ONU
        lports = self.get(
            '/api/v1/logical_devices/{}/ports'.format(ldev_id)
        )['items']

        # device_id -> logical port map, which we will use to construct
        # our flows
        lport_map = dict((lp['device_id'], lp) for lp in lports)
        for onu_id in onu_ids:
            # if eth_type == 0x888e => send to controller
            _in_port = lport_map[onu_id]['ofp_port']['port_no']
            req = ofp.FlowTableUpdate(
                id='ponsim1',
                flow_mod=mk_simple_flow_mod(
                    match_fields=[
                        in_port(_in_port),
                        vlan_vid(ofp.OFPVID_PRESENT | 0),
                        eth_type(0x888e)],
                    actions=[
                        output(ofp.OFPP_CONTROLLER)
                    ],
                    priority=1000
                )
            )
            res = self.post('/api/v1/logical_devices/{}/flows'.format(ldev_id),
                            MessageToDict(req,
                                          preserving_proto_field_name=True),
                            expected_code=200)

        # for sanity, verify that flows are in flow table of logical device
        flows = self.get(
            '/api/v1/logical_devices/{}/flows'.format(ldev_id))['items']
        self.assertGreaterEqual(len(flows), 4)

    def verify_olt_eapol_flow(self, olt_id):
        # olt shall have two flow rules, one is the default and the
        # second is the result of eapol forwarding with rule:
        # if eth_type == 0x888e => push vlan(1000); out_port=nni_port
        flows = self.get('/api/v1/devices/{}/flows'.format(olt_id))['items']
        self.assertEqual(len(flows), 2)
        flow = flows[1]
        self.assertEqual(flow['table_id'], 0)
        self.assertEqual(flow['priority'], 1000)

        # TODO refine this
        # self.assertEqual(flow['match'], {})
        # self.assertEqual(flow['instructions'], [])

    def disable_device(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.post(path + '/disable', expected_code=200)
        device = self.get(path)
        self.assertEqual(device['admin_state'], 'DISABLED')

        self.wait_till(
            'operational state moves to UNKNOWN',
            lambda: self.get(path)['oper_status'] == 'UNKNOWN',
            timeout=0.5)

        # eventually, the connect_state should be UNREACHABLE
        self.wait_till(
            'connest status UNREACHABLE',
            lambda: self.get(path)['connect_status'] == 'UNREACHABLE',
            timeout=0.5)

        # Device's ports should be INACTIVE
        ports = self.get(path + '/ports')['items']
        self.assertEqual(len(ports), 2)
        for p in ports:
            self.assertEqual(p['admin_state'], 'DISABLED')
            self.assertEqual(p['oper_status'], 'UNKNOWN')

    def delete_device(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.delete(path + '/delete', expected_code=200)
        device = self.get(path, expected_code=404)
        self.assertIsNone(device)

    def assert_no_device_present(self):
        path = '/api/v1/devices'
        devices = self.get(path)['items']
        self.assertEqual(devices, [])

    def assert_no_logical_device(self):
        path = '/api/v1/logical_devices'
        ld = self.get(path)['items']
        self.assertEqual(ld, [])

    def delete_device_incorrect_state(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.delete(path + '/delete', expected_code=400)

    def enable_unknown_device(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.post(path + '/enable', expected_code=404)

    def disable_unknown_device(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.post(path + '/disable', expected_code=404)

    def delete_unknown_device(self, id):
        path = '/api/v1/devices/{}'.format(id)
        self.delete(path + '/delete', expected_code=404)

    def assert_alarm_generation(self, device_id, event_present=True):
        # The olt device should start generating alarms periodically
        alarm = self.assert_alarm_event(device_id, event_present)

        if event_present:
            self.assertIsNotNone(alarm)
            # Make sure that the schema is valid
            self.assert_alarm_event_schema(alarm)

    # Validate a sample alarm for a specific device
    def assert_alarm_event(self, device_id, event_present=True):
        self.alarm_data = None

        def validate_output(data):
            alarm = simplejson.loads(data)

            if not alarm or \
                            'resource_id' not in alarm or \
                            'reported_ts' not in alarm:
                return False

            # Check for new alarms only
            if alarm['reported_ts'] > self.consume_kafka_message_starting_at:
                if alarm['resource_id'] == device_id:
                    self.alarm_data = alarm
                    return True

        cmd = command_defs['kafka_alarms'].format(self.kafka_endpoint)

        self.run_command_and_wait_until(cmd, validate_output, 30, 'alarms',
                                        expected_predicate_result=event_present)

        return self.alarm_data

    # Validate a sample kpi for a specific device
    def assert_kpis_event(self, device_id, event_present=True):

        def validate_output(data):
            kpis_data = simplejson.loads(data)

            if not kpis_data or \
                            'ts' not in kpis_data or \
                            'prefixes' not in kpis_data:
                return False

            # Check only new kpis
            if kpis_data['ts'] > self.consume_kafka_message_starting_at:
                for key, value in kpis_data['prefixes'].items():
                    if device_id in key:
                        return True
            return False

        cmd = command_defs['kafka_kpis'].format(self.kafka_endpoint)

        self.run_command_and_wait_until(cmd, validate_output, 60, 'kpis',
                                        expected_predicate_result=event_present)

    # Verify that the alarm follows the proper schema structure
    def assert_alarm_event_schema(self, alarm):
        try:
            jsonschema.validate(alarm, ALARM_SCHEMA)
        except Exception as e:
            self.assertTrue(
                False, 'Validation failed for alarm : {}'.format(e.message))

    def create_device_filter(self, device_id):
        rules = list()
        rule = dict()

        # Create a filter with a single rule
        rule['key'] = 'device_id'
        rule['value'] = device_id
        rules.append(rule)

        alarm_filter = AlarmFilter(rules=rules)
        alarm_filter = self.post('/api/v1/local/alarm_filters',
                                 MessageToDict(alarm_filter),
                                 expected_code=200)
        self.assertIsNotNone(alarm_filter)
        return alarm_filter

    def remove_device_filter(self, alarm_filter_id):
        path = '/api/v1/local/alarm_filters/{}'.format(alarm_filter_id)
        self.delete(path, expected_code=200)
        alarm_filter = self.get(path, expected_code=404)
        self.assertIsNone(alarm_filter)

    def run_command_and_wait_until(self, cmd, predicate, timeout, msg,
                                   expected_predicate_result=True):
        # Run until the predicate is True or timeout
        try:
            deadline = time() + timeout
            env = os.environ.copy()
            proc = subprocess.Popen(
                cmd,
                env=env,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                bufsize=1
            )
            poll_obj = select.poll()
            poll_obj.register(proc.stdout, select.POLLIN)
            while time() < deadline:
                poll_result = poll_obj.poll(0)
                if poll_result:
                    line = proc.stdout.readline()
                    if predicate(line):
                        try:
                            proc.terminate()
                            proc.wait()
                            subprocess.Popen(['reset']).wait()
                        except Exception as e:
                            print "Received exception {} when killing process " \
                                  "started with {}".format(repr(e), cmd)
                        if not expected_predicate_result:
                            self.fail(
                                'Predicate is True but is should be false:{}'.
                                    format(msg))
                        else:
                            return
            try:
                proc.terminate()
                proc.wait()
                subprocess.Popen(['reset']).wait()
            except Exception as e:
                print "Received exception {} when killing process " \
                      "started with {}".format(repr(e), cmd)

            if expected_predicate_result:
                self.fail(
                    'Timed out while waiting for condition: {}'.format(msg))

        except Exception as e:
            print 'Exception {} when running command:{}'.format(repr(e), cmd)
        return
