VOL-1374: OLT Activation with Edgecore asfvolt16
Change-Id: I61ce4b0a6a3666070d08a162251d42d90817f409
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
new file mode 100644
index 0000000..07ae169
--- /dev/null
+++ b/adaptercore/device_handler.go
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "io"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/gogo/protobuf/proto"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/common"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ of "github.com/opencord/voltha-go/protos/openflow_13"
+ oop "github.com/opencord/voltha-go/protos/openolt"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "google.golang.org/grpc"
+)
+
+//DeviceHandler will interact with the OLT device.
+type DeviceHandler struct {
+ deviceId string
+ deviceType string
+ device *voltha.Device
+ coreProxy *com.CoreProxy
+ openOLT *OpenOLT
+ nniPort *voltha.Port
+ ponPort *voltha.Port
+ exitChannel chan int
+ lockDevice sync.RWMutex
+ client oop.OpenoltClient
+ transitionMap *TransitionMap
+ clientCon *grpc.ClientConn
+}
+
+//NewDeviceHandler creates a new device handler
+func NewDeviceHandler(cp *com.CoreProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
+ var dh DeviceHandler
+ dh.coreProxy = cp
+ cloned := (proto.Clone(device)).(*voltha.Device)
+ dh.deviceId = cloned.Id
+ dh.deviceType = cloned.Type
+ dh.device = cloned
+ dh.openOLT = adapter
+ dh.exitChannel = make(chan int, 1)
+ dh.lockDevice = sync.RWMutex{}
+
+ //TODO initialize the support classes.
+ return &dh
+}
+
+// start save the device to the data model
+func (dh *DeviceHandler) start(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debugw("starting-device-agent", log.Fields{"device": dh.device})
+ // Add the initial device to the local model
+ log.Debug("device-agent-started")
+}
+
+// stop stops the device dh. Not much to do for now
+func (dh *DeviceHandler) stop(ctx context.Context) {
+ dh.lockDevice.Lock()
+ defer dh.lockDevice.Unlock()
+ log.Debug("stopping-device-agent")
+ dh.exitChannel <- 1
+ log.Debug("device-agent-stopped")
+}
+
+func macAddressToUint32Array(mac string) []uint32 {
+ slist := strings.Split(mac, ":")
+ result := make([]uint32, len(slist))
+ var err error
+ var tmp int64
+ for index, val := range slist {
+ if tmp, err = strconv.ParseInt(val, 16, 32); err != nil {
+ return []uint32{1, 2, 3, 4, 5, 6}
+ }
+ result[index] = uint32(tmp)
+ }
+ return result
+}
+
+func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
+
+ if portType == voltha.Port_PON_OLT {
+ return "pon-" + string(portNum)
+ } else if portType == voltha.Port_ETHERNET_NNI {
+ return "nni-" + string(intfId)
+ } else if portType == voltha.Port_ETHERNET_UNI {
+ log.Errorw("local UNI management not supported", log.Fields{})
+ }
+ return ""
+}
+
+func (dh *DeviceHandler) addPort(intfId uint32, portType voltha.Port_PortType, state string) {
+ var operStatus common.OperStatus_OperStatus
+ if state == "up" {
+ operStatus = voltha.OperStatus_ACTIVE
+ } else {
+ operStatus = voltha.OperStatus_DISCOVERED
+ }
+
+ // TODO
+ //portNum := platform.intfIdToPortNo(intfId, portType)
+ portNum := intfId
+
+ label := portName(portNum, portType, intfId)
+ // Now create the PON Port
+ ponPort := &voltha.Port{
+ PortNo: portNum,
+ Label: label,
+ Type: portType,
+ OperStatus: operStatus,
+ }
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+ log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
+ }
+}
+
+// readIndications to read the indications from the OLT device
+func (dh *DeviceHandler) readIndications() {
+ indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+ if err != nil {
+ log.Errorw("Failed to read indications", log.Fields{"err": err})
+ return
+ }
+ if indications == nil {
+ log.Errorw("Indications is nil", log.Fields{})
+ return
+ }
+ for {
+ indication, err := indications.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Infow("Failed to read from indications", log.Fields{"err": err})
+ continue
+ }
+ switch indication.Data.(type) {
+ case *oop.Indication_OltInd:
+ oltInd := indication.GetOltInd()
+ if oltInd.OperState == "up" {
+ dh.transitionMap.Handle(DeviceUpInd)
+ } else if oltInd.OperState == "down" {
+ dh.transitionMap.Handle(DeviceDownInd)
+ }
+ case *oop.Indication_IntfInd:
+
+ intfInd := indication.GetIntfInd()
+ go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
+ log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+ case *oop.Indication_IntfOperInd:
+ intfOperInd := indication.GetIntfOperInd()
+ if intfOperInd.GetType() == "nni" {
+ go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+ } else if intfOperInd.GetType() == "pon" {
+ // TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
+ // Handle pon port update
+ }
+ log.Infow("Received interface oper indication ", log.Fields{"InterfaceOperInd": intfOperInd})
+ case *oop.Indication_OnuDiscInd:
+ onuDiscInd := indication.GetOnuDiscInd()
+ log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+ // TODO Get onu ID from the resource manager
+ onuId := 0
+ go dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()),
+ "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()),
+ string(onuDiscInd.SerialNumber.GetVendorSpecific()), int64(onuId))
+ case *oop.Indication_OnuInd:
+ onuInd := indication.GetOnuInd()
+ log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+ case *oop.Indication_OmciInd:
+ omciInd := indication.GetOmciInd()
+ log.Infow("Received Omci indication ", log.Fields{"OmciInd": omciInd})
+ case *oop.Indication_PktInd:
+ pktInd := indication.GetPktInd()
+ log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+ case *oop.Indication_PortStats:
+ portStats := indication.GetPortStats()
+ log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+ case *oop.Indication_FlowStats:
+ flowStats := indication.GetFlowStats()
+ log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+ case *oop.Indication_AlarmInd:
+ alarmInd := indication.GetAlarmInd()
+ log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+ }
+ }
+}
+
+// doStateUp handle the olt up indication and update to voltha core
+func (dh *DeviceHandler) doStateUp() error {
+ // Synchronous call to update device state - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+ voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+// doStateDown handle the olt down indication
+func (dh *DeviceHandler) doStateDown() error {
+ //TODO Handle oper state down
+ return nil
+}
+
+// doStateInit dial the grpc before going to init state
+func (dh *DeviceHandler) doStateInit() error {
+ var err error
+ dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+ if err != nil {
+ log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
+ return err
+ }
+ return nil
+}
+
+// postInit create olt client instance to invoke RPC on the olt device
+func (dh *DeviceHandler) postInit() error {
+ dh.client = oop.NewOpenoltClient(dh.clientCon)
+ dh.transitionMap.Handle(GrpcConnected)
+ return nil
+}
+
+// doStateConnected get the device info and update to voltha core
+func (dh *DeviceHandler) doStateConnected() error {
+ deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+ if err != nil {
+ log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+ return err
+ }
+ if deviceInfo == nil {
+ log.Errorw("Device info is nil", log.Fields{})
+ return errors.New("Failed to get device info from OLT")
+ }
+
+ dh.device.Root = true
+ dh.device.Vendor = deviceInfo.Vendor
+ dh.device.Model = deviceInfo.Model
+ dh.device.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ dh.device.SerialNumber = deviceInfo.DeviceSerialNumber
+ dh.device.HardwareVersion = deviceInfo.HardwareVersion
+ dh.device.FirmwareVersion = deviceInfo.FirmwareVersion
+ // TODO : Check whether this MAC address is learnt from SDPON or need to send from device
+ dh.device.MacAddress = "0a:0b:0c:0d:0e:0f"
+
+ // Synchronous call to update device - this method is run in its own go routine
+ if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
+ log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
+ }
+
+ // Start reading indications
+ go dh.readIndications()
+ return nil
+}
+
+// AdoptDevice adopts the OLT device
+func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+ dh.transitionMap = NewTransitionMap(dh)
+ log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
+ dh.transitionMap.Handle(DeviceInit)
+}
+
+// GetOfpDeviceInfo Get the Ofp device information
+func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
+ return &ic.SwitchCapability{
+ Desc: &of.OfpDesc{
+ HwDesc: "open_pon",
+ SwDesc: "open_pon",
+ SerialNum: dh.device.SerialNumber,
+ },
+ SwitchFeatures: &of.OfpSwitchFeatures{
+ NBuffers: 256,
+ NTables: 2,
+ Capabilities: uint32(of.OfpCapabilities_OFPC_FLOW_STATS |
+ of.OfpCapabilities_OFPC_TABLE_STATS |
+ of.OfpCapabilities_OFPC_PORT_STATS |
+ of.OfpCapabilities_OFPC_GROUP_STATS),
+ },
+ }, nil
+}
+
+// GetOfpPortInfo Get Ofp port information
+func (dh *DeviceHandler) GetOfpPortInfo(device *voltha.Device, portNo int64) (*ic.PortCapability, error) {
+ cap := uint32(of.OfpPortFeatures_OFPPF_1GB_FD | of.OfpPortFeatures_OFPPF_FIBER)
+ return &ic.PortCapability{
+ Port: &voltha.LogicalPort{
+ OfpPort: &of.OfpPort{
+ HwAddr: macAddressToUint32Array(dh.device.MacAddress),
+ Config: 0,
+ State: uint32(of.OfpPortState_OFPPS_LIVE),
+ Curr: cap,
+ Advertised: cap,
+ Peer: cap,
+ CurrSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ MaxSpeed: uint32(of.OfpPortFeatures_OFPPF_1GB_FD),
+ },
+ DeviceId: dh.device.Id,
+ DevicePortNo: uint32(portNo),
+ },
+ }, nil
+}
+
+// Process_inter_adapter_message process inter adater message
+func (dh *DeviceHandler) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ // TODO
+ log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ return nil
+}
+
diff --git a/adaptercore/olt_state_transitions.go b/adaptercore/olt_state_transitions.go
new file mode 100644
index 0000000..45e5f5e
--- /dev/null
+++ b/adaptercore/olt_state_transitions.go
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "reflect"
+ "runtime"
+
+ "github.com/opencord/voltha-go/common/log"
+)
+
+// DeviceState OLT Device state
+type DeviceState int
+
+const (
+ // deviceStateNull OLT is not instantiated
+ deviceStateNull DeviceState = iota
+ // deviceStateInit OLT is instantiated
+ deviceStateInit
+ // deviceStateConnected Grpc session established with OLT
+ deviceStateConnected
+ // deviceStateUp Admin state of OLT is UP
+ deviceStateUp
+ // deviceStateDown Admin state of OLT is down
+ deviceStateDown
+)
+
+// Trigger for changing the state
+type Trigger int
+
+const (
+ // DeviceInit Go to Device init state
+ DeviceInit Trigger = iota
+ // GrpcConnected Go to connected state
+ GrpcConnected
+ // DeviceUpInd Go to Device up state
+ DeviceUpInd
+ // DeviceDownInd Go to Device down state
+ DeviceDownInd
+ // GrpcDisconnected Go to Device init state
+ GrpcDisconnected
+)
+
+// TransitionHandler function type for handling transition
+type TransitionHandler func() error
+
+// Transition to store state machine
+type Transition struct {
+ previousState []DeviceState
+ currentState DeviceState
+ before []TransitionHandler
+ after []TransitionHandler
+}
+
+// TransitionMap to store all the states and current device state
+type TransitionMap struct {
+ transitions map[Trigger]Transition
+ currentDeviceState DeviceState
+}
+
+// OpenoltDevice state machine:
+//
+// null ----> init ------> connected -----> up -----> down
+// ^ ^ | ^ | |
+// | | | | | |
+// | +-------------+ +---------+ |
+// | |
+// +-----------------------------------------+
+
+// NewTransitionMap create a new state machine with all the transitions
+func NewTransitionMap(dh *DeviceHandler) *TransitionMap {
+ var transitionMap TransitionMap
+ transitionMap.currentDeviceState = deviceStateNull
+ transitionMap.transitions = make(map[Trigger]Transition)
+ // In doInit establish the grpc session
+ transitionMap.transitions[DeviceInit] =
+ Transition{
+ previousState: []DeviceState{deviceStateNull},
+ currentState: deviceStateInit,
+ before: []TransitionHandler{dh.doStateInit},
+ after: []TransitionHandler{dh.postInit}}
+ // If gRpc session fails, re-establish the grpc session
+ transitionMap.transitions[GrpcDisconnected] =
+ Transition{
+ previousState: []DeviceState{deviceStateConnected, deviceStateDown},
+ currentState: deviceStateInit,
+ before: []TransitionHandler{dh.doStateInit},
+ after: []TransitionHandler{dh.postInit}}
+ // in doConnected, create logical device and read the indications
+ transitionMap.transitions[GrpcConnected] =
+ Transition{
+ previousState: []DeviceState{deviceStateInit},
+ currentState: deviceStateConnected,
+ before: []TransitionHandler{dh.doStateConnected}}
+
+ // Once the olt UP is indication received, then do state up
+ transitionMap.transitions[DeviceUpInd] =
+ Transition{
+ previousState: []DeviceState{deviceStateConnected, deviceStateDown},
+ currentState: deviceStateUp,
+ before: []TransitionHandler{dh.doStateUp}}
+ // If olt DOWN indication comes then do sate down
+ transitionMap.transitions[DeviceDownInd] =
+ Transition{
+ previousState: []DeviceState{deviceStateUp},
+ currentState: deviceStateDown,
+ before: []TransitionHandler{dh.doStateDown}}
+
+ return &transitionMap
+}
+
+// funcName gets the handler function name
+func funcName(f interface{}) string {
+ p := reflect.ValueOf(f).Pointer()
+ rf := runtime.FuncForPC(p)
+ return rf.Name()
+}
+
+// isValidTransition checks for the new state transition is valid from current state
+func (tMap *TransitionMap) isValidTransition(trigger Trigger) bool {
+ // Validate the state transition
+ for _, state := range tMap.transitions[trigger].previousState {
+ if tMap.currentDeviceState == state {
+ return true
+ }
+ }
+ return false
+}
+
+// Handle moves the state machine to next state based on the trigger and invokes the before and
+// after handlers
+func (tMap *TransitionMap) Handle(trigger Trigger) {
+
+ // Check whether the transtion is valid from current state
+ if !tMap.isValidTransition(trigger) {
+ log.Errorw("Invalid transition triggered ", log.Fields{"CurrentState": tMap.currentDeviceState, "Trigger": trigger})
+ return
+ }
+
+ // Invoke the before handlers
+ beforeHandlers := tMap.transitions[trigger].before
+ if beforeHandlers != nil {
+ for _, handler := range beforeHandlers {
+ log.Debugw("running-before-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(); err != nil {
+ // TODO handle error
+ return
+ }
+ }
+ } else {
+ log.Debugw("No handlers for before", log.Fields{"trigger": trigger})
+ }
+
+ // Update the state
+ tMap.currentDeviceState = tMap.transitions[trigger].currentState
+ log.Debugw("Updated device state ", log.Fields{"CurrentDeviceState": tMap.currentDeviceState})
+
+ // Invoke the after handlers
+ afterHandlers := tMap.transitions[trigger].after
+ if afterHandlers != nil {
+ for _, handler := range afterHandlers {
+ log.Debugw("running-after-handler", log.Fields{"handler": funcName(handler)})
+ if err := handler(); err != nil {
+ // TODO handle error
+ return
+ }
+ }
+ } else {
+ log.Debugw("No handlers for after", log.Fields{"trigger": trigger})
+ }
+}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
new file mode 100644
index 0000000..18ef08f
--- /dev/null
+++ b/adaptercore/openolt.go
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adaptercore
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
+ "github.com/opencord/voltha-go/protos/openflow_13"
+ "github.com/opencord/voltha-go/protos/voltha"
+)
+
+type OpenOLT struct {
+ deviceHandlers map[string]*DeviceHandler
+ coreProxy *com.CoreProxy
+ kafkaICProxy *kafka.InterContainerProxy
+ numOnus int
+ exitChannel chan int
+ lockDeviceHandlersMap sync.RWMutex
+}
+
+func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy, coreProxy *com.CoreProxy, onuNumber int) *OpenOLT {
+ var openOLT OpenOLT
+ openOLT.exitChannel = make(chan int, 1)
+ openOLT.deviceHandlers = make(map[string]*DeviceHandler)
+ openOLT.kafkaICProxy = kafkaICProxy
+ openOLT.numOnus = onuNumber
+ openOLT.coreProxy = coreProxy
+ openOLT.lockDeviceHandlersMap = sync.RWMutex{}
+ return &openOLT
+}
+
+func (oo *OpenOLT) Start(ctx context.Context) error {
+ log.Info("starting-device-manager")
+ log.Info("device-manager-started")
+ return nil
+}
+
+func (oo *OpenOLT) Stop(ctx context.Context) error {
+ log.Info("stopping-device-manager")
+ oo.exitChannel <- 1
+ log.Info("device-manager-stopped")
+ return nil
+}
+
+func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
+ if ctx.Err() == nil {
+ // Returned response only of the ctx has not been cancelled/timeout/etc
+ // Channel is automatically closed when a context is Done
+ ch <- result
+ log.Debugw("sendResponse", log.Fields{"result": result})
+ } else {
+ // Should the transaction be reverted back?
+ log.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
+ }
+}
+
+func (oo *OpenOLT) addDeviceHandlerToMap(agent *DeviceHandler) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ if _, exist := oo.deviceHandlers[agent.deviceId]; !exist {
+ oo.deviceHandlers[agent.deviceId] = agent
+ }
+}
+
+func (oo *OpenOLT) deleteDeviceHandlerToMap(agent *DeviceHandler) {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ delete(oo.deviceHandlers, agent.deviceId)
+}
+
+func (oo *OpenOLT) getDeviceHandler(deviceId string) *DeviceHandler {
+ oo.lockDeviceHandlersMap.Lock()
+ defer oo.lockDeviceHandlersMap.Unlock()
+ if agent, ok := oo.deviceHandlers[deviceId]; ok {
+ return agent
+ }
+ return nil
+}
+
+func (oo *OpenOLT) createDeviceTopic(device *voltha.Device) error {
+ log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
+ deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ // TODO for the offset
+ if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {
+ log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return err
+ }
+ return nil
+}
+
+func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("adopt-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = oo.getDeviceHandler(device.Id); handler == nil {
+ handler := NewDeviceHandler(oo.coreProxy, device, oo)
+ oo.addDeviceHandlerToMap(handler)
+ go handler.AdoptDevice(device)
+ // Launch the creation of the device topic
+ go oo.createDeviceTopic(device)
+ }
+ return nil
+}
+
+func (oo *OpenOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
+ log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+ if handler := oo.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpDeviceInfo(device)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (oo *OpenOLT) Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error) {
+ log.Infow("Get_ofp_port_info", log.Fields{"deviceId": device.Id})
+ if handler := oo.getDeviceHandler(device.Id); handler != nil {
+ return handler.GetOfpPortInfo(device, port_no)
+ }
+ log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
+ return nil, errors.New("device-handler-not-set")
+}
+
+func (oo *OpenOLT) Process_inter_adapter_message(msg *ic.InterAdapterMessage) error {
+ log.Infow("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
+ targetDevice := msg.Header.ProxyDeviceId // Request?
+ if targetDevice == "" && msg.Header.ToDeviceId != "" {
+ // Typical response
+ targetDevice = msg.Header.ToDeviceId
+ }
+ if handler := oo.getDeviceHandler(targetDevice); handler != nil {
+ return handler.Process_inter_adapter_message(msg)
+ }
+ return errors.New(fmt.Sprintf("handler-not-found-%s", targetDevice))
+}
+
+func (oo *OpenOLT) Adapter_descriptor() error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Health() (*voltha.HealthStatus, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Abandon_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Disable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reenable_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Reboot_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Self_test_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Gelete_device(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Get_device_details(device *voltha.Device) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Receive_packet_out(device *voltha.Device, egress_port_no int, msg openflow_13.PacketOut) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Suppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Unsuppress_alarm(filter *voltha.AlarmFilter) error {
+ return errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}
+
+func (oo *OpenOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
+}