VOL-1377-802.1x EAPOL flow addition updated code changes  after some cleanup

Change-Id: Ie15675d9e2b3cc7c594edf0626702db264bb584a
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index e8d618c..b626242 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -29,11 +29,12 @@
 	"github.com/golang/protobuf/ptypes"
 	com "github.com/opencord/voltha-go/adapters/common"
 	"github.com/opencord/voltha-go/common/log"
-	"github.com/opencord/voltha-go/protos/common"
-	ic "github.com/opencord/voltha-go/protos/inter_container"
-	of "github.com/opencord/voltha-go/protos/openflow_13"
-	oop "github.com/opencord/voltha-go/protos/openolt"
-	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-protos/go/common"
+	ic "github.com/opencord/voltha-protos/go/inter_container"
+	of "github.com/opencord/voltha-protos/go/openflow_13"
+	oop "github.com/opencord/voltha-protos/go/openolt"
+        rsrcMgr "github.com/opencord/voltha-go/adapters/openolt/adaptercore/resourcemanager"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc"
 )
 
@@ -43,22 +44,24 @@
 	deviceType    string
 	device        *voltha.Device
 	coreProxy     *com.CoreProxy
-	adapterProxy  *com.AdapterProxy
+	AdapterProxy  *com.AdapterProxy
 	openOLT       *OpenOLT
 	nniPort       *voltha.Port
 	ponPort       *voltha.Port
 	exitChannel   chan int
 	lockDevice    sync.RWMutex
-	client        oop.OpenoltClient
+	Client        oop.OpenoltClient
 	transitionMap *TransitionMap
 	clientCon     *grpc.ClientConn
+	flowMgr       *OpenOltFlowMgr
+	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 }
 
 //NewDeviceHandler creates a new device handler
 func NewDeviceHandler(cp *com.CoreProxy, ap *com.AdapterProxy, device *voltha.Device, adapter *OpenOLT) *DeviceHandler {
 	var dh DeviceHandler
 	dh.coreProxy = cp
-	dh.adapterProxy = ap
+        dh.AdapterProxy = ap
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	dh.deviceId = cloned.Id
 	dh.deviceType = cloned.Type
@@ -103,16 +106,15 @@
 	return result
 }
 
