VOL-1377-802.1x EAPOL flow addition updated code changes after some cleanup
Change-Id: Ie15675d9e2b3cc7c594edf0626702db264bb584a
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index e8d618c..b626242 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -29,11 +29,12 @@
"github.com/golang/protobuf/ptypes"
com "github.com/opencord/voltha-go/adapters/common"
"github.com/opencord/voltha-go/common/log"
- "github.com/opencord/voltha-go/protos/common"
- ic "github.com/opencord/voltha-go/protos/inter_container"
- of "github.com/opencord/voltha-go/protos/openflow_13"
- oop "github.com/opencord/voltha-go/protos/openolt"
- "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/opencord/voltha-protos/go/common"
+ ic "github.com/opencord/voltha-protos/go/inter_container"
+ of "github.com/opencord/voltha-protos/go/openflow_13"
+ oop "github.com/opencord/voltha-protos/go/openolt"
+ rsrcMgr "github.com/opencord/voltha-go/adapters/openolt/adaptercore/resourcemanager"
+ "github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
)
@@ -43,22 +44,24 @@
deviceType string
device *voltha.Device
coreProxy *com.CoreProxy
- adapterProxy *com.AdapterProxy
+ AdapterProxy *com.AdapterProxy
openOLT *OpenOLT
nniPort *voltha.Port
ponPort *voltha.Port
exitChannel chan int
lockDevice sync.RWMutex
- client oop.OpenoltClient
+ Client oop.OpenoltClient
transitionMap *TransitionMap
clientCon *grpc.ClientConn
+ flowMgr *OpenOltFlowMgr
+ resourceMgr *rsrcMgr.OpenOltResourceMgr
}
//NewDeviceHandler creates a new device handler
func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
var dh DeviceHandler
dh.coreProxy = cp
- dh.adapterProxy = ap
+ dh.AdapterProxy = ap
cloned := (proto.Clone(device)).(*voltha.Device)
dh.deviceId = cloned.Id
dh.deviceType = cloned.Type
@@ -103,16 +106,15 @@
return result
}
-func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
+func GetportLabel(portNum uint32, portType voltha.Port_PortType) string {
- if portType == voltha.Port_PON_OLT {
- //return "pon-" + string(portNum)
- return "pon-" + strconv.FormatInt(int64(intfId), 10)
- } else if portType == voltha.Port_ETHERNET_NNI {
- //return "nni-" + string(intfId)
- return "nni-" + strconv.FormatInt(int64(intfId), 10)
+ if portType == voltha.Port_ETHERNET_NNI {
+ return fmt.Sprintf("nni-%d",portNum)
+ } else if portType == voltha.Port_PON_OLT{
+ return fmt.Sprintf("pon-%d",portNum)
} else if portType == voltha.Port_ETHERNET_UNI {
log.Errorw("local UNI management not supported", log.Fields{})
+ return ""
}
return ""
}
@@ -124,30 +126,30 @@
} else {
operStatus = voltha.OperStatus_DISCOVERED
}
-
- // TODO
- //portNum := platform.intfIdToPortNo(intfId, portType)
- //portNum := intfIdToPortNo(intfId, portType)
+ // portNum := IntfIdToPortNo(intfId,portType)
portNum := intfId
-
- label := portName(portNum, portType, intfId)
- // Now create the PON Port
- ponPort := &voltha.Port{
+ label := GetportLabel(portNum, portType)
+ if len(label) == 0 {
+ log.Errorw("Invalid-port-label",log.Fields{"portNum":portNum,"portType":portType})
+ return
+ }
+ // Now create Port
+ port := &voltha.Port{
PortNo: portNum,
Label: label,
Type: portType,
OperStatus: operStatus,
}
-
+ log.Debugw("Sending port update to core",log.Fields{"port":port})
// Synchronous call to update device - this method is run in its own go routine
- if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+ if err := dh.coreProxy.PortCreated(nil, dh.device.Id, port); err != nil {
log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
}
}
// readIndications to read the indications from the OLT device
func (dh *DeviceHandler) readIndications() {
- indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+ indications, err := dh.Client.EnableIndication(context.Background(), new(oop.Empty))
if err != nil {
log.Errorw("Failed to read indications", log.Fields{"err": err})
return
@@ -190,10 +192,18 @@
case *oop.Indication_OnuDiscInd:
onuDiscInd := indication.GetOnuDiscInd()
log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
- // TODO Get onu ID from the resource manager
+ //onuId,err := dh.resourceMgr.GetONUID(onuDiscInd.GetIntfId())
+ //onuId,err := dh.resourceMgr.GetONUID(onuDiscInd.GetIntfId())
+ // TODO Get onu ID from the resource manager
var onuId uint32 = 1
+ /*if err != nil{
+ log.Errorw("onu-id-unavailable",log.Fields{"intfId":onuDiscInd.GetIntfId()})
+ return
+ }*/
+
sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
- go dh.onuDiscIndication(onuDiscInd, onuId, sn)
+ //FIXME: Duplicate child devices being create in go routine
+ dh.onuDiscIndication(onuDiscInd, onuId, sn)
case *oop.Indication_OnuInd:
onuInd := indication.GetOnuInd()
log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
@@ -222,13 +232,13 @@
// doStateUp handle the olt up indication and update to voltha core
func (dh *DeviceHandler) doStateUp() error {
- // Synchronous call to update device state - this method is run in its own go routine
+ // Synchronous call to update device state - this method is run in its own go routine
if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
- voltha.OperStatus_ACTIVE); err != nil {
- log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
- return err
- }
- return nil
+ voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+ return err
+ }
+ return nil
}
// doStateDown handle the olt down indication
@@ -239,25 +249,26 @@
// doStateInit dial the grpc before going to init state
func (dh *DeviceHandler) doStateInit() error {
- var err error
- dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
- if err != nil {
+ var err error
+ dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+ if err != nil {
log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
- return err
- }
- return nil
+ return err
+ }
+ return nil
}
// postInit create olt client instance to invoke RPC on the olt device
func (dh *DeviceHandler) postInit() error {
- dh.client = oop.NewOpenoltClient(dh.clientCon)
- dh.transitionMap.Handle(GrpcConnected)
- return nil
+ dh.Client = oop.NewOpenoltClient(dh.clientCon)
+ dh.transitionMap.Handle(GrpcConnected)
+ return nil
}
// doStateConnected get the device info and update to voltha core
func (dh *DeviceHandler) doStateConnected() error {
- deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+ log.Debug("OLT device has been connected")
+ deviceInfo, err := dh.Client.GetDeviceInfo(context.Background(), new(oop.Empty))
if err != nil {
log.Errorw("Failed to fetch device info", log.Fields{"err": err})
return err
@@ -281,6 +292,18 @@
if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
}
+ KVStoreHostPort := fmt.Sprintf("%s:%d",dh.openOLT.KVStoreHost,dh.openOLT.KVStorePort)
+ // Instantiate resource manager
+ if dh.resourceMgr = rsrcMgr.NewResourceMgr(dh.deviceId, KVStoreHostPort, dh.deviceType, deviceInfo); dh.resourceMgr == nil{
+ log.Error("Error while instantiating resource manager")
+ return errors.New("Instantiating resource manager failed")
+ }
+ // Instantiate flow manager
+ if dh.flowMgr = NewFlowManager(dh, dh.resourceMgr); dh.flowMgr == nil{
+ log.Error("Error while instantiating flow manager")
+ return errors.New("Instantiating flow manager failed")
+ }
+ /* TODO: Instantiate Alarm , stats , BW managers */
// Start reading indications
go dh.readIndications()
@@ -289,9 +312,9 @@
// AdoptDevice adopts the OLT device
func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
- dh.transitionMap = NewTransitionMap(dh)
+ dh.transitionMap = NewTransitionMap(dh)
log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
- dh.transitionMap.Handle(DeviceInit)
+ dh.transitionMap.Handle(DeviceInit)
}
// GetOfpDeviceInfo Get the Ofp device information
@@ -337,6 +360,7 @@
func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) error {
log.Debugw("omci indication", log.Fields{"intfId": omciInd.IntfId, "onuId": omciInd.OnuId})
+// ponPort := IntfIdToPortNo(omciInd.GetIntfId(),voltha.Port_PON_OLT)
kwargs := make(map[string]interface{})
kwargs["onu_id"] = omciInd.OnuId
kwargs["parent_port_no"] = omciInd.GetIntfId()
@@ -346,7 +370,7 @@
return err
} else {
omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
- if sendErr := dh.adapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
+ if sendErr := dh.AdapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
ic.InterAdapterMessageType_OMCI_REQUEST, dh.deviceType, onuDevice.Type,
onuDevice.Id, onuDevice.ProxyAddress.DeviceId, ""); sendErr != nil {
log.Errorw("send omci request error", log.Fields{"fromAdapter": dh.deviceType, "toAdapter": onuDevice.Type, "onuId": onuDevice.Id, "proxyDeviceId": onuDevice.ProxyAddress.DeviceId})
@@ -398,7 +422,7 @@
omciMessage := &oop.OmciMsg{IntfId: onuDevice.ProxyAddress.GetChannelId(), OnuId: onuDevice.ProxyAddress.GetOnuId(), Pkt: omciMsg.Message}
- dh.client.OmciMsgOut(context.Background(), omciMessage)
+ dh.Client.OmciMsgOut(context.Background(), omciMessage)
log.Debugw("omci-message-sent", log.Fields{"serialNumber": onuDevice.SerialNumber, "intfId": onuDevice.ProxyAddress.GetChannelId(), "omciMsg": string(omciMsg.Message)})
}
@@ -407,7 +431,7 @@
// TODO: need resource manager
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfId, OnuId: uint32(onuId), SerialNumber: serialNum, Pir: pir}
- if _, err := dh.client.ActivateOnu(context.Background(), &Onu); err != nil {
+ if _, err := dh.Client.ActivateOnu(context.Background(), &Onu); err != nil {
log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu})
} else {
log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
@@ -415,7 +439,11 @@
}
func (dh *DeviceHandler) onuDiscIndication(onuDiscInd *oop.OnuDiscIndication, onuId uint32, sn string) error {
- if err := dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()), "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuId)); err != nil {
+ //channelId := MkUniPortNum(onuDiscInd.GetIntfId(), onuId, uint32(0))
+ //parentPortNo := IntfIdToPortNo(onuDiscInd.GetIntfId(),voltha.Port_PON_OLT)
+ channelId := onuDiscInd.GetIntfId()
+ parentPortNo := onuDiscInd.GetIntfId()
+ if err := dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(parentPortNo), "brcm_openomci_onu", int(channelId), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuId)); err != nil {
log.Errorw("Create onu error", log.Fields{"parent_id": dh.device.Id, "ponPort": onuDiscInd.GetIntfId(), "onuId": onuId, "sn": sn, "error": err})
return err
}
@@ -441,6 +469,8 @@
serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
kwargs := make(map[string]interface{})
+// ponPort := IntfIdToPortNo(onuInd.GetIntfId(),voltha.Port_PON_OLT)
+
if serialNumber != "" {
kwargs["serial_number"] = serialNumber
} else {
@@ -485,7 +515,7 @@
log.Debugw("inter-adapter-send-onu-ind", log.Fields{"onuIndication": onuInd})
// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
- dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+ dh.AdapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
} else if onuInd.OperState == "up" {
if onuDevice.ConnectStatus != common.ConnectStatus_REACHABLE {
dh.coreProxy.DeviceStateUpdate(nil, onuDevice.Id, common.ConnectStatus_REACHABLE, onuDevice.OperStatus)
@@ -495,7 +525,7 @@
log.Warnw("ignore onu indication", log.Fields{"intfId": onuInd.IntfId, "onuId": onuInd.OnuId, "operStatus": onuDevice.OperStatus, "msgOperStatus": onuInd.OperState})
return
}
- dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+ dh.AdapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
} else {
log.Warnw("Not-implemented-or-invalid-value-of-oper-state", log.Fields{"operState": onuInd.OperState})
}
@@ -530,3 +560,32 @@
func (dh *DeviceHandler) Update_flows_bulk() error {
return errors.New("UnImplemented")
}
+func (dh *DeviceHandler) GetChildDevice(parentPort uint32, onuId uint32)*voltha.Device{
+ log.Debugw("GetChildDevice",log.Fields{"pon port": parentPort,"onuId": onuId})
+ kwargs := make(map[string]interface{})
+ kwargs["onu_id"] = onuId
+ kwargs["parent_port_no"] = parentPort
+ onuDevice, err := dh.coreProxy.GetChildDevice(nil, dh.device.Id, kwargs)
+ if err != nil {
+ log.Errorw("onu not found", log.Fields{"intfId": parentPort, "onuId": onuId})
+ return nil
+ }
+ log.Debugw("Successfully received child device from core",log.Fields{"child_device":*onuDevice})
+ return onuDevice
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges) error {
+ log.Debugw("In UpdateFlowsIncrementally",log.Fields{"deviceId":device.Id,"flows":flows,"groups":groups})
+ if flows != nil{
+ for _,flow := range flows.ToAdd.Items{
+ dh.flowMgr.AddFlow(flow)
+ }
+ }
+ if groups != nil{
+ for _,flow := range flows.ToRemove.Items{
+ log.Debug("Removing flow",log.Fields{"deviceId":device.Id,"flowToRemove":flow})
+ // dh.flowMgr.RemoveFlow(flow)
+ }
+ }
+ return nil
+}