VOL-182: ONU Remote Reboot support for Broadcom ONU
VOL-551: VOLTHA core needs to replay device flows to adapter during reconcile

Change-Id: I23a1ad136e0be3ac6899a7c033df0e1b070f38fb
diff --git a/tests/utests/voltha/core/test_proxy_child_msg_subscription.py b/tests/utests/voltha/core/test_proxy_child_msg_subscription.py
new file mode 100644
index 0000000..299867a
--- /dev/null
+++ b/tests/utests/voltha/core/test_proxy_child_msg_subscription.py
@@ -0,0 +1,144 @@
+from unittest import main, TestCase
+from mock import Mock, patch
+from voltha.main import Main
+import voltha.core.device_agent
+from voltha.core.flow_decomposer import *
+from voltha.core.adapter_agent import AdapterAgent
+from voltha.core.core import VolthaCore
+from voltha.adapters.loader import AdapterLoader
+from voltha.registry import registry, IComponent
+from voltha.protos import third_party
+from voltha.protos.device_pb2 import Device, Port, DeviceType
+from voltha.protos.logical_device_pb2 import LogicalDevice, LogicalPort
+from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
+from voltha.protos.common_pb2 import AdminState, LogLevel
+from twisted.internet import defer
+
+
+class test_proxy_child_msg_subscription(TestCase):
+
+    def setUp(self):
+        registry.register(
+                          'core',
+                             VolthaCore(
+                              instance_id=1,
+                              core_store_id=1,
+                              grpc_port=50060,
+                              version="1",
+                              log_level=LogLevel.INFO
+                             )
+                          ).start()
+
+        self.adapter_agent_ont = AdapterAgent("broadcom_onu", "BroadcomOnuAdapter")
+        self.adapter_agent_olt = AdapterAgent("asfvolt16_olt", "Asfvolt16Adapter")
+
+        # create and update the core with Broadcom ONU device type
+        self.onu_device_type =  DeviceType(
+                                  id='broadcom_onu',
+                                  vendor_id='BRCM',
+                                  adapter='broadcom_onu',
+                                  accepts_bulk_flow_update=True
+                                )
+
+        # create and update the core with Broadcom ONU device type
+        self.olt_device_type =  DeviceType(
+                                  id='asfvolt16_olt',
+                                  vendor_id='Edgecore',
+                                  adapter='asfvolt16_olt',
+                                  accepts_bulk_flow_update=True
+                                )
+
+        self.adapter_agent_ont._make_up_to_date('/device_types', 'broadcom_onu', self.onu_device_type)
+        self.adapter_agent_olt._make_up_to_date('/device_types', 'asfvolt16_olt', self.olt_device_type)
+
+    def tearDown(self):
+        self.adapter_agent_ont._remove_node('/device_types', self.onu_device_type)
+        self.adapter_agent_olt._remove_node('/device_types', self.olt_device_type)
+        del self.onu_device_type
+        del self.olt_device_type
+        del self.adapter_agent_ont
+        del self.adapter_agent_olt
+        registry.unregister('core')
+
+
+    # ~~~~~~~~~~~~~~~~~~~ TEST ~~~~~~~~~~~~~~~~~~~~~~
+
+    @patch('voltha.core.device_agent.DeviceAgent._set_adapter_agent', return_value='adapter_name')
+    @patch('voltha.core.device_agent.DeviceAgent._delete_device', return_value=defer.Deferred())
+    def test_subsribe_to_proxy_child_messages(self, mock_set_adapter_agent, mock_delete_device):
+
+        # Add OLT
+        olt_device = Device(id='olt', root=True, parent_id='id', type='asfvolt16_olt')
+        self.adapter_agent_olt.add_device(olt_device)
+
+        # Initially when no ONUs are attached to the OLT, the tx_event_subscriptions
+        # should be 0
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 0)
+
+        # Add 1st ONU to the OLT
+        onu1_proxy_address = Device.ProxyAddress(
+                                      device_id='olt',
+                                      channel_id=1,
+                                      onu_id=1,
+                                      onu_session_id=1
+                                     )
+        self.adapter_agent_olt.add_onu_device('olt',
+                                           1,
+                                          "BRCM",
+                                          onu1_proxy_address,
+                                          AdminState.UNKNOWN)
+
+        # The tx_event_subscriptions should increment to 1 after adding 1st ONU
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 1)
+
+        # Add 2nd ONU to the OLT
+        onu2_proxy_address = Device.ProxyAddress(
+                                      device_id='olt',
+                                      channel_id=2,
+                                      onu_id=2,
+                                      onu_session_id=2
+                                     )
+        self.adapter_agent_olt.add_onu_device('olt',
+                                           1,
+                                          "BRCM",
+                                          onu2_proxy_address,
+                                          AdminState.UNKNOWN)
+
+        # The tx_event_subscriptions should increment to 2 after adding 2nd ONU
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 2)
+
+        # Remove one ONU
+        children = self.adapter_agent_olt.get_child_devices('olt')
+        self.assertEqual(len(children), 2)
+        for child in children:
+            self.adapter_agent_olt.delete_child_device('olt', child.id)
+            break
+
+        # The tx_event_subscriptions should decrement to 1 after removing one ONU
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 1)
+
+        # Add new ONU to the OLT. The total ONUs on the OLT are now 2
+        onu3_proxy_address = Device.ProxyAddress(
+                                      device_id='olt',
+                                      channel_id=3,
+                                      onu_id=3,
+                                      onu_session_id=3
+                                     )
+        self.adapter_agent_olt.add_onu_device('olt',
+                                           1,
+                                          "BRCM",
+                                          onu3_proxy_address,
+                                          AdminState.UNKNOWN)
+
+        # The tx_event_subscriptions should increment to 2 after adding another ONU
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 2)
+
+        # delete all child devices (ONUs)
+        self.adapter_agent_olt.delete_all_child_devices('olt')
+
+        # There should be no tx_event_subscriptions after deleting all child devices (ONUs)
+        self.assertEqual(len(self.adapter_agent_olt._tx_event_subscriptions), 0)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/tests/utests/voltha/extensions/omci/test_omci.py b/tests/utests/voltha/extensions/omci/test_omci.py
index 1fa4397..49fa9b9 100644
--- a/tests/utests/voltha/extensions/omci/test_omci.py
+++ b/tests/utests/voltha/extensions/omci/test_omci.py
@@ -1125,6 +1125,20 @@
                     '%s: %s' % (k, v) for k, v in omci.object_data.items())
             )
 
