[VOL-1037, VOL-1035] This commit consists of flow and groups
handling (from NBI to Adapters, including decomposition),
Change-Id: I4f6d9ecd3dee8a9b161708b20b0a68d030c0cb23
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 82c6dec..643c9de 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -22,6 +22,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -249,14 +250,50 @@
return nil, nil
}
-func (ap *AdapterProxy) UpdateFlowsBulk(device voltha.Device, flows voltha.Flows, groups voltha.FlowGroups) error {
- log.Debug("UpdateFlowsBulk")
- return nil
+func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+ log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ rpc := "update_flows_bulk"
+ args := make([]*kafka.KVArg, 3)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "flows",
+ Value: flows,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "groups",
+ Value: groups,
+ }
+
+ success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) UpdateFlowsIncremental(device voltha.Device, flowChanges voltha.Flows, groupChanges voltha.FlowGroups) error {
- log.Debug("UpdateFlowsIncremental")
- return nil
+func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
+ log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
+ topic := kafka.Topic{Name: device.Type}
+ rpc := "update_flows_bulk"
+ args := make([]*kafka.KVArg, 3)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "flow_changes",
+ Value: flowChanges,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "group_changes",
+ Value: groupChanges,
+ }
+
+ success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) UpdatePmConfig(device voltha.Device, pmConfigs voltha.PmConfigs) error {