VOL-2339: DHCP issue on second ONU, when two ONUs exist on same PON

- Process to_remove before to_add items in UpdateFlowsIncrementally
  method
- Sequentially add flows for a given pon, onu, uni inorder to avoid
  PON resource collisions. Used lock per key so that different
  subscribers do not have to wait for each other to add flows.
- Wait for any flow removes for the subscriber to complete before
  adding flows.

Change-Id: I095291b9a53fd0f19dc79f2b44923ec786a26d6e
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7e09fc8..9c65cee 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -1081,14 +1081,15 @@
 func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
 	if flows != nil {
-		for _, flow := range flows.ToAdd.Items {
-			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
-			dh.flowMgr.AddFlow(flow, flowMetadata)
-		}
 		for _, flow := range flows.ToRemove.Items {
 			log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
 			dh.flowMgr.RemoveFlow(flow)
 		}
+
+		for _, flow := range flows.ToAdd.Items {
+			log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
+			dh.flowMgr.AddFlow(flow, flowMetadata)
+		}
 	}
 	if groups != nil && flows != nil {
 		for _, flow := range flows.ToRemove.Items {
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 0186712..8c7cf8d 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -259,5 +259,9 @@
 	onuID = OnuIDFromUniPortNum(uniPortNo)
 	uniID = UniIDFromPortNum(uniPortNo)
 
+	log.Debugw("flow extract info result",
+		log.Fields{"uniPortNo": uniPortNo, "ponIntf": ponIntf,
+			"onuID": onuID, "uniID": uniID, "inPort": inPort, "ethType": ethType})
+
 	return uniPortNo, ponIntf, onuID, uniID, inPort, ethType, nil
 }
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 565c8c8..71a0850 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -26,6 +26,7 @@
 	"fmt"
 	"math/big"
 	"sync"
+	"time"
 
 	"github.com/opencord/voltha-lib-go/v2/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -39,6 +40,7 @@
 	"github.com/opencord/voltha-protos/v2/go/voltha"
 
 	//deepcopy "github.com/getlantern/deepcopy"
+	"github.com/EagleChen/mapmutex"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -152,6 +154,18 @@
 	gemPort uint32
 }
 
+type pendingFlowDeleteKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
+type tpLockKey struct {
+	intfID uint32
+	onuID  uint32
+	uniID  uint32
+}
+
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -174,6 +188,9 @@
 	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	onuGemInfo         map[uint32][]rsrcMgr.OnuGemInfo    //onu, gem and uni info local cache
 	lockCache          sync.RWMutex
+	pendingFlowDelete  sync.Map
+	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
+	perUserFlowHandleLock *mapmutex.Mutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -204,6 +221,8 @@
 		flowMgr.loadFlowIDlistForGem(idx)
 	}
 	flowMgr.lockCache = sync.RWMutex{}
+	flowMgr.pendingFlowDelete = sync.Map{}
+	flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
 	log.Info("Initialization of  flow manager success!!")
 	return &flowMgr
 }
@@ -254,24 +273,32 @@
 
 	uni := getUniPortPath(intfID, int32(onuID), int32(uniID))
 	log.Debugw("Uni port name", log.Fields{"uni": uni})
-	allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
-	if allocID == 0 || gemPorts == nil || TpInst == nil {
-		log.Error("alloc-id-gem-ports-tp-unavailable")
+
+	tpLockMapKey := tpLockKey{intfID, onuID, uniID}
+	if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
+		allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+		if allocID == 0 || gemPorts == nil || TpInst == nil {
+			log.Error("alloc-id-gem-ports-tp-unavailable")
+			f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+			return
+		}
+		args := make(map[string]uint32)
+		args[IntfID] = intfID
+		args[OnuID] = onuID
+		args[UniID] = uniID
+		args[PortNo] = portNo
+		args[AllocID] = allocID
+
+		/* Flows can be added specific to gemport if p-bits are received.
+		 * If no pbit mentioned then adding flows for all gemports
+		 */
+		f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+		f.perUserFlowHandleLock.Unlock(tpLockMapKey)
+	} else {
+		log.Errorw("failed to acquire per user flow handle lock",
+			log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID})
 		return
 	}
