GRPC Server implmentation in ASFVOLT16 adapter to handle indications from EdgeCore Device
Change-Id: Idadb581cc2a37af2d54118422fac7d2fad1f366d
diff --git a/compose/docker-compose-system-test.yml b/compose/docker-compose-system-test.yml
index 57bd1f2..0ceee1d 100644
--- a/compose/docker-compose-system-test.yml
+++ b/compose/docker-compose-system-test.yml
@@ -131,6 +131,7 @@
- 8880
- 50555
- 18880
+ - "60001:60001"
depends_on:
- consul
links:
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
index 85f12f6..9165433 100644
--- a/ponsim/grpc_server.py
+++ b/ponsim/grpc_server.py
@@ -62,8 +62,12 @@
log.error('failed-to-read-cert-keys', reason=e)
# create server credentials
- server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
- self.server.add_secure_port('[::]:%s' % self.port, server_credentials)
+ if self.device_type == 'ponsim':
+ server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
+ self.server.add_secure_port('[::]:%s' % self.port, server_credentials)
+ else:
+ self.server.add_insecure_port('[::]:%s' % self.port)
+
self.server.start()
log.info('started')
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
index e0563e7..ee0a9f4 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
@@ -46,8 +46,9 @@
def __init__(self, adapter, device_id):
super(Asfvolt16Handler, self).__init__(adapter, device_id)
self.filter = is_inband_frame
- self.bal = Bal(self.log)
+ self.bal = Bal(self, self.log)
self.host_and_port = None
+ self.olt_id = 0
def __del__(self):
super(Asfvolt16Handler, self).__del__()
@@ -59,32 +60,31 @@
self.log.info('activating-asfvolt16-olt', device=device)
- if self.logical_device_id is not None:
- return
+ if self.logical_device_id is None:
- if not device.host_and_port:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No host_and_port field provided'
+ if not device.host_and_port:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No host_and_port field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ self.host_and_port = device.host_and_port
+ device.root = True
+ device.vendor = 'Edgecore'
+ device.model = 'ASFvOLT16'
+ device.serial_number = device.host_and_port
self.adapter_agent.update_device(device)
- return
+
+ self.add_port(port_no=1, port_type=Port.ETHERNET_NNI)
+ self.logical_device_id = self.add_logical_device(device_id=device.id)
+ self.add_logical_port(port_no=1,
+ port_type=Port.ETHERNET_NNI,
+ device_id=device.id,
+ logical_device_id=self.logical_device_id)
- self.host_and_port = device.host_and_port
+ self.bal.connect_olt(device.host_and_port, self.device_id)
- device.root = True
- device.vendor = 'Edgecore'
- device.model = 'ASFvOLT16'
- device.serial_number = device.host_and_port
- self.adapter_agent.update_device(device)
-
- self.add_port(port_no=nni_port_no, port_type=Port.ETHERNET_NNI)
- self.logical_device_id = self.add_logical_device(device_id=device.id)
- self.add_logical_port(port_no=nni_port_no,
- port_type=Port.ETHERNET_NNI,
- device_id=device.id,
- logical_device_id=self.logical_device_id)
-
- self.bal.connect_olt(device.host_and_port)
- self.bal.activate_olt(olt_id)
+ self.bal.activate_olt()
device = self.adapter_agent.get_device(device.id)
device.parent_id = self.logical_device_id
@@ -99,7 +99,13 @@
self.log.info('adding-port', port_no=port_no, port_type=port_type)
if port_type is Port.ETHERNET_NNI:
label='NNI facing Ethernet port'
- else:
+ oper_status=OperStatus.ACTIVE
+ elif port_type is Port.PON_OLT:
+ label='PON port'
+ #To-Do The pon port status should be ACTIVATING.
+ #For now make the status as Active.
+ oper_status=OperStatus.ACTIVE
+ else :
self.log.erro('invalid-port-type', port_type=port_type)
return
@@ -108,7 +114,7 @@
label=label,
type=port_type,
admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE
+ oper_status=oper_status
)
self.adapter_agent.add_port(self.device_id, port)
@@ -172,6 +178,40 @@
self.adapter_agent.add_logical_port(logical_device_id, logical_port)
+ def handle_access_term_ind(self, ind_info):
+ #import pdb; pdb.set_trace()
+ device = self.adapter_agent.get_device(self.device_id)
+ if ind_info['actv_status'] == 'success':
+ self.log.info('successful access terminal Indication',
+ olt_id=self.olt_id)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = 'OLT activated successfully'
+ status = self.adapter_agent.update_device(device)
+ self.log.info('OLT activation complete')
+ try:
+ #Here we have to add pon_port to OLT device.
+ #Since the create_interface is not called, below code is
+ #added to achive functionality.
+ #self.send_connect_olt(self.olt_id)
+ port_no = 100
+ self.add_port(port_no, port_type=Port.PON_OLT)
+ #import pdb; pdb.set_trace()
+ self.bal.activate_pon_port(self.olt_id, port_no);
+ except Exception as e:
+ return
+ else:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'Failed to Intialize OLT'
+ self.adapter_agent.update_device(device)
+ reactor.callLater(5, self.activate, device)
+ return
+
+ def handle_subscriber_term_ind(self, ind_info):
+ #import pdb; pdb.set_trace()
+ self.log.info('To-DO Need to handle ONU Indication')
+
+
def disable(self):
super(Asfvolt16Handler, self).disable()
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py b/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
index 7ea8ddb..25b1b02 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
@@ -21,6 +21,7 @@
import structlog
from voltha.adapters.iadapter import OltAdapter
from voltha.adapters.asfvolt16_olt.asfvolt16_device_handler import Asfvolt16Handler
+from voltha.adapters.asfvolt16_olt.asfvolt16_rx_handler import Asfvolt16RxHandler
log = structlog.get_logger()
@@ -34,4 +35,10 @@
version='0.1',
device_type='asfvolt16_olt')
# register for adapter messages
+ self.port = 60001
+ self.rx_handler = Asfvolt16RxHandler(self, self.port, log)
+ self.rx_handler.start()
self.adapter_agent.register_for_inter_adapter_messages()
+
+ def stop(self):
+ self.rx_handler.stop()
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py
index e7c912d..f23e66b 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py
@@ -22,212 +22,124 @@
from twisted.internet.defer import DeferredQueue
import arrow
import binascii
-from common.frameio.frameio import hexify
-from voltha.protos.events_pb2 import KpiEvent, MetricValuePairs
-from voltha.protos.events_pb2 import KpiEventType
-from voltha.protos.events_pb2 import AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from twisted.internet import reactor
+from common.utils.grpc_utils import twisted_async
+from google.protobuf import empty_pb2
+from voltha.adapters.asfvolt16_olt.protos import bal_indications_pb2
+from voltha.adapters.asfvolt16_olt.protos import bal_msg_type_pb2, \
+ bal_osmsg_pb2, bal_model_ids_pb2, bal_obj_pb2, bal_model_types_pb2, \
+ bal_errno_pb2, bal_pb2
+from voltha.adapters.device_handler import OltDeviceHandler
+from voltha.adapters.asfvolt16_olt.grpc_server import GrpcServer
-log = structlog.get_logger()
+
+#log = structlog.get_logger()
class Asfvolt16RxHandler(object):
- def __init__(self, device_id, adapter, onu_queue):
- self.device_id = device_id
+ def __init__(self, adapter, port, log):
self.adapter = adapter
- self.onu_discovered_queue = onu_queue
self.adapter_agent = adapter.adapter_agent
self.adapter_name = adapter.name
- self.omci_rx_queue = DeferredQueue()
+ self.grpc_server = None
+ self.grpc_server_port = port
+ self.log = log
- def remote_echo(self, pkt_type, pon, onu, port, crc_ok, msg_size, msg_data):
- log.info('received-omci-msg',
- pkt_type=pkt_type,
- pon_id=pon,
- onu_id=onu,
- port_id=port,
- crc_ok=crc_ok,
- msg_size=msg_size,
- msg_data=hexify(msg_data))
- self.omci_rx_queue.put((onu, msg_data))
+ def start(self):
+ self.grpc_server = GrpcServer(self.grpc_server_port, self, self.log)
+ self.grpc_server.start(bal_indications_pb2.add_BalIndServicer_to_server, self)
- def receive_omci_msg(self):
- return self.omci_rx_queue.get()
+ def stop(self):
+ self.grpc_server.stop()
- def remote_report_stats(self, _object, key, stats_data):
- log.info('received-stats-msg',
- object=_object,
- key=key,
- stats=stats_data)
+ def process_flow_ind(self, bal_indication, device_id):
+ self.log.info('Flow indication is not implemented', device_id=device_id)
- prefix = 'voltha.{}.{}'.format(self.adapter_name, self.device_id)
+ def process_group_ind(self, bal_indication, device_id):
+ self.log.info('Group indication is not implemented', device_id=device_id)
- try:
- ts = arrow.utcnow().timestamp
+ def process_interface_ind(self, bal_indication, device_id):
+ self.log.info('Inteface Ind received', intf_id=bal_indication.balObjInfo.keyStr)
+ self.log.info('Awaiting ONU discovery')
+ return
- prefixes = {
- prefix + '.nni': MetricValuePairs(metrics=stats_data)
- }
+ def process_packet_ind(self, bal_indication, device_id):
+ self.log.info('packet indication is not implemented', device_id=device_id)
- kpi_event = KpiEvent(
- type=KpiEventType.slice,
- ts=ts,
- prefixes=prefixes
- )
+ def process_subscriber_term_ind(self, bal_indication, device_id):
+ onu_data = bal_indication.balObjInfo.onuDiscoveryInfo
+ self.log.info('Subscriber termination message received',
+ admin_state=onu_data.data.admin_state)
+ # ind_info: {'object_type': <str>
+ # '_device_id': <str>
+ # '_pon_id' : <int>
+ # 'onu_id' : <int>
+ # '_vendor_id' : <str>
+ # '__vendor_specific' : <str>
+ # 'activation_successful':[True or False]}
- self.adapter_agent.submit_kpis(kpi_event)
+ ind_info = dict()
+ ind_info['object_type'] = 'subscriber_terminal'
+ ind_info['_device_id'] = device_id
+ ind_info['_pon_id'] = onu_data.key.intf_id
+ ind_info['onu_id'] = onu_data.key.sub_term_id
+ ind_info['_vendor_id'] = '4252434D'
+ ind_info['_vendor_specific'] = onu_data.data.serial_number.vendor_specific
- except Exception as e:
- log.exception('failed-to-submit-kpis', e=e)
+ if(bal_model_types_pb2.BAL_STATE_DOWN == onu_data.data.admin_state):
+ ind_info['activation_successful']=False
+ elif(bal_model_types_pb2.BAL_STATE_UP == onu_data.data.admin_state):
+ ind_info['activation_successful']=True
+ reactor.callLater(0,
+ self.adapter.devices_handlers[device_id].handle_subscriber_term_ind,
+ ind_info)
- def remote_report_event(self, _object, key, event, event_data=None):
- def _convert_serial_data(data):
- b = bytearray()
- b.extend(data)
+ def process_queue_ind(self, bal_indication, device_id):
+ self.log.info('activating-olt', device_id=device_id)
- return binascii.hexlify(b)
+ def process_sched_ind(self, bal_indication, device_id):
+ self.log.info('activating-olt', device_id=device_id)
- log.info('received-event-msg',
- object=_object,
- key=key,
- event_str=event,
- event_data=event_data)
-
- if _object == 'device':
- # key: {'device_id': <int>}
- # event: 'state-changed'
- # event_data: {'state_change_successful': <False|True>,
- # 'new_state': <str> ('active-working'|'inactive')}
- pass
- elif _object == 'nni':
- # key: {'device_id': <int>, 'nni': <int>}
- pass
- elif _object == 'pon_ni':
- # key: {'device_id': <int>, 'pon_ni': <int>}
- # event: 'state-changed'
- # event_data: {'state_change_successful': <False|True>,
- # 'new_state': <str> ('active-working'|'inactive')}
- #
- # event: 'onu-discovered'
- # event_data: {'serial_num_vendor_id': <str>
- # 'serial_num_vendor_specific': <str>
- # 'ranging_time': <int>
- # 'onu_id': <int>
- # 'us_line_rate': <int> (0=2.5G, 1=10G)
- # 'ds_pon_id': <int>
- # 'us_pon_id': <int>
- # 'tuning_granularity': <int>
- # 'step_tuning_time': <int>
- # 'attenuation': <int>
- # 'power_levelling_caps': <int>}
- if 'onu-discovered' == event and event_data is not None:
- event_data['_device_id'] = key['device_id'] if 'device_id' in key else None
- event_data['_pon_id'] = key['pon_id'] if 'pon_id' in key else None
- event_data['_vendor_id'] = _convert_serial_data(event_data['serial_num_vendor_id']) \
- if 'serial_num_vendor_id' in event_data else None
- event_data['_vendor_specific'] = _convert_serial_data(event_data['serial_num_vendor_specific']) \
- if 'serial_num_vendor_specific' in event_data else None
-
- self.onu_discovered_queue.put(event_data)
- log.info('onu-discovered-event-added-to-queue', event_data=event_data)
-
- elif _object == 'onu':
- # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>}
- # event: 'activation-completed'
- # event_data: {'activation_successful': <False|True>,
- # act_fail_reason': <str>}
- #
- # event: 'deactivation-completed'
- # event_data: {'deactivation_successful': <False|True>}
- #
- # event: 'ranging-completed'
- # event_data: {'ranging_successful': <False|True>,
- # 'ranging_fail_reason': <str>,
- # 'eqd': <int>,
- # 'number_of_ploams': <int>,
- # 'power_level': <int>}
- #
- # event: 'enable-completed'
- # event_data: {'serial_num-vendor_id': <str>
- # 'serial_num-vendor_specific: <str>}
- #
- # event: 'disable-completed'
- # event_data: {'serial_num-vendor_id': <str>
- # 'serial_num-vendor_specific: <str>}
-
- # Get child_device from onu_id
- child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=key['onu_id'])
- assert child_device is not None
-
- # Build the message, the ONU adapter uses the proxy_address
- # to uniquely identify a specific ONU
- msg = {'proxy_address':child_device.proxy_address, 'event':event, 'event_data':event_data}
-
- # Send the event message to the ONU adapter
- self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
-
- elif _object == 'alloc_id':
- # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'alloc_id': ,<int>}
- pass
- elif _object == 'gem_port':
- # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'gem_port': ,<int>}
- pass
- elif _object == 'trx':
- # key: {'device_id': <int>, 'pon_ni': <int>}
- pass
- elif _object == 'flow_map':
- # key: {'device_id': <int>, 'pon_ni': <int>}
- pass
-
- def remote_report_alarm(self, _object, key, alarm, status, priority,
- alarm_data=None):
- log.info('received-alarm-msg',
- object=_object,
- key=key,
- alarm=alarm,
- status=status,
- priority=priority,
- alarm_data=alarm_data)
-
- id = 'voltha.{}.{}.{}'.format(self.adapter_name, self.device_id, _object)
- description = '{} Alarm - {} - {}'.format(_object.upper(), alarm.upper(),
- 'Raised' if status else 'Cleared')
-
- if priority == 'low':
- severity = AlarmEventSeverity.MINOR
- elif priority == 'medium':
- severity = AlarmEventSeverity.MAJOR
- elif priority == 'high':
- severity = AlarmEventSeverity.CRITICAL
+ def process_access_term_ind(self, bal_indication, device_id):
+ self.log.info('Received access terminal Indication',
+ device_id=device_id)
+ # ind_info: {'object_type': <str>
+ # 'actv_status': <str>}
+ ind_info = dict()
+ ind_info['object_type'] = 'access_terminal'
+ if bal_indication.balObjInfo.status != bal_errno_pb2.BAL_ERR_OK:
+ ind_info['actv_status'] = 'failed'
else:
- severity = AlarmEventSeverity.INDETERMINATE
+ ind_info['actv_status'] = 'success'
+ reactor.callLater(0,
+ self.adapter.devices_handlers[device_id].handle_access_term_ind,
+ ind_info)
+
+ ind_handlers = {
+ bal_model_ids_pb2.BAL_OBJ_ID_ACCESS_TERMINAL : process_access_term_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_FLOW : process_flow_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_GROUP : process_group_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_INTERFACE : process_interface_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_PACKET : process_packet_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_SUBSCRIBER_TERMINAL : process_subscriber_term_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_TM_QUEUE : process_queue_ind,
+ bal_model_ids_pb2.BAL_OBJ_ID_TM_SCHED : process_sched_ind,
+ }
+
+ @twisted_async
+ def BalIndInfo(self, request, context):
+ self.log.info('get-device-info')
+ self.log.info('received indication for object type',obj_type=request.balObjInfo.objType)
+ #import pdb; pdb.set_trace()
+ #reactor.callFromThread(self.Baltest, request)
+ device_id = request.device_id.decode('unicode-escape')
try:
- ts = arrow.utcnow().timestamp
-
- alarm_event = self.adapter_agent.create_alarm(
- id=id,
- resource_id=str(key),
- type=AlarmEventType.EQUIPMENT,
- category=AlarmEventCategory.PON,
- severity=severity,
- state=AlarmEventState.RAISED if status else AlarmEventState.CLEARED,
- description=description,
- context=alarm_data,
- raised_ts = ts)
-
- self.adapter_agent.submit_alarm(self.device_id, alarm_event)
-
+ handler = self.ind_handlers.get(request.balObjInfo.objType)
+ if handler:
+ handler(self, request, device_id)
except Exception as e:
- log.exception('failed-to-submit-alarm', e=e)
+ self.log.exception('Invalid object type', e=e)
- # take action based on alarm type, only pon_ni and onu objects report alarms
- if object == 'pon_ni':
- # key: {'device_id': <int>, 'pon_ni': <int>}
- # alarm: 'los'
- # status: <False|True>
- pass
- elif object == 'onu':
- # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>}
- # alarm: <'los'|'lob'|'lopc_miss'|'los_mic_err'|'dow'|'sf'|'sd'|'suf'|'df'|'tiw'|'looc'|'dg'>
- # status: <False|True>
- pass
+ bal_err = bal_pb2.BalErr()
+ bal_err.err = bal_errno_pb2.BAL_ERR_OK
+ return bal_err
diff --git a/voltha/adapters/asfvolt16_olt/bal.py b/voltha/adapters/asfvolt16_olt/bal.py
index 3ce9ceb..25e3426 100644
--- a/voltha/adapters/asfvolt16_olt/bal.py
+++ b/voltha/adapters/asfvolt16_olt/bal.py
@@ -16,18 +16,23 @@
from twisted.internet.defer import inlineCallbacks
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus
from voltha.adapters.asfvolt16_olt.protos import bal_pb2, bal_obj_pb2, \
- bal_model_types_pb2
+ bal_model_types_pb2, bal_model_ids_pb2
from voltha.adapters.asfvolt16_olt.grpc_client import GrpcClient
class Bal(object):
- def __init__(self, log):
+ def __init__(self, olt, log):
self.log = log
self.grpc_client = GrpcClient(self.log)
+ self.stub = None
+ self.device_id = None
+ self.olt = olt
@inlineCallbacks
- def connect_olt(self, host_and_port):
+ def connect_olt(self, host_and_port, device_id):
self.log.info('connecting-olt', host_and_port=host_and_port)
+ self.device_id = device_id
self.grpc_client.connect(host_and_port)
self.stub = bal_pb2.BalStub(self.grpc_client.channel)
init = bal_pb2.BalInit()
@@ -37,15 +42,34 @@
'''
yield self.stub.BalApiInit(init)
- def activate_olt(self, olt_id):
+ def activate_olt(self):
self.log.info('activating-olt')
- self.set_access_terminal_admin_state(bal_model_types_pb2.BAL_STATE_UP, olt_id)
+ self.set_access_terminal_admin_state(bal_model_types_pb2.BAL_STATE_UP)
@inlineCallbacks
- def set_access_terminal_admin_state(self, admin_state, olt_id):
- self.log.info('setting-admin-state', admin_state=admin_state, olt_id=olt_id)
- cfg = bal_pb2.BalCfg()
- cfg.hdr.type = bal_obj_pb2.BAL_OBJ_MSG_TYPE_SET
- cfg.cfg.key.access_term_id = olt_id
- cfg.cfg.data.admin_state = admin_state
- yield self.stub.BalCfgSet(cfg)
+ def set_access_terminal_admin_state(self, admin_state):
+ #import pdb; pdb.set_trace()
+ self.log.info('setting-admin-state', admin_state=admin_state, device_id=self.device_id)
+ obj = bal_pb2.BalCfg()
+ obj.device_id = self.device_id.encode('ascii', 'ignore')
+ obj.hdr.obj_type = bal_model_ids_pb2.BAL_OBJ_ID_ACCESS_TERMINAL
+ obj.cfg.key.access_term_id = 0
+ obj.cfg.data.admin_state = admin_state
+ yield self.stub.BalCfgSet(obj)
+
+ @inlineCallbacks
+ def activate_pon_port(self, olt_no, pon_port):
+ self.log.info('activating-pon-port in olt', olt=olt_no, pon_port=pon_port)
+ try:
+ obj = bal_pb2.BalCfg()
+ #Fill Header details
+ obj.device_id = self.device_id.encode('ascii', 'ignore')
+ obj.hdr.obj_type = bal_model_ids_pb2.BAL_OBJ_ID_INTERFACE
+ #Fill Access Terminal Details
+ obj.interface.key.intf_id = pon_port
+ obj.interface.key.intf_type = bal_model_types_pb2.BAL_INTF_TYPE_PON
+ obj.interface.data.admin_state = bal_model_types_pb2.BAL_STATE_UP
+ yield self.stub.BalCfgSet(obj)
+ except Exception as e:
+ self.log.info('activating-pon-port in olt-exception', exc=str(e))
+ return
diff --git a/voltha/adapters/asfvolt16_olt/grpc_server.py b/voltha/adapters/asfvolt16_olt/grpc_server.py
new file mode 100644
index 0000000..e9f1006
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/grpc_server.py
@@ -0,0 +1,52 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from Queue import Queue, Empty
+import os
+
+import grpc
+from concurrent import futures
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
+from structlog import get_logger
+from common.utils.grpc_utils import twisted_async
+from twisted.internet import threads
+
+#log = get_logger()
+
+class GrpcServer(object):
+
+ def __init__(self, port, adapter, log):
+ self.port = port
+ self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+ self.server = grpc.server(self.thread_pool)
+ self.services = []
+ self.adapter = adapter
+ self.log = log
+
+ def start(self, activation_func, service):
+ self.log.debug('Edge core GRPC server starting')
+ self.services.append(service)
+ activation_func(service, self.server)
+ self.server.add_insecure_port('[::]:%s' % self.port)
+ self.server.start()
+ self.log.info('Edge core GRPC server started')
+
+ def stop(self, grace=0):
+ self.log.info('Asfvolt16-stopping GRPC Server')
+ self.server.stop(grace)
+ self.thread_pool.shutdown(False)
+ self.log.debug('Asfvolt16-stopped GRPC Server')
diff --git a/voltha/adapters/asfvolt16_olt/protos/bal.proto b/voltha/adapters/asfvolt16_olt/protos/bal.proto
index 4c4601b..f2091f8 100644
--- a/voltha/adapters/asfvolt16_olt/protos/bal.proto
+++ b/voltha/adapters/asfvolt16_olt/protos/bal.proto
@@ -16,7 +16,7 @@
syntax = "proto3";
-import "google/protobuf/empty.proto";
+//import "google/protobuf/empty.proto";
import "bal_obj.proto";
import "bal_model_types.proto";
import "bal_errno.proto";
@@ -40,6 +40,7 @@
BalTmQueueCfg tm_queue_cfg = 8;
BalTmSchedCfg tm_sched_cfg = 9;
}
+ string device_id = 10;
}
message BalInit {
@@ -74,7 +75,7 @@
/**
* Un-initialize the BAL Public API internal data structures
**/
- rpc BalApiFinish(google.protobuf.Empty) returns(BalErr) {}
+ rpc BalApiFinish(BalCfg) returns(BalErr) {}
/**
* BAL Public API Set (or modify) command.
*
diff --git a/voltha/adapters/asfvolt16_olt/protos/bal_indications.proto b/voltha/adapters/asfvolt16_olt/protos/bal_indications.proto
index 3494faa..b4ee69b 100644
--- a/voltha/adapters/asfvolt16_olt/protos/bal_indications.proto
+++ b/voltha/adapters/asfvolt16_olt/protos/bal_indications.proto
@@ -172,9 +172,9 @@
BalErrno status = 2; //status of the indication received from BAL utilities
string keyStr = 3; //key string containing additional data
oneof u {
- BalSubscriberTerminalCfg onuDiscoveryInfo = 4; // ONU discovery info
- bytes pktData = 5; //raw packet in case of object type is PACKET
- }
+ BalSubscriberTerminalCfg onuDiscoveryInfo = 4; // ONU discovery info
+ bytes pktData = 5; //raw packet in case of object type is PACKET
+ }
}
message BalIndications{
@@ -184,6 +184,7 @@
BalIndStatsInfo balStatsInfo = 3; //statistics info from BAL
BalOmciRespInfo balOmciRespInfo = 4; // OMCI response
}
+ string device_id = 5; //Deviced Id
}
service BalInd {
diff --git a/voltha/adapters/asfvolt16_olt/protos/bal_model_ids.proto b/voltha/adapters/asfvolt16_olt/protos/bal_model_ids.proto
index 3e1817d..ca87dcd 100644
--- a/voltha/adapters/asfvolt16_olt/protos/bal_model_ids.proto
+++ b/voltha/adapters/asfvolt16_olt/protos/bal_model_ids.proto
@@ -327,21 +327,22 @@
BAL_TM_SCHED_KEY_ID_ID = 1; /**< id. */
}
-/** Identifiers for all objects in the system.
+/** Identifiers for all objects in the system.
*/
enum BalObjId
{
- BAL_OBJ_ID_ACCESS_TERMINAL = 0; /**< BAL Access Terminal */
- BAL_OBJ_ID_FLOW = 1; /**< BAL Flow */
- BAL_OBJ_ID_GROUP = 2; /**< BAL Group */
- BAL_OBJ_ID_INTERFACE = 3; /**< BAL Interface */
- BAL_OBJ_ID_PACKET = 4; /**< packet */
- BAL_OBJ_ID_SUBSCRIBER_TERMINAL = 5; /**< BAL Subscriber Terminal */
- BAL_OBJ_ID_TM_QUEUE = 6; /**< tm_queue */
- BAL_OBJ_ID_TM_SCHED = 7; /**< tm_sched */
+ BAL_OBJ_ID_INVALID = 0; /**< INVALID */
+ BAL_OBJ_ID_ACCESS_TERMINAL = 1; /**< BAL Access Terminal */
+ BAL_OBJ_ID_FLOW = 2; /**< BAL Flow */
+ BAL_OBJ_ID_GROUP = 3; /**< BAL Group */
+ BAL_OBJ_ID_INTERFACE = 4; /**< BAL Interface */
+ BAL_OBJ_ID_PACKET = 5; /**< packet */
+ BAL_OBJ_ID_SUBSCRIBER_TERMINAL = 6; /**< BAL Subscriber Terminal */
+ BAL_OBJ_ID_TM_QUEUE = 7; /**< tm_queue */
+ BAL_OBJ_ID_TM_SCHED = 8; /**< tm_sched */
}
-/** Identifiers for all possible groups under all objects in the system.
+/** Identifiers for all possible groups under all objects in the system.
*/
enum BalObjGroupId
{
diff --git a/voltha/adapters/asfvolt16_olt/protos/bal_model_types.proto b/voltha/adapters/asfvolt16_olt/protos/bal_model_types.proto
index 5adb020..1a6da61 100644
--- a/voltha/adapters/asfvolt16_olt/protos/bal_model_types.proto
+++ b/voltha/adapters/asfvolt16_olt/protos/bal_model_types.proto
@@ -151,9 +151,10 @@
/** Interface type.
*/
enum BalIntfType {
- BAL_INTF_TYPE_NNI = 0; /**< NNI Interface. */
- BAL_INTF_TYPE_PON = 1; /**< POIN Interface. */
- BAL_INTF_TYPE__NUM_OF = 2; /**< Number of enum entries, not an entry itself. */
+ BAL_INTF_TYPE_INVALID = 0; /**< INVALID.*/
+ BAL_INTF_TYPE_NNI = 1; /**< NNI Interface. */
+ BAL_INTF_TYPE_PON = 2; /**< POIN Interface. */
+ BAL_INTF_TYPE__NUM_OF = 3; /**< Number of enum entries, not an entry itself. */
}
/** Interworking Function Mode.
diff --git a/voltha/adapters/device_handler.py b/voltha/adapters/device_handler.py
index 33bbd35..54f1106 100644
--- a/voltha/adapters/device_handler.py
+++ b/voltha/adapters/device_handler.py
@@ -44,6 +44,7 @@
def __init__(self, adapter, device_id):
super(OltDeviceHandler, self).__init__(adapter, device_id)
self.filter = None
+ self.io_port = None
def __del__(self):
if self.io_port is not None: