VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0

Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index f342bcf..3be60ec 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -34,17 +34,17 @@
 	"github.com/golang/protobuf/ptypes"
 	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
 	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
-	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
-	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	"github.com/opencord/voltha-lib-go/v3/pkg/pmmetrics"
+	"github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+	"github.com/opencord/voltha-lib-go/v4/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
-	"github.com/opencord/voltha-protos/v3/go/common"
-	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
-	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
-	oop "github.com/opencord/voltha-protos/v3/go/openolt"
-	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"github.com/opencord/voltha-protos/v4/go/common"
+	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+	of "github.com/opencord/voltha-protos/v4/go/openflow_13"
+	oop "github.com/opencord/voltha-protos/v4/go/openolt"
+	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -52,9 +52,7 @@
 
 // Constants for number of retries and for timeout
 const (
-	MaxRetry       = 10
-	MaxTimeOutInMs = 500
-	InvalidPort    = 0xffffffff
+	InvalidPort = 0xffffffff
 )
 
 //DeviceHandler will interact with the OLT device.
@@ -356,16 +354,16 @@
 
 				// On failure process a backoff timer while watching for stopIndications
 				// events
-				backoff := time.NewTimer(indicationBackoff.NextBackOff())
+				backoffTimer := time.NewTimer(indicationBackoff.NextBackOff())
 				select {
 				case <-dh.stopIndications:
 					logger.Debugw(ctx, "stopping-collecting-indications-for-olt", log.Fields{"device-id": dh.device.Id})
-					if !backoff.Stop() {
-						<-backoff.C
+					if !backoffTimer.Stop() {
+						<-backoffTimer.C
 					}
 					break Loop
-				case <-backoff.C:
-					// backoff expired continue
+				case <-backoffTimer.C:
+					// backoffTimer expired continue
 				}
 				if indications, err = dh.startOpenOltIndicationStream(ctx); err != nil {
 					return err
@@ -720,6 +718,9 @@
 				_ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 			}
 		}()
+
+		go startHeartbeatCheck(ctx, dh)
+
 		return nil
 	}
 
@@ -748,6 +749,9 @@
 	if device.PmConfigs != nil {
 		dh.UpdatePmConfig(ctx, device.PmConfigs)
 	}
+
+	go startHeartbeatCheck(ctx, dh)
+
 	return nil
 }
 
@@ -769,7 +773,7 @@
 	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
 	for i := range dh.flowMgr {
 		// Instantiate flow manager
-		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr); dh.flowMgr[i] == nil {
+		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
 			return olterrors.ErrResourceManagerInstantiating
 		}
 	}
@@ -875,8 +879,6 @@
 	if err := dh.coreProxy.DevicePMConfigUpdate(ctx, dh.metrics.ToPmConfigs()); err != nil {
 		_ = olterrors.NewErrAdapter("error-updating-performance-metrics", log.Fields{"device-id": device.Id}, err).LogAt(log.ErrorLevel)
 	}
-
-	go startHeartbeatCheck(ctx, dh)
 }
 
 //GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -886,7 +888,7 @@
 			MfrDesc:   "VOLTHA Project",
 			HwDesc:    "open_pon",
 			SwDesc:    "open_pon",
-			SerialNum: dh.device.SerialNumber,
+			SerialNum: device.SerialNumber,
 		},
 		SwitchFeatures: &of.OfpSwitchFeatures{
 			NBuffers: 256,
@@ -1328,7 +1330,7 @@
 	}
 	return &oop.SerialNumber{
 		VendorId:       []byte(serialNum[:4]),
-		VendorSpecific: []byte(decodedStr),
+		VendorSpecific: decodedStr,
 	}, nil
 }
 
@@ -1578,17 +1580,13 @@
 	var uniID uint32
 	var err error
 	for _, port := range onu.UniPorts {
-		uniID = UniIDFromPortNum(uint32(port))
+		uniID = UniIDFromPortNum(port)
 		logger.Debugw(ctx, "clearing-resource-data-for-uni-port", log.Fields{"port": port, "uni-id": uniID})
 		/* Delete tech-profile instance from the KV store */
-		if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+		if err = dh.flowMgr[onu.IntfID].DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
 			logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
 		logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
-		flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
-		for _, flowID := range flowIDs {
-			dh.resourceMgr.FreeFlowID(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
-		}
 		tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
 		for _, tpID := range tpIDList {
 			if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
@@ -1605,9 +1603,12 @@
 			logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
 		logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
-		if err = dh.resourceMgr.DelGemPortPktInOfAllServices(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+		if err = dh.resourceMgr.DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
 			logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
 		}
+		if err = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID)); err != nil {
+			logger.Debugw(ctx, "failed-to-remove-flow-for", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
+		}
 	}
 	return nil
 }
@@ -1626,16 +1627,14 @@
 	}
 	logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
 	for _, nniIntfID := range nni {
-		flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
-		logger.Debugw(ctx, "current-flow-ids-for-nni", log.Fields{"flow-ids": flowIDs})
-		for _, flowID := range flowIDs {
-			dh.resourceMgr.FreeFlowID(ctx, uint32(nniIntfID), -1, -1, uint32(flowID))
-		}
 		dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
+		_ = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, nniIntfID, -1, -1)
+
 	}
 	if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
 		return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
 	}
+
 	return nil
 }
 
@@ -2248,7 +2247,6 @@
 	// Default to PON0
 	var intfID uint32
 	inPort, outPort := getPorts(flow)
-	logger.Debugw(ctx, "increment-flow-remove-count-for-inPort-out-port", log.Fields{"inPort": inPort, "out-port": outPort})
 	if inPort != InvalidPort && outPort != InvalidPort {
 		_, intfID, _, _ = ExtractAccessFromFlow(inPort, outPort)
 	}