-
-	/* Flows can be added specific to gemport if p-bits are received.
-	 * If no pbit mentioned then adding flows for all gemports
-	 */
-
-	args := make(map[string]uint32)
-	args[IntfID] = intfID
-	args[OnuID] = onuID
-	args[UniID] = uniID
-	args[PortNo] = portNo
-	args[AllocID] = allocID
-
-	f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
 }
 
 // CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
@@ -445,6 +472,7 @@
 	var allocIDs []uint32
 	var allgemPortIDs []uint32
 	var gemPortIDs []uint32
+	tpInstanceExists := false
 
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
@@ -466,6 +494,7 @@
 		f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
 	} else {
 		log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
+		tpInstanceExists = true
 	}
 	if UsMeterID != 0 {
 		sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
@@ -485,13 +514,19 @@
 	}
 
 	allocID := techProfileInstance.UsScheduler.AllocID
-	allocIDs = appendUnique(allocIDs, allocID)
-
 	for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
-		allgemPortIDs = appendUnique(allgemPortIDs, gem.GemportID)
 		gemPortIDs = append(gemPortIDs, gem.GemportID)
 	}
 
+	if tpInstanceExists {
+		return allocID, gemPortIDs, techProfileInstance
+	}
+
+	allocIDs = appendUnique(allocIDs, allocID)
+	for _, gemPortID := range gemPortIDs {
+		allgemPortIDs = appendUnique(allgemPortIDs, gemPortID)
+	}
+
 	log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
 	// Send Tconts and GEM ports to KV store
 	f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
@@ -603,6 +638,10 @@
 		log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 	flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
@@ -674,11 +713,15 @@
 	delete(classifier, VlanVid)
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 
-	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
+	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
-		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
+		log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		return
 	}
 
@@ -729,10 +772,8 @@
 
 	uplinkClassifier := make(map[string]interface{})
 	uplinkAction := make(map[string]interface{})
-	downlinkClassifier := make(map[string]interface{})
-	downlinkAction := make(map[string]interface{})
+
 	var upstreamFlow openoltpb2.Flow
-	var downstreamFlow openoltpb2.Flow
 	var networkIntfID uint32
 
 	// Fill Classfier
@@ -742,6 +783,10 @@
 	// Fill action
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
 	//Add Uplink EAPOL Flow
 	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
@@ -795,78 +840,7 @@
 			return
 		}
 	}
-	// Dummy Downstream flow due to BAL 2.6 limitation
-	{
-		/* Add Downstream EAPOL Flow, Only for first EAP flow (BAL
-		# requirement)
-		# On one of the platforms (Broadcom BAL), when same DL classifier
-		# vlan was used across multiple ONUs, eapol flow re-adds after
-		# flow delete (cases of onu reboot/disable) fails.
-		# In order to generate unique vlan, a combination of intf_id
-		# onu_id and uniId is used.
-		# uniId defaults to 0, so add 1 to it.
-		*/
-		log.Debugw("Creating DL EAPOL flow with default vlan", log.Fields{"vlan": vlanID})
-		specialVlanDlFlow := 4090 - intfID*onuID*(uniID+1)
-		// Assert that we do not generate invalid vlans under no condition
-		if specialVlanDlFlow <= 2 {
-			log.Fatalw("invalid-vlan-generated", log.Fields{"vlan": specialVlanDlFlow})
-			return
-		}
-		log.Debugw("specialVlanEAPOLDlFlow:", log.Fields{"dl_vlan": specialVlanDlFlow})
-		// Fill Classfier
-		downlinkClassifier[PacketTagType] = SingleTag
-		downlinkClassifier[EthType] = uint32(EapEthType)
-		downlinkClassifier[VlanVid] = uint32(specialVlanDlFlow)
-		// Fill action
-		downlinkAction[PushVlan] = true
-		downlinkAction[VlanVid] = vlanID
-		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
-		if err != nil {
-			log.Errorw("flowId unavailable for DL EAPOL",
-				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
-			return
-		}
-		log.Debugw("Creating DL EAPOL flow",
-			log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowID": downlinkFlowID})
-		if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
-			log.Error("Error in making classifier protobuf for downlink flow")
-			return
-		}
-		if actionProto = makeOpenOltActionField(downlinkAction); actionProto == nil {
-			log.Error("Error in making action protobuf for dl flow")
-			return
-		}
-		// Downstream flow in grpc protobuf
-		downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
-			OnuId:         int32(onuID),
-			UniId:         int32(uniID),
-			FlowId:        downlinkFlowID,
-			FlowType:      Downstream,
-			AllocId:       int32(allocID),
-			NetworkIntfId: int32(networkIntfID),
-			GemportId:     int32(gemPortID),
-			Classifier:    classifierProto,
-			Action:        actionProto,
-			Priority:      int32(logicalFlow.Priority),
-			Cookie:        logicalFlow.Cookie,
-			PortNo:        portNo}
-		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
-			log.Debug("EAPOL DL flow added to device successfully")
-			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID, logicalFlow.Id)
-			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
-				downstreamFlow.OnuId,
-				downstreamFlow.UniId,
-				downstreamFlow.FlowId,
-				/* flowCategory, */
-				flowsToKVStore); err != nil {
-				log.Errorw("Error uploading EAPOL DL flow into KV store", log.Fields{"flow": upstreamFlow, "error": err})
-				return
-			}
-		}
-	}
+
 	log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
 }
 
@@ -967,6 +941,7 @@
 		log.Error("Invalid classfier object")
 		return 0
 	}
+	log.Debugw("generating flow store cookie", log.Fields{"classifier": classifier, "gemPortID": gemPortID})
 	var jsonData []byte
 	var flowString string
 	var err error
@@ -983,7 +958,9 @@
 	_, _ = h.Write([]byte(flowString))
 	hash := big.NewInt(0)
 	hash.SetBytes(h.Sum(nil))
-	return hash.Uint64()
+	generatedHash := hash.Uint64()
+	log.Debugw("hash generated", log.Fields{"hash": generatedHash})
+	return generatedHash
 }
 
 func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
@@ -1062,7 +1039,7 @@
 	st, _ := status.FromError(err)
 	if st.Code() == codes.AlreadyExists {
 		log.Debug("Flow already exists", log.Fields{"err": err, "deviceFlow": deviceFlow})
-		return false
+		return true
 	}
 
 	if err != nil {
@@ -1270,6 +1247,29 @@
 	return nil
 }
 
+func (f *OpenOltFlowMgr) deletePendingFlows(Intf uint32, onuID int32, uniID int32) {
+	pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+	if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); ok {
+		if val.(int) > 0 {
+			pnFlDels := val.(int) - 1
+			if pnFlDels > 0 {
+				log.Debugw("flow delete succeeded, more pending",
+					log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+				f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+			} else {
+				log.Debugw("all pending flow deletes handled, removing entry from map",
+					log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+				f.pendingFlowDelete.Delete(pnFlDelKey)
+			}
+		}
+	} else {
+		log.Debugw("no pending delete flows found",
+			log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+
+	}
+
+}
+
 //clearResources clears pon resources in kv store and the device
 func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
 	gemPortID int32, flowID uint32, flowDirection string,
@@ -1288,6 +1288,23 @@
 		// between DS and US.
 		f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
 		if len(updatedFlows) == 0 {
+			// Do this for subscriber flows only (not trap from NNI flows)
+			if onuID != -1 && uniID != -1 {
+				pnFlDelKey := pendingFlowDeleteKey{Intf, uint32(onuID), uint32(uniID)}
+				if val, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+					log.Debugw("creating entry for pending flow delete",
+						log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID})
+					f.pendingFlowDelete.Store(pnFlDelKey, 1)
+				} else {
+					pnFlDels := val.(int) + 1
+					log.Debugw("updating flow delete entry",
+						log.Fields{"intf": Intf, "onuID": onuID, "uniID": uniID, "currPendingFlowCnt": pnFlDels})
+					f.pendingFlowDelete.Store(pnFlDelKey, pnFlDels)
+				}
+
+				defer f.deletePendingFlows(Intf, onuID, uniID)
+			}
+
 			log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
 			f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
 
@@ -1369,6 +1386,7 @@
 		log.Error(err)
 		return
 	}
+
 	onuID = int32(onu)
 	uniID = int32(uni)
 
@@ -1451,6 +1469,24 @@
 	return
 }
 
+func (f *OpenOltFlowMgr) waitForFlowDeletesToCompleteForOnu(ctx context.Context, intfID uint32, onuID uint32,
+	uniID uint32, ch chan bool) {
+	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+	for {
+		select {
+		case <-time.After(20 * time.Millisecond):
+			if flowDelRefCnt, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok || flowDelRefCnt == 0 {
+				log.Debug("pending flow deletes completed")
+				ch <- true
+				return
+			}
+		case <-ctx.Done():
+			log.Error("flow delete wait handler routine canceled")
+			return
+		}
+	}
+}
+
 // AddFlow add flow to device
 func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
 	classifierInfo := make(map[string]interface{})
