[VOL-1564] Refactor flow deletion
This update consists of the following:
1) Refactor the flow management around flow deletion and
addition.
2) Update the simulated adapters to receive and do initial
processing of flow updates (bulk and incremental)
3) Add more tests to the flow utils test suite
4) Add a new flow management test for integration test in a
development environment (work in progress)
Change-Id: I9dbb2adf9e600af52ce267b727617be181c8f1ab
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index c0c2dee..7ce4414 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -268,6 +268,45 @@
}
func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
+ log.Debug("Update_flows_bulk")
+ if len(args) < 4 {
+ log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ flows := &voltha.Flows{}
+ groups := &voltha.FlowGroups{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flows":
+ if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+ log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ return nil, err
+ }
+ case "groups":
+ if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+ log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index f64f99b..2c1abbf 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -266,3 +266,15 @@
log.Debugw("DeleteDevice-end", log.Fields{"deviceId": device.Id})
}
+
+func (dh *DeviceHandler) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) {
+ log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+ // For now we do nothing with it
+ return
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncremental(device *voltha.Device, flowChanges *of.FlowChanges, groupChanges *of.FlowGroupChanges) {
+ log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index dd40b27..4349a1b 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -216,11 +216,29 @@
}
func (so *SimulatedOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("bulk-flow-updates", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdateFlowsBulk(device, flows, groups)
+ }
+ return nil
}
-func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Update_flows_incrementally(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("incremental-flow-update", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdateFlowsIncremental(device, flowChanges, groupChanges)
+ }
+ return nil
}
func (so *SimulatedOLT) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
diff --git a/adapters/simulated_olt/main.go b/adapters/simulated_olt/main.go
index 4a18d10..ba9b9ed 100644
--- a/adapters/simulated_olt/main.go
+++ b/adapters/simulated_olt/main.go
@@ -233,7 +233,7 @@
func (a *adapter) registerWithCore(retries int) error {
log.Info("registering-with-core")
adapterDescription := &voltha.Adapter{Id: "simulated_olt", Vendor: "simulation Enterprise Inc"}
- types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt"}}
+ types := []*voltha.DeviceType{{Id: "simulated_olt", Adapter: "simulated_olt", AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
count := 0
for {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index 96cb3d8..61b1c0b 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -256,3 +256,15 @@
log.Debugw("DeleteDevice-end", log.Fields{"deviceId": device.Id})
}
+
+func (dh *DeviceHandler) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) {
+ log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+ // For now we do nothing with it
+ return
+}
+
+func (dh *DeviceHandler) UpdateFlowsIncremental(device *voltha.Device, flowChanges *of.FlowChanges, groupChanges *of.FlowGroupChanges) {
+ log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+ // For now we do nothing with it
+ return
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index e9fa82e..d8d4127 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -208,11 +208,29 @@
}
func (so *SimulatedONU) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("bulk-flow-updates", log.Fields{"deviceId": device.Id, "flows": flows, "groups": groups})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdateFlowsBulk(device, flows, groups)
+ }
+ return nil
}
-func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Update_flows_incrementally(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Debugw("incremental-flow-update", log.Fields{"deviceId": device.Id, "flowChanges": flowChanges, "groupChanges": groupChanges})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.UpdateFlowsIncremental(device, flowChanges, groupChanges)
+ }
+ return nil
}
func (so *SimulatedONU) Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error {
diff --git a/compose/adapters-simulated.yml b/compose/adapters-simulated.yml
index 74ff99c..4d2a3b7 100644
--- a/compose/adapters-simulated.yml
+++ b/compose/adapters-simulated.yml
@@ -32,7 +32,7 @@
"--simulator_topic=simulated_olt",
"--kv_store_host=${DOCKER_HOST_IP}",
"--kv_store_port=2379",
- "--onu_number=1"
+ "--onu_number=4"
]
networks:
- default
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
index 4600b1a..49d24f9 100644
--- a/compose/rw_core.yml
+++ b/compose/rw_core.yml
@@ -31,10 +31,10 @@
- -rw_core_topic=rwcore
- -kv_store_data_prefix=service/voltha
- -in_competing_mode=false
- - -timeout_long_request=3000
- - -timeout_request=500
- - -core_timeout=500
- - -log_level=2
+ - -timeout_long_request=6000
+ - -timeout_request=3000
+ - -core_timeout=3000
+ - -log_level=0
ports:
- 50057:50057
volumes:
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 541b502..23b2073 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -415,7 +415,7 @@
}
func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
- log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
+ log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
@@ -440,7 +440,15 @@
}
func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
- log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
+ log.Debugw("UpdateFlowsIncremental",
+ log.Fields{
+ "deviceId": device.Id,
+ "flowsToAdd": len(flowChanges.ToAdd.Items),
+ "flowsToDelete": len(flowChanges.ToRemove.Items),
+ "groupsToAdd": len(groupChanges.ToAdd.Items),
+ "groupsToDelete": len(groupChanges.ToRemove.Items),
+ "groupsToUpdate": len(groupChanges.ToUpdate.Items),
+ })
toTopic := ap.getAdapterTopic(device.Adapter)
rpc := "update_flows_incrementally"
args := make([]*kafka.KVArg, 3)
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4a88779..5718614 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -237,7 +237,11 @@
ch <- nil
}
+//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
+//adapters
func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+ log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+
if (len(newFlows) | len(newGroups)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
return nil
@@ -245,93 +249,287 @@
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
- var existingFlows *voltha.Flows
- if device, err := agent.getDeviceWithoutLock(); err != nil {
- return status.Errorf(codes.NotFound, "%s", agent.deviceId)
- } else {
- existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
- existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
- log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
- var updatedFlows []*ofp.OfpFlowStats
+ var device *voltha.Device
+ var err error
+ if device, err = agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ }
+
+ existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+ existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+ var updatedFlows []*ofp.OfpFlowStats
+ var flowsToDelete []*ofp.OfpFlowStats
+ var groupsToDelete []*ofp.OfpGroupEntry
+ var updatedGroups []*ofp.OfpGroupEntry
+
+ // Process flows
+ for _, flow := range newFlows {
+ updatedFlows = append(updatedFlows, flow)
+ }
+ for _, flow := range existingFlows.Items {
+ if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+ updatedFlows = append(updatedFlows, flow)
+ } else {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ }
+
+ // Process groups
+ for _, g := range newGroups {
+ updatedGroups = append(updatedGroups, g)
+ }
+ for _, group := range existingGroups.Items {
+ if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+ updatedGroups = append(updatedGroups, group)
+ } else {
+ groupsToDelete = append(groupsToDelete, group)
+ }
+ }
+
+ // Sanity check
+ if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+
+ // Send update to adapters
+ // Create two channels to receive responses from the dB and from the adapters.
+ // Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
+ // to send their responses. These channels will be garbage collected once all the responses are
+ // received
+ chAdapters := make(chan interface{})
+ chdB := make(chan interface{})
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+
+ if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ return nil
+ }
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: newFlows},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: newGroups},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ }
+
+ // store the changed data
+ device.Flows = &voltha.Flows{Items: updatedFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+
+ return nil
+}
+
+//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
+//adapters
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry) error {
+ log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
+
+ if (len(flowsToDel) | len(groupsToDel)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
+ return nil
+ }
+
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+
+ var device *voltha.Device
+ var err error
+
+ if device, err = agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ }
+
+ existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+ existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+ var flowsToKeep []*ofp.OfpFlowStats
+ var groupsToKeep []*ofp.OfpGroupEntry
+
+ // Process flows
+ for _, flow := range existingFlows.Items {
+ if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
+ flowsToKeep = append(flowsToKeep, flow)
+ }
+ }
+
+ // Process groups
+ for _, group := range existingGroups.Items {
+ if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
+ groupsToKeep = append(groupsToKeep, group)
+ }
+ }
+
+ log.Debugw("deleteFlowsAndGroups",
+ log.Fields{
+ "deviceId": agent.deviceId,
+ "flowsToDel": len(flowsToDel),
+ "flowsToKeep": len(flowsToKeep),
+ "groupsToDel": len(groupsToDel),
+ "groupsToKeep": len(groupsToKeep),
+ })
+
+ // Sanity check
+ if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+ return nil
+ }
+
+ // Send update to adapters
+ chAdapters := make(chan interface{})
+ chdB := make(chan interface{})
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+ return nil
+ }
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, chAdapters)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: flowsToDel},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDel},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+ }
+
+ // store the changed data
+ device.Flows = &voltha.Flows{Items: flowsToKeep}
+ device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
+ go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+
+}
+
+//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
+//also sends the updates to the adapters
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry) error {
+ log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+
+ if (len(updatedFlows) | len(updatedGroups)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+ return nil
+ }
+
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ var device *voltha.Device
+ var err error
+ if device, err = agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ }
+ existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+ existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+ if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+ return nil
+ }
+
+ log.Debugw("updating-flows-and-groups",
+ log.Fields{
+ "deviceId": agent.deviceId,
+ "updatedFlows": updatedFlows,
+ "updatedGroups": updatedGroups,
+ })
+
+ chAdapters := make(chan interface{})
+ chdB := make(chan interface{})
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+
+ // Process bulk flow update differently than incremental update
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+ } else {
+ var flowsToAdd []*ofp.OfpFlowStats
var flowsToDelete []*ofp.OfpFlowStats
+ var groupsToAdd []*ofp.OfpGroupEntry
var groupsToDelete []*ofp.OfpGroupEntry
- var updatedGroups []*ofp.OfpGroupEntry
// Process flows
- for _, flow := range newFlows {
- updatedFlows = append(updatedFlows, flow)
+ for _, flow := range updatedFlows {
+ if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
+ flowsToAdd = append(flowsToAdd, flow)
+ }
}
-
for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(newFlows, flow); idx == -1 {
- updatedFlows = append(updatedFlows, flow)
- } else {
+ if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
flowsToDelete = append(flowsToDelete, flow)
}
}
// Process groups
- for _, g := range newGroups {
- updatedGroups = append(updatedGroups, g)
+ for _, g := range updatedGroups {
+ if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
+ groupsToAdd = append(groupsToAdd, g)
+ }
}
-
for _, group := range existingGroups.Items {
- if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
- updatedGroups = append(updatedGroups, group)
- } else {
+ if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
groupsToDelete = append(groupsToDelete, group)
}
}
+ log.Debugw("updating-flows-and-groups",
+ log.Fields{
+ "deviceId": agent.deviceId,
+ "flowsToAdd": flowsToAdd,
+ "flowsToDelete": flowsToDelete,
+ "groupsToAdd": groupsToAdd,
+ "groupsToDelete": groupsToDelete,
+ })
+
// Sanity check
- if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+ if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
+ log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
return nil
-
- }
- // Send update to adapters
-
- // Create two channels to receive responses from the dB and from the adapters.
- // Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
- // to send their responses. These channels will be garbage collected once all the responses are
- // received
- chAdapters := make(chan interface{})
- chdB := make(chan interface{})
- dType := agent.adapterMgr.getDeviceType(device.Type)
- if !dType.AcceptsAddRemoveFlowUpdates {
-
- if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
- log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
- return nil
- }
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
-
- } else {
- flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: newFlows},
- ToRemove: &voltha.Flows{Items: flowsToDelete},
- }
- groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: newGroups},
- ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
- ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
- }
- go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
}
- // store the changed data
- device.Flows = &voltha.Flows{Items: updatedFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- go agent.updateDeviceWithoutLockAsync(device, chdB)
-
- if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: flowsToAdd},
+ ToRemove: &voltha.Flows{Items: flowsToDelete},
}
-
- return nil
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
+ ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+ ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
+ }
+ go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
}
+
+ // store the updated data
+ device.Flows = &voltha.Flows{Items: updatedFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
}
//disableDevice disable a device
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index faf8da8..f52345e 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -592,8 +592,22 @@
log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.addFlowsAndGroups(flows, groups)
- //go agent.addFlowsAndGroups(flows, groups)
- //return nil
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) deleteFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+ log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.deleteFlowsAndGroups(flows, groups)
+ }
+ return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) updateFlowsAndGroups(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) error {
+ log.Debugw("updateFlowsAndGroups", log.Fields{"deviceid": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.updateFlowsAndGroups(flows, groups)
}
return status.Errorf(codes.NotFound, "%s", deviceId)
}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 906c4e7..88cc5fc 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -111,8 +111,8 @@
Transition{
deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_DISCOVERED},
- currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVATING},
- handlers: []TransitionHandler{}})
+ currentState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+ handlers: []TransitionHandler{dMgr.SetAdminStateToEnable}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
deviceType: child,
diff --git a/rw_core/core/device_state_transitions_test.go b/rw_core/core/device_state_transitions_test.go
index a9ec326..c89c684 100644
--- a/rw_core/core/device_state_transitions_test.go
+++ b/rw_core/core/device_state_transitions_test.go
@@ -126,6 +126,12 @@
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetAdminStateToEnable).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+ from = getDevice(false, voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_DISCOVERED)
+ to = getDevice(false, voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ handlers = transitionMap.GetTransitionHandler(from, to)
+ assert.Equal(t, 1, len(handlers))
+ assert.True(t, reflect.ValueOf(tdm.SetAdminStateToEnable).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+
from = getDevice(false, voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
to = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
handlers = transitionMap.GetTransitionHandler(from, to)
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 489c79f..cae40e1 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -533,7 +533,6 @@
}
updatedFlows := make([]*ofp.OfpFlowStats, 0)
- //oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
changed := false
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
if checkOverlap {
@@ -568,18 +567,16 @@
}
}
if changed {
- // Launch a routine to decompose the flows
- if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups); err != nil {
- log.Errorw("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
return err
}
// Update model
- flowsToUpdate := &ofp.Flows{}
- if lDevice.Flows != nil {
- flowsToUpdate = &ofp.Flows{Items: flows}
- }
- if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
+ if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
@@ -587,31 +584,6 @@
return nil
}
-func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups) error {
- log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
-
- deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups)
- log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- chnlsList := make([]chan interface{}, 0)
- for deviceId, value := range deviceRules.GetRules() {
- ch := make(chan interface{})
- chnlsList = append(chnlsList, ch)
- go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
- if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
- log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
- ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
- }
- ch <- nil
- }(deviceId, value.ListFlows(), value.ListGroups())
- }
- // Wait for completion
- if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
-}
-
//flowDelete deletes a flow from the flow table of that logical device
func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
log.Debug("flowDelete")
@@ -631,14 +603,33 @@
//build a list of what to keep vs what to delete
toKeep := make([]*ofp.OfpFlowStats, 0)
+ toDelete := make([]*ofp.OfpFlowStats, 0)
for _, f := range flows {
+ // Check whether the flow and the flowmod matches
+ if fu.FlowMatch(f, fu.FlowStatsEntryFromFlowModMessage(mod)) {
+ toDelete = append(toDelete, f)
+ continue
+ }
+ // Check wild card match
if !fu.FlowMatchesMod(f, mod) {
toKeep = append(toKeep, f)
+ } else {
+ toDelete = append(toDelete, f)
}
}
+ log.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "toKeep": len(toKeep), "toDelete": toDelete})
+
//Update flows
- if len(toKeep) < len(flows) {
+ if len(toDelete) > 0 {
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{})
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
@@ -649,37 +640,68 @@
return nil
}
-//flowStatsDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowStatsDelete(flow *ofp.OfpFlowStats) error {
- log.Debug("flowStatsDelete")
- if flow == nil {
- return nil
- }
- agent.lockLogicalDevice.Lock()
- defer agent.lockLogicalDevice.Unlock()
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+ log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
- var lDevice *voltha.LogicalDevice
- var err error
- if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
- log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
- return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+ chnlsList := make([]chan interface{}, 0)
+ for deviceId, value := range deviceRules.GetRules() {
+ ch := make(chan interface{})
+ chnlsList = append(chnlsList, ch)
+ go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+ if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+ log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
+ ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+ }
+ ch <- nil
+ }(deviceId, value.ListFlows(), value.ListGroups())
}
- flows := lDevice.Flows.Items
-
- //build a list of what to keep vs what to delete
- toKeep := make([]*ofp.OfpFlowStats, 0)
- for _, f := range flows {
- if !fu.FlowMatch(f, flow) {
- toKeep = append(toKeep, f)
- }
+ // Wait for completion
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
}
+ return nil
+}
- //Update flows
- if len(toKeep) < len(flows) {
- if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
- log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
- return err
- }
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+ log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+ chnlsList := make([]chan interface{}, 0)
+ for deviceId, value := range deviceRules.GetRules() {
+ ch := make(chan interface{})
+ chnlsList = append(chnlsList, ch)
+ go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups); err != nil {
+ log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
+ ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+ }
+ ch <- nil
+ }(deviceId, value.ListFlows(), value.ListGroups())
+ }
+ // Wait for completion
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+ log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+ chnlsList := make([]chan interface{}, 0)
+ for deviceId, value := range deviceRules.GetRules() {
+ ch := make(chan interface{})
+ chnlsList = append(chnlsList, ch)
+ go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+ if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups); err != nil {
+ log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
+ ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+ }
+ ch <- nil
+ }(deviceId, value.ListFlows(), value.ListGroups())
+ }
+ // Wait for completion
+ if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
}
@@ -711,12 +733,19 @@
}
if changed {
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: []*ofp.OfpFlowStats{flow}}, ofp.FlowGroups{})
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
- log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+ log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
}
}
-
return nil
}
@@ -747,6 +776,15 @@
groups := lDevice.FlowGroups.Items
if fu.FindGroup(groups, groupMod.GroupId) == -1 {
groups = append(groups, fu.GroupEntryFromGroupMod(groupMod))
+
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, *lDevice.Flows, ofp.FlowGroups{Items: groups})
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
return err
@@ -790,6 +828,16 @@
groupsChanged = true
}
}
+ if flowsChanged || groupsChanged {
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+ }
+
if groupsChanged {
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
@@ -802,7 +850,6 @@
return err
}
}
-
return nil
}
@@ -832,6 +879,14 @@
groupsChanged = true
}
if groupsChanged {
+ deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: lDevice.Flows.Items}, ofp.FlowGroups{Items: groups})
+ log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+ log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+ return err
+ }
+
//lDevice.FlowGroups.Items = groups
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
diff --git a/rw_core/utils/flow_utils.go b/rw_core/utils/flow_utils.go
index c1ca18d..3828b39 100644
--- a/rw_core/utils/flow_utils.go
+++ b/rw_core/utils/flow_utils.go
@@ -1082,7 +1082,7 @@
//Check match condition
//If the flow_mod match field is empty, that is a special case and indicates the flow entry matches
- if (mod.Match == nil) || (mod.Match.OxmFields == nil) {
+ if (mod.Match == nil) || (mod.Match.OxmFields == nil) || (len(mod.Match.OxmFields) == 0) {
//If we got this far and the match is empty in the flow spec, than the flow matches
return true
} // TODO : implement the flow match analysis
@@ -1156,3 +1156,11 @@
}
return len(toKeep) < len(flows), toKeep
}
+
+func ToOfpOxmField(from []*ofp.OfpOxmOfbField) []*ofp.OfpOxmField {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range from {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return matchFields
+}
diff --git a/rw_core/utils/flow_utils_test.go b/rw_core/utils/flow_utils_test.go
index 48a1b75..c1b1da9 100644
--- a/rw_core/utils/flow_utils_test.go
+++ b/rw_core/utils/flow_utils_test.go
@@ -462,3 +462,228 @@
flow = MkFlowStat(fa)
assert.False(t, FlowHasOutGroup(flow, 1))
}
+
+func TestMatchFlow(t *testing.T) {
+ assert.False(t, FlowMatch(nil, nil))
+ fa := &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ Group(10),
+ },
+ }
+ flow1 := MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, nil))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ Group(10),
+ },
+ }
+ flow2 := MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+ assert.False(t, FlowMatch(nil, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.True(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 501, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ Group(10),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 2, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268467, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 14},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(4),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.False(t, FlowMatch(flow1, flow2))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1, "cookie": 38268468, "flags": 12},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ PopVlan(),
+ Output(1),
+ },
+ }
+ flow2 = MkFlowStat(fa)
+ assert.True(t, FlowMatch(flow1, flow2))
+}
+
+func TestFlowMatchesMod(t *testing.T) {
+ assert.False(t, FlowMatchesMod(nil, nil))
+ fa := &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ Output(1),
+ Group(10),
+ },
+ }
+ flow := MkFlowStat(fa)
+ assert.False(t, FlowMatchesMod(flow, nil))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"priority": 500, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ InPort(2),
+ VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 0),
+ VlanPcp(0),
+ EthType(0x800),
+ Ipv4Dst(0xe00a0a0a),
+ },
+ Actions: []*ofp.OfpAction{
+ PopVlan(),
+ Output(1),
+ },
+ }
+ flowMod := MkSimpleFlowMod(ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ assert.False(t, FlowMatchesMod(nil, flowMod))
+ assert.False(t, FlowMatchesMod(flow, flowMod))
+ assert.True(t, FlowMatch(flow, FlowStatsEntryFromFlowModMessage(flowMod)))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"table_id": uint64(ofp.OfpTable_OFPTT_ALL),
+ "cookie_mask": 0,
+ "out_port": uint64(ofp.OfpPortNo_OFPP_ANY),
+ "out_group": uint64(ofp.OfpGroup_OFPG_ANY),
+ },
+ }
+ flowMod = MkSimpleFlowMod(ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ assert.True(t, FlowMatchesMod(flow, flowMod))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"table_id": 1,
+ "cookie_mask": 0,
+ "out_port": uint64(ofp.OfpPortNo_OFPP_ANY),
+ "out_group": uint64(ofp.OfpGroup_OFPG_ANY),
+ },
+ }
+ flowMod = MkSimpleFlowMod(ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ assert.True(t, FlowMatchesMod(flow, flowMod))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"table_id": 1,
+ "cookie_mask": 0,
+ "out_port": 1,
+ "out_group": uint64(ofp.OfpGroup_OFPG_ANY),
+ },
+ }
+ flowMod = MkSimpleFlowMod(ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ assert.True(t, FlowMatchesMod(flow, flowMod))
+
+ fa = &FlowArgs{
+ KV: OfpFlowModArgs{"table_id": 1,
+ "cookie_mask": 0,
+ "out_port": 1,
+ "out_group": 10,
+ },
+ }
+ flowMod = MkSimpleFlowMod(ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ assert.True(t, FlowMatchesMod(flow, flowMod))
+}
diff --git a/tests/core/flow_management_test.go b/tests/core/flow_management_test.go
new file mode 100644
index 0000000..ebda7de
--- /dev/null
+++ b/tests/core/flow_management_test.go
@@ -0,0 +1,521 @@
+// +build integration
+
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-go/common/log"
+ fu "github.com/opencord/voltha-go/rw_core/utils"
+ tu "github.com/opencord/voltha-go/tests/utils"
+ "github.com/opencord/voltha-protos/go/common"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/metadata"
+ "math"
+ "os"
+ "testing"
+ "time"
+)
+
+var stub voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+
+/*
+ This local "integration" test uses one RW-Core, one simulated_olt and one simulated_onu adapter to test flows
+(add/delete), in a development environment. It uses docker-compose to set up the local environment. However, it can
+easily be extended to run in k8s environment.
+
+The compose files used are located under %GOPATH/src/github.com/opencord/voltha-go/compose. If the GOPATH is not set
+then you can specify the location of the compose files by using COMPOSE_PATH to set the compose files location.
+
+To run this test: DOCKER_HOST_IP=<local IP> go test -v
+
+NOTE: Since this is an integration test that involves several containers and features (device creation, device
+activation, validation of parent and discovered devices, validation of logical device as well as add/delete flows)
+then a failure can occur anywhere not just when testing flows.
+
+*/
+
+var allDevices map[string]*voltha.Device
+var allLogicalDevices map[string]*voltha.LogicalDevice
+
+var composePath string
+
+const (
+ GRPC_PORT = 50057
+ NUM_OLTS = 1
+ NUM_ONUS_PER_OLT = 4 // This should coincide with the number of onus per olt in adapters-simulated.yml file
+)
+
+func setup() {
+ var err error
+
+ if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+ log.SetAllLogLevel(log.ErrorLevel)
+
+ volthaSerialNumberKey = "voltha_serial_number"
+ allDevices = make(map[string]*voltha.Device)
+ allLogicalDevices = make(map[string]*voltha.LogicalDevice)
+
+ grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+ goPath := os.Getenv("GOPATH")
+ if goPath != "" {
+ composePath = fmt.Sprintf("%s/src/github.com/opencord/voltha-go/compose", goPath)
+ } else {
+ composePath = os.Getenv("COMPOSE_PATH")
+ }
+
+ fmt.Println("Using compose path:", composePath)
+
+ //Start the simulated environment
+ if err = tu.StartSimulatedEnv(composePath); err != nil {
+ fmt.Println("Failure starting simulated environment:", err)
+ os.Exit(10)
+ }
+
+ stub, err = tu.SetupGrpcConnectionToCore(grpcHostIP, GRPC_PORT)
+ if err != nil {
+ fmt.Println("Failure connecting to Voltha Core:", err)
+ os.Exit(11)
+ }
+
+ // Wait for the simulated devices to be registered in the Voltha Core
+ adapters := []string{"simulated_olt", "simulated_onu"}
+ if _, err = tu.WaitForAdapterRegistration(stub, adapters, 20); err != nil {
+ fmt.Println("Failure retrieving adapters:", err)
+ os.Exit(12)
+ }
+}
+
+func shutdown() {
+ err := tu.StopSimulatedEnv(composePath)
+ if err != nil {
+ fmt.Println("Failure stop simulated environment:", err)
+ }
+}
+
+func refreshLocalDeviceCache(stub voltha.VolthaServiceClient) error {
+ retrievedDevices, err := tu.ListDevices(stub)
+ if err != nil {
+ return err
+ }
+ for _, d := range retrievedDevices.Items {
+ allDevices[d.Id] = d
+ }
+
+ retrievedLogicalDevices, err := tu.ListLogicalDevices(stub)
+ if err != nil {
+ return err
+ }
+
+ for _, ld := range retrievedLogicalDevices.Items {
+ allLogicalDevices[ld.Id] = ld
+ }
+ return nil
+}
+
+func makeSimpleFlowMod(fa *fu.FlowArgs) *ofp.OfpFlowMod {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return fu.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func addEAPOLFlow(stub voltha.VolthaServiceClient, ld *voltha.LogicalDevice, port *voltha.LogicalPort, ch chan interface{}) {
+ var fa *fu.FlowArgs
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 2000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(port.OfpPort.PortNo),
+ fu.EthType(0x888e),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+ },
+ }
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ f := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: ld.Id}
+
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func getNumUniPort(ld *voltha.LogicalDevice, lPortNos ...uint32) int {
+ num := 0
+ if len(lPortNos) > 0 {
+ for _, pNo := range lPortNos {
+ for _, lPort := range ld.Ports {
+ if !lPort.RootPort && lPort.OfpPort.PortNo == pNo {
+ num += 1
+ }
+ }
+ }
+ } else {
+ for _, port := range ld.Ports {
+ if !port.RootPort {
+ num += 1
+ }
+ }
+ }
+ return num
+}
+
+func filterOutPort(lPort *voltha.LogicalPort, lPortNos ...uint32) bool {
+ if len(lPortNos) == 0 {
+ return false
+ }
+ for _, pNo := range lPortNos {
+ if lPort.OfpPort.PortNo == pNo {
+ return false
+ }
+ }
+ return true
+}
+
+func verifyEAPOLFlows(t *testing.T, ld *voltha.LogicalDevice, lPortNos ...uint32) {
+ // First get the flows from the logical device
+ lFlows := ld.Flows
+ assert.Equal(t, getNumUniPort(ld, lPortNos...), len(lFlows.Items))
+
+ onuDeviceId := ""
+
+ // Verify that the flows in the logical device is what was pushed
+ for _, lPort := range ld.Ports {
+ if lPort.RootPort {
+ continue
+ }
+ if filterOutPort(lPort, lPortNos...) {
+ continue
+ }
+ onuDeviceId = lPort.DeviceId
+ var fa *fu.FlowArgs
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 2000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(lPort.OfpPort.PortNo),
+ fu.EthType(0x888e),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+ },
+ }
+ expectedLdFlow := fu.MkFlowStat(fa)
+ assert.Equal(t, true, tu.IsFlowPresent(expectedLdFlow, lFlows.Items))
+ }
+
+ // Verify the OLT flows
+ retrievedOltFlows := allDevices[ld.RootDeviceId].Flows.Items
+ assert.Equal(t, NUM_OLTS*getNumUniPort(ld, lPortNos...)*2, len(retrievedOltFlows))
+ for _, lPort := range ld.Ports {
+ if lPort.RootPort {
+ continue
+ }
+ if filterOutPort(lPort, lPortNos...) {
+ continue
+ }
+
+ fa := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 2000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(1),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | lPort.OfpPort.PortNo),
+ fu.TunnelId(uint64(lPort.OfpPort.PortNo)),
+ fu.EthType(0x888e),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PushVlan(0x8100),
+ fu.SetField(fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000)),
+ fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+ },
+ }
+ expectedOltFlow := fu.MkFlowStat(fa)
+ assert.Equal(t, true, tu.IsFlowPresent(expectedOltFlow, retrievedOltFlows))
+
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 2000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(2),
+ fu.VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
+ fu.VlanPcp(0),
+ fu.Metadata_ofp(uint64(lPort.OfpPort.PortNo)),
+ fu.TunnelId(uint64(lPort.OfpPort.PortNo)),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.PopVlan(),
+ fu.Output(1),
+ },
+ }
+ expectedOltFlow = fu.MkFlowStat(fa)
+ assert.Equal(t, true, tu.IsFlowPresent(expectedOltFlow, retrievedOltFlows))
+ }
+ // Verify the ONU flows
+ retrievedOnuFlows := allDevices[onuDeviceId].Flows.Items
+ assert.Equal(t, 0, len(retrievedOnuFlows))
+}
+
+func verifyNOFlows(t *testing.T, ld *voltha.LogicalDevice, lPortNos ...uint32) {
+ if len(lPortNos) == 0 {
+ assert.Equal(t, 0, len(ld.Flows.Items))
+ for _, d := range allDevices {
+ if d.ParentId == ld.Id {
+ assert.Equal(t, 0, len(d.Flows.Items))
+ }
+ }
+ return
+ }
+ for _, p := range lPortNos {
+ // Check absence of flows in logical device for that port
+ for _, f := range ld.Flows.Items {
+ assert.NotEqual(t, p, fu.GetInPort(f))
+ }
+ // Check absence of flows in the parent device for that port
+ for _, d := range allDevices {
+ if d.ParentId == ld.Id {
+ for _, f := range d.Flows.Items {
+ assert.NotEqual(t, p, fu.GetTunnelId(f))
+ }
+ }
+ }
+ // TODO: check flows in child device. Not required for the use cases being tested
+ }
+
+}
+
+func installEapolFlows(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice, lPortNos ...uint32) error {
+ requestNum := 0
+ combineCh := make(chan interface{})
+ if len(lPortNos) > 0 {
+ fmt.Println("Installing EAPOL flows on ports:", lPortNos)
+ for _, p := range lPortNos {
+ for _, lport := range lDevice.Ports {
+ if !lport.RootPort && lport.OfpPort.PortNo == p {
+ go addEAPOLFlow(stub, lDevice, lport, combineCh)
+ requestNum += 1
+ }
+ }
+ }
+ } else {
+ fmt.Println("Installing EAPOL flows on logical device ", lDevice.Id)
+ for _, lport := range lDevice.Ports {
+ if !lport.RootPort {
+ go addEAPOLFlow(stub, lDevice, lport, combineCh)
+ requestNum += 1
+ }
+ }
+
+ }
+ receivedResponse := 0
+ var err error
+ for {
+ select {
+ case res, ok := <-combineCh:
+ receivedResponse += 1
+ if !ok {
+ } else if er, ok := res.(error); ok {
+ err = er
+ }
+ }
+ if receivedResponse == requestNum {
+ break
+ }
+ }
+ return err
+}
+
+func deleteAllFlows(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice) error {
+ fmt.Println("Deleting all flows for logical device:", lDevice.Id)
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ ch := make(chan interface{})
+ defer close(ch)
+ fa := &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"table_id": uint64(ofp.OfpTable_OFPTT_ALL),
+ "cookie_mask": 0,
+ "out_port": uint64(ofp.OfpPortNo_OFPP_ANY),
+ "out_group": uint64(ofp.OfpGroup_OFPG_ANY),
+ },
+ }
+ cmd := ofp.OfpFlowModCommand_OFPFC_DELETE
+ fa.Command = &cmd
+ flowMod := fu.MkSimpleFlowMod(fu.ToOfpOxmField(fa.MatchFields), fa.Actions, fa.Command, fa.KV)
+ f := ofp.FlowTableUpdate{FlowMod: flowMod, Id: lDevice.Id}
+ _, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f)
+ return err
+}
+
+func deleteEapolFlow(stub voltha.VolthaServiceClient, lDevice *voltha.LogicalDevice, lPortNo uint32) error {
+ fmt.Println("Deleting flows from port ", lPortNo, " of logical device ", lDevice.Id)
+ ui := uuid.New()
+ var fa *fu.FlowArgs
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+ fa = &fu.FlowArgs{
+ KV: fu.OfpFlowModArgs{"priority": 2000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ fu.InPort(lPortNo),
+ fu.EthType(0x888e),
+ },
+ Actions: []*ofp.OfpAction{
+ fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
+ },
+ }
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ cmd := ofp.OfpFlowModCommand_OFPFC_DELETE
+ fa.Command = &cmd
+ f := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: lDevice.Id}
+ _, err := stub.UpdateLogicalDeviceFlowTable(ctx, &f)
+ return err
+}
+
+func runInstallEapolFlows(t *testing.T, stub voltha.VolthaServiceClient, lPortNos ...uint32) {
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ for _, ld := range allLogicalDevices {
+ err = installEapolFlows(stub, ld, lPortNos...)
+ assert.Nil(t, err)
+ }
+
+ err = refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ for _, ld := range allLogicalDevices {
+ verifyEAPOLFlows(t, ld, lPortNos...)
+ }
+}
+
+func runDeleteAllFlows(t *testing.T, stub voltha.VolthaServiceClient) {
+ fmt.Println("Removing ALL flows ...")
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ for _, ld := range allLogicalDevices {
+ err = deleteAllFlows(stub, ld)
+ assert.Nil(t, err)
+ }
+
+ err = refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ for _, ld := range allLogicalDevices {
+ verifyNOFlows(t, ld)
+ }
+}
+
+func runDeleteEapolFlows(t *testing.T, stub voltha.VolthaServiceClient, ld *voltha.LogicalDevice, lPortNos ...uint32) {
+ err := refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ if len(lPortNos) == 0 {
+ err = deleteAllFlows(stub, ld)
+ assert.Nil(t, err)
+ } else {
+ for _, lPortNo := range lPortNos {
+ err = deleteEapolFlow(stub, ld, lPortNo)
+ assert.Nil(t, err)
+ }
+ }
+
+ err = refreshLocalDeviceCache(stub)
+ assert.Nil(t, err)
+
+ for _, lde := range allLogicalDevices {
+ if lde.Id == ld.Id {
+ verifyNOFlows(t, lde, lPortNos...)
+ break
+ }
+ }
+}
+
+func createAndEnableDevices(t *testing.T) {
+ err := tu.SetAllLogLevel(stub, voltha.Logging{Level: common.LogLevel_WARNING})
+ assert.Nil(t, err)
+
+ err = tu.SetLogLevel(stub, voltha.Logging{Level: common.LogLevel_DEBUG, PackageName: "github.com/opencord/voltha-go/rw_core/core"})
+ assert.Nil(t, err)
+
+ startTime := time.Now()
+
+ //Pre-provision the parent device
+ oltDevice, err := tu.PreProvisionDevice(stub)
+ assert.Nil(t, err)
+
+ fmt.Println("Creation of ", NUM_OLTS, " OLT devices took:", time.Since(startTime))
+
+ startTime = time.Now()
+
+ //Enable all parent device - this will enable the child devices as well as validate the child devices
+ err = tu.EnableDevice(stub, oltDevice, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+
+ fmt.Println("Enabling of OLT device took:", time.Since(startTime))
+
+ // Wait until the core and adapters sync up after an enabled
+ time.Sleep(time.Duration(math.Max(10, float64(NUM_OLTS*NUM_ONUS_PER_OLT)/2)) * time.Second)
+
+ err = tu.VerifyDevices(stub, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+
+ lds, err := tu.VerifyLogicalDevices(stub, oltDevice, NUM_ONUS_PER_OLT)
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(lds.Items))
+}
+
+func TestFlowManagement(t *testing.T) {
+ //1. Test creation and activation of the devices. This will validate the devices as well as the logical device created/
+ createAndEnableDevices(t)
+
+ //2. Test installation of EAPOL flows
+ runInstallEapolFlows(t, stub)
+
+ //3. Test deletion of all EAPOL flows
+ runDeleteAllFlows(t, stub)
+
+ //4. Test installation of EAPOL flows on specific ports
+ runInstallEapolFlows(t, stub, 101, 102)
+
+ lds, err := tu.ListLogicalDevices(stub)
+ assert.Nil(t, err)
+
+ //5. Test deletion of EAPOL on a specific port for a given logical device
+ runDeleteEapolFlows(t, stub, lds.Items[0], 101)
+}
+
+func TestMain(m *testing.M) {
+ setup()
+ code := m.Run()
+ shutdown()
+ os.Exit(code)
+}
diff --git a/tests/utils/test_utils.go b/tests/utils/test_utils.go
new file mode 100644
index 0000000..52bdfbc
--- /dev/null
+++ b/tests/utils/test_utils.go
@@ -0,0 +1,499 @@
+// "build integration
+
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "fmt"
+ "os/exec"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/uuid"
+ com "github.com/opencord/voltha-go/adapters/common"
+ "github.com/opencord/voltha-protos/go/common"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ VOLTHA_SERIAL_NUMBER_KEY = "voltha_serial_number"
+)
+
+func startKafka(composePath string) error {
+ fmt.Println("Starting Kafka and Etcd ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "up", "-d")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func startEtcd(composePath string) error {
+ fmt.Println("Starting Etcd ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "up", "-d")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func stopKafka(composePath string) error {
+ fmt.Println("Stopping Kafka and Etcd ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/docker-compose-zk-kafka-test.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "down")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func stopEtcd(composePath string) error {
+ fmt.Println("Stopping Etcd ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/docker-compose-etcd.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "down")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func startCore(composePath string) error {
+ fmt.Println("Starting voltha core ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "up", "-d")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func stopCore(composePath string) error {
+ fmt.Println("Stopping voltha core ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/rw_core.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "down")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func startSimulatedOLTAndONUAdapters(composePath string) error {
+ fmt.Println("Starting simulated OLT and ONU adapters ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "up", "-d")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func stopSimulatedOLTAndONUAdapters(composePath string) error {
+ fmt.Println("Stopping simulated OLT and ONU adapters ...")
+ command := "docker-compose"
+ fileName := fmt.Sprintf("%s/adapters-simulated.yml", composePath)
+ cmd := exec.Command(command, "-f", fileName, "down")
+ if err := cmd.Run(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func ListLogicalDevices(stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ if response, err := stub.ListLogicalDevices(ctx, &empty.Empty{}); err != nil {
+ return nil, err
+ } else {
+ return response, nil
+ }
+}
+
+func getNumUniPort(ld *voltha.LogicalDevice) int {
+ num := 0
+ for _, port := range ld.Ports {
+ if !port.RootPort {
+ num += 1
+ }
+ }
+ return num
+}
+
+func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{}) {
+ if response, err := stub.CreateDevice(ctx, device); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func sendListAdapters(ctx context.Context, stub voltha.VolthaServiceClient, ch chan interface{}) {
+ if response, err := stub.ListAdapters(ctx, &empty.Empty{}); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
+ if response, err := stub.EnableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func sendDisableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
+ if response, err := stub.DisableDevice(ctx, &common.ID{Id: deviceId}); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func sendDeleteDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{}) {
+ if response, err := stub.DeleteDevice(ctx, &common.ID{Id: deviceId}); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func getDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
+ if response, err := stub.ListDevices(ctx, &empty.Empty{}); err != nil {
+ return nil, err
+ } else {
+ return response, nil
+ }
+}
+
+func getLogicalDevices(ctx context.Context, stub voltha.VolthaServiceClient) (*voltha.LogicalDevices, error) {
+ if response, err := stub.ListLogicalDevices(ctx, &empty.Empty{}); err != nil {
+ return nil, err
+ } else {
+ return response, nil
+ }
+}
+
+func IsFlowPresent(lookingFor *voltha.OfpFlowStats, flows []*voltha.OfpFlowStats) bool {
+ for _, f := range flows {
+ if f.String() == lookingFor.String() {
+ return true
+ }
+ }
+ return false
+}
+
+func ListDevices(stub voltha.VolthaServiceClient) (*voltha.Devices, error) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ if devices, err := getDevices(ctx, stub); err == nil {
+ return devices, nil
+ } else {
+ return nil, err
+ }
+}
+
+func sendFlow(ctx context.Context, stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate, ch chan interface{}) {
+ if response, err := stub.UpdateLogicalDeviceFlowTable(ctx, flow); err != nil {
+ ch <- err
+ } else {
+ ch <- response
+ }
+}
+
+func SetLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ _, err := stub.UpdateLogLevel(ctx, &l)
+ return err
+}
+
+func SetAllLogLevel(stub voltha.VolthaServiceClient, l voltha.Logging) error {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ _, err := stub.UpdateLogLevel(ctx, &l)
+ return err
+}
+
+func SetupGrpcConnectionToCore(grpcHostIP string, grpcPort int) (voltha.VolthaServiceClient, error) {
+ grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
+ fmt.Println("Connecting to voltha using:", grpcHost)
+ conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
+ if err != nil {
+ return nil, err
+ }
+ return voltha.NewVolthaServiceClient(conn), nil
+}
+
+func VerifyLogicalDevices(stub voltha.VolthaServiceClient, parentDevice *voltha.Device, numONUsPerOLT int) (*voltha.LogicalDevices, error) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ retrievedLogicalDevices, err := getLogicalDevices(ctx, stub)
+ if err != nil {
+ return nil, err
+ }
+ if len(retrievedLogicalDevices.Items) != 1 {
+ return nil, status.Errorf(codes.Internal, "Logical device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedLogicalDevices.Items))
+ }
+
+ // Verify that each device has two ports
+ for _, ld := range retrievedLogicalDevices.Items {
+ if ld.Id == "" ||
+ ld.DatapathId == uint64(0) ||
+ ld.Desc.HwDesc != "simulated_pon" ||
+ ld.Desc.SwDesc != "simulated_pon" ||
+ ld.RootDeviceId == "" ||
+ ld.Desc.SerialNum == "" ||
+ ld.SwitchFeatures.NBuffers != uint32(256) ||
+ ld.SwitchFeatures.NTables != uint32(2) ||
+ ld.SwitchFeatures.Capabilities != uint32(15) ||
+ len(ld.Ports) != 1+numONUsPerOLT ||
+ ld.RootDeviceId != parentDevice.Id {
+ return nil, status.Errorf(codes.Internal, "incorrect logical device status:{%v}", ld)
+ }
+ for _, p := range ld.Ports {
+ if p.DevicePortNo != p.OfpPort.PortNo ||
+ p.OfpPort.State != uint32(4) {
+ return nil, status.Errorf(codes.Internal, "incorrect logical ports status:{%v}", p)
+ }
+ if strings.HasPrefix(p.Id, "nni") {
+ if !p.RootPort || fmt.Sprintf("nni-%d", p.DevicePortNo) != p.Id {
+ return nil, status.Errorf(codes.Internal, "incorrect nni port status:{%v}", p)
+ }
+ } else {
+ if p.RootPort || fmt.Sprintf("uni-%d", p.DevicePortNo) != p.Id {
+ return nil, status.Errorf(codes.Internal, "incorrect uni port status:{%v}", p)
+ }
+ }
+ }
+ }
+ return retrievedLogicalDevices, nil
+}
+
+func VerifyDevices(stub voltha.VolthaServiceClient, numONUsPerOLT int) error {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ retrievedDevices, err := getDevices(ctx, stub)
+ if err != nil {
+ return err
+ }
+ if len(retrievedDevices.Items) != 1+numONUsPerOLT {
+ return status.Errorf(codes.Internal, "Device number incorrect. Expected:{%d}, Created:{%d}", 1, len(retrievedDevices.Items))
+ }
+ // Verify that each device has two ports
+ for _, d := range retrievedDevices.Items {
+ if d.AdminState != voltha.AdminState_ENABLED ||
+ d.ConnectStatus != voltha.ConnectStatus_REACHABLE ||
+ d.OperStatus != voltha.OperStatus_ACTIVE ||
+ d.Type != d.Adapter ||
+ d.Id == "" ||
+ d.MacAddress == "" ||
+ d.SerialNumber == "" {
+ return status.Errorf(codes.Internal, "incorrect device state - %s", d.Id)
+ }
+
+ if d.Type == "simulated_olt" && (!d.Root || d.ProxyAddress != nil) {
+ return status.Errorf(codes.Internal, "invalid olt status:{%v}", d)
+ } else if d.Type == "simulated_onu" && (d.Root ||
+ d.Vlan == uint32(0) ||
+ d.ParentId == "" ||
+ d.ProxyAddress.DeviceId == "" ||
+ d.ProxyAddress.DeviceType != "simulated_olt") {
+ return status.Errorf(codes.Internal, "invalid onu status:{%s}", d.Id)
+ }
+
+ if len(d.Ports) != 2 {
+ return status.Errorf(codes.Internal, "invalid number of ports:{%s, %v}", d.Id, d.Ports)
+ }
+
+ for _, p := range d.Ports {
+ if p.AdminState != voltha.AdminState_ENABLED ||
+ p.OperStatus != voltha.OperStatus_ACTIVE {
+ return status.Errorf(codes.Internal, "invalid port state:{%s, %v}", d.Id, p)
+ }
+
+ if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+ if len(p.Peers) != 0 {
+ return status.Errorf(codes.Internal, "invalid length of peers:{%s, %d}", d.Id, p.Type)
+ }
+ } else if p.Type == voltha.Port_PON_OLT {
+ if len(p.Peers) != numONUsPerOLT ||
+ p.PortNo != uint32(1) {
+ return status.Errorf(codes.Internal, "invalid length of peers for PON OLT port:{%s, %v}", d.Id, p)
+ }
+ } else if p.Type == voltha.Port_PON_ONU {
+ if len(p.Peers) != 1 ||
+ p.PortNo != uint32(1) {
+ return status.Errorf(codes.Internal, "invalid length of peers for PON ONU port:{%s, %v}", d.Id, p)
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func areAdaptersPresent(requiredAdapterNames []string, retrievedAdapters *voltha.Adapters) bool {
+ if len(requiredAdapterNames) == 0 {
+ return true
+ }
+ for _, nAName := range requiredAdapterNames {
+ found := false
+ for _, rA := range retrievedAdapters.Items {
+ if nAName == rA.Id {
+ found = true
+ break
+ }
+ }
+ if !found {
+ return false
+ }
+ }
+ return true
+}
+
+func WaitForAdapterRegistration(stub voltha.VolthaServiceClient, requiredAdapterNames []string, timeout int) (*voltha.Adapters, error) {
+ fmt.Println("Waiting for adapter registration ...")
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ ch := make(chan interface{})
+ defer close(ch)
+ for {
+ go sendListAdapters(ctx, stub, ch)
+ select {
+ case res, ok := <-ch:
+ if !ok {
+ return nil, status.Error(codes.Aborted, "channel closed")
+ } else if er, ok := res.(error); ok {
+ return nil, er
+ } else if a, ok := res.(*voltha.Adapters); ok {
+ if areAdaptersPresent(requiredAdapterNames, a) {
+ fmt.Println("All adapters registered:", a.Items)
+ return a, nil
+ }
+ }
+ case <-time.After(time.Duration(timeout) * time.Second):
+ return nil, status.Error(codes.Aborted, "timeout while waiting for adapter registration")
+ }
+ time.Sleep(1 * time.Second)
+ }
+}
+
+func PreProvisionDevice(stub voltha.VolthaServiceClient) (*voltha.Device, error) {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
+ device := &voltha.Device{Type: "simulated_olt", MacAddress: randomMacAddress}
+ ch := make(chan interface{})
+ defer close(ch)
+ go sendCreateDeviceRequest(ctx, stub, device, ch)
+ res, ok := <-ch
+ if !ok {
+ return nil, status.Error(codes.Aborted, "channel closed")
+ } else if er, ok := res.(error); ok {
+ return nil, er
+ } else if d, ok := res.(*voltha.Device); ok {
+ return d, nil
+ }
+ return nil, status.Errorf(codes.Unknown, "cannot provision device:{%v}", device)
+}
+
+func EnableDevice(stub voltha.VolthaServiceClient, device *voltha.Device, numONUs int) error {
+ if device.AdminState == voltha.AdminState_PREPROVISIONED {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ ch := make(chan interface{})
+ defer close(ch)
+ go sendEnableDeviceRequest(ctx, stub, device.Id, ch)
+ res, ok := <-ch
+ if !ok {
+ return status.Error(codes.Aborted, "channel closed")
+ } else if er, ok := res.(error); ok {
+ return er
+ } else if _, ok := res.(*empty.Empty); ok {
+ return nil
+ }
+ }
+ return status.Errorf(codes.Unknown, "cannot enable device:{%s}", device.Id)
+}
+
+func UpdateFlow(stub voltha.VolthaServiceClient, flow *ofp.FlowTableUpdate) error {
+ ui := uuid.New()
+ ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(VOLTHA_SERIAL_NUMBER_KEY, ui.String()))
+ ch := make(chan interface{})
+ defer close(ch)
+ go sendFlow(ctx, stub, flow, ch)
+ res, ok := <-ch
+ if !ok {
+ return status.Error(codes.Aborted, "channel closed")
+ } else if er, ok := res.(error); ok {
+ return er
+ } else if _, ok := res.(*empty.Empty); ok {
+ return nil
+ }
+ return status.Errorf(codes.Unknown, "cannot add flow:{%v}", flow)
+}
+
+func StartSimulatedEnv(composePath string) error {
+ fmt.Println("Starting simulated environment ...")
+ // Start kafka and Etcd
+ if err := startKafka(composePath); err != nil {
+ return err
+ }
+ if err := startEtcd(composePath); err != nil {
+ return err
+ }
+ time.Sleep(5 * time.Second)
+
+ //Start the simulated adapters
+ if err := startSimulatedOLTAndONUAdapters(composePath); err != nil {
+ return err
+ }
+
+ //Start the core
+ if err := startCore(composePath); err != nil {
+ return err
+ }
+
+ time.Sleep(10 * time.Second)
+
+ fmt.Println("Simulated environment started.")
+ return nil
+}
+
+func StopSimulatedEnv(composePath string) error {
+ stopSimulatedOLTAndONUAdapters(composePath)
+ stopCore(composePath)
+ stopKafka(composePath)
+ stopEtcd(composePath)
+ return nil
+}