+    def test_onu_reboot(self):
+        ref = '0016590a01000000000000000000000000000'\
+              '0000000000000000000000000000000000000'\
+              '00000000000028'
+
+        frame = OmciFrame(
+            transaction_id=22,
+            message_type=OmciReboot.message_id,
+            omci_message=OmciReboot(
+                entity_class=OntG.class_id,
+                 entity_id=0
+            )
+        )
+        self.assertGeneratedFrameEquals(frame, ref)
 
 if __name__ == '__main__':
     main()
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
index e0b901d..6b40a5f 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
@@ -1235,7 +1235,7 @@
             else:
                 self.log.info('Onu-is-not-configured', olt_id=self.olt_id,
                               intf_id=onu_device.proxy_address.channel_id,
-                              onu_data=onu_device.proxy_address.onu_id)
+                              onu_id=onu_device.proxy_address.onu_id)
             if tcont_data.name in v_ont_ani.tconts:
                 self.log.info('tcont-info-already-present',
                               tcont_info=tcont_data)
@@ -1259,6 +1259,8 @@
             # find way to generate uninqe number.
             id = tcont_data.alloc_id
             self.bal.delete_scheduler(id, 'upstream')
+            self.log.info('tcont-deleted-successfully',
+                          tcont_name=tcont_data.name)
             if tcont_data.name in v_ont_ani.tconts:
                 del v_ont_ani.tconts[tcont_data.name]
 
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index d421690..db5b940 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -21,7 +21,7 @@
 from uuid import uuid4
 import structlog
 from twisted.internet import reactor, task