-func portName(portNum uint32, portType voltha.Port_PortType, intfId uint32) string {
+func GetportLabel(portNum uint32, portType voltha.Port_PortType) string {
 
-	if portType == voltha.Port_PON_OLT {
-		//return "pon-" + string(portNum)
-		return "pon-" + strconv.FormatInt(int64(intfId), 10)
-	} else if portType == voltha.Port_ETHERNET_NNI {
-		//return "nni-" + string(intfId)
-		return "nni-" + strconv.FormatInt(int64(intfId), 10)
+    if portType == voltha.Port_ETHERNET_NNI {
+        return fmt.Sprintf("nni-%d",portNum)
+    } else if portType == voltha.Port_PON_OLT{
+        return fmt.Sprintf("pon-%d",portNum)
 	} else if portType == voltha.Port_ETHERNET_UNI {
 		log.Errorw("local UNI management not supported", log.Fields{})
+        return ""
 	}
 	return ""
 }
@@ -124,30 +126,30 @@
 	} else {
 		operStatus = voltha.OperStatus_DISCOVERED
 	}
-
-	// TODO
-	//portNum := platform.intfIdToPortNo(intfId, portType)
-	//portNum := intfIdToPortNo(intfId, portType)
+    // portNum := IntfIdToPortNo(intfId,portType)
 	portNum := intfId
-
-	label := portName(portNum, portType, intfId)
-	//    Now create the PON Port
-	ponPort := &voltha.Port{
+        label := GetportLabel(portNum, portType)
+        if len(label) == 0 {
+        log.Errorw("Invalid-port-label",log.Fields{"portNum":portNum,"portType":portType})
+        return
+    }
+    //    Now create  Port
+    port := &voltha.Port{
 		PortNo:     portNum,
 		Label:      label,
 		Type:       portType,
 		OperStatus: operStatus,
 	}
-
+    log.Debugw("Sending port update to core",log.Fields{"port":port})
 	// Synchronous call to update device - this method is run in its own go routine
-	if err := dh.coreProxy.PortCreated(nil, dh.device.Id, ponPort); err != nil {
+    if err := dh.coreProxy.PortCreated(nil, dh.device.Id, port); err != nil {
 		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": dh.device.Id, "error": err})
 	}
 }
 
 // readIndications to read the indications from the OLT device
 func (dh *DeviceHandler) readIndications() {
-	indications, err := dh.client.EnableIndication(context.Background(), new(oop.Empty))
+    indications, err := dh.Client.EnableIndication(context.Background(), new(oop.Empty))
 	if err != nil {
 		log.Errorw("Failed to read indications", log.Fields{"err": err})
 		return
@@ -190,10 +192,18 @@
 		case *oop.Indication_OnuDiscInd:
 			onuDiscInd := indication.GetOnuDiscInd()
 			log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
-			// TODO Get onu ID from the resource manager
+            //onuId,err := dh.resourceMgr.GetONUID(onuDiscInd.GetIntfId())
+            //onuId,err := dh.resourceMgr.GetONUID(onuDiscInd.GetIntfId())
+            // TODO Get onu ID from the resource manager
 			var onuId uint32 = 1
+            /*if err != nil{
+                log.Errorw("onu-id-unavailable",log.Fields{"intfId":onuDiscInd.GetIntfId()})
+                return
+            }*/
+
 			sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
-			go dh.onuDiscIndication(onuDiscInd, onuId, sn)
+            //FIXME: Duplicate child devices being create in go routine 
+                        dh.onuDiscIndication(onuDiscInd, onuId, sn)
 		case *oop.Indication_OnuInd:
 			onuInd := indication.GetOnuInd()
 			log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
@@ -222,13 +232,13 @@
 
 // doStateUp handle the olt up indication and update to voltha core
 func (dh *DeviceHandler) doStateUp() error {
-	// Synchronous call to update device state - this method is run in its own go routine
+    // Synchronous call to update device state - this method is run in its own go routine
 	if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
-		voltha.OperStatus_ACTIVE); err != nil {
-		log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
-		return err
-	}
-	return nil
+        voltha.OperStatus_ACTIVE); err != nil {
+            log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceId": dh.device.Id, "error": err})
+            return err
+    }
+    return nil
 }
 
 // doStateDown handle the olt down indication
@@ -239,25 +249,26 @@
 
 // doStateInit dial the grpc before going to init state
 func (dh *DeviceHandler) doStateInit() error {
-	var err error
-	dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
-	if err != nil {
+    var err error
+    dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure())
+    if err != nil {
 		log.Errorw("Failed to dial device", log.Fields{"DeviceId": dh.deviceId, "HostAndPort": dh.device.GetHostAndPort(), "err": err})
-		return err
-	}
-	return nil
+        return err
+    }
+    return nil
 }
 
 // postInit create olt client instance to invoke RPC on the olt device
 func (dh *DeviceHandler) postInit() error {
-	dh.client = oop.NewOpenoltClient(dh.clientCon)
-	dh.transitionMap.Handle(GrpcConnected)
-	return nil
+    dh.Client = oop.NewOpenoltClient(dh.clientCon)
+    dh.transitionMap.Handle(GrpcConnected)
+    return nil
 }
 
 // doStateConnected get the device info and update to voltha core
 func (dh *DeviceHandler) doStateConnected() error {
-	deviceInfo, err := dh.client.GetDeviceInfo(context.Background(), new(oop.Empty))
+    log.Debug("OLT device has been connected")
+    deviceInfo, err := dh.Client.GetDeviceInfo(context.Background(), new(oop.Empty))
 	if err != nil {
 		log.Errorw("Failed to fetch device info", log.Fields{"err": err})
 		return err
@@ -281,6 +292,18 @@
 	if err := dh.coreProxy.DeviceUpdate(nil, dh.device); err != nil {
 		log.Errorw("error-updating-device", log.Fields{"deviceId": dh.device.Id, "error": err})
 	}
+    KVStoreHostPort := fmt.Sprintf("%s:%d",dh.openOLT.KVStoreHost,dh.openOLT.KVStorePort)
+    // Instantiate resource manager
+    if dh.resourceMgr = rsrcMgr.NewResourceMgr(dh.deviceId, KVStoreHostPort,  dh.deviceType, deviceInfo); dh.resourceMgr == nil{
+        log.Error("Error while instantiating resource manager")
+        return errors.New("Instantiating resource manager failed")
+    }
+    // Instantiate flow manager
+    if dh.flowMgr = NewFlowManager(dh, dh.resourceMgr); dh.flowMgr == nil{
+        log.Error("Error while instantiating flow manager")
+        return errors.New("Instantiating flow manager failed")
+    }
+    /* TODO: Instantiate Alarm , stats , BW managers */
 
 	// Start reading indications
 	go dh.readIndications()
@@ -289,9 +312,9 @@
 
 // AdoptDevice adopts the OLT device
 func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
-	dh.transitionMap = NewTransitionMap(dh)
+    dh.transitionMap = NewTransitionMap(dh)
 	log.Infow("AdoptDevice", log.Fields{"deviceId": device.Id, "Address": device.GetHostAndPort()})
-	dh.transitionMap.Handle(DeviceInit)
+    dh.transitionMap.Handle(DeviceInit)
 }
 
 // GetOfpDeviceInfo Get the Ofp device information
@@ -337,6 +360,7 @@
 func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) error {
 	log.Debugw("omci indication", log.Fields{"intfId": omciInd.IntfId, "onuId": omciInd.OnuId})
 
+//        ponPort := IntfIdToPortNo(omciInd.GetIntfId(),voltha.Port_PON_OLT)
 	kwargs := make(map[string]interface{})
 	kwargs["onu_id"] = omciInd.OnuId
 	kwargs["parent_port_no"] = omciInd.GetIntfId()
@@ -346,7 +370,7 @@
 		return err
 	} else {
 		omciMsg := &ic.InterAdapterOmciMessage{Message: omciInd.Pkt}
-		if sendErr := dh.adapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
+		if sendErr := dh.AdapterProxy.SendInterAdapterMessage(context.Background(), omciMsg,
 			ic.InterAdapterMessageType_OMCI_REQUEST, dh.deviceType, onuDevice.Type,
 			onuDevice.Id, onuDevice.ProxyAddress.DeviceId, ""); sendErr != nil {
 			log.Errorw("send omci request error", log.Fields{"fromAdapter": dh.deviceType, "toAdapter": onuDevice.Type, "onuId": onuDevice.Id, "proxyDeviceId": onuDevice.ProxyAddress.DeviceId})
@@ -398,7 +422,7 @@
 
 	omciMessage := &oop.OmciMsg{IntfId: onuDevice.ProxyAddress.GetChannelId(), OnuId: onuDevice.ProxyAddress.GetOnuId(), Pkt: omciMsg.Message}
 
-	dh.client.OmciMsgOut(context.Background(), omciMessage)
+	dh.Client.OmciMsgOut(context.Background(), omciMessage)
 	log.Debugw("omci-message-sent", log.Fields{"serialNumber": onuDevice.SerialNumber, "intfId": onuDevice.ProxyAddress.GetChannelId(), "omciMsg": string(omciMsg.Message)})
 }
 
@@ -407,7 +431,7 @@
 	// TODO: need resource manager
 	var pir uint32 = 1000000
 	Onu := oop.Onu{IntfId: intfId, OnuId: uint32(onuId), SerialNumber: serialNum, Pir: pir}
-	if _, err := dh.client.ActivateOnu(context.Background(), &Onu); err != nil {
+	if _, err := dh.Client.ActivateOnu(context.Background(), &Onu); err != nil {
 		log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu})
 	} else {
 		log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
@@ -415,7 +439,11 @@
 }
 
 func (dh *DeviceHandler) onuDiscIndication(onuDiscInd *oop.OnuDiscIndication, onuId uint32, sn string) error {
-	if err := dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(onuDiscInd.GetIntfId()), "brcm_openomci_onu", int(onuDiscInd.GetIntfId()), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuId)); err != nil {
+        //channelId := MkUniPortNum(onuDiscInd.GetIntfId(), onuId, uint32(0))
+        //parentPortNo := IntfIdToPortNo(onuDiscInd.GetIntfId(),voltha.Port_PON_OLT)
+        channelId := onuDiscInd.GetIntfId()
+        parentPortNo := onuDiscInd.GetIntfId()
+	if err := dh.coreProxy.ChildDeviceDetected(nil, dh.device.Id, int(parentPortNo), "brcm_openomci_onu", int(channelId), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuId)); err != nil {
 		log.Errorw("Create onu error", log.Fields{"parent_id": dh.device.Id, "ponPort": onuDiscInd.GetIntfId(), "onuId": onuId, "sn": sn, "error": err})
 		return err
 	}
