[VOL-1564] Refactor flow deletion

This update consists of the following:
1)  Refactor the flow management around flow deletion and
addition.
2) Update the simulated adapters to receive and do initial
processing of flow updates (bulk and incremental)
3) Add more tests to the flow utils test suite
4) Add a new flow management test for integration test in a
development environment (work in progress)

Change-Id: I9dbb2adf9e600af52ce267b727617be181c8f1ab
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index c0c2dee..7ce4414 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -268,6 +268,45 @@
 }
 
 func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+	log.Debug("Update_flows_bulk")
+	if len(args) < 4 {
+		log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	flows := &voltha.Flows{}
+	groups := &voltha.FlowGroups{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case "flows":
+			if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+				log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+				return nil, err
+			}
+		case "groups":
+			if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+				log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+	//Invoke the adopt device on the adapter
+	if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index f64f99b..2c1abbf 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -266,3 +266,15 @@
 
 	log.Debugw("DeleteDevice-end", log.Fields{"deviceId": device.Id})
 }
+
+func (dh *DeviceHandler) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) {
+	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+	// For now we do nothing with it
+	return
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncremental(device *voltha.Device, flowChanges *of.FlowChanges, groupChanges *of.FlowGroupChanges) {
+	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+	// For now we do nothing with it
+	return
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index dd40b27..4349a1b 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -216,11 +216,29 @@
 }
 
 func (so *SimulatedOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("bulk-flow-updates", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdateFlowsBulk(device, flows, groups)
+	}
+	return nil
 }
 
-func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
-	return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("incremental-flow-update", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdateFlowsIncremental(device, flowChanges, groupChanges)
+	}
+	return nil
 }
 
 func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 4a18d10..ba9b9ed 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -233,7 +233,7 @@
 func (a *adapter) registerWithCore(retries int) error {
 	log.Info("registering-with-core")
 	adapterDescription := &voltha.Adapter{Id: "simulated_olt", Vendor: "simulation Enterprise Inc"}
-	types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt"}}
+	types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt", AcceptsAddRemoveFlowUpdates: true}}
 	deviceTypes := &voltha.DeviceTypes{Items: types}
 	count := 0
 	for {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 96cb3d8..61b1c0b 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -256,3 +256,15 @@
 
 	log.Debugw("DeleteDevice-end", log.Fields{"deviceId": device.Id})
 }
+
+func (dh *DeviceHandler) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) {
+	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+	// For now we do nothing with it
+	return
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncremental(device *voltha.Device, flowChanges *of.FlowChanges, groupChanges *of.FlowGroupChanges) {
+	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+	// For now we do nothing with it
+	return
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index e9fa82e..d8d4127 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -208,11 +208,29 @@
 }
 
 func (so *SimulatedONU) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("bulk-flow-updates", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdateFlowsBulk(device, flows, groups)
+	}
+	return nil
 }
 
-func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
-	return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Debugw("incremental-flow-update", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.UpdateFlowsIncremental(device, flowChanges, groupChanges)
+	}
+	return nil
 }
 
 func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {