VOL-3710 Avoid concurrent access to the flows map in olt.go

Change-Id: I0f7fc493efeb8f2d28c1ee276d24cda348126f09
diff --git a/VERSION b/VERSION
index 347f583..f9f73cc 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.4.1
+1.4.2-dev
diff --git a/internal/bbsim/api/onus_handler.go b/internal/bbsim/api/onus_handler.go
index 10d671d..3c84380 100644
--- a/internal/bbsim/api/onus_handler.go
+++ b/internal/bbsim/api/onus_handler.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"fmt"
+	"github.com/opencord/voltha-protos/v4/go/openolt"
 
 	"github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsim/alarmsim"
@@ -406,11 +407,12 @@
 	res := &bbsim.Flows{}
 
 	if req.SerialNumber == "" {
-		for flowKey := range olt.Flows {
-			flow := olt.Flows[flowKey]
-			res.Flows = append(res.Flows, &flow)
-		}
-		res.FlowCount = uint32(len(olt.Flows))
+		olt.Flows.Range(func(flowKey, flow interface{}) bool {
+			flowObj := flow.(openolt.Flow)
+			res.Flows = append(res.Flows, &flowObj)
+			return true
+		})
+		res.FlowCount = uint32(len(res.Flows))
 	} else {
 		onu, err := olt.FindOnuBySn(req.SerialNumber)
 		if err != nil {
@@ -420,8 +422,9 @@
 			return nil, err
 		}
 		for _, flowKey := range onu.Flows {
-			flow := olt.Flows[flowKey]
-			res.Flows = append(res.Flows, &flow)
+			flow, _ := olt.Flows.Load(flowKey)
+			flowObj := flow.(openolt.Flow)
+			res.Flows = append(res.Flows, &flowObj)
 		}
 		res.FlowCount = uint32(len(onu.Flows))
 	}
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 008e3be..9e26ad2 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -59,7 +59,7 @@
 	InternalState        *fsm.FSM
 	channel              chan Message
 	dhcpServer           dhcp.DHCPServerIf
-	Flows                map[FlowKey]openolt.Flow
+	Flows                sync.Map
 	Delay                int
 	ControlledActivation mode
 	EventChannel         chan common.Event
@@ -105,7 +105,6 @@
 		Pons:              []*PonPort{},
 		Nnis:              []*NniPort{},
 		Delay:             options.BBSim.Delay,
-		Flows:             make(map[FlowKey]openolt.Flow),
 		enablePerf:        options.BBSim.EnablePerf,
 		PublishEvents:     options.BBSim.Events,
 		PortStatsInterval: options.Olt.PortStatsInterval,
@@ -980,7 +979,7 @@
 	flowKey := FlowKey{}
 	if !o.enablePerf {
 		flowKey = FlowKey{ID: flow.FlowId, Direction: flow.FlowType}
-		olt.Flows[flowKey] = *flow
+		olt.Flows.Store(flowKey, *flow)
 	}
 
 	if flow.AccessIntfId == -1 {
@@ -1060,12 +1059,14 @@
 		}
 
 		// Check if flow exists
-		storedFlow, ok := o.Flows[flowKey]
+		storedFlowIntf, ok := o.Flows.Load(flowKey)
 		if !ok {
 			oltLogger.Errorf("Flow %v not found", flow)
 			return new(openolt.Empty), status.Errorf(codes.NotFound, "Flow not found")
 		}
 
+		storedFlow := storedFlowIntf.(openolt.Flow)
+
 		// if its ONU flow remove it from ONU also
 		if storedFlow.AccessIntfId != -1 {
 			pon := o.Pons[uint32(storedFlow.AccessIntfId)]
@@ -1083,7 +1084,7 @@
 		}
 
 		// delete from olt flows
-		delete(o.Flows, flowKey)
+		o.Flows.Delete(flowKey)
 	}
 
 	if flow.AccessIntfId == -1 {