@@ -441,6 +469,8 @@
 	serialNumber := dh.stringifySerialNumber(onuInd.SerialNumber)
 
 	kwargs := make(map[string]interface{})
+//        ponPort := IntfIdToPortNo(onuInd.GetIntfId(),voltha.Port_PON_OLT)
+
 	if serialNumber != "" {
 		kwargs["serial_number"] = serialNumber
 	} else {
@@ -485,7 +515,7 @@
 			log.Debugw("inter-adapter-send-onu-ind", log.Fields{"onuIndication": onuInd})
 
 			// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
-			dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+			dh.AdapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
 		} else if onuInd.OperState == "up" {
 			if onuDevice.ConnectStatus != common.ConnectStatus_REACHABLE {
 				dh.coreProxy.DeviceStateUpdate(nil, onuDevice.Id, common.ConnectStatus_REACHABLE, onuDevice.OperStatus)
@@ -495,7 +525,7 @@
 				log.Warnw("ignore onu indication", log.Fields{"intfId": onuInd.IntfId, "onuId": onuInd.OnuId, "operStatus": onuDevice.OperStatus, "msgOperStatus": onuInd.OperState})
 				return
 			}
-			dh.adapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
+			dh.AdapterProxy.SendInterAdapterMessage(nil, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST, "openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
 		} else {
 			log.Warnw("Not-implemented-or-invalid-value-of-oper-state", log.Fields{"operState": onuInd.OperState})
 		}
@@ -530,3 +560,32 @@
 func (dh *DeviceHandler) Update_flows_bulk() error {
 	return errors.New("UnImplemented")
 }
+func (dh *DeviceHandler) GetChildDevice(parentPort uint32, onuId uint32)*voltha.Device{
+    log.Debugw("GetChildDevice",log.Fields{"pon port": parentPort,"onuId": onuId})
+    kwargs := make(map[string]interface{})
+    kwargs["onu_id"] = onuId
+    kwargs["parent_port_no"] = parentPort
+    onuDevice, err := dh.coreProxy.GetChildDevice(nil, dh.device.Id, kwargs)
+    if err != nil {
+        log.Errorw("onu not found", log.Fields{"intfId": parentPort, "onuId": onuId})
+        return nil
+    }
+    log.Debugw("Successfully received child device from core",log.Fields{"child_device":*onuDevice})
+    return  onuDevice
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges) error {
+    log.Debugw("In UpdateFlowsIncrementally",log.Fields{"deviceId":device.Id,"flows":flows,"groups":groups})
+    if flows != nil{
+    for _,flow := range flows.ToAdd.Items{
+        dh.flowMgr.AddFlow(flow)
+        }
+    }
+    if groups != nil{
+        for _,flow := range flows.ToRemove.Items{
+            log.Debug("Removing flow",log.Fields{"deviceId":device.Id,"flowToRemove":flow})
+          //  dh.flowMgr.RemoveFlow(flow)
+        }
+    }
+    return nil
+}