VOL-1076: CIG OLT Adapter Integration
Add CIG OLT Adapter with new proto files
Change-Id: Ie3d9d9d7544bd8884585bdf334150c249a4cdff3
diff --git a/docker/Dockerfile.protos b/docker/Dockerfile.protos
index 94bd4ed..db1470d 100644
--- a/docker/Dockerfile.protos
+++ b/docker/Dockerfile.protos
@@ -40,6 +40,11 @@
WORKDIR /protos/openolt
RUN make -f Makefile.protos build
+COPY voltha/adapters/cig_olt/protos/*.proto /protos/cig_olt/
+COPY voltha/adapters/cig_olt/protos/Makefile.protos /protos/cig_olt/Makefile.protos
+WORKDIR /protos/cig_olt
+RUN make -f Makefile.protos build
+
# Copy the files to a scrach based container to minimize its size
FROM ${REGISTRY}scratch
COPY --from=builder /protos/ /protos/
diff --git a/docker/Dockerfile.test_runner b/docker/Dockerfile.test_runner
index 72ea3d2..9743dc4 100644
--- a/docker/Dockerfile.test_runner
+++ b/docker/Dockerfile.test_runner
@@ -32,6 +32,7 @@
COPY --from=protos /protos/google/api /work/voltha/protos/third_party/google/api
COPY --from=protos /protos/asfvolt16_olt /work/voltha/adapters/asfvolt16_olt/protos
+COPY --from=protos /protos/cig_olt /work/voltha/protos
COPY --from=protos /protos/voltha /work/ofagent/protos
COPY --from=protos /protos/google/api /work/ofagent/protos/third_party/google/api
diff --git a/docker/Dockerfile.voltha_d b/docker/Dockerfile.voltha_d
index a8ab938..b65c9af 100644
--- a/docker/Dockerfile.voltha_d
+++ b/docker/Dockerfile.voltha_d
@@ -32,6 +32,7 @@
COPY --from=protos /protos/google/api /voltha/voltha/protos/third_party/google/api
COPY --from=protos /protos/asfvolt16_olt /voltha/voltha/adapters/asfvolt16_olt/protos
COPY --from=protos /protos/openolt /voltha/voltha/adapters/openolt/protos
+COPY --from=protos /protos/cig_olt /voltha/voltha/protos
# Exposing process and default entry point
# EXPOSE 8000
diff --git a/voltha/adapters/cig_olt/README.md b/voltha/adapters/cig_olt/README.md
new file mode 100644
index 0000000..a32ef05
--- /dev/null
+++ b/voltha/adapters/cig_olt/README.md
@@ -0,0 +1,2 @@
+# CIG OLT Device Adapter
+
diff --git a/voltha/adapters/cig_olt/__init__.py b/voltha/adapters/cig_olt/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/voltha/adapters/cig_olt/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/voltha/adapters/cig_olt/cig_olt.py b/voltha/adapters/cig_olt/cig_olt.py
new file mode 100644
index 0000000..37ec5da
--- /dev/null
+++ b/voltha/adapters/cig_olt/cig_olt.py
@@ -0,0 +1,680 @@
+#
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Cig 1-U OLT adapter.
+"""
+import structlog
+from twisted.internet import reactor,defer
+from zope.interface import implementer
+from voltha.adapters.iadapter import OltAdapter
+
+from cig_olt_handler import CigOltHandler
+from voltha.adapters.interface import IAdapterInterface
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.registry import registry
+
+_ = third_party
+log = structlog.get_logger()
+
+
+#@implementer(IAdapterInterface)
+class CigOltAdapter(OltAdapter):
+
+ supported_device_types = [
+ DeviceType(
+ id='cig_olt',
+ adapter='cig_olt',
+ accepts_bulk_flow_update=True
+ )
+ ]
+
+ def __init__(self, adapter_agent, config):
+ super(CigOltAdapter, self).__init__(adapter_agent=adapter_agent,
+ config=config,
+ device_handler_class = CigOltHandler,
+ name='cig_olt',
+ vendor='CIG Tech',
+ version='0.11',
+ device_type='cig_olt')
+
+ #self.adapter_agent = adapter_agent
+ #self.config = config
+ #self.descriptor = Adapter(
+ #id='cig_olt',
+ #vendor='Voltha project',
+ #version='0.4',
+ #config=AdapterConfig(log_level=LogLevel.INFO)
+ #)
+ self.devices_handlers = dict()
+ self.logical_device_id_to_root_device_id = dict()
+
+ # register for adapter messages
+ self.adapter_agent.register_for_inter_adapter_messages()
+
+
+ def start(self):
+ """
+ Called once after adapter instance is loaded. Can be used to async
+ initialization.
+
+ :return: (None or Deferred)
+ """
+ log.debug('cig starting')
+ log.info('cig started')
+
+ def stop(self):
+ """
+ Called once before adapter is unloaded. It can be used to perform
+ any cleanup after the adapter.
+
+ :return: (None or Deferred)
+ """
+ log.info('stopped')
+
+ def adapter_descriptor(self):
+ """
+ Return the adapter descriptor object for this adapter.
+
+ :return: voltha.Adapter grpc object (see voltha/protos/adapter.proto),
+ with adapter-specific information and config extensions.
+ """
+ log.debug('get descriptor')
+ return self.descriptor
+
+ def device_types(self):
+ """
+ Return list of device types supported by the adapter.
+
+ :return: voltha.DeviceTypes protobuf object, with optional type
+ specific extensions.
+ """
+ log.debug('get device_types', items=self.supported_device_types)
+ return DeviceTypes(items=self.supported_device_types)
+
+ def health(self):
+ """
+ Return a 3-state health status using the voltha.HealthStatus message.
+
+ :return: Deferred or direct return with voltha.HealthStatus message
+ """
+ log.debug('get health')
+ return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+ def change_master_state(self, master):
+ """
+ Called to indicate if plugin shall assume or lose master role. The
+ master role can be used to perform functions that must be performed
+ from a single point in the cluster. In single-node deployments of
+ Voltha, the plugins are always in master role.
+
+ :param master: (bool) True to indicate the mastership needs to be
+ assumed; False to indicate that mastership needs to be abandoned.
+ :return: (Deferred) which is fired by the adapter when mastership is
+ assumed/dropped, respectively.
+ """
+ log.debug('change_master_state', master=master)
+ raise NotImplementedError()
+
+ def adopt_device(self, device):
+ """
+ Make sure the adapter looks after given device. Called when a device
+ is provisioned top-down and needs to be activated by the adapter.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+ log.info('cig adopt-device', device=device)
+ self.devices_handlers[device.id] = CigOltHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+ return device
+ #raise NotImplementedError()
+
+ def reconcile_device(self, device):
+ """
+ Make sure the adapter looks after given device. Called when this device has
+ changed ownership from another Voltha instance to this one (typically, this
+ occurs when the previous voltha instance went down).
+
+ :param device: A voltha.Device object, with possible device-type specific
+ extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+ log.info('reconcile-device', device=device)
+ #self.devices_handlers[device.id] = AdtranOltHandler(self, device.id)
+ #reactor.callLater(0, self.devices_handlers[device.id].activate, device, reconciling=True)
+ #return device
+ raise NotImplementedError()
+
+ def abandon_device(self, device):
+ """
+ Make sure the adapter no longer looks after device. This is called
+ if device ownership is taken over by another Voltha instance.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge abandonment.
+ """
+ log.info('abandon-device', device=device)
+ raise NotImplementedError()
+
+ def disable_device(self, device):
+ """
+ This is called when a previously enabled device needs to be disabled
+ based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge disabling the device.
+ """
+ log.info('disable-device', device=device)
+ #raise NotImplementedError()
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ reactor.callLater(0,handler.disable)
+ return device
+
+ def reenable_device(self, device):
+ """
+ This is called when a previously disabled device needs to be enabled
+ based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge re-enabling the device.
+ """
+ log.info('reenable_device', device=device)
+ #raise NotImplementedError()
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ d = defer.Deferred()
+ reactor.callLater(0,handler.reenable,done_deferred=d)
+ return d
+
+ def reboot_device(self, device):
+ """
+ This is called to reboot a device based on a NBI call. The admin
+ state of the device will not change after the reboot
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the reboot.
+ """
+ log.info('reboot_device', device=device)
+ #raise NotImplementedError()
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ reactor.callLater(0,handler.reboot)
+ return device
+
+ def download_image(self, device, request):
+ """
+ This is called to request downloading a specified image into the standby partition
+ of a device based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge the download.
+ """
+ log.info('image_download', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.start_download(device, request, defer.Deferred())
+
+ def get_image_download_status(self, device, request):
+ """
+ This is called to inquire about a requested image download status based
+ on a NBI call. The adapter is expected to update the DownloadImage DB object
+ with the query result
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('get_image_download', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.download_status(device, request, defer.Deferred())
+
+ def cancel_image_download(self, device, request):
+ """
+ This is called to cancel a requested image download
+ based on a NBI call. The admin state of the device will not
+ change after the download.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('cancel_image_download', device=device)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.cancel_download(device, request, defer.Deferred())
+
+ def activate_image_update(self, device, request):
+ """
+ This is called to activate a downloaded image from
+ a standby partition into active partition.
+ Depending on the device implementation, this call
+ may or may not cause device reboot.
+ If no reboot, then a reboot is required to make the
+ activated image running on device
+ This call is expected to be non-blocking.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('activate_image_update', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.activate_image(device, request, defer.Deferred())
+
+ def revert_image_update(self, device, request):
+ """
+ This is called to deactivate the specified image at
+ active partition, and revert to previous image at
+ standby partition.
+ Depending on the device implementation, this call
+ may or may not cause device reboot.
+ If no reboot, then a reboot is required to make the
+ previous image running on device
+ This call is expected to be non-blocking.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('revert_image_update', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.revert_image(device, request, defer.Deferred())
+
+
+
+ def self_test_device(self, device):
+ """
+ This is called to Self a device based on a NBI call.
+ :param device: A Voltha.Device object.
+ :return: Will return result of self test
+ """
+ from voltha.protos.voltha_pb2 import SelfTestResponse
+ log.info('self-test-device', device=device.id)
+ #raise NotImplementedError()
+ return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
+
+ def delete_device(self, device):
+ """
+ This is called to delete a device from the PON based on a NBI call.
+ If the device is an OLT then the whole PON will be deleted.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the deletion.
+ """
+ log.info('delete-device', device=device)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ reactor.callLater(0, handler.delete)
+ return device
+
+ def get_device_details(self, device):
+ """
+ This is called to get additional device details based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the retrieval of
+ additional details.
+ """
+ log.debug('get_device_details', device=device)
+ raise NotImplementedError()
+
+ def update_flows_bulk(self, device, flows, groups):
+ """
+ Called after any flow table change, but only if the device supports
+ bulk mode, which is expressed by the 'accepts_bulk_flow_update'
+ capability attribute of the device type.
+
+ :param device: A Voltha.Device object.
+ :param flows: An openflow_v13.Flows object
+ :param groups: An openflow_v13.Flows object
+ :return: (Deferred or None)
+ """
+ log.info('bulk-flow-update', device_id=device.id, flows=flows,
+ groups=groups)
+ #assert len(groups.items) == 0, "Cannot yet deal with groups"
+ handler = self.devices_handlers[device.id]
+ return handler.update_flow_table(flows.items, device)
+ #raise NotImplementedError()
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ """
+ [This mode is not supported yet.]
+
+ :param device: A Voltha.Device object.
+ :param flow_changes:
+ :param group_changes:
+ :return:
+ """
+ log.debug('update_flows_incrementally', device=device, flow_changes=flow_changes,
+ group_changes=group_changes)
+ raise NotImplementedError()
+
+ def update_pm_config(self, device, pm_configs):
+ """
+ Called every time a request is made to change pm collection behavior
+ :param device: A Voltha.Device object
+ :param pm_configs: A Pms
+ """
+ log.debug('update_pm_config', device=device, pm_configs=pm_configs)
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ """
+ Forward a msg to a child device of device, addressed by the given
+ proxy_address=Device.ProxyAddress().
+
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device.
+ :param msg: (str) The actual message to send.
+ :return: (Deferred(None) or None) The return of this method should
+ indicate that the message was successfully *sent*.
+ """
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ handler = self.devices_handlers[proxy_address.device_id]
+ handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ """
+ Pass an async message (arrived via a proxy) to this device.
+
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device. Note this is the proxy_address with which the adapter
+ had to register prior to receiving proxied messages.
+ :param msg: (str) The actual message received.
+ :return: None
+ """
+ log.debug('receive_proxied_message', proxy_address=proxy_address, msg=msg)
+ raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ """
+ Pass a packet_out message content to adapter so that it can forward it
+ out to the device. This is only called on root devices.
+
+ :param logical_device_id:
+ :param egress_port_no: egress logical port number
+ :param msg: actual message
+ :return: None
+ """
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+ def ldi_to_di(ldi):
+ di = self.logical_device_id_to_root_device_id.get(ldi)
+ if di is None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ di = logical_device.root_device_id
+ self.logical_device_id_to_root_device_id[ldi] = di
+ return di
+
+ device_id = ldi_to_di(logical_device_id)
+ handler = self.devices_handlers[device_id]
+ handler.packet_out(egress_port_no, msg)
+
+ def receive_inter_adapter_message(self, msg):
+ """
+ Called when the adapter recieves a message that was sent to it directly
+ from another adapter. An adapter may register for these messages by calling
+ the register_for_inter_adapter_messages() method in the adapter agent.
+ Note that it is the responsibility of the sending and receiving
+ adapters to properly encode and decode the message.
+ :param msg: The message contents.
+ :return: None
+ """
+ log.info('rx_inter_adapter_msg')
+ raise NotImplementedError()
+
+ def suppress_alarm(self, filter):
+ log.info('suppress_alarm', filter=filter)
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ log.info('unsuppress_alarm', filter=filter)
+ raise NotImplementedError()
+
+ # PON Mgnt APIs #
+ def create_interface(self, device, data):
+ """
+ API to create various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
+ log.info('create-interface', data=data)
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_interface(data)
+
+ def update_interface(self, device, data):
+ """
+ API to update various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
+ log.info('update-interface', data=data)
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_interface(data)
+
+ def remove_interface(self, device, data):
+ """
+ API to delete various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
+ log.info('remove-interface', data=data)
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_interface(data)
+
+
+ def receive_onu_detect_state(self, device_id, state):
+ raise NotImplementedError()
+
+ def create_tcont(self, device, tcont_data, traffic_descriptor_data):
+ """
+ API to create tcont object in the devices
+ :param device: device id
+ :tcont_data: tcont data object
+ :traffic_descriptor_data: traffic descriptor data object
+ :return: None
+ """
+ log.info('create-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_tcont(tcont_data, traffic_descriptor_data)
+
+ def update_tcont(self, device, tcont_data, traffic_descriptor_data):
+ """
+ API to update tcont object in the devices
+ :param device: device id
+ :tcont_data: tcont data object
+ :traffic_descriptor_data: traffic descriptor data object
+ :return: None
+ """
+ log.info('update-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_tcont(tcont_data, traffic_descriptor_data)
+
+ def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
+ """
+ API to delete tcont object in the devices
+ :param device: device id
+ :tcont_data: tcont data object
+ :traffic_descriptor_data: traffic descriptor data object
+ :return: None
+ """
+ log.info('remove-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_tcont(tcont_data, traffic_descriptor_data)
+
+ def create_gemport(self, device, data):
+ """
+ API to create gemport object in the devices
+ :param device: device id
+ :data: gemport data object
+ :return: None
+ """
+ log.info('create-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_gemport(data)
+
+ def update_gemport(self, device, data):
+ """
+ API to update gemport object in the devices
+ :param device: device id
+ :data: gemport data object
+ :return: None
+ """
+ log.info('update-gemport', data=data)
+
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_gemport(data)
+
+ def remove_gemport(self, device, data):
+ """
+ API to delete gemport object in the devices
+ :param device: device id
+ :data: gemport data object
+ :return: None
+ """
+ log.info('remove-gemport', data=data)
+
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_gemport(data)
+
+ def create_multicast_gemport(self, device, data):
+ """
+ API to create multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ log.info('create-mcast-gemport', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.create_multicast_gemport(data)
+
+ def update_multicast_gemport(self, device, data):
+ """
+ API to update multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ log.info('update-mcast-gemport', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.update_multicast_gemport(data)
+
+ def remove_multicast_gemport(self, device, data):
+ """
+ API to delete multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ log.info('remove-mcast-gemport', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.remove_multicast_gemport(data)
+
+ def create_multicast_distribution_set(self, device, data):
+ """
+ API to create multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ log.info('create-mcast-distribution-set', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.create_multicast_distribution_set(data)
+
+ def update_multicast_distribution_set(self, device, data):
+ """
+ API to update multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ log.info('update-mcast-distribution-set', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.create_multicast_distribution_set(data)
+
+ def remove_multicast_distribution_set(self, device, data):
+ """
+ API to delete multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ log.info('remove-mcast-distribution-set', data=data)
+
+ raise NotImplementedError()
+ #if device.id in self.devices_handlers:
+ #handler = self.devices_handlers[device.id]
+ #if handler is not None:
+ #handler.create_multicast_distribution_set(data)
+
+
+
diff --git a/voltha/adapters/cig_olt/cig_olt_device.py b/voltha/adapters/cig_olt/cig_olt_device.py
new file mode 100644
index 0000000..ce754ce
--- /dev/null
+++ b/voltha/adapters/cig_olt/cig_olt_device.py
@@ -0,0 +1,808 @@
+#
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import base64
+import binascii
+import json
+import pprint
+import random
+import os
+import structlog
+
+from enum import Enum
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos.bbf_fiber_types_pb2 import *
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Device
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import *
+from voltha.protos.openflow_13_pb2 import *
+
+from voltha.protos.olt_common_pb2 import *
+from voltha.protos.olt_d_pb2 import *
+from voltha.protos.olt_pon_pb2 import *
+from voltha.protos.olt_switch_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_common_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_d_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_pon_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_switch_pb2 import *
+
+from cig_olt_zmq import *
+from cig_olt_xpon import *
+
+log = structlog.get_logger()
+
+class NniPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+
+ def __init__(self, nni_port, parent_id):
+ self.log = structlog.get_logger(device_id=parent_id)
+ self._parent_id = parent_id
+ self._port_no = nni_port.port_no
+ self._mac_address = nni_port.mac_address
+ if self._mac_address is None:
+ self._mac_address = "11:22:33:44:55:66"
+
+ if nni_port.port_type == OLT_NNI_PORT_TYPE_10G:
+ self._ofp_capabilities = OFPPF_10GB_FD | OFPPF_FIBER
+ self._current_speed = OFPPF_10GB_FD
+ self._max_speed = OFPPF_10GB_FD
+ elif nni_port.port_type == OLT_NNI_PORT_TYPE_40G:
+ self._ofp_capabilities = OFPPF_40GB_FD | OFPPF_FIBER
+ self._current_speed = OFPPF_40GB_FD
+ self._max_speed = OFPPF_40GB_FD
+ elif nni_port.port_type == OLT_NNI_PORT_TYPE_100G or nni_port.port_type == OLT_NNI_PORT_TYPE_80G:
+ self._ofp_capabilities = OFPPF_100GB_FD | OFPPF_FIBER
+ self._current_speed = OFPPF_100GB_FD
+ self._max_speed = OFPPF_100GB_FD
+ else:
+ return error
+
+ self._ofp_state = OFPPS_LIVE
+
+ self.startup = None
+ self._port = None
+ self._logical_port = None
+
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
+ log.info('Creating NNI Port {}'.format(self._port_no))
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ self.log.info('get nni gort', self._port_no)
+ if self._port is None:
+ self._port = Port(port_no=self._port_no, \
+ label='NNI port', \
+ type=Port.ETHERNET_NNI, \
+ admin_state=self._admin_state, \
+ oper_status=self._oper_status)
+ return self._port
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port
+ :return: VOLTHA logical port or None if not supported
+ """
+ if self._logical_port is None:
+ openflow_port = ofp_port(port_no=self._port_no, \
+ hw_addr=mac_str_to_tuple(self._mac_address), \
+ name='nni' + str(self._port_no), \
+ config=0, \
+ state=self._ofp_state, \
+ curr=self._ofp_capabilities, \
+ advertised=self._ofp_capabilities, \
+ peer=self._ofp_capabilities, \
+ curr_speed=self._current_speed, \
+ max_speed=self._max_speed)
+
+ self._logical_port = LogicalPort(id='nni{}'.format(self._port_no), \
+ ofp_port=openflow_port, \
+ device_id=self._parent_id, \
+ device_port_no=self._port_no, \
+ root_port=True)
+ return self._logical_port
+
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleting')
+ #self.state = DELETING
+ #self.cancel_deferred()
+
+class PonPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+ MAX_ONUS_SUPPORTED = 1024
+ DEFAULT_ENABLED = False
+
+ #def __init__(self, port_no, parent_id):
+ def __init__(self, port_no, parent):
+ # TODO: Weed out those properties supported by common 'Port' object
+ self.log = structlog.get_logger(device_id=parent.device_id)
+ self._parent_id = parent.device_id
+ self._parent = parent
+ self._pon_id = port_no
+ self._port_no = port_no
+ self._name = 'xpon 0/{}'.format(port_no)
+ self._label = 'pon-{}'.format(port_no)
+ self.startup = None
+ self._port = None
+
+ self._discovery_tick = 20.0
+ self._no_onu_discover_tick = self._discovery_tick / 2
+ self._sync_tick = 20.0
+ self._in_sync = False
+ self._expedite_sync = False
+ self._expedite_count = 0
+
+
+
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
+
+ #self.onus = {}
+ #self._onus = {} # serial_number-base64 -> ONU (allowed list)
+ self._onu_by_id = {} # onu-id -> ONU
+
+ self._deferred = None # General purpose
+ self._discovery_deferred = None # Specifically for ONU discovery
+ self._sync_deferred = None # For sync of PON config to hardware
+
+ self._active_los_alarms = set() # ONU-ID
+
+ # xPON configuration
+
+ self._xpon_name = None
+ self._enabled = False
+ self._downstream_fec_enable = False
+ self._upstream_fec_enable = False
+ self._deployment_range = 25000
+ self._authentication_method = 'serial-number'
+
+
+ log.info('Creating pon Port {}'.format(self._port_no))
+
+ def __del__(self):
+ #self.stop()
+ pass
+
+ def __str__(self):
+ return "PonPort-{}: Admin: {}, Oper: {}, OLT: {}".format(self._label,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ self.log.info('get pon gort', self._port_no)
+ if self._port is None:
+ self._port = Port( \
+ port_no=100 + self._port_no, \
+ #port_no=self._port_no, \
+ label=self._label, \
+ type=Port.PON_OLT, \
+ admin_state=self._admin_state, \
+ oper_status=self._oper_status \
+ )
+
+ return self._port
+
+ @property
+ def port_number(self):
+ return self._port_no
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def pon_id(self):
+ return self._pon_id
+
+ @property
+ def olt(self):
+ return self._parent
+
+ def onu(self, onu_id):
+ return self._onu_by_id.get(onu_id)
+
+ #def onu_add(self, onu_id, sn):
+ def onu_add(self, onu_info):
+
+ sn = onu_info['serial-number']
+ onu_id = onu_info['onu-id']
+ self.log.info('Add ONU: {}'.format(sn))
+
+ if onu_id not in self._onu_by_id:
+ # Newly found and not enabled ONU, enable it now if not at max
+
+ if len(self._onu_by_id) < self.MAX_ONUS_SUPPORTED:
+ # TODO: For now, always allow any ONU to be activated
+
+ #self.on_new_onu_discovered(onu)
+ #self.onus[sn] = str(onu_id)
+ self._onu_by_id[onu_id] = Onu(onu_info)
+ olt = self.olt
+ onu = self.onu(onu_id)
+ if (olt.work_mode == OLT_MODE_AUTO) and (onu._ranging_status == 1) and (onu._config_status == 0):
+ #add onu to oltd
+ self.onu_add_msg_send(onu)
+ onu._status_machine = 'activating'
+
+ elif (olt.work_mode == OLT_MODE_CONFIG) and (onu._config_status == 1):
+ #add onu to oltd
+ self.onu_add_msg_send(onu)
+ onu._status_machine = 'activating'
+ else:
+ pass
+ else:
+ self.log.warning('Maximum number of ONUs already provisioned')
+ else:
+ # ONU has been enabled
+ pass
+
+
+ def onu_update(self, onu_info):
+
+ if onu_info['serial-number'] is not None:
+ sn = onu_info['serial-number']
+ self.log.info('update ONU: {}'.format(sn))
+
+ onu_id = onu_info['onu-id']
+ onu = self.onu(onu_id)
+ if onu is None:
+ return
+
+ olt = self.olt
+ if olt.work_mode == OLT_MODE_AUTO:
+ return
+
+ if onu_info['ranging-status'] is not None:
+ onu._ranging_status = onu_info['ranging-status']
+
+ if onu_info['config-status'] is not None:
+ if (onu_info['config-status'] == 1) and (onu._config_status == 0):
+ #add onu to oltd
+ self.onu_add_msg_send(onu)
+ onu._status_machine = 'activating'
+ onu._config_status = onu_info['config-status']
+
+ if (onu_info['config-status'] == 0) and (onu._config_status == 1):
+ self.onu_delete_msg_send(onu)
+ del self._onu_by_id[onu_id]
+
+
+ if onu_info['channel-id'] is not None:
+ onu._channel_id = onu_info['channel-id']
+
+ if onu_info['status_machine'] is not None:
+ onu._status_machine = onu_info['status_machine']
+
+ if onu_info['upstream-fec'] is not None:
+ if onu._status_machine == 'activating' or onu._status_machine == 'activation_successful':
+ if onu._upstream_fec != onu_info['upstream-fec']:
+ onu._upstream_fec = onu_info['upstream-fec']
+ self.onu_update_msg_send(onu)
+ else :
+ onu._upstream_fec = onu_info['upstream-fec']
+
+
+ def onu_del(self, onu_id):
+ self.log.info('Delete ONU: {}'.format(onu_id))
+
+ if onu_id in self._onu_by_id:
+ onu = self.onu(onu_id)
+ if onu is None:
+ return
+ if onu._status_machine != 'init':
+ self.onu_delete_msg_send(onu)
+
+ del self._onu_by_id[onu_id]
+
+ else:
+ # ONU has been delete
+ pass
+
+
+ def onu_exist_check(self, onu_id):
+ return onu_id in self._onu_by_id
+
+ def onu_add_msg_send(self, onu):
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_D_ADD_ONU,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,1)
+
+ olt = self.olt
+ if olt.work_mode == OLT_MODE_AUTO:
+ onu_add = OltDAddOnu(
+ pon_port=onu._pon_id,
+ onu_id=onu.onu_id,
+ sn=onu._serial_number_string,
+ registration_id= "",
+ fec_upstream= onu._upstream_fec,
+ authentication_method=SERIAL_NUMBER
+ )
+ else:
+ channel_partition_name = onu._channel_partition
+ cp = olt.channel_partitions.get(channel_partition_name)
+
+ if cp['authentication-method'] == 'serial-number':
+ auth_method = SERIAL_NUMBER
+ elif cp['authentication-method'] == 'loid':
+ auth_method = LOID
+ elif cp['authentication-method'] == 'registration-id':
+ auth_method = REGISTRATION_ID
+ elif cp['authentication-method'] == 'omci':
+ auth_method = OMCI
+ elif cp['authentication-method'] == 'dot1x':
+ auth_method = DOT1X
+ else :
+ auth_method = SERIAL_NUMBER
+
+ #self.log.info('onu_add_msg_send cp', cp)
+ #self.log.info('onu_add_msg_send auth_method', auth_method)
+ #ani = olt.ont_anis.get(onu._vani_name)
+ #if ani is not None:
+ #ups_fec = ani['upstream-fec']
+ #else:
+ #ups_fec = 1
+
+ onu_add = OltDAddOnu(
+ pon_port=onu._pon_id,
+ onu_id=onu.onu_id,
+ sn=onu._serial_number_string,
+ registration_id= onu._registration_id,
+ #fec_upstream=ups_fec,
+ fec_upstream=onu._upstream_fec,
+ authentication_method=auth_method
+ )
+ data=onu_add.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during add onu', e=e)
+
+ def onu_delete_msg_send(self, onu):
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_D_DELETE_ONU,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,1)
+
+ onu_delete = OltDDeleteOnu(
+ pon_port=onu._pon_id,
+ onu_id=onu.onu_id,
+ sn=onu._serial_number_string
+ )
+ data=onu_delete.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during delete onu', e=e)
+
+
+ def onu_update_msg_send(self, onu):
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_D_UPDATE_ONU,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,1)
+
+ olt = self.olt
+ channel_partition_name = onu._channel_partition
+ cp = olt.channel_partitions.get(channel_partition_name)
+
+ if cp['authentication-method'] == 'serial-number':
+ auth_method = SERIAL_NUMBER
+ elif cp['authentication-method'] == 'loid':
+ auth_method = LOID
+ elif cp['authentication-method'] == 'registration-id':
+ auth_method = REGISTRATION_ID
+ elif cp['authentication-method'] == 'omci':
+ auth_method = OMCI
+ elif cp['authentication-method'] == 'dot1x':
+ auth_method = DOT1X
+ else :
+ auth_method = SERIAL_NUMBER
+
+ onu_update = OltDUpdateOnu(
+ pon_port=onu._pon_id,
+ onu_id=onu.onu_id,
+ sn=onu._serial_number_string,
+ registration_id= onu._registration_id,
+ fec_upstream=onu._upstream_fec,
+ authentication_method=auth_method
+ )
+ data=onu_update.SerializeToString()
+ self._parent.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during update onu', e=e)
+
+
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleting')
+ #self.state = DELETING
+ #self.cancel_deferred()
+ for onu in self._onu_by_id.itervalues():
+ onu.delete()
+
+ self._onu_by_id.clear()
+
+class Onu(object):
+ """
+ Wraps an ONU
+ """
+ MIN_ONU_ID = 0
+ MAX_ONU_ID = 253 # G.984. 0..253, 254=reserved, 255=broadcast
+ BROADCAST_ONU_ID = 255
+ DEFAULT_PASSWORD = ''
+
+ def __init__(self, onu_info):
+ self._onu_id = onu_info['onu-id']
+ self._ranging_status = onu_info['ranging-status']
+ self._config_status = onu_info['config-status']
+ self._status_machine = onu_info['status_machine']
+
+ if self._onu_id is None:
+ raise ValueError('No ONU ID available')
+
+ pon = onu_info['pon']
+ #self._serial_number_base64 = Onu.string_to_serial_number(onu_info['serial-number'])
+ self._serial_number_string = onu_info['serial-number']
+ self._device_id = onu_info['device-id']
+ self._password = onu_info['password']
+ self._registration_id = onu_info['expected-registration-id']
+ self._channel_partition = onu_info['channel-partition']
+ self._vani_name = onu_info['name']
+ self._upstream_fec = onu_info['upstream-fec']
+
+ #self._upstream_channel_speed = onu_info[upstream-channel-speed]
+
+
+ self._olt = pon.olt
+ self._pon_id = pon.pon_id
+ self._name = '{}@{}'.format(pon.name, self._onu_id)
+ self._xpon_name = onu_info['xpon-name']
+ self._gem_ports = {} # gem-id -> GemPort
+ self._tconts = {} # alloc-id -> TCont
+ self._onu_vid = onu_info['onu-vid']
+ self._uni_ports = [onu_info['onu-vid']]
+ assert len(self._uni_ports) == 1, 'Only one UNI port supported at this time'
+ self._channel_id = onu_info['channel-id']
+ self._enabled = onu_info['enabled']
+ self._vont_ani = onu_info.get('vont-ani')
+ self._rssi = -9999
+ self._equalization_delay = 0
+ self._fiber_length = 0
+ self._valid = True # Set false during delete/cleanup
+ self._proxy_address = None
+
+ self._include_multicast = True # TODO: May need to add multicast on a per-ONU basis
+
+ self._expedite_sync = False
+ self._expedite_count = 0
+ self._resync_flows = False
+ self._sync_deferred = None # For sync of ONT config to hardware
+
+ # TODO: enable and upstream-channel-speed not yet supported
+
+ self.log = structlog.get_logger(pon_id=self._pon_id, onu_id=self._onu_id)
+
+ def __del__(self):
+ # self.stop()
+ pass
+
+ def __str__(self):
+ return "Onu-{}-{}, PON ID: {}".format(self._onu_id, self._serial_number_string, self._pon_id)
+
+ @property
+ def olt(self):
+ return self._olt
+
+ @property
+ def pon(self):
+ return self.olt.southbound_ports[self._pon_id]
+
+ @property
+ def onu_id(self):
+ return self._onu_id
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ def _get_v_ont_ani(self, olt):
+ onu = None
+ try:
+ vont_ani = olt.v_ont_anis.get(self.vont_ani)
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+
+ pon = olt.pon(ch_term['xgs-ponid'])
+ onu = pon.onu(vont_ani['onu-id'])
+
+ except Exception:
+ pass
+
+ return onu
+
+ def _cancel_deferred(self):
+ d, self._sync_deferred = self._sync_deferred, None
+
+ if d is not None and not d.called:
+ try:
+ d.cancel()
+ except Exception:
+ pass
+
+ def delete(self):
+ """
+ Clean up ONU (gems/tconts). ONU removal from OLT h/w done by PonPort
+ :return: (deferred)
+ """
+
+ self.log.info('onu Deleting')
+ self._valid = False
+ self._cancel_deferred()
+
+ self._gem_ports.clear()
+ self._tconts.clear()
+ self._olt = None
+ self._channel_id = None
+
+
+ def start(self):
+ self._cancel_deferred()
+ #self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
+ def stop(self):
+ self._cancel_deferred()
+ #self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
+
+
+ #@inlineCallbacks
+ def add_tcont(self, tcont, reflow=False):
+ """
+ Creates/ a T-CONT with the given alloc-id
+
+ :param tcont: (TCont) Object that maintains the TCONT properties
+ :param reflow: (boolean) If true, force add (used during h/w resync)
+ :return: (deferred)
+ """
+
+ self._tconts[tcont.alloc_id] = tcont
+ log.info('add_tcont onu_status_machine:', self._status_machine)
+ #log.info('add_tcont:', self._tconts[tcont.alloc_id])
+ #if self._status_machine == 'activation_successful':
+ #pass
+ #self.tcont_add_msg_send(self._olt, self._tconts[tcont.alloc_id])
+ self.tcont_add_msg_send(self._olt, tcont.alloc_id)
+
+
+ def update_tcont(self, alloc_id, new_values):
+ # TODO: If alloc-id in use by a gemport, should we deny request?
+ tcont = self._tconts.get(alloc_id)
+
+ #if tcont is None:
+ #returnValue(succeed('not-found'))
+
+ # del self._tconts[alloc_id]
+ #
+ # try:
+ # results = yield tcont.remove_from_hardware()
+ #
+ # except Exception as e:
+ # self.log.exception('delete', e=e)
+ # raise
+
+ #returnValue(succeed('TODO: Not implemented yet'))
+
+ #@inlineCallbacks
+ def remove_tcont(self, alloc_id):
+ # TODO: If alloc-id in use by a gemport, should we deny request?
+ tcont = self._tconts.get(alloc_id)
+
+ if tcont is None:
+ return
+
+ log.info('remove_tcont alloc_id:', alloc_id)
+ #log.info('remove_tcont:', self._tconts[tcont.alloc_id])
+ #if self._status_machine == 'activation_successful':
+ self.tcont_delete_msg_send(self._olt, alloc_id)
+
+ del self._tconts[alloc_id]
+
+
+ def tcont_add_msg_send(self, olt, alloc_id):
+ tcont = self._tconts.get(alloc_id)
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_PON_ADD_TCONT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ olt.zmq_client_async.async_send(data,1)
+
+ tcont_add = OltPonAddTcont(
+ #pon_port=tcont.pon_id,
+ pon_port=self._pon_id,
+ onu_id=self._onu_id,
+ alloc_id=tcont.alloc_id,
+ fixed_bandwidth= tcont.traffic_descriptor.fixed_bandwidth,
+ assured_bandwidth= tcont.traffic_descriptor.assured_bandwidth,
+ maximum_bandwidth=tcont.traffic_descriptor.maximum_bandwidth
+ )
+ data=tcont_add.SerializeToString()
+ olt.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during add tcont', e=e)
+
+ def tcont_delete_msg_send(self, olt, alloc_id):
+ #tcont = self._tconts.get(tcont_id)
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_PON_DELETE_TCONT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ olt.zmq_client_async.async_send(data,1)
+
+ tcont_delete = OltPonDeleteTcont(
+ #pon_port=tcont.pon_id,
+ pon_port=self._pon_id,
+ alloc_id=alloc_id
+ )
+ data=tcont_delete.SerializeToString()
+ olt.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during delete tcont', e=e)
+
+
+ def add_gemport(self, gemport, reflow=False):
+ if reflow == False:
+ self._gem_ports[gemport.gem_id] = gemport
+
+ cvlan = gemport.cvlan
+ uni_name = gemport.uni_name
+ olt=self._olt
+ uniLogicalPort = 0
+ items = olt.adapter_agent.root_proxy.get('/logical_devices/{}/ports'.format(olt._logical_device.id))
+ logical_port_items = LogicalPorts(items=items)
+ for port in logical_port_items.items:
+ if uni_name == port.ofp_port.name :
+ uniLogicalPort = port.ofp_port.port_no
+ #onu_device = olt.adapter_agent.get_device(port.device_id)
+ #self.log.debug("gem_port--onu_device", onu_device=onu_device)
+ break
+
+ self.log.debug('gem_port--gemportId', gemportId = gemport.gem_id)
+ self.log.debug('gem_port--cvlan', cvlan = cvlan)
+ self.log.debug('gem_port--uni', uni_name = uni_name)
+
+ self.log.debug('gem_port---uniLogicalPort', uniLogicalPort=uniLogicalPort)
+
+ self.gemport_msg_send(OLT_PON_ADD_GEMPORT, uniLogicalPort, cvlan, gemport.gem_id)
+
+
+ def remove_gemport(self, gemport_id, reflow=False):
+ gemport = self._gem_ports.get(gemport_id)
+
+ if gemport is None:
+ return
+
+ cvlan = gemport.cvlan
+ uni_name = gemport.uni_name
+ uniLogicalPort = 0
+
+ olt=self._olt
+ items = olt.adapter_agent.root_proxy.get('/logical_devices/{}/ports'.format(olt._logical_device.id))
+ logical_port_items = LogicalPorts(items=items)
+ for port in logical_port_items.items:
+ if uni_name == port.ofp_port.name :
+ uniLogicalPort = port.ofp_port.port_no
+ break
+
+ self.log.debug('gem_port--gemportId', gemportId = gemport.gem_id)
+ self.log.debug('gem_port--cvlan', cvlan = cvlan)
+ self.log.debug('gem_port--uni', uni_name = uni_name)
+ self.log.debug('gem_port---uniLogicalPort', uniLogicalPort=uniLogicalPort)
+
+ self.gemport_msg_send(OLT_PON_DELETE_GEMPORT, uniLogicalPort, cvlan, gemport.gem_id)
+
+ del self._gem_ports[gemport_id]
+
+ def gemport_msg_send(self, msg_type, uni_logic_port, vlan, gemport_id):
+
+ gemport = self._gem_ports.get(gemport_id)
+
+ if gemport is None:
+ return
+
+ try:
+ mgr_hdr = OltMsgCommonHdr(
+ type=msg_type,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+
+ data=mgr_hdr.SerializeToString()
+
+ self._olt.zmq_client_async.async_send(data, 1)
+
+ gemport_data = OltPonGemport(
+ uni_logic_port = uni_logic_port,
+ vlan = vlan,
+ gemport_id = gemport_id,
+ onu_id = gemport.onu_id,
+ pon_port = gemport.pon_id
+ )
+ data=gemport_data.SerializeToString()
+ self._olt.zmq_client_async.async_send(data, 0)
+ except Exception as e:
+ self.log.exception('Exception during gemport add processing', e=e)
+
+
+
+
diff --git a/voltha/adapters/cig_olt/cig_olt_handler.py b/voltha/adapters/cig_olt/cig_olt_handler.py
new file mode 100644
index 0000000..b7c37a7
--- /dev/null
+++ b/voltha/adapters/cig_olt/cig_olt_handler.py
@@ -0,0 +1,2648 @@
+#
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import datetime
+import pprint
+import random
+import argparse
+import datetime
+import shlex
+import time
+
+import arrow
+import structlog
+import json
+
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
+from twisted.internet.task import LoopingCall
+
+#from adapter_alarms import AdapterAlarms
+from cig_olt_zmq import *
+from cig_olt_device import *
+from cig_olt_xpon import *
+from download import *
+from voltha.extensions.omci.omci import *
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.core.flow_decomposer import *
+from voltha.protos import third_party
+from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
+from voltha.protos.events_pb2 import AlarmEventType, \
+ AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from voltha.protos.device_pb2 import Device, Image, DeviceType, DeviceTypes, Port, Device, \
+ PmConfigs, PmConfig, PmGroupConfig
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos.logical_device_pb2 import LogicalPort, LogicalPorts, LogicalDevice
+from voltha.protos.openflow_13_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_common_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_d_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_pon_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_switch_pb2 import *
+from voltha.protos.olt_common_pb2 import *
+from voltha.protos.olt_d_pb2 import *
+from voltha.protos.olt_pon_pb2 import *
+from voltha.protos.olt_switch_pb2 import *
+
+from voltha.protos.bbf_fiber_base_pb2 import \
+ ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig, ChannelterminationConfig, \
+ OntaniConfig, VOntaniConfig, VEnetConfig
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+
+from voltha.registry import registry
+from common.frameio.frameio import BpfProgramFilter, hexify
+from common.utils.asleep import asleep
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.layers.inet import Raw
+
+from google.protobuf.json_format import MessageToDict
+
+class CigOltHandler(object):
+ """
+ The OLT Handler is used to wrap a single instance of a 10G OLT 1-U pizza-box
+ """
+
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.io_port = None
+ self.logical_device_id = None
+ self._logical_device = None
+ #self.interface = registry('main').get_args().interface
+ self.work_mode = None
+ self.work_status = 0
+ self.reboot_status = 0
+ self.command_timeout = 5
+ self.pm_metrics = None
+ self.default_freq = 150
+ self.lc = None
+ #self.onus = {}
+
+ self.alarms = None
+
+ self.ip_address = None
+ self.olt_mac = None
+ self.startup = None
+ self.zmq_client_echo = None
+ self.zmq_client_sync = None
+ self.zmq_client_omci = None
+ self.zmq_client_of_packet = None
+ self.zmq_client_async = None
+ self.zmq_client_sub = None
+
+ self.echo_incoming_queue = DeferredQueue()
+ self.sync_incoming_queue = DeferredQueue()
+ self.async_incoming_queue = DeferredQueue()
+ self.omci_incoming_queue = DeferredQueue()
+ self.of_pkt_incoming_queue = DeferredQueue()
+
+ self.pon_port = None
+ self.onu_id = None
+
+ #self.channel = None # Proxy messaging channel with 'send' method
+
+ # Northbound and Southbound ports
+ self.northbound_ports = {} # port number -> Port
+ self.southbound_ports = {} # port number -> Port (For PON, use pon-id as key)
+ # self.management_ports = {} # port number -> Port TODO: Not currently supported
+
+ self.num_northbound_ports = None
+ self.num_southbound_ports = None
+ # self.num_management_ports = None
+
+ # Heartbeat support
+ self.heartbeat_miss = 0
+ self.heartbeat_interval = 2 # TODO: Decrease before release or any scale testing
+ self.heartbeat_failed_limit = 3
+ self.heartbrat_status = 0
+ self.reboot_check_times = 0
+ self.seq_no = 0
+
+ self.heartbeat = None
+ self.asyncmsg = None
+ self.omcimsg = None
+ self.packetmsg = None
+
+ #self.io_port = None
+ #self.interface = registry('main').get_args().interface
+
+ # Installed flows
+ #self.flow_entries = {} # Flow ID/name -> FlowEntry
+
+ # xPON config dictionaries
+ self._channel_groups = {} # Name -> dict
+ self._channel_partitions = {} # Name -> dict
+ self._channel_pairs = {} # Name -> dict
+ self._channel_terminations = {} # Name -> dict
+ self._v_ont_anis = {} # Name -> dict
+ self._ont_anis = {} # Name -> dict
+ self._v_enets = {} # Name -> dict
+ self._tconts = {} # Name -> dict
+ self._traffic_descriptors = {} # Name -> dict
+ self._gemports = {} # Name -> dict
+ self._cached_xpon_pon_info = {} # PON-id -> dict
+
+ self._download_protocols = None
+ self._download_deferred = None
+ self._downloads = {} # name -> Download obj
+
+ def __del__(self):
+ if self.io_port is not None:
+ registry('frameio').close_port(self.io_port)
+
+ @inlineCallbacks
+ def activate(self, device):
+ """
+ Activate the OLT device
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param reconciling: If True, this adapter is taking over for a previous adapter
+ for an existing OLT
+ """
+ log.debug('cig activate starting')
+ log.info('cig activate started')
+ self.log.info('CigDeviceHandler.activating', device=device)
+
+ self.ip_address = device.ipv4_address
+ self.zmq_client_sync = CigZmqClientSync(self.ip_address, self.sync_incoming_queue)
+
+ olt_info = yield self.get_olt_info_com()
+ if olt_info == None:
+ self.log.info(' get olt info fail.')
+ self.activate_failed(device, "ipc timeout.", reachable=False)
+ return
+ else:
+ reactor.callLater(1, self._olt_activate, device, olt_info)
+
+ def _olt_activate(self, device, olt_info):
+ self.work_mode = olt_info.work_mode
+ self.olt_mac = olt_info.mac_address
+ device.root = True
+ device.vendor = olt_info.vendor
+ device.model = olt_info.model
+ device.hardware_version = olt_info.hardware_version
+ device.firmware_version = olt_info.firmware_version
+ device.serial_number = olt_info.serial_number
+ self.adapter_agent.update_device(device)
+
+ logical_device = self.get_logical_device(device)
+ self._logical_device = self.adapter_agent.create_logical_device(logical_device,dpid=self.olt_mac)
+
+ for m in olt_info.nni_port:
+ self.log.info('nni_port.port_no', m.port_no)
+ self.log.info('nni_port.port_type', m.port_type)
+ self.log.info('nni_port.mac_address', m.mac_address)
+ if m.port_no==1:
+ self.northbound_ports[m.port_no] = NniPort(m, self.device_id)
+ phy_port = self.northbound_ports[m.port_no].get_port()
+ self.adapter_agent.add_port(device.id, phy_port)
+ logical_port = self.northbound_ports[m.port_no].get_logical_port()
+ self.adapter_agent.add_logical_port(self._logical_device.id, logical_port)
+
+ for m in olt_info.pon_port:
+ self.log.info('pon_port.port_no', m.port_no)
+ self.log.info('pon_port.port_type', m.port_type)
+ #if m.port_no==1 or m.port_no==2:
+ #if m.port_no==1:
+ #self.southbound_ports[m.port_no] = PonPort(m.port_no, self.device_id)
+ self.southbound_ports[m.port_no] = PonPort(m.port_no, self)
+ phy_port = self.southbound_ports[m.port_no].get_port()
+ self.adapter_agent.add_port(device.id, phy_port)
+
+ self.log.info('create CigZmqClientAsync socket.')
+ self.zmq_client_async = CigZmqClientAsync(self.ip_address, self.heartbeat_receive)
+ self.log.info('send olt activate_msg.')
+ self.olt_activate_msg_send()
+ self.work_status = 1
+
+ ############################################################################
+ # Setup Alarm handler
+
+ #self.alarms = AdapterAlarms(self.adapter, device)
+
+ ############################################################################
+ self.log.info('get_device.....')
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = self._logical_device.id
+ #device.connect_status = ConnectStatus.UNREACHABLE
+ #device.oper_status = OperStatus.ACTIVATING
+ #self.adapter_agent.update_device(device)
+ self.logical_device_id = self._logical_device.id
+
+ #device = self.adapter_agent.get_device(device.id)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.log.info('update_device.....')
+ self.adapter_agent.update_device(device)
+
+ self.log.info('create others zmq sockets.....')
+ self.zmq_client_sub = CigZmqClientSub(self.ip_address, self.async_incoming_queue)
+ self.zmq_client_omci = CigZmqClientOmci(self.ip_address, self.omci_incoming_queue)
+ self.zmq_client_of_packet = CigZmqClientPacketInOut(self.ip_address, self.of_pkt_incoming_queue)
+
+ # Schedule the heartbeat for the device
+ self.log.debug('Starting heartbeat')
+ self.start_heartbeat(delay=5)
+
+ #self.start_onu_test()
+
+ self.start_poll_async_msg()
+ self.start_poll_omci_msg()
+
+ #self.start_kpi_collection()
+ self.start_poll_packet_in_msg()
+
+ self.log.info('activate over.', device_id=self.device_id)
+
+
+ @inlineCallbacks
+ def collect_gem_metrics(self,prefix):
+ try:
+ # get pon port
+ gem_metrics = {}
+ self.log.info("collect-gem-metrics")
+
+ for m in self.southbound_ports:
+ phy_port = self.southbound_ports[m]._pon_id
+ self.log.info("collect-gem-metrics", m=m, port=phy_port)
+
+ xgem_pm_req_head = OltMsgCommonHdr(
+ type=OLT_PON_GET_XGEM_PM_REQ,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=1
+ )
+
+ data = xgem_pm_req_head.SerializeToString()
+ self.zmq_client_sync.sync_send(data, 1)
+
+ xgem_pm_req_data = OltPonXGemPMReq(
+ pon_slot=0,
+ pon_port=phy_port
+ )
+
+ data = xgem_pm_req_data.SerializeToString()
+ self.zmq_client_sync.sync_send(data, 0)
+
+ #time.sleep(1)
+
+ self.startup = self.get_sync_queue()
+ results = yield self.startup
+
+ self.log.info('get_sync_queue', results=results)
+ if results == "RecvTimeoutErr.":
+ self.log.info('get OLT_PON_GET_XGEM_PM_ACK info timeout.', results=results)
+ self.zmq_client_sync.sync_reconnect()
+ else:
+ gem_pm_info_rep = OltMsgCommonHdr()
+ gem_pm_info_rep.ParseFromString(results[0])
+ self.log.info('gem_pm_info_rep.type', gem_pm_info_rep.type)
+ self.log.info('gem_pm_info_rep.src_appId', gem_pm_info_rep.src_appId)
+ self.log.info('gem_pm_info_rep.sync', gem_pm_info_rep.sync)
+
+ if (gem_pm_info_rep.type != OLT_PON_GET_XGEM_PM_ACK) or (
+ gem_pm_info_rep.src_appId != OLT_APPID_OLTD) \
+ or (gem_pm_info_rep.sync != 1) or (len(results) < 2):
+ self.log.info('get OLT_PON_GET_XGEM_PM_ACK err.')
+ else:
+ gem_pm_info = OltPonXGemPMAck()
+ gem_pm_info.ParseFromString(results[1])
+ self.log.info('gem_pm_info.pon_port', pon_port=gem_pm_info.pon_port)
+ self.log.info('gem_pm_info.pon_slot', pon_slot=gem_pm_info.pon_slot)
+ self.log.info('gem_pm_info.tx_gem_frames', tx_gem_frames=gem_pm_info.tx_gem_frames)
+ self.log.info('gem_pm_info.rx_gem_frames', rx_gem_frames=gem_pm_info.rx_gem_frames)
+ self.log.info('gem_pm_info.tx_nolfbit_count', tx_nolfbit_count=gem_pm_info.tx_nolfbit_count)
+ self.log.info('gem_pm_info.hec_err_count', hec_err_count=gem_pm_info.hec_err_count)
+ self.log.info('gem_pm_info.frame_lost_count', frame_lost_count=gem_pm_info.frame_lost_count)
+ self.log.info('gem_pm_info.key_err_count', key_err_count=gem_pm_info.key_err_count)
+
+ gem_metrics['tx_gem_frames'] = gem_pm_info.tx_gem_frames
+ gem_metrics['rx_gem_frames'] = gem_pm_info.rx_gem_frames
+ gem_metrics['tx_nolfbit_count'] = gem_pm_info.tx_nolfbit_count
+ gem_metrics['hec_err_count'] = gem_pm_info.hec_err_count
+ gem_metrics['frame_lost_count'] = gem_pm_info.frame_lost_count
+ gem_metrics['key_err_count'] = gem_pm_info.key_err_count
+
+ # Step 2: prepare the KpiEvent for submission
+ # we can time-stamp them here (or could use time derived from OLT
+ ts = arrow.utcnow().timestamp
+ prefixstr = "%s pon_port:%d xgem" % (prefix, gem_pm_info.pon_port)
+ self.log.info('prefixstr', prefixstr=prefixstr)
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # xgem
+ prefixstr: MetricValuePairs(
+ metrics=gem_metrics),
+ }
+ )
+
+ # Step 3: submit
+ self.adapter_agent.submit_kpis(kpi_event)
+ gem_metrics.clear()
+
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+
+
+ def start_kpi_collection(self):
+ prefix = 'voltha.{}.{}'.format('cig-olt', self.device_id)
+ self.log.info('start_kpi_collection',prefix=prefix)
+ self.lc = LoopingCall(self.collect_gem_metrics,prefix)
+ self.log.info('start_kpi_collection ',interval=self.default_freq / 10)
+ self.lc.start(interval=self.default_freq / 10)
+ #reactor.run()
+
+ def stop_kpi_collection(self):
+ self.lc.stop()
+
+ #packet_in, strip svlan and cvlan, and send to onos
+ '''
+ def rcv_io(self, port, frame):
+ self.log.info('received', iface_name=port.iface_name,
+ frame_len=len(frame))
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ outer_shim = pkt.getlayer(Dot1Q)
+ if isinstance(outer_shim.payload, Dot1Q):
+ inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (
+ Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload
+ )
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
+ elif pkt.haslayer(Raw):
+ raw_data = json.loads(pkt.getlayer(Raw).load)
+ self.alarms.send_alarm(self, raw_data)
+ '''
+
+ def activate_failed(self, device, reason, reachable=True):
+ """
+ Activation process (adopt_device) has failed.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :param reason: (string) failure reason
+ :param reachable: (boolean) Flag indicating if device may be reachable
+ via RESTConf or NETConf even after this failure.
+ """
+ device.oper_status = OperStatus.FAILED
+ if not reachable:
+ device.connect_status = ConnectStatus.UNREACHABLE
+
+ device.reason = reason
+ self.adapter_agent.update_device(device)
+ raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
+
+ #@inlineCallbacks
+ def deactivate(self, device):
+ # OLT Specific things here
+
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ self.pons.clear()
+
+ # TODO: Any other? OLT specific deactivate steps
+
+ # Call into base class and have it clean up as well
+ super(AdtranOltHandler, self).deactivate(device)
+
+ #@inlineCallbacks
+ def update_flow_table(self, flows, device):
+ '''
+ for flow in flows:
+ self.log.info('bulk-flow-update1', device_id=device.id, flow=flow)
+ for field in get_ofb_fields(flow):
+ if field.type == IN_PORT:
+ if field.port >= 100 and field.port < 124:
+ field.port = field.port - 100
+ flow.cookie = 1
+ elif field.port >= 0 and field.port < 6:
+ flow.cookie = 2
+ else:
+ pass
+ for action in get_actions(flow):
+ if action.type == OUTPUT:
+ if action.output.port >= 100 and action.output.port < 124:
+ action.output.port = action.output.port - 100
+ self.log.info('bulk-flow-update2', device_id=device.id, flow=flow)
+ '''
+
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_SWITCH_UPDATE_FLOW_TABLE,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+
+ data=msg_hdr.SerializeToString()
+
+ self.zmq_client_async.async_send(data,1)
+
+ flow_table = OltSwitchFlowTable(
+ flows=flows
+ )
+ data=flow_table.SerializeToString()
+
+ self.zmq_client_async.async_send(data,0)
+
+ except Exception as e:
+ self.log.exception('Exception during update flow table.', e=e)
+
+ #@inlineCallbacks
+ def send_proxied_message(self, proxy_address, msg):
+ if isinstance(msg, Packet):
+ msg = str(msg)
+
+ self.log.info('send-proxied-message',
+ proxy_address=proxy_address,
+ msg=msg)
+
+ if self.zmq_client_omci is not None:
+ pon_id = proxy_address.channel_id - 100
+ onu_id = proxy_address.onu_id
+ self.log.info('send-proxied-message pon_id:',pon_id)
+ self.log.info('send-proxied-message onu_id:',onu_id)
+
+ else:
+ return
+
+
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_PON_SEND_OMCI,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ self.zmq_client_omci.omci_send(data,1)
+
+ omci_msg = OltPonSendOmci(
+ pon_slot=0,
+ pon_port=pon_id,
+ onu_id=onu_id,
+ omci_content=msg
+ )
+
+ #self.log.info('omci_msg.pon_slot', omci_msg.pon_slot)
+ #self.log.info('omci_msg.pon_port', omci_msg.pon_port)
+ self.log.info('omci_msg.onu_id', omci_msg.onu_id, datetime.datetime.now())
+ #self.log.info('omci_msg.omci_content', omci_msg.omci_content)
+
+ data=omci_msg.SerializeToString()
+ self.zmq_client_omci.omci_send(data,0)
+
+ except Exception as e:
+ self.log.info('zmq_client_omci.omci_send exception', exc=str(e))
+ #raise
+
+ def packet_in_msg_proc(self, message):
+ try:
+ self.log.info('packet_in_msg_proc: Message from oltd')
+ sub_msg_header = OltMsgCommonHdr()
+ sub_msg_header.ParseFromString(message[0])
+ #self.log.info('got-response', sub_msg_header.type)
+ #self.log.info('got-response', sub_msg_header.src_appId)
+ #self.log.info('got-response', sub_msg_header.sync)
+
+ if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
+ self.log.exception('Get error msg.')
+ return
+
+ if sub_msg_header.type == OLT_D_PACKET_IN:
+ if len(message) != 2:
+ self.log.exception('Get error packet in msg.')
+ return
+
+ packet_msg = OltDEthPacket()
+ packet_msg.ParseFromString(message[1])
+
+ frame = packet_msg.pkt_buf
+ #self.log.info('received packet_msg.pkt_len', packet_msg.pkt_len)
+ #self.log.info('received packet_msg', hexify(packet_msg.pkt_buf))
+ #self.log.info('received frame:', hexify(frame))
+ #self.packet_out(1,frame)
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ inner_shim = pkt.getlayer(Dot1Q)
+ #if isinstance(outer_shim.payload, Dot1Q):
+ #inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (
+ Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload
+ )
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
+
+ #elif pkt.haslayer(Raw):
+ #raw_data = json.loads(pkt.getlayer(Raw).load)
+ #self.alarms.send_alarm(self, raw_data)
+ else:
+ self.log.info('No Dot1Q tag.')
+ else:
+ self.log.exception('No support msg type.')
+ return
+ except Exception as e:
+ self.log.exception('Exception during packet_in_msg_proc processing', e=e)
+
+ def packet_out(self, egress_port, msg):
+ self.log.info('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
+ )
+ #self.io_port.send(str(out_pkt))
+
+ if self.zmq_client_of_packet is None:
+ return
+
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_D_PACKET_OUT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data=msg_hdr.SerializeToString()
+ self.zmq_client_of_packet.packet_send(data,1)
+
+ packet_msg = OltDEthPacket(
+ pkt_len=len(str(out_pkt)),
+ pkt_buf=str(out_pkt)
+ )
+
+ #self.log.info('packet_msg.pkt_len', packet_msg.pkt_len)
+ #self.log.info('packet_msg.pkt_buf', packet_msg.pkt_buf)
+
+ data=packet_msg.SerializeToString()
+ self.zmq_client_of_packet.packet_send(data,0)
+
+ except Exception as e:
+ self.log.info('zmq_client_of_packet.packet_send exception', exc=str(e))
+
+ def poll_metrics_receive(self, message):
+ try:
+ self.log.info('poll_metrics_receive: Message from oltd')
+ self.poll_incoming_queue.put(message)
+
+ except Exception as e:
+ self.log.exception('Exception during poll_metrics_receive processing', e=e)
+
+ def of_packet_receive(self, message):
+ try:
+ self.log.info('of_packet_receive: Message from oltd')
+
+ except Exception as e:
+ self.log.exception('Exception during of_packet_receive processing', e=e)
+
+ def heartbeat_receive(self, message):
+ if self.work_status == 0:
+ return
+ try:
+ #self.log.info('heartbeat_receive: Message from oltd')
+ sub_msg_header = OltMsgCommonHdr()
+ sub_msg_header.ParseFromString(message[0])
+ if sub_msg_header.type == OLT_COMMON_HEART_BEAT_ACK:
+
+ heartbeat_ack = OltCommonHeartBeat()
+ heartbeat_ack.ParseFromString(message[1])
+ #self.log.info('heartbeat_ack.is_active', heartbeat_ack.is_active)
+ if heartbeat_ack.seq_no != self.seq_no:
+ return
+ self.heartbeat_miss = 0
+ #self.log.info("receive-heart-beat-ack heartbeat_ack.seq_no", heartbeat_ack.seq_no)
+
+ if heartbeat_ack.is_active == 0:
+
+ if self.reboot_status == 0:
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ self.olt_activate_msg_send()
+ for channel_term in self._channel_terminations:
+ self.log.info('self._channel_terminations:', channel_term)
+ self._on_channel_termination_create(channel_term)
+ self.configure_pon(channel_term)
+
+ for vont_ani in self._v_ont_anis:
+ self.log.info('self._v_ont_anis:', vont_ani)
+ self._on_vont_ani_create(vont_ani)
+
+ for ont_ani in self._ont_anis:
+ self.log.info('self._ont_anis:', ont_ani)
+ self._on_ont_ani_create(ont_ani)
+
+ #self.log.info('pon and onu config recovery.', device_id=self.device_id)
+
+ except Exception as e:
+ self.log.exception('Exception during of_packet_receive processing', e=e)
+
+ def omci_msg_proc(self, message):
+ try:
+ self.log.info('omci_msg_proc: Message from oltd')
+ sub_msg_header = OltMsgCommonHdr()
+ sub_msg_header.ParseFromString(message[0])
+ #self.log.info('got-response', sub_msg_header.type)
+ #self.log.info('got-response', sub_msg_header.src_appId)
+ #self.log.info('got-response', sub_msg_header.sync)
+
+ if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
+ self.log.exception('Get error async msg.')
+ return
+
+ if sub_msg_header.type == OLT_PON_SEND_OMCI:
+ if len(message) != 2:
+ self.log.exception('Get error omci msg.')
+ return
+
+ omci_msg = OltPonSendOmci()
+ omci_msg.ParseFromString(message[1])
+
+ #self.log.info('omci_msg.pon_slot', omci_msg.pon_slot)
+ #self.log.info('omci_msg.pon_port', omci_msg.pon_port)
+ self.log.info('omci_msg.onu_id', omci_msg.onu_id, datetime.datetime.now())
+ #self.log.info('omci_msg.omci_content', omci_msg.omci_content)
+
+ #proxy_address=Device.ProxyAddress(
+ # device_id=self.device_id,
+ # channel_id=omci_msg.pon_port + 100,
+ # onu_id=omci_msg.onu_id,
+ #)
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=omci_msg.onu_id)
+ self.adapter_agent.receive_proxied_message(child_device.proxy_address, omci_msg.omci_content)
+ #self.adapter_agent.receive_proxied_message(proxy_address, omci_msg.omci_content)
+ else:
+ self.log.exception('No support omci msg type.')
+ return
+ except Exception as e:
+ self.log.exception('Exception during omci_receive processing', e=e)
+
+ def async_msg_proc(self, message):
+ try:
+ self.log.info('sub_receive: Message from oltd')
+ sub_msg_header = OltMsgCommonHdr()
+ sub_msg_header.ParseFromString(message[0])
+ #self.log.info('got-response', sub_msg_header.type)
+ #self.log.info('got-response', sub_msg_header.src_appId)
+ #self.log.info('got-response', sub_msg_header.sync)
+
+ if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
+ self.log.exception('Get error async msg.')
+ return
+
+ if sub_msg_header.type == OLT_PON_ONU_RANGING_EVENT:
+ if len(message) != 2:
+ self.log.exception('Get error async msg(OLT_PON_ONU_RANGING_EVENT).')
+ return
+
+ self.log.info('sub_receive. msg_len', len(message), message[1])
+
+ self.onu_detected(message[1])
+ elif sub_msg_header.type == OLT_PON_ONU_ACTIVATE_COMPLETE:
+ if len(message) != 2:
+ self.log.exception('Get error async msg(OLT_PON_ONU_ACTIVATE_COMPLETE).')
+ return
+
+ self.log.info('sub_receive. msg_len', len(message), message[1])
+ self.onu_activate_complete_msg_proc(message[1])
+
+ else:
+ self.log.exception('No support msg type.')
+ #return
+
+ except Exception as e:
+ self.log.exception('Exception during sub_receive processing', e=e)
+
+
+ def get_echo_queue(self):
+ request = self.echo_incoming_queue.get()
+ return request
+
+
+ def get_sync_queue(self):
+ request = self.sync_incoming_queue.get()
+ return request
+
+ def get_async_queue(self):
+ request = self.async_incoming_queue.get()
+ return request
+
+ def get_omci_queue(self):
+ request = self.omci_incoming_queue.get()
+ return request
+
+ def get_packet_in_queue(self):
+ request = self.of_pkt_incoming_queue.get()
+ return request
+
+ def start_poll_async_msg(self):
+ self.log.info('*** Starting polling async msg ***')
+ self.asyncmsg = reactor.callLater(0, self.poll_async_msg)
+ return self.asyncmsg
+
+ @inlineCallbacks
+ def poll_async_msg(self):
+ try:
+ response = yield self.get_async_queue()
+ self.async_msg_proc(response)
+ except Exception as e:
+ self.log.info('wait-for-async-exception', exc=str(e))
+
+ self.asyncmsg = reactor.callLater(0.07, self.poll_async_msg)
+
+ def start_poll_omci_msg(self):
+ self.log.info('*** Starting polling omci msg ***')
+ self.omcimsg = reactor.callLater(0, self.poll_omci_msg)
+ return self.omcimsg
+
+ @inlineCallbacks
+ def poll_omci_msg(self):
+ try:
+ response = yield self.get_omci_queue()
+ self.omci_msg_proc(response)
+ except Exception as e:
+ self.log.info('wait-for-omci-exception', exc=str(e))
+
+ self.omcimsg = reactor.callLater(0.1, self.poll_omci_msg)
+
+ def start_poll_packet_in_msg(self):
+ self.log.info('*** Starting polling packet in msg ***')
+ self.packetmsg = reactor.callLater(0, self.poll_packet_in_msg)
+ return self.packetmsg
+
+ @inlineCallbacks
+ def poll_packet_in_msg(self):
+ try:
+ response = yield self.get_packet_in_queue()
+ self.packet_in_msg_proc(response)
+ except Exception as e:
+ self.log.info('wait-for-packet-in-exception', exc=str(e))
+
+ self.packetmsg = reactor.callLater(0.1, self.poll_packet_in_msg)
+
+ def start_heartbeat(self, delay=10):
+ assert delay > 1
+ self.log.info('*** Starting Device Heartbeat ***')
+ self.heartbeat = reactor.callLater(delay, self.check_pulse)
+ return self.heartbeat
+
+ def check_pulse(self):
+ if self.work_status == 0:
+ return
+
+ self.heartbeat_check_status()
+
+ try:
+ echo_req = OltMsgCommonHdr(
+ type=1,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+
+ data=echo_req.SerializeToString()
+ self.zmq_client_async.async_send(data,1)
+
+ self.seq_no += 1
+ heart_beat = OltCommonHeartBeat(
+ seq_no=self.seq_no,
+ is_active=1
+ )
+
+ #self.log.info('send-heartbeat heart_beat.seq_no', heart_beat.seq_no)
+ #self.log.info('send-heartbeat heart_beat.is_active', heart_beat.is_active)
+
+ data=heart_beat.SerializeToString()
+ self.zmq_client_async.async_send(data,0)
+
+ self.heartbeat_miss += 1
+
+ except Exception as e:
+ self.log.exception('Exception during echo processing', e=e)
+
+ # Reschedule next heartbeat
+ #if self.logical_device_id is not None:
+ #if self.startup_heartbeat
+ self.heartbeat = reactor.callLater(self.heartbeat_interval, self.check_pulse)
+
+ def heartbeat_check_status(self):
+ """
+ Check the number of heartbeat failures against the limit and emit an alarm if needed
+ """
+ device = self.adapter_agent.get_device(self.device_id)
+
+ if self.heartbeat_miss >= self.heartbeat_failed_limit and device.connect_status == ConnectStatus.REACHABLE:
+ self.log.warning('olt-heartbeat-failed', count=self.heartbeat_miss)
+ self.heartbrat_status = 0
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'heartbeat timeout'
+ self.adapter_agent.update_device(device)
+
+ self.heartbeat_alarm(True, self.heartbeat_miss)
+ else:
+ # Update device states
+ if self.heartbeat_miss == 0 and device.connect_status != ConnectStatus.REACHABLE:
+ self.heartbrat_status = 1
+ self.log.info('heartbeat success')
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+
+ self.heartbeat_alarm(False)
+
+ def heartbeat_alarm(self, status, heartbeat_misses=0):
+ try:
+ ts = arrow.utcnow().timestamp
+
+ alarm_data = {'heartbeats_missed':str(heartbeat_misses)}
+
+ alarm_event = self.adapter_agent.create_alarm(
+ id='voltha.{}.{}.olt'.format(self.adapter.name, self.device_id),
+ resource_id='olt',
+ type=AlarmEventType.EQUIPMENT,
+ category=AlarmEventCategory.PON,
+ severity=AlarmEventSeverity.CRITICAL,
+ state=AlarmEventState.RAISED if status else
+ AlarmEventState.CLEARED,
+ description='OLT Alarm - Heartbeat - {}'.format('Raised'
+ if status
+ else 'Cleared'),
+ context=alarm_data,
+ raised_ts = ts)
+
+ self.adapter_agent.submit_alarm(self.device_id, alarm_event)
+ self.log.debug('olt-heartbeat alarm sent')
+
+ except Exception as e:
+ log.exception('failed-to-submit-alarm', e=e)
+
+ def onu_detected(self, message):
+
+ onu_ranging_event = OltPonOnuRangingEvent()
+ onu_ranging_event.ParseFromString(message)
+
+ #self.log.info('onu_detected', onu_ranging_event.pon_slot)
+ #self.log.info('onu_detected', onu_ranging_event.pon_port)
+ self.log.info('onu_detected', onu_ranging_event.onu_id, datetime.datetime.now())
+ #self.log.info('onu_detected', onu_ranging_event.sn)
+ #self.log.info('onu_detected', onu_ranging_event.ranging_state)
+
+ if onu_ranging_event.ranging_state == 1:
+ if self.southbound_ports[onu_ranging_event.pon_port].onu_exist_check(onu_ranging_event.onu_id)==False:
+ if self.work_mode == OLT_MODE_AUTO:
+ self.adapter_agent.child_device_detected(
+ parent_device_id=self.device_id,
+ parent_port_no=100 + onu_ranging_event.pon_port,
+ child_device_type='broadcom_onu',
+ proxy_address=Device.ProxyAddress(
+ device_id=self.device_id,
+ channel_id=onu_ranging_event.pon_port + 100,
+ onu_id=onu_ranging_event.onu_id,
+ ),
+ admin_state=AdminState.ENABLED,
+ )
+
+ onu_info = {
+ 'name': None,
+ 'device-id': self.device_id,
+ 'serial-number': onu_ranging_event.sn,
+ 'xpon-name': None,
+ #'pon': onu_ranging_event.pon_port,
+ 'pon': self.southbound_ports[onu_ranging_event.pon_port],
+ 'onu-id': onu_ranging_event.onu_id,
+ 'ranging-status': 1,
+ 'config-status': 0,
+ 'status_machine': 'init',
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': True,
+ 'password': Onu.DEFAULT_PASSWORD,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id': onu_ranging_event.pon_port + 100,
+ 'vont-ani': None
+ }
+ self.southbound_ports[onu_ranging_event.pon_port].onu_add(onu_info)
+
+ else:
+ if self.work_mode == OLT_MODE_CONFIG:
+ onu_info = {
+ 'name': None,
+ 'device-id': self.device_id,
+ 'serial-number': onu_ranging_event.sn,
+ 'xpon-name': None,
+ 'pon': self.southbound_ports[onu_ranging_event.pon_port],
+ 'onu-id': onu_ranging_event.onu_id,
+ 'ranging-status': 1,
+ 'config-status': None,
+ 'status_machine': None,
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': None,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id': None,
+ 'vont-ani': None
+ }
+ self.southbound_ports[onu_ranging_event.pon_port].onu_update(onu_info)
+
+ else :
+ if self.southbound_ports[onu_ranging_event.pon_port].onu_exist_check(onu_ranging_event.onu_id)==True:
+ if self.work_mode == OLT_MODE_AUTO:
+ self.southbound_ports[onu_ranging_event.pon_port].onu_del(onu_ranging_event.onu_id)
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_ranging_event.onu_id)
+ if child_device:
+ self.adapter_agent.delete_child_device(self.device_id, child_device.id)
+ elif self.work_mode == OLT_MODE_CONFIG:
+ onu_info = {
+ 'name': None,
+ 'device-id': self.device_id,
+ 'serial-number': onu_ranging_event.sn,
+ 'xpon-name': None,
+ 'pon': self.southbound_ports[onu_ranging_event.pon_port],
+ 'onu-id': onu_ranging_event.onu_id,
+ 'ranging-status': 0,
+ 'config-status': None,
+ 'status_machine': None,
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': None,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id': None,
+ 'vont-ani': None
+ }
+ self.southbound_ports[onu_ranging_event.pon_port].onu_update(onu_info)
+
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_ranging_event.onu_id)
+
+ self.log.info('onu_activate_ranging_down_msg', child_device)
+ time.sleep(1)
+
+ if child_device is not None:
+ msg = {'proxy_address': child_device.proxy_address,'event': 'deactivation-completed'}
+ self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
+ else:
+ pass
+
+
+ def onu_activate_complete_msg_proc(self, message):
+ onu_activate_complete_event = OltPonOnuActivateComplete()
+ onu_activate_complete_event.ParseFromString(message)
+
+ #self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.pon_port)
+ #self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.onu_id)
+ #self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.result)
+
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_activate_complete_event.onu_id)
+
+ self.log.info('onu_activate_complete_msg_proc', child_device)
+ time.sleep(1)
+
+ if child_device is not None:
+ if onu_activate_complete_event.result==0:
+ ind_info = {
+ 'activation_successful': True
+ }
+ onu_status = 'activation_successful'
+ else :
+ ind_info = {
+ 'activation_successful': False
+ }
+ onu_status = 'activation_fail'
+
+ msg = {'proxy_address': child_device.proxy_address,'event': 'activation-completed', 'event_data': ind_info}
+ self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
+ self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.pon_port)
+ self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.onu_id)
+ self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.result)
+
+ #update onu
+ onu_info = {
+ 'name': None,
+ 'device-id': self.device_id,
+ 'serial-number': None,
+ 'xpon-name': None,
+ 'pon': self.southbound_ports[onu_activate_complete_event.pon_port],
+ 'onu-id': onu_activate_complete_event.onu_id,
+ 'ranging-status': None,
+ 'config-status': None,
+ 'status_machine': onu_status,
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': None,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id':None,
+ 'vont-ani': None
+ }
+ self.southbound_ports[onu_activate_complete_event.pon_port].onu_update(onu_info)
+
+ onu = self.southbound_ports[onu_activate_complete_event.pon_port].onu(onu_activate_complete_event.onu_id)
+ if onu._status_machine == 'activation_successful':
+ for tcont_id in onu._tconts:
+ self.log.info('onu_activate_complete_msg_proc tcont add', tcont_id)
+ onu.tcont_add_msg_send(self, tcont_id)
+ for gemport_id in onu._gem_ports:
+ self.log.info('onu_activate_complete_msg_proc gem add', gemport_id)
+ gemport = onu._gem_ports.get(gemport_id)
+ onu.add_gemport(gemport, True)
+
+
+ def start_onu_test(self):
+ self.log.info('*** Starting onu report test ***')
+ self.test = reactor.callLater(0, self.onu_test, 1, 1, 1)
+ return self.test
+
+ def onu_test(self, port, onuid, state):
+ onu_ranging_event = OltPonOnuRangingEvent(
+ pon_slot=1,
+ pon_port=port,
+ onu_id=onuid,
+ #sn='CIGONU' + str((port << 24) + (onuid << 8)),
+ sn='BRCM00000001',
+ ranging_state=state,
+ eqd=1,
+ distance=1
+ )
+
+ data=onu_ranging_event.SerializeToString()
+ self.onu_detected(data)
+ self.onu_id = onuid
+ self.pon_port = port
+ if self.onu_id < 1 :
+ self.pon_port = port
+ self.onu_id = onuid + 1
+ else:
+ self.pon_port = port + 1
+ self.onu_id = 1
+
+ if self.pon_port < 2:
+ self.test = reactor.callLater(0.07, self.onu_test, self.pon_port, self.onu_id, state)
+ #elif state == 1:
+ #self.pon_port = 1
+ #self.onu_id = 0
+ #state = 0
+ #self.test = reactor.callLater(0.07, self.onu_test, self.pon_port, self.onu_id, state)
+
+ @inlineCallbacks
+ def get_olt_info_com(self):
+
+ while (True):
+ try:
+ get_olt_info_req = OltMsgCommonHdr(
+ type=OLT_D_GET_OLT_INFO_REQ,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=1
+ )
+
+ data=get_olt_info_req.SerializeToString()
+ self.log.info('send get olt info msg.')
+ self.zmq_client_sync.sync_send(data,0)
+ self.startup = self.get_sync_queue()
+ results = yield self.startup
+
+ if results == "RecvTimeoutErr.":
+ self.log.info('get olt info timeout.')
+ self.zmq_client_sync.sync_reconnect()
+ #returnValue (None)
+ else:
+ get_olt_info_rep = OltMsgCommonHdr()
+ get_olt_info_rep.ParseFromString(results[0])
+ self.log.info('get_olt_info_rep.type', get_olt_info_rep.type)
+ self.log.info('get_olt_info_rep.src_appId', get_olt_info_rep.src_appId)
+ self.log.info('get_olt_info_rep.sync', get_olt_info_rep.sync)
+
+ if (get_olt_info_rep.type != OLT_D_GET_OLT_INFO_ACK) or (get_olt_info_rep.src_appId != OLT_APPID_OLTD) \
+ or (get_olt_info_rep.sync != 1) or (len(results) < 2):
+ self.log.info('get OltDGetOltInfoAck err.')
+ returnValue (None)
+ else:
+ olt_info = OltDGetOltInfoAck()
+ olt_info.ParseFromString(results[1])
+ self.log.info('olt_info.olt_state', olt_info.olt_state)
+ self.log.info('olt_info.vendor', olt_info.vendor)
+ self.log.info('olt_info.model', olt_info.model)
+ self.log.info('olt_info.hardware_version', olt_info.hardware_version)
+ self.log.info('olt_info.firmware_version', olt_info.firmware_version)
+ self.log.info('olt_info.software_version', olt_info.software_version)
+ self.log.info('olt_info.serial_number', olt_info.serial_number)
+ #add mac loginfo
+ self.log.info('olt_info.work_mode',olt_info.work_mode)
+ self.log.info('olt_info.mac_address',olt_info.mac_address)
+ returnValue (olt_info)
+
+ except Exception as e:
+ self.log.exception('Exception during activate get olt info processing', e=e)
+ returnValue (None)
+
+
+ def olt_activate_msg_send(self):
+ try:
+ olt_activate_msg = OltMsgCommonHdr(
+ type=OLT_D_ACTIVATE_OLT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+
+ data=olt_activate_msg.SerializeToString()
+
+ self.zmq_client_async.async_send(data,0)
+ except Exception as e:
+ self.log.exception('Exception during activate processing', e=e)
+
+ def olt_deactivate_msg_send(self):
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ #type=OLT_D_DISABLE_OLT,
+ type=OLT_D_DEACTIVATE_OLT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data = msg_hdr.SerializeToString()
+ self.zmq_client_async.async_send(data, 0)
+
+ self.log.info("send-deactivate-olt ok")
+ except Exception as e:
+ self.log.exception('Exception during send deactivate olt', e=e)
+
+ def olt_reboot_msg_send(self):
+ try:
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_D_REBOOT_OLT,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data = msg_hdr.SerializeToString()
+ self.zmq_client_async.async_send(data, 0)
+
+ self.log.info("send-reboot-olt ok")
+ except Exception as e:
+ self.log.exception('Exception during send deactivate olt', e=e)
+
+
+ def get_logical_device(self, device):
+ """
+ Get the VOLTHA logical device
+ :return: VOLTHA logical device or None
+ """
+ if self._logical_device is None:
+ self._logical_device = LogicalDevice(
+ # not setting id and datapth_id will let the adapter
+ # agent pick id
+ desc=ofp_desc(
+ mfr_desc='cord project',
+ hw_desc='n/a',
+ sw_desc='logical device for Cig-based PON',
+ serial_num=device.serial_number,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
+ )
+ ),
+ root_device_id=device.id
+ )
+ return self._logical_device
+
+
+
+
+ def _get_xpon_collection(self, data):
+ if isinstance(data, ChannelgroupConfig):
+ return self._channel_groups
+ elif isinstance(data, ChannelpartitionConfig):
+ return self._channel_partitions
+ elif isinstance(data, ChannelpairConfig):
+ return self._channel_pairs
+ elif isinstance(data, ChannelterminationConfig):
+ return self._channel_terminations
+ elif isinstance(data, OntaniConfig):
+ return self._ont_anis
+ elif isinstance(data, VOntaniConfig):
+ return self._v_ont_anis
+ elif isinstance(data, VEnetConfig):
+ return self._v_enets
+ return None
+
+ def pon(self, pon_id):
+ return self.southbound_ports.get(pon_id)
+
+ @property
+ def channel_terminations(self):
+ return self._channel_terminations
+
+ @property
+ def channel_pairs(self):
+ return self._channel_pairs
+
+ @property
+ def channel_partitions(self):
+ return self._channel_partitions
+
+ @property
+ def ont_anis(self):
+ return self._ont_anis
+
+ @property
+ def v_ont_anis(self):
+ return self._v_ont_anis
+
+ @property
+ def v_enets(self):
+ return self._v_enets
+
+ @property
+ def tconts(self):
+ return self._tconts
+
+ def _data_to_dict(self, data):
+ name = data.name
+ interface = data.interface
+ inst_data = data.data
+
+ if isinstance(data, ChannelgroupConfig):
+ return 'channel-group', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'system-id': inst_data.system_id,
+ 'polling-period': inst_data.polling_period
+ }
+
+ elif isinstance(data, ChannelpartitionConfig):
+ def _auth_method_enum_to_string(value):
+ from voltha.protos.bbf_fiber_types_pb2 import SERIAL_NUMBER, LOID, \
+ REGISTRATION_ID, OMCI, DOT1X
+ return {
+ SERIAL_NUMBER: 'serial-number',
+ LOID: 'loid',
+ REGISTRATION_ID: 'registration-id',
+ OMCI: 'omci',
+ DOT1X: 'dot1x'
+ }.get(value, 'unknown')
+
+ return 'channel-partition', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'authentication-method': _auth_method_enum_to_string(inst_data.authentication_method),
+ 'channel-group': inst_data.channelgroup_ref,
+ 'fec-downstream': inst_data.fec_downstream,
+ 'mcast-aes': inst_data.multicast_aes_indicator,
+ 'differential-fiber-distance': inst_data.differential_fiber_distance,
+ 'closest_ont_distance':inst_data.closest_ont_distance
+ }
+
+ elif isinstance(data, ChannelpairConfig):
+ return 'channel-pair', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'channel-group': inst_data.channelgroup_ref,
+ 'channel-partition': inst_data.channelpartition_ref,
+ 'line-rate': inst_data.channelpair_linerate
+ }
+
+ elif isinstance(data, ChannelterminationConfig):
+ return 'channel-termination', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'xgs-ponid': inst_data.xgs_ponid,
+ 'xgpon-ponid': inst_data.xgpon_ponid,
+ 'channel-pair': inst_data.channelpair_ref,
+ 'ber-calc-period': inst_data.ber_calc_period,
+ 'pon_tag':inst_data.pon_tag
+ }
+
+ elif isinstance(data, OntaniConfig):
+ return 'ont-ani', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'upstream-fec': inst_data.upstream_fec_indicator,
+ 'mgnt-gemport-aes': inst_data.mgnt_gemport_aes_indicator
+ }
+
+ elif isinstance(data, VOntaniConfig):
+ return 'vOnt-ani', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'onu-id': inst_data.onu_id,
+ 'expected-serial-number': inst_data.expected_serial_number,
+ 'expected-registration-id': inst_data.expected_registration_id,
+ 'preferred-channel-pair': inst_data.preferred_chanpair,
+ 'channel-partition': inst_data.parent_ref,
+ 'upstream-channel-speed': inst_data.upstream_channel_speed,
+ 'data': data
+ }
+
+ elif isinstance(data, VEnetConfig):
+ return 'vEnet', {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'v-ont-ani': inst_data.v_ontani_ref
+ }
+
+ else:
+ raise NotImplementedError('Unknown data type')
+
+ @staticmethod
+ def _dict_diff(lhs, rhs):
+ """
+ Compare the values of two dictionaries and return the items in 'rhs'
+ that are different than 'lhs. The RHS dictionary keys can be a subset of the
+ LHS dictionary, or the RHS dictionary keys can contain new values.
+
+ :param lhs: (dict) Original dictionary values
+ :param rhs: (dict) New dictionary values to compare to the original (lhs) dict
+ :return: (dict) Dictionary with differences from the RHS dictionary
+ """
+ assert len(lhs.keys()) == len(set(lhs.iterkeys()) & (rhs.iterkeys())), 'Dictionary Keys do not match'
+ return {k: v for k, v in rhs.items() if k not in lhs or lhs[k] != rhs[k]}
+
+ def _valid_to_modify(self, item_type, valid, diffs):
+ bad_keys = [mod_key not in valid for mod_key in diffs]
+ if len(bad_keys) != 0:
+ self.log.warn("{} modification of '{}' not supported").format(item_type, bad_keys[0])
+ return False
+ return True
+
+ def _on_channel_group_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['polling-period'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-group', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+ def _on_channel_partition_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['fec-downstream', 'mcast-aes', 'differential-fiber-distance']
+
+ if self._valid_to_modify('channel-partition', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+ def _on_channel_pair_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['line-rate'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-pair', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+
+
+ def _on_channel_termination_create(self, name, pon_type='xgs-ponid'):
+ assert name in self._channel_terminations, \
+ '{} is not a channel-termination'.format(name)
+ ct = self._channel_terminations[name]
+
+ pon_id = ct[pon_type]
+ # Look up the southbound PON port
+
+ pon_port = self.southbound_ports.get(pon_id, None)
+ if pon_port is None:
+ raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
+
+ assert ct['channel-pair'] in self._channel_pairs, \
+ '{} is not a channel-pair'.format(ct['channel-pair'])
+ cpair = self._channel_pairs[ct['channel-pair']]
+
+ assert cpair['channel-group'] in self._channel_groups, \
+ '{} is not a -group'.format(cpair['channel-group'])
+ assert cpair['channel-partition'] in self._channel_partitions, \
+ '{} is not a channel-partition'.format(cpair('channel-partition'))
+ cg = self._channel_groups[cpair['channel-group']]
+ cpart = self._channel_partitions[cpair['channel-partition']]
+
+ enabled = ct['enabled']
+
+ polling_period = cg['polling-period']
+ authentication_method = cpart['authentication-method']
+ # line_rate = cpair['line-rate']
+ downstream_fec = cpart['fec-downstream']
+ deployment_range = cpart['differential-fiber-distance']
+ # mcast_aes = cpart['mcast-aes']
+
+ # TODO: Support BER calculation period
+ # TODO Support setting of line rate
+
+ pon_port.xpon_name = name
+ pon_port.discovery_tick = polling_period
+ pon_port.authentication_method = authentication_method
+ pon_port.deployment_range = deployment_range * 1000 # pon-agent uses meters
+ pon_port.downstream_fec_enable = downstream_fec
+ # TODO: For now, upstream FEC = downstream
+ pon_port.upstream_fec_enable = downstream_fec
+
+ # TODO: pon_port.mcast_aes = mcast_aes
+
+ pon_port.admin_state = AdminState.ENABLED if enabled else AdminState.DISABLED
+
+ def _on_channel_termination_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['enabled'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-termination', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+ def _on_channel_termination_delete(self, name, pon_type='xgs-ponid'):
+ assert name in self._channel_terminations, \
+ '{} is not a channel-termination'.format(name)
+ ct = self._channel_terminations[name]
+
+ # Look up the southbound PON port
+ pon_id = ct[pon_type]
+ pon_port = self.southbound_ports.get(pon_id, None)
+ if pon_port is None:
+ raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
+
+ pon_port.admin_state = AdminState.DISABLED
+
+ def _on_vont_ani_create(self, name):
+ assert name in self._v_ont_anis, \
+ '{} is not a v_ont_ani'.format(name)
+
+ vani = self._v_ont_anis[name]
+
+ va_cp_name = vani['preferred-channel-pair']
+
+ if vani['preferred-channel-pair'] in self.channel_pairs:
+ for ct_name in self._channel_terminations:
+ #ct_name = ct['channel-pair']
+ ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
+
+ if va_cp_name == ct_cp_name:
+ ct = self._channel_terminations[ct_name]
+ pon_id = ct['xgs-ponid']
+
+ # Look up the southbound PON port
+ pon_port = self.southbound_ports.get(pon_id, None)
+ if pon_port is not None:
+ sn = vani['expected-serial-number']
+ onu_id = vani['onu-id']
+ if pon_port.onu_exist_check(onu_id)==False:
+ #add onu
+ onu_info = {
+ 'name': vani['name'],
+ 'device-id': self.device_id,
+ 'serial-number': sn,
+ 'xpon-name': None,
+ 'pon': pon_port,
+ 'onu-id': onu_id,
+ 'ranging-status': 0,
+ 'config-status': 1,
+ 'status_machine': 'init',
+ 'enabled': vani['enabled'],
+ 'channel-partition': vani['channel-partition'],
+ 'expected-registration-id': vani['expected-registration-id'],
+ 'upstream-channel-speed': vani['upstream-channel-speed'],
+ 'upstream-fec': True,
+ 'password': Onu.DEFAULT_PASSWORD,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id': pon_id + 100,
+ 'vont-ani': vani
+ }
+ pon_port.onu_add(onu_info)
+ else:
+ #update onu
+ onu_info = {
+ 'name': vani['name'],
+ 'device-id': self.device_id,
+ 'serial-number': sn,
+ 'xpon-name': None,
+ 'pon': pon_port,
+ 'onu-id': onu_id,
+ 'ranging-status': None,
+ 'config-status': 1,
+ 'status_machine': None,
+ 'enabled': vani['enabled'],
+ 'channel-partition': vani['channel-partition'],
+ 'expected-registration-id': vani['expected-registration-id'],
+ 'upstream-channel-speed': vani['upstream-channel-speed'],
+ 'upstream-fec': None,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id':None,
+ 'vont-ani': vani
+ }
+ pon_port.onu_update(onu_info)
+
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+
+ #if vani['protection-channel-pair'] in self.channel_pairs:
+ #for ct in self._channel_terminations:
+ #if ct['channel-pair']== vani['protection-channel-pair']:
+ #pon_id = ct['xgs-ponid']
+
+ # Look up the southbound PON port
+ #pon_port = self.southbound_ports.get(pon_id, None)
+ #if pon_port is not None:
+ #sn = vani['expected-serial-number']
+ #onu_id = vani['onu-id']
+ #if pon_port.onu_exist_check(onuid)==False:
+ #add onu
+ #onu_info = {
+ #'device-id': self.device_id,
+ #'serial-number': sn,
+ #'xpon-name': None,
+ #'pon': pon_port,
+ #'onu-id': onu_id,
+ #'enabled': enabled,
+ #'upstream-channel-speed': vani['upstream-channel-speed'],
+ #'password': Onu.DEFAULT_PASSWORD,
+ #'t-conts': None,
+ #'gem-ports': None,
+ #'onu-vid': None,
+ #'vont-ani': vani
+ #}
+ #pon_port.onu_add(onu_info)
+ #else:
+ #update onu
+
+
+ def _on_vont_ani_delete(self, name):
+ assert name in self._v_ont_anis, \
+ '{} is not a v_ont_ani'.format(name)
+
+ vani = self._v_ont_anis[name]
+
+ va_cp_name = vani['preferred-channel-pair']
+
+ if vani['preferred-channel-pair'] in self.channel_pairs:
+ for ct_name in self._channel_terminations:
+ ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
+
+ if va_cp_name == ct_cp_name:
+ ct = self._channel_terminations[ct_name]
+ pon_id = ct['xgs-ponid']
+
+ # Look up the southbound PON port
+ pon_port = self.southbound_ports.get(pon_id, None)
+ if pon_port is not None:
+ onu_id = vani['onu-id']
+ onu_info = {
+ 'name': vani['name'],
+ 'device-id': self.device_id,
+ 'serial-number': None,
+ 'xpon-name': None,
+ 'pon': pon_port,
+ 'onu-id': onu_id,
+ 'ranging-status': None,
+ 'config-status': 0,
+ 'status_machine': None,
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': None,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id':None,
+ 'vont-ani': None
+ }
+ pon_port.onu_update(onu_info)
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+
+ #if vani['protection-channel-pair'] in self.channel_pairs:
+ #for ct in self._channel_terminations:
+ #if ct['channel-pair']== vani['protection-channel-pair']:
+ #pon_id = ct['xgs-ponid']
+
+ # Look up the southbound PON port
+ #pon_port = self.southbound_ports.get(pon_id, None)
+ #if pon_port is not None:
+ #sn = vani['expected-serial-number']
+ #onu_id = vani['onu-id']
+ #if pon_port.onu_exist_check(onuid)==False:
+ #add onu
+ #onu_info = {
+ #'device-id': self.device_id,
+ #'serial-number': sn,
+ #'xpon-name': None,
+ #'pon': pon_port,
+ #'onu-id': onu_id,
+ #'enabled': enabled,
+ #'upstream-channel-speed': vani['upstream-channel-speed'],
+ #'password': Onu.DEFAULT_PASSWORD,
+ #'t-conts': None,
+ #'gem-ports': None,
+ #'onu-vid': None,
+ #'vont-ani': vani
+ #}
+ #pon_port.onu_add(onu_info)
+ #else:
+ #update onu
+
+
+ def _on_ont_ani_create(self, name):
+ assert name in self._ont_anis, \
+ '{} is not a ont_ani'.format(name)
+
+ ani = self._ont_anis[name]
+
+ upstream_fec = ani['upstream-fec']
+
+ #if upstream_fec == 1:
+ #return
+
+ vani = self._v_ont_anis[name]
+
+ if vani is None:
+ return
+
+ va_cp_name = vani['preferred-channel-pair']
+
+ if va_cp_name in self.channel_pairs:
+ for ct_name in self._channel_terminations:
+ ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
+
+ if va_cp_name == ct_cp_name:
+ ct = self._channel_terminations[ct_name]
+ pon_id = ct['xgs-ponid']
+
+ # Look up the southbound PON port
+ pon_port = self.southbound_ports.get(pon_id, None)
+ if pon_port is not None:
+ sn = vani['expected-serial-number']
+ onu_id = vani['onu-id']
+ if pon_port.onu_exist_check(onu_id)==True:
+ #update onu
+ onu_info = {
+ 'name': vani['name'],
+ 'device-id': self.device_id,
+ 'serial-number': sn,
+ 'xpon-name': None,
+ 'pon': pon_port,
+ 'onu-id': onu_id,
+ 'ranging-status': None,
+ 'config-status': None,
+ 'status_machine': None,
+ 'enabled': None,
+ 'channel-partition': None,
+ 'expected-registration-id': None,
+ 'upstream-channel-speed': None,
+ 'upstream-fec': upstream_fec,
+ 'password': None,
+ 't-conts': None,
+ 'gem-ports': None,
+ 'onu-vid': None,
+ 'channel-id':None,
+ 'vont-ani': None
+ }
+ pon_port.onu_update(onu_info)
+
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+ else:
+ pass
+
+
+
+ def create_interface(self, data):
+ """
+ Create XPON interfaces
+ :param data: (xpon config info)
+ """
+ self.log.debug('create-interface', interface=data.interface, inst_data=data.data)
+
+ name = data.name
+ items = self._get_xpon_collection(data)
+
+ if items is not None and name not in items:
+ self._cached_xpon_pon_info = {} # Clear cached data
+
+ item_type, new_item = self._data_to_dict(data)
+ #self.log.debug('new-item', item_type=item_type, item=new_item)
+
+ if name not in items:
+ self.log.debug('new-item', item_type=item_type, item=new_item)
+
+ items[name] = new_item
+
+ if isinstance(data, ChannelterminationConfig):
+ self._on_channel_termination_create(name)
+ self.configure_pon(name)
+ elif isinstance(data, VOntaniConfig):
+ self._on_vont_ani_create(name)
+ elif isinstance(data, OntaniConfig):
+ self._on_ont_ani_create(name)
+ else:
+ pass
+
+ def configure_pon(self,name,pon_type='xgs-ponid'):
+ self.log.debug('configure-pon', name=name)
+ try:
+ assert name in self._channel_terminations, \
+ '{} is not a channel-termination'.format(name)
+ ct = self._channel_terminations[name]
+
+ pon_id = ct[pon_type]
+ # Look up the southbound PON port
+ pon_port = pon_id
+
+ assert ct['channel-pair'] in self._channel_pairs, \
+ '{} is not a channel-pair'.format(ct['channel-pair'])
+ cpair = self._channel_pairs[ct['channel-pair']]
+
+ assert cpair['channel-group'] in self._channel_groups, \
+ '{} is not a -group'.format(cpair['channel-group'])
+ assert cpair['channel-partition'] in self._channel_partitions, \
+ '{} is not a channel-partition'.format(cpair('channel-partition'))
+ cg = self._channel_groups[cpair['channel-group']]
+ cpart = self._channel_partitions[cpair['channel-partition']]
+
+ pon_tag = ct['pon_tag']
+ closest_ont_distance = cpart['closest_ont_distance']
+ differential_fiber_distance = cpart['differential-fiber-distance']
+ fec_downstream = cpart['fec-downstream']
+ aes_downstream = 1
+ aes_upstream = 1
+ pon_profile = 0
+ bwmap_cycle = 8
+ discover_period = cg['polling-period']
+
+
+
+
+ msg_hdr = OltMsgCommonHdr(
+ type=OLT_PON_CONFIGURE_PON,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+ data = msg_hdr.SerializeToString()
+ self.zmq_client_async.async_send(data, 1)
+
+ config_pon = OltPonConfigurePon(
+ pon_port = pon_port,
+ pon_id = pon_id,
+ pon_tag = pon_tag,
+ closest_ont_distance = closest_ont_distance,
+ differential_fiber_distance = differential_fiber_distance,
+ fec_downstream = fec_downstream,
+ aes_downstream = aes_downstream,
+ aes_upstream = aes_upstream,
+ pon_profile = pon_profile,
+ bwmap_cycle = bwmap_cycle,
+ discover_period = discover_period
+ )
+ data = config_pon.SerializeToString()
+ self.zmq_client_async.async_send(data, 0)
+ self.log.debug('send configure-pon message success',config_pon=config_pon)
+
+ except Exception as e:
+ self.log.exception('Exception during configure pon', e=e)
+
+
+ def update_interface(self, data):
+ """
+ Update XPON interfaces
+ :param data: (xpon config info)
+ """
+ self.log.debug('update_interface', interface=data.interface, inst_data=data.data)
+
+ name = data.name
+ items = self._get_xpon_collection(data)
+
+ if items is None:
+ raise ValueError('Unknown data type: {}'.format(type(data)))
+
+ existing_item = items.get(name)
+ if existing_item is None:
+ raise KeyError("'{}' not found. Type: {}".format(name, type(data)))
+
+ item_type, update_item = self._data_to_dict(data)
+ self.log.debug('update-item', item_type=item_type, item=update_item)
+
+ # Calculate the difference
+ diffs = self._dict_diff(existing_item, update_item)
+
+ if len(diffs) == 0:
+ self.log.debug('update-item-no-diffs')
+
+ self._cached_xpon_pon_info = {} # Clear cached data
+
+ # Act on changed items
+ if isinstance(data, ChannelgroupConfig):
+ self._on_channel_group_modify(name, items, diffs)
+ #raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, ChannelpartitionConfig):
+ self._on_channel_partition_modify(name, items, diffs)
+ #raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, ChannelpairConfig):
+ self._on_channel_pair_modify(name, items, diffs)
+ #raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, ChannelterminationConfig):
+ self._on_channel_termination_modify(name, items, diffs)
+ #raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, OntaniConfig):
+ raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, VOntaniConfig):
+ raise NotImplementedError('TODO: not yet supported')
+
+ elif isinstance(data, VEnetConfig):
+ raise NotImplementedError('TODO: not yet supported')
+
+ else:
+ raise NotImplementedError('Unknown data type')
+
+
+ def remove_interface(self, data):
+ """
+ Deleete XPON interfaces
+ :param data: (xpon config info)
+ """
+ self.log.debug('remove_interface', interface=data.interface, inst_data=data.data)
+
+ name = data.name
+
+ items = self._get_xpon_collection(data)
+ item = items.get(name)
+ self.log.debug('delete-interface', name=name, data=data)
+ self.log.debug('remove_interface len(items)', len(items))
+
+ if item is not None:
+ self._cached_xpon_pon_info = {} # Clear cached data
+ #del items[name]
+
+ if isinstance(data, ChannelgroupConfig):
+ pass # Rely upon xPON logic to not allow delete of a referenced group
+
+ elif isinstance(data, ChannelpartitionConfig):
+ pass # Rely upon xPON logic to not allow delete of a referenced partition
+
+ elif isinstance(data, ChannelpairConfig):
+ pass # Rely upon xPON logic to not allow delete of a referenced pair
+
+ elif isinstance(data, ChannelterminationConfig):
+ self._on_channel_termination_delete(name)
+
+ elif isinstance(data, OntaniConfig):
+ pass
+
+ elif isinstance(data, VOntaniConfig):
+ self._on_vont_ani_delete(name)
+ #pass
+
+ elif isinstance(data, VEnetConfig):
+ pass
+
+ else:
+ raise NotImplementedError('Unknown data type')
+
+ del items[name]
+ self.log.debug('remove_interface len(items)', len(items))
+ #raise NotImplementedError('TODO: not yet supported')
+
+
+ def create_tcont(self, tcont_data, traffic_descriptor_data):
+ """
+ Create TCONT information
+ :param tcont_data:
+ :param traffic_descriptor_data:
+ """
+ self.log.debug('create-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+ traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
+ tcont = TCont.create(tcont_data, traffic_descriptor)
+
+ if tcont.name not in self._tconts:
+ self._cached_xpon_pon_info = {} # Clear cached data
+ self._tconts[tcont.name] = tcont
+
+ # Update any ONUs referenced
+ tcont.xpon_create(self)
+
+ if traffic_descriptor.name not in self._traffic_descriptors:
+ self._traffic_descriptors[traffic_descriptor.name] = traffic_descriptor
+
+ # Update any ONUs referenced
+ traffic_descriptor.xpon_create(self, tcont)
+
+ def update_tcont(self, tcont_data, traffic_descriptor_data):
+ """
+ Update TCONT information
+ :param tcont_data:
+ :param traffic_descriptor_data:
+ """
+ self.log.debug('update-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+
+ if tcont_data.name not in self._tconts:
+ raise KeyError("TCONT '{}' does not exists".format(tcont_data.name))
+
+ if traffic_descriptor_data.name not in self._traffic_descriptors:
+ raise KeyError("Traffic Descriptor '{}' does not exists".
+ format(traffic_descriptor_data.name))
+
+ self._cached_xpon_pon_info = {} # Clear cached data
+
+ traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
+ tcont = TCont.create(tcont_data, traffic_descriptor)
+ #
+ # Update any ONUs referenced
+ # tcont.xpon_update(self)
+ # traffic_descriptor.xpon_update(self, tcont)
+ pass
+ raise NotImplementedError('TODO: Not yet supported')
+
+ def remove_tcont(self, tcont_data, traffic_descriptor_data):
+ """
+ Remove TCONT information
+ :param tcont_data:
+ :param traffic_descriptor_data:
+ """
+ self.log.debug('remove-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+
+ tcont = self._tconts.get(tcont_data.name)
+ traffic_descriptor = self._traffic_descriptors.get(traffic_descriptor_data.name)
+
+ if traffic_descriptor is not None:
+ del self._traffic_descriptors[traffic_descriptor_data.name]
+
+ self._cached_xpon_pon_info = {} # Clear cached data
+ pass # Perform any needed operations
+ #raise NotImplementedError('TODO: Not yet supported')
+
+ if tcont is not None:
+ #del self._tconts[tcont_data.name]
+
+ self._cached_xpon_pon_info = {} # Clear cached data
+
+ #Update any ONUs referenced
+ tcont.xpon_delete(self)
+ del self._tconts[tcont_data.name]
+
+ pass # Perform any needed operations
+ #raise NotImplementedError('TODO: Not yet supported')
+
+ def create_gemport(self, gemport_data):
+ """
+ Create GEM Port
+ :param data:
+ """
+
+ self.log.debug('create-gemport', gemport=gemport_data)
+ gemport = Gemport.create(gemport_data)
+
+ if gemport.name not in self._gemports:
+ self._cached_xpon_pon_info = {} # Clear cached data
+ self._gemports[gemport.name] = gemport
+
+ # Update any ONUs referenced
+ gemport.xpon_create(self)
+
+ def remove_gemport(self, data):
+ """
+ Delete GEM Port
+ :param data:
+ """
+ self.log.debug('remove-gemport', gem_port=data.name)
+
+ gemport = self._gemports.get(data.name)
+
+ if gemport is not None:
+ #del self._tconts[tcont_data.name]
+
+ self._cached_xpon_pon_info = {} # Clear cached data
+
+ #Update any ONUs referenced
+ gemport.xpon_delete(self)
+ del self._gemports[data.name]
+
+ pass # Perform any needed operations
+
+
+ def update_gemport(self, data):
+ """
+ Update GEM Port
+ :param data:
+ """
+ self.log.debug('update-gemport', gem_port=data)
+ pass
+
+
+
+ def _unregister_for_inter_adapter_messages(self):
+ try:
+ self.adapter_agent.unregister_for_inter_adapter_messages()
+ except:
+ pass
+
+ def _delete_logical_device(self):
+ ldi, self.logical_device_id = self.logical_device_id, None
+
+ if ldi is None:
+ return
+
+ self.log.debug('delete-logical-device', ldi=ldi)
+
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ device = self.adapter_agent.get_device(self.device_id)
+ device.parent_id = ''
+
+ # Update the logical device mapping
+ if ldi in self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[ldi]
+
+ def _cancel_deferred(self):
+
+ d1, self.heartbeat = self.heartbeat, None
+ d2, self.asyncmsg = self.asyncmsg, None
+ d3, self.omcimsg = self.omcimsg, None
+ d4, self.packetmsg = self.packetmsg, None
+
+ for d in [d1, d2, d3, d4]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _zmq_shutdown(self):
+
+ self.zmq_client_sub.sub_shutdown()
+ self.zmq_client_sync.sync_shutdown()
+ self.zmq_client_async.async_shutdown()
+ self.zmq_client_omci.omci_shutdown()
+ self.zmq_client_of_packet.packet_shutdown()
+
+ def _finish_reboot(self):
+
+ if self.heartbrat_status == 1:
+ self.log.info('reboot self.reboot_check_times:', self.reboot_check_times)
+ self.reboot_check_times = 0
+
+ # Reenable all child devices
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ admin_state=AdminState.ENABLED)
+ #self.olt_activate_msg_send()
+ #self.work_status = 1
+
+ #for channel_term in self._channel_terminations:
+ #self.log.info('self._channel_terminations:', channel_term)
+ #self._on_channel_termination_create(channel_term)
+ #self.configure_pon(channel_term)
+
+ #for vont_ani in self._v_ont_anis:
+ #self.log.info('self._v_ont_anis:', vont_ani)
+ #self._on_vont_ani_create(vont_ani)
+
+ self.reboot_status = 0
+ self.log.info('rebooted', device_id=self.device_id)
+
+ else:
+ self.reboot_check_times += 1
+ if self.reboot_check_times < 20:
+ self.startup = reactor.callLater(10, self._finish_reboot)
+ else:
+ self.log.info('reboot fail. olt is unreachable.', device_id=self.device_id)
+
+ def disable(self):
+ self.log.info('disabling', device_id=self.device_id)
+
+ #send deactivate msg
+ self.olt_deactivate_msg_send()
+ self.work_status = 0
+
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ self._cancel_deferred()
+
+ self._unregister_for_inter_adapter_messages()
+ device = self.adapter_agent.get_device(self.device_id)
+
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ #device.admin_state = AdminState.DISABLED
+ self.adapter_agent.update_device(device)
+
+ # Remove the logical device
+ logical_device = self.adapter_agent.get_logical_device(
+ self.logical_device_id)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ # Disable all child devices first
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE,
+ admin_state=AdminState.DISABLED)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ # Set all ports to disabled
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ # Update the logice device mapping
+ if self.logical_device_id in \
+ self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[
+ self.logical_device_id]
+
+ if self.logical_device_id is not None:
+ self.logical_device_id = None
+
+ #self._delete_logical_device()
+ self.log.info('disabled', device_id=device.id)
+
+ #zmq shutdown
+ #self._zmq_shutdown()
+
+
+ def reenable(self,done_deferred=None):
+ """
+ This is called when a previously disabled device needs to be enabled based on a NBI call.
+ :param done_deferred: (Deferred) Deferred to fire when done
+ """
+ self.log.info('re-enabling', device_id=self.device_id)
+
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
+
+ # Set all ports to enabled
+ self.adapter_agent.enable_all_ports(self.device_id)
+
+ # Recreate the logical device
+ logical_device = self.get_logical_device(device)
+ self._logical_device = self.adapter_agent.create_logical_device(logical_device,dpid=self.olt_mac)
+
+ # Recreate logical ports for all southbound and northbound interfaces
+ for port_no in self.northbound_ports:
+ logical_port = self.northbound_ports[port_no].get_logical_port()
+ self.adapter_agent.add_logical_port(self._logical_device.id, logical_port)
+
+ # update device active status now
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = self._logical_device.id
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.logical_device_id = self._logical_device.id
+ self.adapter_agent.update_device(device)
+
+
+ # Reenable all child devices
+ self.adapter_agent.update_child_devices_state(device.id,
+ oper_status=OperStatus.DISCOVERED,
+ connect_status=ConnectStatus.REACHABLE,
+ admin_state=AdminState.ENABLED)
+
+ self.olt_activate_msg_send()
+ self.work_status = 1
+
+ for channel_term in self._channel_terminations:
+ self.log.info('self._channel_terminations:', channel_term)
+ self._on_channel_termination_create(channel_term)
+ self.configure_pon(channel_term)
+
+ for vont_ani in self._v_ont_anis:
+ self.log.info('self._v_ont_anis:', vont_ani)
+ self._on_vont_ani_create(vont_ani)
+
+ for ont_ani in self._ont_anis:
+ self.log.info('self._ont_anis:', ont_ani)
+ self._on_ont_ani_create(ont_ani)
+
+ #start heart_beat pm_collect
+ self.log.debug('Starting heartbeat')
+ self.start_heartbeat(delay=5)
+ #self.start_onu_test()
+ self.start_poll_async_msg()
+ self.start_poll_omci_msg()
+ #self.start_kpi_collection()
+ self.start_poll_packet_in_msg()
+
+ self.log.info('re-enabled', device_id=device.id)
+
+
+ def reboot(self):
+ self.log.info('rebooting', device_id=self.device_id)
+
+ # Update the operational status to ACTIVATING and connect status to
+ # UNREACHABLE
+ #device = self.adapter_agent.get_device(self.device_id)
+ #previous_oper_status = device.oper_status
+ #previous_conn_status = device.connect_status
+ #device.oper_status = OperStatus.ACTIVATING
+ #device.connect_status = ConnectStatus.UNREACHABLE
+ #self.adapter_agent.update_device(device)
+
+ # Update the child devices connect state to UNREACHABLE
+ #self.adapter_agent.update_child_devices_state(self.device_id,
+ # connect_status=ConnectStatus.UNREACHABLE)
+
+ # Disable all child devices first
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ admin_state=AdminState.DISABLED)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ self.olt_reboot_msg_send()
+
+ self.reboot_status = 1
+ self.heartbrat_status = 0
+ self.reboot_check_times = 0
+ self.startup = reactor.callLater(10, self._finish_reboot)
+
+
+ def delete(self):
+ self.log.info('deleting', device_id=self.device_id)
+
+ #send deactivate msg
+ #self.olt_deactivate_msg_send()
+
+ #self._cancel_deferred()
+
+ # Drop registration for adapter messages
+ #self._unregister_for_inter_adapter_messages()
+
+ # Cancel any outstanding tasks
+
+ #d, self.startup = self.startup, None
+ #try:
+ #if d is not None and not d.called:
+ #d.cancel()
+ #except:
+ #pass
+
+
+
+ # clear xpon config
+ self._channel_groups.clear
+ self._channel_partitions.clear
+ self._channel_pairs.clear
+ self._channel_terminations.clear
+ self._v_ont_anis.clear
+ self._ont_anis.clear
+ self._v_enets.clear
+ self._tconts.clear
+ self._traffic_descriptors.clear
+ self._gemports.clear
+ self._cached_xpon_pon_info.clear
+
+ # Remove all child devices
+ self.adapter_agent.delete_all_child_devices(self.device_id)
+
+ self.log.info("_delete_logical_device")
+ # Remove the logical device (should already be gone if disable came first)
+ if self.logical_device_id is not None:
+ logical_device = self.adapter_agent.get_logical_device(
+ self.logical_device_id)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ #self.log.info("delete_all_peer_references")
+ # Remove the peer references from this device
+ #self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Update the logice device mapping
+ if self.logical_device_id in \
+ self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[
+ self.logical_device_id]
+
+ if self.logical_device_id is not None:
+ self.logical_device_id = None
+
+ # Tell all ports to stop any background processing
+ for port in self.northbound_ports.itervalues():
+ port.delete()
+
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ self.northbound_ports.clear()
+ self.southbound_ports.clear()
+
+ # Shutdown communications with OLT
+ self._zmq_shutdown()
+
+ self.log.info('deleted', device_id=self.device_id)
+
+ def _update_download_status(self, request, download):
+ if download is not None:
+ request.state = download.download_state
+ request.reason = download.failure_reason
+ request.image_state = download.image_state
+ request.additional_info = download.additional_info
+ request.downloaded_bytes = download.downloaded_bytes
+ else:
+ request.state = ImageDownload.DOWNLOAD_UNKNOWN
+ request.reason = ImageDownload.UNKNOWN_ERROR
+ request.image_state = ImageDownload.IMAGE_UNKNOWN
+ request.additional_info = "Download request '{}' not found".format(request.name)
+ request.downloaded_bytes = 0
+
+ self.adapter_agent.update_image_download(request)
+
+ def start_download(self, device, request, done):
+ """
+ This is called to request downloading a specified image into
+ the standby partition of a device based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+ :return: (Deferred) Shall be fired to acknowledge the download.
+ """
+ log.info('image_download', request=request)
+
+ try:
+ if request.name in self._downloads:
+ raise Exception("Download request with name '{}' already exists".
+ format(request.name))
+ try:
+ download = Download.create(self, request, self._download_protocols)
+
+ except Exception:
+ request.additional_info = 'Download request creation failed due to exception'
+ raise
+
+ try:
+ self._downloads[download.name] = download
+ self._update_download_status(request, download)
+ done.callback('started')
+ return done
+
+ except Exception:
+ request.additional_info = 'Download request startup failed due to exception'
+ del self._downloads[download.name]
+ download.cancel_download(request)
+ raise
+
+ except Exception as e:
+ self.log.exception('create', e=e)
+
+ request.reason = ImageDownload.UNKNOWN_ERROR
+ request.state = ImageDownload.DOWNLOAD_FAILED
+ if not request.additional_info:
+ request.additional_info = e.message
+
+ self.adapter_agent.update_image_download(request)
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ raise
+
+
+
+
+
+
+ def download_status(self, device, request, done):
+ """
+ This is called to inquire about a requested image download status based
+ on a NBI call.
+
+ The adapter is expected to update the DownloadImage DB object with the
+ query result
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('download_status', request=request)
+ download = self._downloads.get(request.name)
+
+ self._update_download_status(request, download)
+
+ if request.state != ImageDownload.DOWNLOAD_STARTED:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+
+ done.callback(request.state)
+ return done
+
+
+
+
+
+
+ def cancel_download(self, device, request, done):
+ """
+ This is called to cancel a requested image download based on a NBI
+ call. The admin state of the device will not change after the
+ download.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('cancel_download', request=request)
+
+ download = self._downloads.get(request.name)
+
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.cancel_download(request)
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ if device.admin_state == AdminState.DOWNLOADING_IMAGE:
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+
+ return done
+
+
+ def activate_image(self, device, request, done):
+ """
+ This is called to activate a downloaded image from a standby partition
+ into active partition.
+
+ Depending on the device implementation, this call may or may not
+ cause device reboot. If no reboot, then a reboot is required to make
+ the activated image running on device
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('activate_image', request=request)
+
+ download = self._downloads.get(request.name)
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.activate_image()
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ return done
+
+ def revert_image(self, device, request, done):
+ """
+ This is called to deactivate the specified image at active partition,
+ and revert to previous image at standby partition.
+
+ Depending on the device implementation, this call may or may not
+ cause device reboot. If no reboot, then a reboot is required to
+ make the previous image running on device
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('revert_image', request=request)
+ download = self._downloads.get(request.name)
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.revert_image()
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ return done
+
+
+
+
+
diff --git a/voltha/adapters/cig_olt/cig_olt_xpon.py b/voltha/adapters/cig_olt/cig_olt_xpon.py
new file mode 100644
index 0000000..82fcbaf
--- /dev/null
+++ b/voltha/adapters/cig_olt/cig_olt_xpon.py
@@ -0,0 +1,312 @@
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import json
+from enum import Enum
+from voltha.protos.bbf_fiber_tcont_body_pb2 import TcontsConfigData
+from voltha.protos.bbf_fiber_traffic_descriptor_profile_body_pb2 import TrafficDescriptorProfileData
+from voltha.protos.bbf_fiber_gemport_body_pb2 import GemportsConfigData
+from twisted.internet.defer import succeed, inlineCallbacks, returnValue
+from cig_olt_device import *
+
+log = structlog.get_logger()
+
+class TCont(object):
+ """
+ Class to wrap TCont capabilities
+ """
+ def __init__(self, alloc_id, traffic_descriptor, best_effort=None,
+ name=None, ident=None, vont_ani=None):
+ self.alloc_id = alloc_id
+ self.traffic_descriptor = traffic_descriptor
+ self.best_effort = best_effort
+ self.name = name
+ self.pon_id = None
+ self.onu_id = None
+ self.id = ident
+ self.vont_ani = vont_ani # (string) reference
+
+ def __str__(self):
+ return "TCont: {}, alloc-id: {}".format(self.name, self.alloc_id)
+
+ @staticmethod
+ def create(data, td):
+ assert isinstance(data, TcontsConfigData)
+ assert isinstance(td, TrafficDescriptor)
+
+ return TCont(data.alloc_id, td, best_effort=td.best_effort,
+ name=data.name, ident=data.id, vont_ani=data.interface_reference)
+
+ def _get_onu(self, olt):
+ onu = None
+ log.info('tcont _get_onu.')
+ try:
+ vont_ani = olt.v_ont_anis.get(self.vont_ani)
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+ log.info('tcont _get_onu pon.')
+ pon = olt.pon(ch_term['xgs-ponid'])
+ log.info('tcont _get_onu pon.', pon._name)
+ onu = pon.onu(vont_ani['onu-id'])
+ log.info('tcont _get_onu onu.', onu._name)
+ except Exception:
+ pass
+
+ return onu
+
+ def xpon_create(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.add_tcont(self)
+ #pass
+
+ def xpon_update(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ pass # TODO: Not yet supported
+ #pass
+
+ def xpon_delete(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.remove_tcont(self.alloc_id)
+ #pass
+
+
+class TrafficDescriptor(object):
+ """
+ Class to wrap the uplink traffic descriptor.
+ """
+ class AdditionalBwEligibility(Enum):
+ NONE = 0
+ BEST_EFFORT_SHARING = 1
+ NON_ASSURED_SHARING = 2 # Should match xpon.py values
+ DEFAULT = NONE
+
+ @staticmethod
+ def to_string(value):
+ return {
+ TrafficDescriptor.AdditionalBwEligibility.NON_ASSURED_SHARING: "non-assured-sharing",
+ TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING: "best-effort-sharing",
+ TrafficDescriptor.AdditionalBwEligibility.NONE: "none"
+ }.get(value, "unknown")
+
+ @staticmethod
+ def from_value(value):
+ """
+ Matches both Adtran and xPON values
+ :param value:
+ :return:
+ """
+ return {
+ 0: TrafficDescriptor.AdditionalBwEligibility.NONE,
+ 1: TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING,
+ 2: TrafficDescriptor.AdditionalBwEligibility.NON_ASSURED_SHARING,
+ }.get(value, TrafficDescriptor.AdditionalBwEligibility.DEFAULT)
+
+ def __init__(self, fixed, assured, maximum,
+ additional=AdditionalBwEligibility.DEFAULT,
+ best_effort=None,
+ name=None,
+ ident=None):
+ self.name = name
+ self.id = ident
+ self.fixed_bandwidth = fixed # bps
+ self.assured_bandwidth = assured # bps
+ self.maximum_bandwidth = maximum # bps
+ self.additional_bandwidth_eligibility = additional
+ self.best_effort = best_effort\
+ if additional == TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING\
+ else None
+
+ def __str__(self):
+ return "TrafficDescriptor: {}, {}/{}/{}".format(self.name,
+ self.fixed_bandwidth,
+ self.assured_bandwidth,
+ self.maximum_bandwidth)
+
+ @staticmethod
+ def create(data):
+ assert isinstance(data, TrafficDescriptorProfileData)
+
+ additional = TrafficDescriptor.AdditionalBwEligibility.from_value(
+ data.additional_bw_eligibility_indicator)
+
+ if additional == TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
+ best_effort = BestEffort(data.maximum_bandwidth,
+ data.priority,
+ data.weight)
+ else:
+ best_effort = None
+
+ return TrafficDescriptor(data.fixed_bandwidth, data.assured_bandwidth,
+ data.maximum_bandwidth,
+ name=data.name,
+ ident=data.id,
+ best_effort=best_effort,
+ additional=additional)
+
+ def to_dict(self):
+ val = {
+ 'fixed-bandwidth': self.fixed_bandwidth,
+ 'assured-bandwidth': self.assured_bandwidth,
+ 'maximum-bandwidth': self.maximum_bandwidth,
+ 'additional-bandwidth-eligibility':
+ TrafficDescriptor.AdditionalBwEligibility.to_string(
+ self.additional_bandwidth_eligibility)
+ }
+ return val
+
+ def xpon_create(self, olt, tcont):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO
+
+ def xpon_update(self, olt, tcont):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO: Not yet supported
+
+
+class BestEffort(object):
+ def __init__(self, bandwidth, priority, weight):
+ self.bandwidth = bandwidth # bps
+ self.priority = priority # 0.255
+ self.weight = weight # 0..100
+
+ def __str__(self):
+ return "BestEffort: {}/p-{}/w-{}".format(self.bandwidth,
+ self.priority,
+ self.weight)
+
+ def to_dict(self):
+ val = {
+ 'bandwidth': self.bandwidth,
+ 'priority': self.priority,
+ 'weight': self.weight
+ }
+ return val
+
+class Gemport(object):
+ """
+ Class to wrap TCont capabilities
+ """
+ def __init__(self, gem_id, traffic_class, name=None, uni_name=None, aes_indicator=None, tcont_name=None):
+ log.info('gemport init.')
+ self.gem_id = gem_id
+ self.traffic_class = traffic_class
+ self.name = name
+ self.uni_name = uni_name
+ self.aes_indicator = aes_indicator
+ self.tcont_name = tcont_name
+
+ self.pon_id = None
+ self.onu_id = None
+ self.tcont_id = None
+ self.tcont = None
+ self.pon = None
+ self.onu = None
+
+ strlist = uni_name.split('.')
+ self.cvlan = int(strlist[len(strlist)-1])
+ log.info('gemport init self.cvlan.', self.cvlan)
+
+ def __str__(self):
+ return "GemPort: {}, Tcont: {}, gem-id: {}".format(self.name,
+ self.tcont_name,
+ self.gem_id)
+
+ @staticmethod
+ def create(data):
+ log.info('gemport create.')
+ assert isinstance(data, GemportsConfigData)
+
+ return Gemport(data.gemport_id, data.traffic_class, data.name, data.itf_ref, data.aes_indicator, data.tcont_ref)
+
+
+ def _get_onu(self, olt):
+ onu = None
+ log.info('gemport _get_onu.')
+
+ tcont = olt._tconts.get(self.tcont_name)
+ log.info('self.tcont_name.',self.tcont_name)
+ if tcont is None:
+ log.info('tcont is None.')
+ return None
+
+ self.tcont = tcont
+ vont_ani = olt.v_ont_anis.get(tcont.vont_ani)
+ if vont_ani is None:
+ log.info('vont_ani is None.')
+ return None
+
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ if ch_pair is None:
+ log.info('ch_pair is None.')
+ return None
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+ if ch_term is None:
+ log.info('ch_term is None.')
+ return None
+
+ self.pon_id = ch_term['xgs-ponid']
+ self.onu_id = vont_ani['onu-id']
+ self.tcont_id = tcont.alloc_id
+
+ log.info('gemport _get_onu.', self.pon_id)
+ pon = olt.pon(ch_term['xgs-ponid'])
+ log.info('gemport _get_onu pon.', pon._name)
+ self.pon = pon
+ onu = pon.onu(vont_ani['onu-id'])
+ log.info('gemport _get_onu onu.', onu._name)
+ self.onu = onu
+
+ return onu
+
+ def xpon_create(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ log.info('gemport xpon_create.')
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.add_gemport(self)
+
+ def xpon_update(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ log.info('gemport xpon_update.')
+
+ if self.onu is not None:
+ pass # TODO: Not yet supported
+ #pass
+
+ def xpon_delete(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ log.info('gemport xpon_delete.')
+
+ if self.onu is not None:
+ self.onu.remove_gemport(self.gem_id)
+
+
+
+
+
+
+
diff --git a/voltha/adapters/cig_olt/cig_olt_zmq.py b/voltha/adapters/cig_olt/cig_olt_zmq.py
new file mode 100644
index 0000000..74b400c
--- /dev/null
+++ b/voltha/adapters/cig_olt/cig_olt_zmq.py
@@ -0,0 +1,296 @@
+#
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import binascii
+import struct
+import zmq
+import sys
+import thread
+import time
+import structlog
+
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
+
+log = structlog.get_logger()
+
+zmq_context = zmq.Context()
+
+#_OLTD_ZEROMQ_METRICS_PORT = 5555
+_OLTD_ZEROMQ_SYNC_PORT = 5555
+
+_OLTD_ZEROMQ_PACKET_OUT_PORT = 8889
+_OLTD_ZEROMQ_OMCI_PORT = 8888
+_OLTD_ZEROMQ_ASYNC_PORT = 6666
+
+_OLTD_ZEROMQ_PUB_PACKET_IN_PORT = 7778
+_OLTD_ZEROMQ_PUB_OMCI_PORT = 7777
+_OLTD_ZEROMQ_PUB_OTHER_PORT = 4444
+
+class CigZmqClientPollMetrics(object):
+ """
+ poll metrics ipc.
+ """
+ def __init__(self, ip_address, rx_incoming_queue):
+ raise NotImplementedError(self)
+
+ def poll_metrics_send(self, data):
+ raise NotImplementedError(self)
+
+class CigZmqClientSync(object):
+ """
+ Sync ipc.
+ """
+ def __init__(self, ip_address, rx_incoming_queue):
+ self.external_conn = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_SYNC_PORT)
+ self.socket = zmq_context.socket(zmq.REQ)
+ self.socket.setsockopt(zmq.RCVTIMEO,3000)
+ self.socket.connect(self.external_conn)
+ self.rx_incoming_queue = rx_incoming_queue
+ self.response = None
+
+ def sync_send(self, data, flag=0):
+ try:
+ if flag == 1:
+ self.socket.send(data, flags=zmq.SNDMORE)
+ else:
+ self.socket.send(data)
+ if flag != 1:
+ #try:
+ thread.start_new_thread(self.sync_receive,())
+ #except:
+ # log.exception(e.message)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ def sync_receive(self):
+
+ try:
+ self.response = self.socket.recv_multipart()
+ self.rx_incoming_queue.put(self.response)
+
+ except Exception as e:
+ self.rx_incoming_queue.put("RecvTimeoutErr.")
+ log.exception(e.message)
+
+ def sync_reconnect(self):
+ self.socket.close()
+ self.socket = zmq_context.socket(zmq.REQ)
+ self.socket.setsockopt(zmq.RCVTIMEO,3000)
+ self.socket.connect(self.external_conn)
+
+
+ def sync_shutdown(self):
+ self.socket.close()
+
+
+class CigZmqClientOmci(object):
+ """
+ Omci ipc.
+ """
+ def __init__(self, ip_address, rx_incoming_queue):
+
+ self.external_conn_tx = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_OMCI_PORT)
+ self.socket_tx = zmq_context.socket(zmq.DEALER)
+ self.socket_tx.connect(self.external_conn_tx)
+
+ self.external_conn_rx = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_PUB_OMCI_PORT)
+ self.socket_rx = zmq_context.socket(zmq.SUB)
+ self.socket_rx.setsockopt(zmq.SUBSCRIBE,'')
+ self.socket_rx.setsockopt(zmq.RCVTIMEO,1000)
+ self.socket_rx.connect(self.external_conn_rx)
+ #self.rx_callback = rx_callback
+ self.rx_incoming_queue = rx_incoming_queue
+ self.killflag = 0
+
+ try:
+ thread.start_new_thread(self.omci_receive,())
+ except:
+ log.exception(e.message)
+
+ def omci_send(self, data, flag=0):
+ try:
+ if flag == 1:
+ self.socket_tx.send(data, flags=zmq.SNDMORE)
+ else:
+ self.socket_tx.send(data)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ def omci_receive(self):
+ while True:
+ #time.sleep(0.5)
+ try:
+ self.response = self.socket_rx.recv_multipart()
+ #self.rx_callback(self.response)
+ #log.info('omci_receive', self.response)
+ self.rx_incoming_queue.put(self.response)
+
+ except Exception as e:
+ if self.killflag == 1:
+ thread.exit ()
+ #log.exception('Exception during omci_receive processing', e=e)
+
+ def omci_shutdown(self):
+ self.killflag = 1
+ time.sleep(1)
+ self.socket_tx.close()
+ self.socket_rx.close()
+
+
+
+class CigZmqClientAsync(object):
+ """
+ Async ipc.
+ """
+ def __init__(self, ip_address, rx_callback):
+
+ self.external_conn = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_ASYNC_PORT)
+ self.socket = zmq_context.socket(zmq.DEALER)
+ self.socket.setsockopt(zmq.RCVTIMEO,1000)
+ self.socket.connect(self.external_conn)
+ self.rx_callback = rx_callback
+ #self.rx_incoming_queue = rx_incoming_queue
+ self.killflag = 0
+
+ try:
+ thread.start_new_thread(self.async_receive,())
+ except:
+ log.exception(e.message)
+
+ def async_send(self, data, flag=0):
+ try:
+ if flag == 1:
+ self.socket.send(data, flags=zmq.SNDMORE)
+ else:
+ self.socket.send(data)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ def async_receive(self):
+ while True:
+ try:
+ self.response = self.socket.recv_multipart()
+ #self.rx_incoming_queue.put(self.response)
+ self.rx_callback(self.response)
+ #log.info('async_receive', self.response)
+
+ except Exception as e:
+ if self.killflag == 1:
+ thread.exit ()
+ #log.exception('Exception during sync_receive processing', e=e)
+
+ def async_shutdown(self):
+ self.killflag = 1
+ time.sleep(1)
+ self.socket.close()
+
+
+class CigZmqClientSub(object):
+ """
+ Publish ipc.
+ """
+
+ def __init__(self, ip_address, rx_incoming_queue):
+ self.external_conn = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_PUB_OTHER_PORT)
+ self.socket = zmq_context.socket(zmq.SUB)
+ self.socket.setsockopt(zmq.SUBSCRIBE,'')
+ self.socket.setsockopt(zmq.RCVHWM,10000)
+ self.socket.setsockopt(zmq.RCVTIMEO,1000)
+ self.socket.connect(self.external_conn)
+ self.rx_incoming_queue = rx_incoming_queue
+ self.killflag = 0
+
+ log.info('CigZmqClientSub zmq.RCVHWM', self.socket.getsockopt(zmq.RCVHWM))
+
+ try:
+ thread.start_new_thread(self.sub_receive,())
+ except:
+ log.exception(e.message)
+
+ def sub_receive(self):
+ while True:
+ try:
+ self.response = self.socket.recv_multipart()
+ #log.info('sub_receive', self.response)
+ self.rx_incoming_queue.put(self.response)
+ except Exception as e:
+ if self.killflag == 1:
+ thread.exit ()
+ #log.exception('Exception during sub_receive processing', e=e)
+
+ def sub_shutdown(self):
+ self.killflag = 1
+ time.sleep(1)
+ self.socket.close()
+
+class CigZmqClientPacketInOut(object):
+ """
+ packet in/out ipc.
+ """
+ def __init__(self, ip_address, rx_incoming_queue):
+
+ self.external_conn_tx = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_PACKET_OUT_PORT)
+ self.socket_tx = zmq_context.socket(zmq.DEALER)
+ self.socket_tx.connect(self.external_conn_tx)
+
+ self.external_conn_rx = "tcp://{}:{}".format(ip_address, _OLTD_ZEROMQ_PUB_PACKET_IN_PORT)
+ self.socket_rx = zmq_context.socket(zmq.SUB)
+ self.socket_rx.setsockopt(zmq.SUBSCRIBE,'')
+ self.socket_rx.setsockopt(zmq.RCVTIMEO,1000)
+ self.socket_rx.connect(self.external_conn_rx)
+ #self.rx_callback = rx_callback
+ self.rx_incoming_queue = rx_incoming_queue
+ self.killflag = 0
+
+ try:
+ thread.start_new_thread(self.packet_receive,())
+ except:
+ log.exception(e.message)
+
+ def packet_send(self, data, flag=0):
+ try:
+ if flag == 1:
+ self.socket_tx.send(data, flags=zmq.SNDMORE)
+ else:
+ self.socket_tx.send(data)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ def packet_receive(self):
+ while True:
+ #time.sleep(0.5)
+ try:
+ self.response = self.socket_rx.recv_multipart()
+ #self.rx_callback(self.response)
+ #log.info('packet_receive', self.response)
+ self.rx_incoming_queue.put(self.response)
+
+ except Exception as e:
+ if self.killflag == 1:
+ thread.exit ()
+ #log.exception('Exception during omci_receive processing', e=e)
+
+ def packet_shutdown(self):
+ self.killflag = 1
+ time.sleep(1)
+ self.socket_rx.close()
+ self.socket_tx.close()
+
+
diff --git a/voltha/adapters/cig_olt/download.py b/voltha/adapters/cig_olt/download.py
new file mode 100644
index 0000000..61ee34c
--- /dev/null
+++ b/voltha/adapters/cig_olt/download.py
@@ -0,0 +1,329 @@
+# Copyright 2017-present CIG, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import structlog
+import xmltodict
+from twisted.internet import reactor
+from twisted.internet.defer import returnValue, inlineCallbacks
+from voltha.protos.device_pb2 import ImageDownload
+from voltha.protos.common_pb2 import AdminState
+from cig_olt_zmq import *
+#from voltha.adapters.cig_olt.protos.olt_common_pb2 import *
+#from voltha.adapters.cig_olt.protos.olt_d_pb2 import *
+from voltha.protos.olt_common_pb2 import *
+from voltha.protos.olt_d_pb2 import *
+
+log = structlog.get_logger()
+
+
+class Download(object):
+ """Class to wrap an image download"""
+
+ def __init__(self, handler, request, protocols):
+ self._olt = handler
+ self._deferred = None
+ self.device_id = request.id
+ self._name = request.name
+ self._url = request.url
+ self._crc = request.crc
+ self._version = request.image_version
+ self._local = request.local_dir
+ self._save_config = request.save_config
+ self._supported_protocols = protocols
+
+ self._download_state = ImageDownload.DOWNLOAD_UNKNOWN
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._image_state = ImageDownload.IMAGE_UNKNOWN
+ self._additional_info = ''
+ self._downloaded_octets = 0
+
+ # Server profile info
+ self._server_profile_name = None
+ self._scheme = None
+ self._host = ''
+ self._port = None
+ self._path = ''
+ self._auth = None
+
+ # Download job info
+ self._download_job_name = None
+ self._chech_deferred =None
+
+ def __str__(self):
+ return "ImageDownload: {}".format(self.name)
+
+ @staticmethod
+ def create(handler, request, supported_protocols):
+ """
+ Create and start a new image download
+
+ :param handler: (AdtranDeviceHandler) Device download is for
+ :param done_deferred: (Deferred) deferred to fire on completion
+ :param request: (ImageDownload) Request
+ """
+ download = Download(handler, request, supported_protocols)
+ download._deferred = reactor.callLater(0, download.start_download)
+
+ return download
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def download_state(self):
+ return self._download_state
+
+ @property
+ def failure_reason(self):
+ return self._failure_reason
+
+ @property
+ def image_state(self):
+ return self._image_state
+
+ @property
+ def additional_info(self):
+ return self._additional_info
+
+ @property
+ def downloaded_bytes(self):
+ return self._downloaded_octets
+
+ @property
+ def profile_name(self):
+ return self._server_profile_name
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception as e:
+ pass
+
+ def check_download_status(self):
+ log.info('*** check download status ***')
+ self._download_state = ImageDownload.DOWNLOAD_SUCCEEDED
+ self._failure_reason = ImageDownload.NO_ERROR
+ self._download_complete()
+
+
+ #@inlineCallbacks
+ def start_download(self):
+ import uuid
+ log.info('download-start', name=self.name)
+ if not self.parse_url():
+ self._download_failed()
+ log.info('failed url parsing', name=self.name)
+ return
+ #returnValue('failed url parsing')
+
+ self._download_state = ImageDownload.DOWNLOAD_STARTED
+ self._failure_reason = ImageDownload.NO_ERROR
+
+ #send msg to oltd to start download
+ self.image_cfg_msg_send(IMAGE_CMD_DOWNLOAD)
+
+ log.info('start download *************', name=self.name)
+ self._chech_deferred = reactor.callLater(120, self.check_download_status)
+ return self._chech_deferred
+
+ def parse_url(self):
+ from urllib3 import util, exceptions
+ try:
+ results = util.parse_url(self._url)
+
+ # Server info
+ self._scheme = results.scheme.lower()
+ #if self._scheme not in self._supported_protocols:
+ #self._failure_reason = ImageDownload.INVALID_URL
+ #self._additional_info = "Unsupported file transfer protocol: {}".format(results.scheme)
+ #return False
+
+ self._host = results.host
+ self._port = results.port
+ self._path = results.path
+ self._auth = results.auth
+ return True
+
+ except exceptions.LocationValueError as e:
+ self._failure_reason = ImageDownload.INVALID_URL
+ self._additional_info = e.message
+ return False
+
+ except Exception as e:
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info = e.message
+ return False
+
+ def _download_failed(self):
+ log.info('download-failed', name=self.name)
+
+ self._cancel_deferred()
+ self._download_state = ImageDownload.DOWNLOAD_FAILED
+
+ # Cleanup NETCONF
+
+ #reactor.callLater(0, self._cleanup_download_job, 20)
+ #reactor.callLater(0, self._cleanup_server_profile, 20)
+ # TODO: Do we signal any completion due to failure?
+
+ def _download_complete(self):
+ log.info('download-completed', name=self.name)
+
+ self._cancel_deferred()
+ self._download_state = ImageDownload.DOWNLOAD_SUCCEEDED
+ self._downloaded_octets = 123456
+ self._failure_reason = ImageDownload.NO_ERROR
+
+ #reactor.callLater(0, self._cleanup_download_job, 20)
+ #reactor.callLater(0, self._cleanup_server_profile, 20)
+ # TODO: How do we signal completion?
+
+ device = self._olt.adapter_agent.get_device(self.device_id)
+ if device is not None:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self._olt.adapter_agent.update_device(device)
+
+ #@inlineCallbacks
+ def cancel_download(self, request):
+ log.info('cancel-sw-download', name=self.name)
+
+ self._cancel_deferred()
+
+ try:
+ # initiate cancelling software download to device at success
+ # delete image download record
+
+ self._olt.adapter_agent.delete_image_download(request)
+
+ device = self._olt.adapter_agent.get_device(self.device_id)
+ if device is not None:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self._olt.adapter_agent.update_device(device)
+
+
+ #send msg to oltd to cancel download
+ self.image_cfg_msg_send(IMAGE_CMD_CANCEL)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ #reactor.callLater(0, self._cleanup_download_job, 20)
+ #reactor.callLater(0, self._cleanup_server_profile, 20)
+
+
+ #@inlineCallbacks
+ def activate_image(self):
+ log.info('download-activate', name=self.name)
+
+ if self._download_state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ pass # TODO: Implement
+ self._image_state = ImageDownload.IMAGE_ACTIVE
+
+ #send msg to oltd to activate image
+ self.image_cfg_msg_send(IMAGE_CMD_ACTIVETE)
+
+ #returnValue('TODO: Implement this')
+
+ #@inlineCallbacks
+ def revert_image(self):
+ log.info('download-revert', name=self.name)
+
+ if self._download_state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ pass # TODO: Implement
+ self._image_state = ImageDownload.IMAGE_INACTIVE
+
+ #send msg to oltd to revert image
+ self.image_cfg_msg_send(IMAGE_CMD_REVERT)
+
+ #returnValue('TODO: Implement this')
+
+ def monitor_state_to_download_state(self, state):
+ if ':' in state:
+ state = state.split(':')[-1]
+ result = {
+ 'downloading-software': ImageDownload.DOWNLOAD_STARTED, # currently downloading software
+ 'storing-software': ImageDownload.DOWNLOAD_STARTED, # successfully downloaded the required software and is storing it to memory
+ 'software-stored': ImageDownload.DOWNLOAD_SUCCEEDED, # successfully downloaded the required software and has stored it successfully to memory
+ 'software-download-failed': ImageDownload.DOWNLOAD_FAILED, # unsuccessfully attemptedto download the required software
+ 'invalid-software': ImageDownload.DOWNLOAD_FAILED, # successfully downloaded the required software but the software was determined to be invalid
+ 'software-storage-failed': ImageDownload.INSUFFICIENT_SPACE, # successfully downloaded the required software but was unable to successfully stored it to memory
+ }.get(state.lower(), None)
+ log.info('download-state', result=result, state=state, name=self.name)
+ assert result is not None, 'Invalid state'
+ return result
+
+ def monitor_state_to_activate_state(self, state):
+ if ':' in state:
+ state = state.split(':')[-1]
+ result = {
+ 'enabling-software': ImageDownload.IMAGE_ACTIVATE, # currently enabling the software
+ 'software-enabled': ImageDownload.IMAGE_ACTIVE, # successfully enabled the required software
+ 'enable-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to enable the required software revision
+ 'activating-software': ImageDownload.IMAGE_ACTIVATE, # currently activating the software
+ 'software-activated': ImageDownload.IMAGE_ACTIVE, # successfully activated the required software. The job terminated successfully
+ 'activate-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to activate the required software revision
+ 'committing-software': ImageDownload.IMAGE_ACTIVATE, # currently committing the software
+ 'software-committed': ImageDownload.IMAGE_ACTIVATE, # successfully committed the required software. The job terminated successfully
+ 'commit-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to commit the required software revision
+ }.get(state.lower(), None)
+ log.info('download-state', result=result, state=state, name=self.name)
+ assert result is not None, 'Invalid state'
+ return result
+
+
+ def image_cfg_msg_send(self, cmd_type):
+
+ try:
+ mgr_hdr = OltMsgCommonHdr(
+ type=OLT_D_IMAGE_CFG,
+ src_appId=OLT_APPID_VOLTHA,
+ sync=0
+ )
+
+ data=mgr_hdr.SerializeToString()
+
+ self._olt.zmq_client_async.async_send(data, 1)
+ if cmd_type == IMAGE_CMD_DOWNLOAD:
+ image_cfg = OltDImageCfg(
+ name = self._name,
+ cmd = cmd_type,
+ url = self._url,
+ crc = self._crc,
+ version = self._version
+ )
+ else:
+ image_cfg = OltDImageCfg(
+ name = self._name,
+ cmd = cmd_type
+ )
+ data=image_cfg.SerializeToString()
+ self._olt.zmq_client_async.async_send(data, 0)
+ except Exception as e:
+ log.exception('Exception during image cfg processing', e=e)
+
+
+
+
+
+
+
+
+
+
diff --git a/voltha/adapters/cig_olt/protos/Makefile b/voltha/adapters/cig_olt/protos/Makefile
new file mode 100644
index 0000000..62eacc8
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/Makefile
@@ -0,0 +1,85 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Makefile to build all protobuf and gRPC related artifacts
+
+ifeq ($(VOLTHA_BASE)_set,_set)
+ $(error To get started, please source the env.sh file from Voltha top level directory)
+endif
+
+default: build
+
+PROTO_FILES := $(wildcard *.proto) $(wildcard $(VOLTHA_BASE)/voltha/protos/third_party/google/api/*proto)
+PROTO_PB2_FILES := $(foreach f,$(PROTO_FILES),$(subst .proto,_pb2.py,$(f)))
+PROTO_DESC_FILES := $(foreach f,$(PROTO_FILES),$(subst .proto,.desc,$(f)))
+
+PROTOC_PREFIX := /usr/local
+PROTOC_LIBDIR := $(PROTOC_PREFIX)/lib
+
+PROTOC := $(PROTOC_PREFIX)/bin/protoc
+
+PROTOC_VERSION := "3.3.0"
+PROTOC_DOWNLOAD_PREFIX := "https://github.com/google/protobuf/releases/download"
+PROTOC_DIR := protobuf-$(PROTOC_VERSION)
+PROTOC_TARBALL := protobuf-python-$(PROTOC_VERSION).tar.gz
+PROTOC_DOWNLOAD_URI := $(PROTOC_DOWNLOAD_PREFIX)/v$(PROTOC_VERSION)/$(PROTOC_TARBALL)
+PROTOC_BUILD_TMP_DIR := "/tmp/protobuf-build-$(shell uname -s | tr '[:upper:]' '[:lower:]')"
+
+build: $(PROTOC) protos
+
+protos: $(PROTO_PB2_FILES)
+
+%_pb2.py: %.proto Makefile
+ @echo "Building protocol buffer artifacts from $<"
+ env LD_LIBRARY_PATH=$(PROTOC_LIBDIR) python -m grpc.tools.protoc \
+ -I. \
+ -I$(VOLTHA_BASE)/voltha/protos/third_party \
+ --python_out=. \
+ --grpc_python_out=. \
+ --descriptor_set_out=$(basename $<).desc \
+ --include_imports \
+ --include_source_info \
+ $<
+
+clean:
+ rm -f $(PROTO_PB2_FILES) $(PROTO_DESC_FILES)
+
+$(PROTOC):
+ @echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
+ @echo "It looks like you don't have protocol buffer tools installed."
+ @echo "To install the protocol buffer toolchain, you can run:"
+ @echo " make install-protoc"
+ @echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
+
+install-protoc: $(PROTOC)
+ @echo "Downloading and installing protocol buffer support."
+ @echo "Installation will require sodo priviledges"
+ @echo "This will take a few minutes."
+ mkdir -p $(PROTOC_BUILD_TMP_DIR)
+ @echo "We ask for sudo credentials now so we can install at the end"; \
+ sudo echo "Thanks"; \
+ cd $(PROTOC_BUILD_TMP_DIR); \
+ wget $(PROTOC_DOWNLOAD_URI); \
+ tar xzvf $(PROTOC_TARBALL); \
+ cd $(PROTOC_DIR); \
+ ./configure --prefix=$(PROTOC_PREFIX); \
+ make; \
+ sudo make install
+
+uninstall-protoc:
+ cd $(PROTOC_BUILD_TMP_DIR)/$(PROTOC_DIR); \
+ sudo make uninstall
+
diff --git a/voltha/adapters/cig_olt/protos/Makefile.protos b/voltha/adapters/cig_olt/protos/Makefile.protos
new file mode 100644
index 0000000..031af65
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/Makefile.protos
@@ -0,0 +1,60 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Makefile to build all protobuf and gRPC related artifacts
+
+default: build
+
+PROTO_FILES := $(wildcard *.proto)
+PROTO_ALL_FILES := $(PROTO_FILES) $(PROTO_GOOGLE_API)
+PROTO_PB2_FILES := $(foreach f,$(PROTO_FILES),$(subst .proto,_pb2.py,$(f)))
+PROTO_PB2_GOOGLE_API := $(foreach f,$(PROTO_GOOGLE_API),$(subst .proto,_pb2.py,$(f)))
+PROTO_All_PB2_C_FILES := $(foreach f,$(PROTO_ALL_FILES),$(subst .proto,_pb2.pyc,$(f)))
+PROTO_ALL_PB2_GPRC_FILES := $(foreach f,$(PROTO_ALL_FILES),$(subst .proto,_pb2_grpc.py,$(f)))
+PROTO_ALL_DESC_FILES := $(foreach f,$(PROTO_ALL_FILES),$(subst .proto,.desc,$(f)))
+
+# Google API needs to be built from within the third party directory
+#
+google_api:
+ python -m grpc.tools.protoc \
+ -I. \
+ --python_out=. \
+ --grpc_python_out=. \
+ --descriptor_set_out=google/api/annotations.desc \
+ --include_imports \
+ --include_source_info \
+ google/api/annotations.proto google/api/http.proto
+
+build: $(PROTO_PB2_FILES)
+
+%_pb2.py: %.proto
+ python -m grpc.tools.protoc \
+ -I. \
+ -I/protos \
+ -I/protos/voltha \
+ --python_out=. \
+ --grpc_python_out=. \
+ --descriptor_set_out=$(basename $<).desc \
+ --include_imports \
+ --include_source_info \
+ $<
+
+clean:
+ rm -f $(PROTO_PB2_FILES) \
+ $(PROTO_ALL_DESC_FILES) \
+ $(PROTO_ALL_PB2_GPRC_FILES) \
+ $(PROTO_All_PB2_C_FILES) \
+ $(PROTO_PB2_GOOGLE_API)
diff --git a/voltha/adapters/cig_olt/protos/__init__.py b/voltha/adapters/cig_olt/protos/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/voltha/adapters/cig_olt/protos/olt_common.proto b/voltha/adapters/cig_olt/protos/olt_common.proto
new file mode 100644
index 0000000..7e3346e
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/olt_common.proto
@@ -0,0 +1,172 @@
+syntax = "proto3";
+
+package cig_olt;
+
+
+enum OltAppId
+{
+ OLT_APPID_OLTD = 0;
+ OLT_APPID_SWITCHD = 1;
+ OLT_APPID_VOLTHA = 2;
+ OLT_APPID_POND = 3;
+ OLT_APPID_PROCD = 4;
+ OLT_APPID_ONLPD = 5;
+}
+
+message OltMsgCommonHdr
+{
+ uint32 type = 1;
+ OltAppId src_appId = 2;
+ uint32 sync = 3; //0:aync, 1:sync
+}
+
+//olt common msg type from 1 to 999
+enum OltCommonMsgType
+{
+ OLT_COMMON_MSG_INVALID = 0;
+ OLT_COMMON_HEART_BEAT_REQ = 1;
+ OLT_COMMON_HEART_BEAT_ACK = 2;
+ OLT_COMMON_PM_METRICS_REQ = 3;
+ OLT_COMMON_PM_METRICS_ACK = 4;
+ OLT_COMMON_ALARM_EVENT = 5;
+}
+
+/*
+ * Identify to the area of the system impacted by the alarm
+ */
+enum OltAlarmEventType
+{
+ COMMUNICATION = 0;
+ ENVIRONMENT = 1;
+ EQUIPMENT = 2;
+ SERVICE = 3;
+ PROCESSING = 4;
+ SECURITY = 5;
+}
+
+/*
+ * Identify to the functional category originating the alarm
+ */
+enum OltAlarmEventCategory
+{
+ PON = 0;
+ ETH = 1;
+ MISC = 2;
+}
+
+/*
+ * Active state of the alarm
+ */
+enum OltAlarmEventState
+{
+ RAISED = 0;
+ CLEARED = 1;
+}
+
+/*
+ * Identify the overall impact of the alarm on the system
+ */
+enum OltAlarmEventSeverity
+{
+ INDETERMINATE = 0;
+ WARNING = 1;
+ MINOR = 2;
+ MAJOR = 3;
+ CRITICAL = 4;
+}
+
+/*
+* ON_ALARM_SOFTWARE_ERROR = 0
+* PON_ALARM_LOS = 1
+* PON_ALARM_LOSI = 2
+* PON_ALARM_DOWI = 3
+* PON_ALARM_LOFI = 4
+* PON_ALARM_RDII = 5
+* PON_ALARM_LOAMI = 6
+* PON_ALARM_LCDGI = 7
+* PON_ALARM_LOAI = 8
+* PON_ALARM_SDI = 9
+* PON_ALARM_SFI = 10
+* PON_ALARM_PEE = 11
+* PON_ALARM_DGI = 12
+* PON_ALARM_LOKI = 13
+* PON_ALARM_TIWI = 14
+* PON_ALARM_TIA = 15
+* PON_ALARM_VIRTUAL_SCOPE_ONU_LASER_ALWAYS_ON = 16
+* PON_ALARM_VIRTUAL_SCOPE_ONU_SIGNAL_DEGRADATION = 17
+* PON_ALARM_VIRTUAL_SCOPE_ONU_EOL = 18
+* PON_ALARM_VIRTUAL_SCOPE_ONU_EOL_DATABASE_IS_FULL = 19
+* PON_ALARM_AUTH_FAILED_IN_REGISTRATION_ID_MODE = 20
+* PON_ALARM_SUFI = 21
+*/
+
+/*
+ * MSG: OLT_COMMON_ALARM_EVENT
+ */
+message OltCommonAlarmEvent
+{
+ // Unique ID for this alarm.
+ string id = 1;
+
+ // Refers to the area of the system impacted by the alarm
+ OltAlarmEventType type = 2;
+
+ // Refers to functional category of the alarm
+ OltAlarmEventCategory category = 3;
+
+ // Current active state of the alarm
+ OltAlarmEventState state = 4;
+
+ // Overall impact of the alarm on the system
+ OltAlarmEventSeverity severity = 5;
+
+ // Timestamp at which the alarm was first raised
+ float raised_ts = 6;
+
+ // Timestamp at which the alarm was reported
+ float reported_ts = 7;
+
+ // Timestamp at which the alarm has changed since it was raised
+ float changed_ts = 8;
+
+ // Identifier of the originating resource of the alarm
+ string resource_id = 9;
+
+ // Textual explanation of the alarm
+ string description = 10;
+
+ // Key/Value storage for extra information that may give context to the alarm
+ //map<string, string> context = 11;
+}
+
+message PacketCounter
+{
+ string name = 1;
+ int64 value = 2;
+}
+
+message PortMetrics
+{
+ string port_name = 1;
+ repeated PacketCounter packets = 2;
+}
+
+message OltCommonPmMetrics
+{
+ repeated PortMetrics metrics = 1;
+}
+
+enum OltLogLevel
+{
+ LOG_LEVEL_DEBUG = 0;
+ LOG_LEVEL_INFO = 1;
+ LOG_LEVEL_WARNING = 2;
+ LOG_LEVEL_ERROR = 3;
+ LOG_LEVEL_CRITICAL = 4;
+}
+
+// MSG: OLT_COMMON_HEART_BEAT_REQ/OLT_COMMON_HEART_BEAT_ACK
+message OltCommonHeartBeat {
+ uint32 seq_no = 1;
+ bool is_active = 2; //1: ACTIVE 0:INACTIVE
+}
diff --git a/voltha/adapters/cig_olt/protos/olt_d.proto b/voltha/adapters/cig_olt/protos/olt_d.proto
new file mode 100644
index 0000000..f74b779
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/olt_d.proto
@@ -0,0 +1,227 @@
+
+syntax = "proto3";
+
+package cig_olt;
+
+//import "google/protobuf/empty.proto";
+import "bbf_fiber_types.proto";
+
+//oltD msg type from 1001 to 1999
+enum OltDMsgType
+{
+ OLT_D_MSG_INVALID = 0;
+ OLT_D_ACTIVATE_OLT = 1001;
+ OLT_D_DEACTIVATE_OLT = 1002;
+ OLT_D_REBOOT_OLT = 1003;
+ OLT_D_GET_OLT_INFO_REQ = 1004;
+ OLT_D_GET_OLT_INFO_ACK = 1005;
+ OLT_D_ADD_ONU = 1006;
+ OLT_D_UPDATE_ONU = 1007;
+ OLT_D_DELETE_ONU = 1008;
+ OLT_D_PACKET_IN = 1009;
+ OLT_D_PACKET_OUT = 1010;
+ OLT_D_IMAGE_CFG = 1011;
+ OLT_D_IMAGE_STATUS_REQ = 1012;
+ OLT_D_IMAGE_STATUS_ACK = 1013;
+ OLT_D_IMAGE_EVENT = 1014;
+}
+
+enum OltPonPortType {
+ OLT_PON_PORT_TYPE_GPON = 0; //1.25-2.5
+ OLT_PON_PORT_TYPE_XGPON = 1; //2.5-10
+ OLT_PON_PORT_TYPE_XGSPON = 2; //10-10
+ OLT_PON_PORT_TYPE_NGPON2 = 3; //40-40
+}
+
+enum OltNniPortType {
+ OLT_NNI_PORT_TYPE_10G = 0;
+ OLT_NNI_PORT_TYPE_25G = 1;
+ OLT_NNI_PORT_TYPE_40G = 2;
+ OLT_NNI_PORT_TYPE_100G = 3;
+}
+
+message PonPortInfo {
+ int32 port_no = 1; //value: 1,2,3...
+ OltPonPortType port_type = 2;
+}
+message NniPortInfo {
+ int32 port_no = 1; //value: 1,2,3...
+ OltNniPortType port_type = 2;
+ string mac_address = 3;
+}
+
+enum OltState
+{
+ OLT_STATE_INVALID = 0;
+ OLT_STATE_INACTIVE = 1;
+ OLT_STATE_ACTIVE = 2;
+}
+enum OltWorkMode
+{
+ OLT_MODE_CONFIG = 0;
+ OLT_MODE_AUTO = 1;
+}
+
+// Describes instance of software image on the device
+message OltImage {
+ string name = 1; // software patch name
+ string version = 2; // version of software
+ string hash = 3; // md5 hash
+ string install_datetime = 4; // combined date and time expressed in UTC.
+ // use ISO 8601 format for date and time
+
+ // The active software image is one that is currently loaded and executing
+ // in the ONU or circuit pack. Under normal operation, one software image
+ // is always active while the other is inactive. Under no circumstances are
+ // both software images allowed to be active at the same time
+ bool is_active = 5; // True if the image is active
+
+ // The committed software image is loaded and executed upon reboot of the
+ // ONU and/or circuit pack. During normal operation, one software image is
+ // always committed, while the other is uncommitted.
+ bool is_committed = 6; // True if the image is committed
+
+ // A software image is valid if it has been verified to be an executable
+ // code image. The verification mechanism is not subject to standardization;
+ // however, it should include at least a data integrity (e.g., CRC) check of
+ // the entire code image.
+ bool is_valid = 7; // True if the image is valid
+}
+
+// List of software on the device
+message OltImages {
+ repeated OltImage image = 1;
+}
+
+// MSG: OLT_D_GET_OLT_INFO_ACK
+message OltDGetOltInfoAck {
+ OltState olt_state = 1;
+ string vendor = 2;
+ string model = 3;
+ string hardware_version = 4;
+ string firmware_version = 5;
+ string software_version = 6;
+ string serial_number = 7;
+ OltImages images = 8;
+
+ // Device contact MAC address (format: "xx:xx:xx:xx:xx:xx")
+ string mac_address = 9;
+
+ // Device contact IPv4 address (format: "a.b.c.d" or can use hostname too)
+ string ipv4_address = 10;
+
+ // Device contact IPv6 address using the canonical string form
+ // ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
+ string ipv6_address = 11;
+ OltWorkMode work_mode = 12;
+
+ repeated NniPortInfo nni_port = 13;
+ repeated PonPortInfo pon_port = 14;
+}
+// MSG: OLT_D_ADD_ONU
+message OltDAddOnu {
+ uint32 pon_port = 1; // 1~24
+ uint32 onu_id = 2; // 0~255
+ string sn = 3;
+ string registration_id = 4;
+ bool fec_upstream = 5;
+ bbf_fiber_types.AuthMethodType authentication_method = 6;
+}
+// MSG: OLT_D_UPDATE_ONU
+message OltDUpdateOnu {
+ uint32 pon_port = 1; // 1~24
+ uint32 onu_id = 2; // 0~255
+ string sn = 3;
+ string registration_id = 4;
+ bool fec_upstream = 5;
+ bbf_fiber_types.AuthMethodType authentication_method = 6;
+}
+//MSG: OLT_D_PACKET_IN, OLT_D_PACKET_OUT
+message OltDEthPacket
+{
+ uint32 pkt_len = 1;
+ bytes pkt_buf = 2;
+}
+// MSG: OLT_D_DELETE_ONU
+message OltDDeleteOnu {
+ uint32 pon_port = 1; // 1~24
+ uint32 onu_id = 2; // 0~255
+ string sn = 3;
+}
+enum OltImageCfgCmd {
+ IMAGE_CMD_UNKNOWN = 0;
+ IMAGE_CMD_DOWNLOAD = 1;
+ IMAGE_CMD_CANCEL = 2;
+ IMAGE_CMD_ACTIVETE = 3;
+ IMAGE_CMD_REVERT = 4;
+}
+
+enum OltImageDownloadState {
+ DOWNLOAD_UNKNOWN = 0;
+ DOWNLOAD_SUCCEEDED = 1;
+ DOWNLOAD_REQUESTED = 2;
+ DOWNLOAD_STARTED = 3;
+ DOWNLOAD_FAILED = 4;
+ DOWNLOAD_UNSUPPORTED = 5;
+}
+enum OltImageActivateState {
+ IMAGE_UNKNOWN = 0;
+ IMAGE_INACTIVE = 1;
+ IMAGE_ACTIVATE = 2;
+ IMAGE_ACTIVE = 3;
+ IMAGE_REVERT = 4;
+}
+enum OltImageDownloadFailureReason {
+ NO_ERROR = 0;
+ INVALID_URL = 1;
+ DEVICE_BUSY = 2;
+ INSUFFICIENT_SPACE = 3;
+ UNKNOWN_ERROR = 4;
+}
+
+// MSG: OLT_D_IMAGE_DOWNLOAD
+message OltDImageCfg {
+ string name = 1;
+ OltImageCfgCmd cmd = 2; //1: download 2:cancel 3:activate 4:revert
+ string url = 3;
+ uint32 crc = 4;
+ string version = 5;
+}
+
+//OLT_D_IMAGE_EVENT
+message OltDImageEvent {
+ string name = 1;
+ uint32 type = 2; //1: download 2:cancel 3:activate 4:revert
+ // Download state
+ OltImageDownloadState state = 3;
+ // Download failure reason
+ OltImageDownloadFailureReason reason= 4;
+ // Image activation state
+ OltImageActivateState image_state = 5;
+ // Additional info
+ string additional_info = 6;
+
+}
+
+
+//OLT_D_IMAGE_STATUS_REQ
+message OltDImageStatusReq {
+ string name = 1;
+}
+
+//OLT_D_IMAGE_STATUS_ACK
+message OltDImageStatusAck {
+ string name = 1;
+ // Download state
+ OltImageDownloadState state = 2;
+ // Bytes downloaded
+ uint32 downloaded_bytes = 3;
+ // Download failure reason
+ OltImageDownloadFailureReason reason= 4;
+ // Image activation state
+ OltImageActivateState image_state = 5;
+ // Additional info
+ string additional_info = 6;
+}
+
+
diff --git a/voltha/adapters/cig_olt/protos/olt_pon.proto b/voltha/adapters/cig_olt/protos/olt_pon.proto
new file mode 100644
index 0000000..d8e8db2
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/olt_pon.proto
@@ -0,0 +1,185 @@
+syntax = "proto3";
+
+import "bbf_fiber_types.proto";
+
+//olt pon msg type from 2001 to 2999
+enum OltPonMsgType
+{
+ OLT_PON_MSG_TYPE_INVALID = 0;
+ OLT_PON_ACTIVATE_PON = 2001;
+ OLT_PON_DEACTIVATE_PON = 2002;
+ OLT_PON_CONFIGURE_PON = 2003;
+ OLT_PON_GET_PON_INFO_REQ = 2004;
+ OLT_PON_GET_PON_INFO_ACK = 2005;
+ OLT_PON_ADD_TCONT = 2006;
+ OLT_PON_UPDATE_TCONT = 2007;
+ OLT_PON_DELETE_TCONT = 2008;
+ OLT_PON_CONFIGURE_SERVICE = 2009;
+ OLT_PON_ACTIVATE_ONU = 2010;
+ OLT_PON_DEACTIVATE_ONU = 2011;
+ OLT_PON_ENABLE_ONU = 2012;
+ OLT_PON_DISABLE_ONU = 2013;
+ OLT_PON_GET_ONU_INFO_REQ = 2014;
+ OLT_PON_GET_ONU_INFO_ACK = 2015;
+ OLT_PON_SEND_OMCI = 2016;
+ OLT_PON_ONU_RANGING_EVENT = 2017;
+ OLT_PON_GET_PHY_PM_REQ = 2018;
+ OLT_PON_GET_PHY_PM_ACK = 2019;
+ OLT_PON_GET_XGEM_PM_REQ = 2020;
+ OLT_PON_GET_XGEM_PM_ACK = 2021;
+ OLT_PON_GET_PLOAM_PM_REQ = 2022;
+ OLT_PON_GET_PLOAM_PM_ACK = 2023;
+ OLT_PON_GET_OMCI_PM_REQ = 2024;
+ OLT_PON_GET_OMCI_PM_ACK = 2025;
+ OLT_PON_GET_ETH_PM_REQ = 2026;
+ OLT_PON_GET_ETH_PM_ACK = 2027;
+ OLT_PON_ONU_ACTIVATE_COMPLETE = 2028;
+ OLT_PON_ADD_GEMPORT = 2029;
+ OLT_PON_UPDATE_GEMPORT = 2030;
+ OLT_PON_DELETE_GEMPORT = 2031;
+}
+
+// MSG: OLT_PON_ACTIVATE_PON | OLT_PON_DEACTIVATE_PON
+message OltPonPort
+{
+ uint32 pon_port = 1;
+}
+
+// MSG: OLT_PON_CONFIGURE_PON
+message OltPonConfigurePon
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 pon_id = 2; // 32-bit
+ string pon_tag = 3; // 8-byte
+ uint32 closest_ont_distance = 4; // km
+ uint32 differential_fiber_distance = 5; // km
+ bool fec_downstream = 6;
+ bool aes_downstream = 7;
+ bool aes_upstream = 8;
+ uint32 pon_profile = 9; // 0/1/2/3. (0: 256 x 32, 1: 128 x 64, 2: 64 x 128, 3: 32 x 256)
+ uint32 bwmap_cycle = 10; // 8/16/32 (default 8)
+ uint32 discover_period = 11; // default: 10s
+}
+
+// MSG: OLT_PON_ADD_TCONT
+message OltPonAddTcont
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 onu_id = 2; // 0~255
+ uint32 alloc_id = 3;
+ uint32 fixed_bandwidth = 4; // kbps
+ uint32 assured_bandwidth = 5; // kbps
+ uint32 maximum_bandwidth = 6; // kbps
+}
+
+// MSG: OLT_PON_UPDATE_TCONT
+message OltPonUpdateTcont
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 alloc_id = 2;
+ uint32 fixed_bandwidth = 3; // kbps
+ uint32 assured_bandwidth = 4; // kbps
+ uint32 maximum_bandwidth = 5; // kbps
+}
+
+// MSG: OLT_PON_DELETE_TCONT
+message OltPonDeleteTcont
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 alloc_id = 2;
+}
+
+// MSG: OLT_PON_CONFIGURE_SERVICE
+message OltPonConfigureService
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 onu_id = 2; // 0~255
+ uint32 direction = 3; // 0: downstream, 1: upstream
+ uint32 gem_port = 4;
+ uint32 flow_id = 5;
+ uint32 priority = 6;
+ uint32 action = 7;
+}
+
+// MSG: OLT_PON_ACTIVATE_ONU
+message OltPonActivateOnu
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 onu_id = 2; // 0~255
+ string sn = 3;
+ string registration_id = 4;
+ bool fec_upstream = 5;
+ bbf_fiber_types.AuthMethodType authentication_method = 6;
+}
+
+// MSG: OLT_PON_DEACTIVATE_ONU
+message OltPonDeactivateOnu
+{
+ uint32 pon_port = 1; // 0~3
+ uint32 onu_id = 2; // 0~255
+ string sn = 3;
+}
+
+// MSG: OLT_PON_ENABLE_ONU | OLT_PON_DISABLE_ONU
+message OltPonEnableOnu
+{
+ uint32 pon_port = 1; // 0~3
+ string sn = 2;
+}
+
+// MSG: OLT_PON_SEND_OMCI
+message OltPonSendOmci
+{
+ uint32 pon_slot = 1;
+ uint32 pon_port = 2; // 0~3
+ uint32 onu_id = 3; // 0~255
+ string omci_content = 4;
+}
+
+// MSG: OLT_PON_ONU_RANGING_EVENT
+message OltPonOnuRangingEvent
+{
+ uint32 pon_slot = 1;
+ uint32 pon_port = 2; // 0~3
+ uint32 onu_id = 3; // 0~255
+ string sn = 4;
+ uint32 ranging_state = 5; // 0: ranging down; 1: ranging up
+ uint32 eqd = 6;
+ uint32 distance = 7;
+}
+// MSG: OLT_PON_ONU_ACTIVATE_COMPLETE
+message OltPonOnuActivateComplete {
+ uint32 pon_port = 1; // 1~24
+ uint32 pon_slot = 2;
+ uint32 onu_id = 3; // 0~255
+ uint32 result = 4; //0~1
+}
+
+// MSG: OLT_PON_GET_XGEM_PM_REQ
+message OltPonXGemPMReq
+{
+ uint32 pon_slot = 1;
+ uint32 pon_port = 2;
+}
+
+// MSG: OLT_PON_GET_XGEM_PM_ACK
+message OltPonXGemPMAck
+{
+ uint32 pon_slot = 1;
+ uint32 pon_port = 2;
+ uint64 tx_gem_frames = 3;
+ uint64 rx_gem_frames = 4;
+ uint64 tx_nolfbit_count = 5;
+ uint64 hec_err_count = 6;
+ uint64 frame_lost_count = 7;
+ uint64 key_err_count = 8;
+}
+
+message OltPonGemport
+{
+ uint32 uni_logic_port = 1;
+ uint32 vlan = 2; //pit+vlanId
+ uint32 gemport_id = 3;
+ uint32 onu_id = 4;
+ uint32 pon_port = 5;
+}
\ No newline at end of file
diff --git a/voltha/adapters/cig_olt/protos/olt_switch.proto b/voltha/adapters/cig_olt/protos/olt_switch.proto
new file mode 100644
index 0000000..b546ffc
--- /dev/null
+++ b/voltha/adapters/cig_olt/protos/olt_switch.proto
@@ -0,0 +1,45 @@
+syntax = "proto3";
+
+//import "voltha/openflow_13.proto";
+import "openflow_13.proto";
+
+//olt swtich msg type from 3001 to 3999
+enum OltSwitchMsgType
+{
+ OLT_SWITCH_MSG_INVALID = 0;
+ OLT_SWITCH_UPDATE_FLOW_TABLE = 3001;
+ OLT_SWITCH_CONFIGURE_PORT = 3002;
+ OLT_SWITCH_PORT_STATUS_EVENT = 3003;
+ OLT_SWITCH_GET_PORT_PM_REQ = 3004;
+ OLT_SWITCH_GET_PORT_PM_ACK = 3005;
+}
+
+// MSG: OLT_SWITCH_UPDATE_FLOW_TABLE
+message OltSwitchFlowTable
+{
+ repeated openflow_13.ofp_flow_stats flows = 1;
+}
+
+// MSG: OLT_SWITCH_CONFIGURE_PORT
+message OltSwitchConfigurePort
+{
+ openflow_13.ofp_port_mod port = 1; //Modify behavior of the physical port
+}
+
+// MSG: OLT_SWITCH_PORT_STATUS_EVENT
+message OltSwitchPortStatusEvent
+{
+ openflow_13.ofp_port_status port_status = 1;
+}
+
+// MSG: OLT_SWITCH_GET_PORT_PM_REQ
+message OltSwitchGetPortPmReq
+{
+ openflow_13.ofp_port_stats_request port_no = 1;
+}
+
+// MSG: OLT_SWITCH_GET_PORT_PM_ACK
+message OltSwitchGetPortPmAck
+{
+ openflow_13.ofp_port_stats port_stats = 1;
+}