-from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
 from zope.interface import implementer
 
 from voltha.adapters.interface import IAdapterInterface
@@ -37,7 +37,7 @@
 from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD, OFPPS_LINK_DOWN
 from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
-from voltha.protos.bbf_fiber_base_pb2 import VEnetConfig
+from voltha.protos.bbf_fiber_base_pb2 import VEnetConfig, VOntaniConfig
 from voltha.protos.bbf_fiber_traffic_descriptor_profile_body_pb2 import \
     TrafficDescriptorProfileData
 from voltha.protos.bbf_fiber_tcont_body_pb2 import TcontsConfigData
@@ -74,7 +74,7 @@
         self.descriptor = Adapter(
             id=self.name,
             vendor='Voltha project',
-            version='0.43',
+            version='0.44',
             config=AdapterConfig(log_level=LogLevel.INFO)
         )
         self.devices_handlers = dict()  # device_id -> BroadcomOnuHandler()
@@ -131,7 +131,11 @@
                 handler.reenable(device)
 
     def reboot_device(self, device):
-        raise NotImplementedError()
+        log.info('reboot-device', device_id=device.id)
+        if device.id in self.devices_handlers:
+            handler = self.devices_handlers[device.id]
+            if handler is not None:
+                handler.reboot()
 
     def download_image(self, device, request):
         raise NotImplementedError()
@@ -213,6 +217,8 @@
         if device:
             handler = self.devices_handlers[device.id]
             handler.event_messages.put(msg)
+        else:
+            log.error("device-not-found")
 
     def create_interface(self, device, data):
         log.info('create-interface', device_id=device.id)
@@ -351,11 +357,12 @@
             device.oper_status = OperStatus.DISCOVERED
             self.adapter_agent.update_device(device)
 
-        elif (event_msg['event'] == 'deactivate-onu'):
+        elif event_msg['event'] == 'deactivate-onu':
             device = self.adapter_agent.get_device(self.device_id)
             device.connect_status = ConnectStatus.UNREACHABLE
             device.oper_status = OperStatus.DISCOVERED
             self.adapter_agent.update_device(device)
+            self.disable_ports(device)
 
         elif event_msg['event'] == 'ranging-completed':
 
@@ -421,7 +428,7 @@
 
     def reconcile(self, device):
 
-        log.info('reconciling-broadcom-onu-device-starts')
+        self.log.info('reconciling-broadcom-onu-device-starts')
 
         # first we verify that we got parent reference and proxy info
         assert device.parent_id
@@ -434,7 +441,7 @@
         # TODO: Query ONU current status after reconcile and update.
         #       To be addressed in future commits.
 