@@ -1514,7 +1550,26 @@
 		log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
 
 	}
-	f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+	pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
+	if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
+		log.Debugw("no pending flows found, going ahead with flow install", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+		f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+	} else {
+		ctx := context.Background()
+		ctx, cancel := context.WithCancel(ctx)
+		defer cancel()
+		pendingFlowDelComplete := make(chan bool)
+		go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
+		select {
+		case <-pendingFlowDelComplete:
+			log.Debugw("all pending flow deletes completed", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+			f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+
+		case <-time.After(10 * time.Second):
+			log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
+		}
+	}
 }
 
 //sendTPDownloadMsgToChild send payload
diff --git a/go.mod b/go.mod
index 9d21cb0..85e32cd 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@
 go 1.12
 
 require (
+	github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d
 	github.com/cenkalti/backoff/v3 v3.1.1
 	github.com/gogo/protobuf v1.3.1
 	github.com/golang/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 79e15c5..2e05514 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,8 @@
 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
 github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d h1:j5hduAppx4gHqltfZ1cm7jHbXR0LuQulnF4VkBU8esw=
+github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d/go.mod h1:H87WPRkM4YDLkW5tC6biLEzWaKtNse5xL1AR91FXC74=
 github.com/Shopify/sarama v1.23.1 h1:XxJBCZEoWJtoWjf/xRbmGUpAmTZGnuuF0ON0EvxxBrs=
 github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
 github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
diff --git a/vendor/github.com/EagleChen/mapmutex/README.md b/vendor/github.com/EagleChen/mapmutex/README.md
new file mode 100644
index 0000000..9ee890d
--- /dev/null
+++ b/vendor/github.com/EagleChen/mapmutex/README.md
@@ -0,0 +1,73 @@
+# mapmutex
+
+mapmutex is a simple implementation to act as a group of mutex.
+
+## What's it for?
+Synchronization is needed in many cases. But in some cases, you don't want a gaint lock to block totally irrelevant actions. Instead, you need many fine-grained tiny locks to only block on same resource.
+
+Take an example. A website have many users. Each user has a different counter. While one user want to increment the counter at the same time in different devices(say, from a pad and a phone), these increments need to happen one by one. But user A's incremntation has nothing to do with user B's incrementation, they don't have to affect each other.
+This is where this package comes in. You can lock for each user (by using user id as key) without blocking other users.
+
+## Performance
+As shown by the result of benchmark(in `mutex_test.go`), it's several times faster than one giant mutex.
+```
+(11 times faster)
+BenchmarkMutex1000_100_20_20-4          	       1	20164937908 ns/op
+BenchmarkMapMutex1000_100_20_20-4       	       1	1821899222 ns/op 
+
+(7 times faster)
+BenchmarkMutex1000_20_20_20-4           	       1	19726327623 ns/op
+BenchmarkMapMutex1000_20_20_20-4        	       1	2759654813 ns/op
+
+(11 times faster)
+BenchmarkMutex1000_20_40_20-4           	       1	20380128848 ns/op
+BenchmarkMapMutex1000_20_40_20-4        	       1	1828899343 ns/op
+
+(only 2 keys in map, 2 times faster)
+(in case of only one key in map, it's the same as one gaint lock)
+BenchmarkMutex1000_2_40_20-4            	       1	20721092007 ns/op
+BenchmarkMapMutex1000_2_40_20-4         	       1	10818512020 ns/op (989 of 1000 success)
+
+(9 times faster)
+BenchmarkMutex1000_20_40_60-4           	       1	60341833247 ns/op
+BenchmarkMapMutex1000_20_40_60-4        	       1	6240238975 ns/op
+
+(11 times faster)
+BenchmarkMutex10000_20_40_20-4          	       1	205493472245 ns/op
+BenchmarkMapMutex10000_20_40_20-4       	       1	18677416055 ns/op
+```
+
+## How to get
+```
+go get github.com/EagleChen/mapmutex
+```
+
+## How to use
+```
+mutex := mapmutex.NewMapMutex()
+if mutex.TryLock(key) { // for example, key can be user id
+    // do the real job here
+
+    mutex.Unlock(key)
+}
+```
+
+TryLock itself will retry several times to aquire the lock. But in the application level, you can also try several times when the lock cannot be got.
+```
+got := false
+for i := 0; && i < retryTimes; i++ {
+    if got = mutex.TryLock(key); got {
+        break
+    }
+}
+if got {
+    // do the real job here
+
+    mutex.Unlock(key)
+}
+```
+
+## How to tune
+1. Use `NewCustomizedMapMutex` to customize how hard 'TryLock' will try to get the lock. The parameters controls how many times to try, how long to wait before another try when failing to aquire the lock, etc. They may be very different for various use cases.
+
+2. Change some source code for your use case. For general use, `map[interface{}]interface{}` is used for storing 'locks'. But it can be changed to `map[int]bool` if your `key` is `int` and `map[string]bool` if you `key` is `string`. As far as i know, this trick will improve the performance, a little bit.
\ No newline at end of file
diff --git a/vendor/github.com/EagleChen/mapmutex/mutex.go b/vendor/github.com/EagleChen/mapmutex/mutex.go
new file mode 100644
index 0000000..2555e55
--- /dev/null
+++ b/vendor/github.com/EagleChen/mapmutex/mutex.go
@@ -0,0 +1,90 @@
+package mapmutex
+
+import (
+	"math/rand"
+	"sync"
+	"time"
+)
+
+// Mutex is the mutex with synchronized map
+// it's for reducing unnecessary locks among different keys
+type Mutex struct {
+	locks     map[interface{}]interface{}
+	m         *sync.Mutex
+	maxRetry  int
+	maxDelay  float64 // in nanosend
+	baseDelay float64 // in nanosecond
+	factor    float64
+	jitter    float64
+}
+
+// TryLock tries to aquire the lock.
+func (m *Mutex) TryLock(key interface{}) (gotLock bool) {
+	for i := 0; i < m.maxRetry; i++ {
+		m.m.Lock()
+		if _, ok := m.locks[key]; ok { // if locked
+			m.m.Unlock()
+			time.Sleep(m.backoff(i))
+		} else { // if unlock, lockit
+			m.locks[key] = struct{}{}
+			m.m.Unlock()
+			return true
+		}
+	}
+
+	return false
+}
+
+// Unlock unlocks for the key
+// please call Unlock only after having aquired the lock
+func (m *Mutex) Unlock(key interface{}) {
+	m.m.Lock()
+	delete(m.locks, key)
+	m.m.Unlock()
+}
+
+// borrowed from grpc
+func (m *Mutex) backoff(retries int) time.Duration {
+	if retries == 0 {
+		return time.Duration(m.baseDelay) * time.Nanosecond
+	}
+	backoff, max := m.baseDelay, m.maxDelay
+	for backoff < max && retries > 0 {
+		backoff *= m.factor
+		retries--
+	}
+	if backoff > max {
+		backoff = max
+	}
+	backoff *= 1 + m.jitter*(rand.Float64()*2-1)
+	if backoff < 0 {
+		return 0
+	}
+	return time.Duration(backoff) * time.Nanosecond
+}
+
+// NewMapMutex returns a mapmutex with default configs
+func NewMapMutex() *Mutex {
+	return &Mutex{
+		locks:     make(map[interface{}]interface{}),
+		m:         &sync.Mutex{},
+		maxRetry:  200,
+		maxDelay:  100000000, // 0.1 second
+		baseDelay: 10,        // 10 nanosecond
+		factor:    1.1,
+		jitter:    0.2,
+	}
+}
+
+// NewCustomizedMapMutex returns a customized mapmutex
+func NewCustomizedMapMutex(mRetry int, mDelay, bDelay, factor, jitter float64) *Mutex {
+	return &Mutex{
+		locks:     make(map[interface{}]interface{}),
+		m:         &sync.Mutex{},
+		maxRetry:  mRetry,
+		maxDelay:  mDelay,
+		baseDelay: bDelay,
+		factor:    factor,
+		jitter:    jitter,
+	}
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index ea3c340..8fe9dde 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1,5 +1,7 @@
 # github.com/DataDog/zstd v1.4.1
 github.com/DataDog/zstd
+# github.com/EagleChen/mapmutex v0.0.0-20180418073615-e1a5ae258d8d
+github.com/EagleChen/mapmutex
 # github.com/Shopify/sarama v1.23.1
 github.com/Shopify/sarama
 # github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878