-        log.info('reconciling-broadcom-onu-device-ends')
+        self.log.info('reconciling-broadcom-onu-device-ends')
 
     def update_logical_port(self, logical_device_id, port_id, state):
         self.log.info('updating-logical-port', logical_port_id=port_id,
@@ -445,24 +452,10 @@
         self.adapter_agent.update_logical_port(logical_device_id,
                                                logical_port)
 
-    @inlineCallbacks
     def delete(self, device):
         self.log.info('delete-onu')
-
-        # construct message
-        # MIB Reset - OntData - 0
-        self.send_mib_reset()
-        yield self.wait_for_response()
-
-        self.proxy_address = device.proxy_address
-        self.adapter_agent.unregister_for_proxied_messages(device.proxy_address)
-
-        ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
-        if ports is not None:
-            for port in ports:
-                if port.label == 'PON port':
-                    self.adapter_agent.delete_port(self.device_id, port)
-                    break
+        # The device is already deleted in delete_v_ont_ani(). No more
+        # handling needed here
 
     @inlineCallbacks
     def update_flow_table(self, device, flows):
@@ -574,7 +567,7 @@
 
                     elif action.type == fd.PUSH_VLAN:
                         _push_tpid = action.push.ethertype
-                        log.info('action-type-push-vlan',
+                        self.log.info('action-type-push-vlan',
                                  push_tpid=_push_tpid, in_port=_in_port)
                         if action.push.ethertype != 0x8100:
                             self.log.error('unhandled-tpid',
@@ -593,7 +586,7 @@
                             self.log.error('unsupported-action-set-field-type',
                                            field_type=_field.type)
                     else:
-                        log.error('unsupported-action-type',
+                        self.log.error('unsupported-action-type',
                                   action_type=action.type, in_port=_in_port)
 
                 #
@@ -627,7 +620,7 @@
                     '''
 
             except Exception as e:
-                log.exception('failed-to-install-flow', e=e, flow=flow)
+                self.log.exception('failed-to-install-flow', e=e, flow=flow)
 
     def get_tx_id(self):
         self.tx_id += 1
@@ -881,7 +874,7 @@
                 entity_class=MacBridgePortConfigurationData.class_id,
                 entity_id=entity_id,
                 data=dict(
-                    bridge_id_pointer = bridge_id,
+                    bridge_id_pointer=bridge_id,
                     port_num=port_id,
                     tp_type=tp_type,
                     tp_pointer=tp_id
@@ -1294,20 +1287,33 @@
         )
         self.send_omci_message(frame)
 
+    def send_reboot(self):
+        frame = OmciFrame(
+            transaction_id=self.get_tx_id(),
+            message_type=OmciReboot.message_id,
+            omci_message=OmciReboot(
+                entity_class=OntG.class_id,
+                entity_id=0
+            )
+        )
+        self.send_omci_message(frame)
+
     @inlineCallbacks
     def wait_for_response(self):
-        log.info('wait-for-response')
+        self.log.info('wait-for-response')
         try:
             response = yield self.incoming_messages.get()
-            log.info('got-response')
-            # resp = OmciFrame(response)
-            # resp.show()
+            self.log.info('got-response')
+            resp = OmciFrame(response)
+            resp.show()
+            returnValue(resp)
         except Exception as e:
+            returnValue(None)
             self.log.info('wait-for-response-exception', exc=str(e))
 
     @inlineCallbacks
     def message_exchange(self, onu, gem, cvid):
-        log.info('message_exchange', onu=onu, gem=gem, cvid=cvid)
+        self.log.info('message_exchange', onu=onu, gem=gem, cvid=cvid)
         # reset incoming message queue
         while self.incoming_messages.pending:
             _ = yield self.incoming_messages.get()
@@ -1506,6 +1512,30 @@
         self.adapter_agent.delete_logical_port_by_id(parent_logical_device_id,
                                                      'uni-{}'.format(port_no))
 
+
+    @inlineCallbacks
+    def delete_v_ont_ani(self, data):
+        self.log.info('deleting-v_ont_ani')
+
+        device = self.adapter_agent.get_device(self.device_id)
+        # construct message
+        # MIB Reset - OntData - 0
+        if device.connect_status != ConnectStatus.REACHABLE:
+            self.log.error('device-unreachable')
+            returnValue(None)
+
+        self.send_mib_reset()
+        yield self.wait_for_response()
+        self.proxy_address = device.proxy_address
+        self.adapter_agent.unregister_for_proxied_messages(device.proxy_address)
+
+        ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+        if ports is not None:
+            for port in ports:
+                if port.label == 'PON port':
+                    self.adapter_agent.delete_port(self.device_id, port)
+                    break
+
     def create_interface(self, data):
         if isinstance(data, VEnetConfig):
             parent_port_num = None
@@ -1520,7 +1550,7 @@
             parent_device = self.adapter_agent.get_device(onu_device.parent_id)
             logical_device_id = parent_device.parent_id
             assert logical_device_id
-            self.add_uni_port(onu_device, logical_device_id, 
+            self.add_uni_port(onu_device, logical_device_id,
                               data.name, parent_port_num)
 
             if parent_port_num is None:
@@ -1544,11 +1574,11 @@
             self.adapter_agent.add_port_reference_to_parent(self.device_id,
                                                             pon_port)
         else:
-            self.log.info('Not handled Yet')
+            self.log.info('Not-handled-Yet')
         return
 
     def update_interface(self, data):
-        self.log.info('Not Implemented yet')
+        self.log.info('Not-Implemented-yet')
         return
 
     def remove_interface(self, data):
@@ -1567,17 +1597,19 @@
             assert logical_device_id
             self.del_uni_port(onu_device, logical_device_id,
                               data.name, parent_port_num)
+        if isinstance(data, VOntaniConfig):
+            self.delete_v_ont_ani(data)
         else:
-            self.log.info('Not handled Yet')
+            self.log.info('not-handled-yet')
         return
 
     @inlineCallbacks
     def create_gemport(self, data):
-        log.info('create-gemport')
-	gem_port= GemportsConfigData()
-	gem_port.CopyFrom(data)
+        self.log.info('create-gemport')
+        gem_port = GemportsConfigData()
+        gem_port.CopyFrom(data)
         if gem_port.tcont_ref is None:
-            self.log.info('Recevied NULL Gem Port Data')
+            self.log.error('recevied-null-gem-port-data')
         else:
             #To-Do Need to see how the valuse 0x8001 is derived
             self.send_create_gem_port_network_ctp(gem_port.gemport_id,
@@ -1603,37 +1635,47 @@
 
     @inlineCallbacks
     def remove_gemport(self, data):
-        log.info('remove-gemport')
-        gem_port= GemportsConfigData()
+        self.log.info('remove-gemport')
+        gem_port = GemportsConfigData()
         gem_port.CopyFrom(data)
-        self.send_set_8021p_mapper_service_profile(0x8001, 0xFFFF)
+        device = self.adapter_agent.get_device(self.device_id)
+        if device.connect_status != ConnectStatus.REACHABLE:
+            self.log.error('device-unreachable')
+            returnValue(None)
 
+        self.send_set_8021p_mapper_service_profile(0x8001,
+                                                  0xFFFF)
         yield self.wait_for_response()
 
         self.send_delete_omci_mesage(GemInterworkingTp.class_id,
-                                     gem_port.gemport_id)
+                                    gem_port.gemport_id)
         yield self.wait_for_response()
 
         #To-Do Need to see how the valuse 0x8001 is derived
         self.send_delete_omci_mesage(GemPortNetworkCtp.class_id,
-                                     gem_port.gemport_id)
+                                gem_port.gemport_id)
         yield self.wait_for_response()
 
     @inlineCallbacks
     def create_tcont(self, tcont_data, traffic_descriptor_data):
-        log.info('create-tcont')
-	tcont = TcontsConfigData()
+        self.log.info('create-tcont')
+        tcont = TcontsConfigData()
         tcont.CopyFrom(tcont_data)
-        if (tcont.interface_reference is not None):
-                self.log.info('tcont created is', tcont= tcont.alloc_id)
-                self.send_set_tcont(0x8001, tcont.alloc_id)
-                yield self.wait_for_response()
-	else:
-            self.log.info('Recevied NULL tcont Data', tcont= tcont.alloc_id)
+        if tcont.interface_reference is not None:
+            self.log.debug('tcont', tcont=tcont.alloc_id)
+            self.send_set_tcont(0x8001, tcont.alloc_id)
+            yield self.wait_for_response()
+        else:
+            self.log.info('recevied-null-tcont-data', tcont=tcont.alloc_id)
 
     @inlineCallbacks
     def remove_tcont(self, tcont_data, traffic_descriptor_data):
-        log.info('remove-tcont')
+        self.log.info('remove-tcont')
+        device = self.adapter_agent.get_device(self.device_id)
+        if device.connect_status != ConnectStatus.REACHABLE:
+            self.log.error('device-unreachable')
+            returnValue(None)
+
         self.send_set_tcont(0x8001, 0xFFFF)
         yield self.wait_for_response()
 
@@ -1644,6 +1686,7 @@
     def disable(self, device):
         try:
             self.log.info('sending-admin-state-lock-towards-device', device=device)
+
             self.send_set_admin_state(0x0000, ADMIN_STATE_LOCK)
             yield self.wait_for_response()
             device = self.adapter_agent.get_device(device.id)
@@ -1688,3 +1731,43 @@
         except Exception as e:
             log.exception('exception-in-onu-reenable', exception=e)
 
+    @inlineCallbacks
+    def reboot(self):
+        self.log.info('reboot-device')
+        device = self.adapter_agent.get_device(self.device_id)
+        if device.connect_status != ConnectStatus.REACHABLE:
+            self.log.error("device-unreacable")
+            returnValue(None)
+
+        self.send_reboot()
+        response = yield self.wait_for_response()
+        if response is not None:
+            omci_response = response.getfieldval("omci_message")
+            success_code = omci_response.getfieldval("success_code")
+            if success_code == 0:
+                self.log.info("reboot-command-processed-successfully")
+                # Update the device connection and operation status
+                device = self.adapter_agent.get_device(self.device_id)
+                device.connect_status = ConnectStatus.UNREACHABLE
+                device.oper_status = OperStatus.DISCOVERED
+                self.adapter_agent.update_device(device)
+                self.disable_ports(device)
+            else:
+                self.log.info("reboot-failed", success_code=success_code)
+        else:
+            self.log.info("error-in-processing-reboot-response")
+
+    def disable_ports(self, onu_device):
+        self.log.info('disable-ports', device_id=self.device_id)
+
+        # Disable all ports on that device
+        self.adapter_agent.disable_all_ports(self.device_id)
+
+        parent_device = self.adapter_agent.get_device(onu_device.parent_id)
+        assert parent_device
+        logical_device_id = parent_device.parent_id
+        assert logical_device_id
+        ports = self.adapter_agent.get_ports(onu_device.id, Port.ETHERNET_UNI)
+        for port in ports:
+            port_id = 'uni-{}'.format(port.port_no)
+            self.update_logical_port(logical_device_id, port_id, OFPPS_LINK_DOWN)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index bd10a18..21dddec 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -735,7 +735,14 @@
     def delete_all_child_devices(self, parent_device_id):
         """ Remove all ONUs from a given OLT """
         devices = self.root_proxy.get('/devices')
-        children_ids = set(d.id for d in devices if d.parent_id == parent_device_id)
+        children_ids = set()
+        for device in devices:
+            if device.parent_id == parent_device_id:
+                children_ids.add(device.id)
+                topic = self._gen_tx_proxy_address_topic(device.proxy_address)
+                self.event_bus.unsubscribe(self._tx_event_subscriptions[topic])
+                del self._tx_event_subscriptions[topic]
+
         self.log.debug('devices-to-delete',
                        parent_id=parent_device_id,
                        children_ids=children_ids)
@@ -772,7 +779,12 @@
         onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id))
         if onu_device is not None:
             if onu_device.parent_id == parent_device_id:
-                self.log.debug('deleting-child-device', parent_device_id=parent_device_id, child_device_id=child_device_id)
+                self.log.debug('deleting-child-device',
+                   parent_device_id=parent_device_id,
+                   child_device_id=child_device_id)
+                topic = self._gen_tx_proxy_address_topic(onu_device.proxy_address)
+                self.event_bus.unsubscribe(self._tx_event_subscriptions[topic])
+                del self._tx_event_subscriptions[topic]
                 self._remove_node('/devices', child_device_id)
 
     def _gen_rx_proxy_address_topic(self, proxy_address):
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 1b01f62..47cb4d1 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -222,7 +222,7 @@
             if self.alarm_filter_agent is not None:
                 self.alarm_filter_agent.remove_device_filters(device)
 
-            log.debug('removed-device-filter', device)
+            log.debug('removed-device-filter', device=device)
 
             yield self.device_agents[device.id].stop(device)
             del self.device_agents[device.id]
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 224e8ec..129390b 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -303,12 +303,23 @@
             self.update_device(device)
             yield self.adapter_agent.adopt_device(device)
 
+    @inlineCallbacks
     def update_device(self, device):
         self.last_data = device  # so that we don't propagate back
         self.proxy.update('/', device)
         if device.oper_status == OperStatus.ACTIVE and device.connect_status == ConnectStatus.REACHABLE:
             self.log.info('replay-create-interfaces ', device=device.id)
             self.core.xpon_agent.replay_interface(device.id)
+            # if device accepts bulk flow update, lets just call that
+            if self.device_type.accepts_bulk_flow_update:
+                flows = self.flows_proxy.get('/')  # gather flows
+                groups = self.groups_proxy.get('/') # gather flow groups
+                self.log.info('replay-flows ', device=device.id)
+                yield self.adapter_agent.update_flows_bulk(
+                    device=device,
+                    flows=flows,
+                    groups=groups)
+
 
     def update_device_pm_config(self, device_pm_config, init=False):
         self.callback_data = init# so that we don't push init data
diff --git a/voltha/extensions/omci/omci_frame.py b/voltha/extensions/omci/omci_frame.py
index 28dc298..139a862 100644
--- a/voltha/extensions/omci/omci_frame.py
+++ b/voltha/extensions/omci/omci_frame.py
@@ -24,7 +24,8 @@
     OmciMibResetResponse, OmciMibReset, OmciMibUploadNextResponse, \
     OmciMibUploadNext, OmciMibUploadResponse, OmciMibUpload, \
     OmciGetAllAlarmsNextResponse, OmciAttributeValueChange, \
-    OmciTestResult, OmciAlarmNotification
+    OmciTestResult, OmciAlarmNotification, \
+    OmciReboot, OmciRebootResponse
 from voltha.extensions.omci.omci_messages import OmciCreateResponse
 
 
@@ -106,8 +107,14 @@
             PacketField("omci_message", None, OmciTestResult), align=36),
             lambda pkt: pkt.message_type == OmciTestResult.message_id),
 
-        # TODO add entries for remaining OMCI message types
+        ConditionalField(FixedLenField(
+            PacketField("omci_message", None, OmciReboot), align=36),
+            lambda pkt: pkt.message_type == OmciReboot.message_id),
+        ConditionalField(FixedLenField(
+            PacketField("omci_message", None, OmciRebootResponse), align=36),
+            lambda pkt: pkt.message_type == OmciRebootResponse.message_id),
 
+        # TODO add entries for remaining OMCI message types
 
         IntField("omci_trailer", 0x00000028)
     ]
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index e728bff..65e291a 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -331,3 +331,23 @@
         # ME Test specific message contents starts here
         # TODO: Can this be coded easily with scapy?
     ]
+
+
+class OmciReboot(OmciMessage):
+    name = "OmciOnuReboot"
+    message_id = 0x59
+    fields_desc = [
+        ShortField("entity_class", None),
+        ShortField("entity_id", 0),
+        ByteField("reboot_code", 0)
+    ]
+
+
+class OmciRebootResponse(OmciMessage):
+    name = "OmciOnuRebootResponse"
+    message_id = 0x39
+    fields_desc = [
+        ShortField("entity_class", None),
+        ShortField("entity_id", 0),
+        ByteField("success_code", 0)
+    ]