[VOL-1442] This commit handles adapter registration end to end.
It introduces an adapter manager to handle all adapter admin
requests. Some protos have been cleaned as well.
Change-Id: If75d4f7665c03e841d57f5621c30301940d04d93
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
new file mode 100644
index 0000000..c0ec1e9
--- /dev/null
+++ b/rw_core/core/adapter_manager.go
@@ -0,0 +1,370 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "reflect"
+ "sync"
+)
+
+const (
+ SENTINEL_ADAPTER_ID = "adapter_sentinel"
+ SENTINEL_DEVICETYPE_ID = "device_type_sentinel"
+
+)
+
+type AdapterAgent struct {
+ adapter *voltha.Adapter
+ deviceTypes map[string]*voltha.DeviceType
+ lock sync.RWMutex
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *AdapterAgent {
+ var adapterAgent AdapterAgent
+ adapterAgent.adapter = adapter
+ adapterAgent.lock = sync.RWMutex{}
+ adapterAgent.deviceTypes = make(map[string]*voltha.DeviceType)
+ if deviceTypes != nil {
+ for _, dType := range deviceTypes.Items {
+ adapterAgent.deviceTypes[dType.Id] = dType
+ }
+ }
+ return &adapterAgent
+}
+
+// Returns true if this device agent can handle this device Type
+func (aa *AdapterAgent) handlesDeviceType(deviceType string) bool {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ _, exist := aa.deviceTypes[deviceType]
+ return exist
+}
+
+func (aa *AdapterAgent) getDeviceType(deviceType string) *voltha.DeviceType {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ if _, exist := aa.deviceTypes[deviceType]; exist {
+ return aa.deviceTypes[deviceType]
+ }
+ return nil
+}
+
+func (aa *AdapterAgent) getAdapter() *voltha.Adapter {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ log.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
+ return aa.adapter
+}
+
+func (aa *AdapterAgent) updateAdapter(adapter *voltha.Adapter) {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ aa.adapter = adapter
+}
+
+func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
+ aa.lock.RLock()
+ defer aa.lock.RUnlock()
+ aa.deviceTypes[deviceType.Id] = deviceType
+}
+
+type AdapterManager struct {
+ adapterAgents map[string]*AdapterAgent
+ deviceTypeToAdapterMap map[string]string
+ clusterDataProxy *model.Proxy
+ adapterProxy *model.Proxy
+ deviceTypeProxy *model.Proxy
+ coreInstanceId string
+ exitChannel chan int
+ lockAdaptersMap sync.RWMutex
+ lockdDeviceTypeToAdapterMap sync.RWMutex
+}
+
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceId string) *AdapterManager {
+ var adapterMgr AdapterManager
+ adapterMgr.exitChannel = make(chan int, 1)
+ adapterMgr.coreInstanceId = coreInstanceId
+ adapterMgr.clusterDataProxy = cdProxy
+ adapterMgr.adapterAgents = make(map[string]*AdapterAgent)
+ adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
+ adapterMgr.lockAdaptersMap = sync.RWMutex{}
+ adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
+ return &adapterMgr
+}
+
+func (aMgr *AdapterManager) start(ctx context.Context) {
+ log.Info("starting-adapter-manager")
+
+ // Load the existing adapterAgents and device types - this will also ensure the correct paths have been
+ // created if there are no data in the dB to start
+ aMgr.loadAdaptersAndDevicetypesInMemory()
+
+ //// Create the proxies
+ aMgr.adapterProxy = aMgr.clusterDataProxy.Root.CreateProxy("/adapters", false)
+ aMgr.deviceTypeProxy = aMgr.clusterDataProxy.Root.CreateProxy("/device_types", false)
+
+ // Register the callbacks
+ aMgr.adapterProxy.RegisterCallback(model.POST_UPDATE, aMgr.adapterUpdated)
+ aMgr.deviceTypeProxy.RegisterCallback(model.POST_UPDATE, aMgr.deviceTypesUpdated)
+
+ log.Info("adapter-manager-started")
+}
+
+func (aMgr *AdapterManager) stop(ctx context.Context) {
+ log.Info("stopping-device-manager")
+ aMgr.exitChannel <- 1
+ log.Info("device-manager-stopped")
+}
+
+//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
+ // Load the adapters
+ if adaptersIf := aMgr.clusterDataProxy.Get("/adapters", 0, false, ""); adaptersIf != nil {
+ for _, adapterIf := range adaptersIf.([]interface{}) {
+ if adapter, ok := adapterIf.(*voltha.Adapter); ok {
+ log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
+ aMgr.addAdapter(adapter, false)
+ }
+ }
+ } else {
+ log.Debug("no-existing-adapter-found")
+ // No adapter data. In order to have a proxy setup for that path let's create a fake adapter
+ aMgr.addAdapter(&voltha.Adapter{Id: SENTINEL_ADAPTER_ID}, true)
+ }
+
+ // Load the device types
+ if deviceTypesIf := aMgr.clusterDataProxy.Get("/device_types", 0, false, ""); deviceTypesIf != nil {
+ dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
+ for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
+ if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
+ log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+ dTypes.Items = append(dTypes.Items, dType)
+ }
+ }
+ aMgr.addDeviceTypes(dTypes, false)
+ } else {
+ log.Debug("no-existing-device-type-found")
+ // No device types data. In order to have a proxy setup for that path let's create a fake device type
+ aMgr.addDeviceTypes(&voltha.DeviceTypes{Items:[]*voltha.DeviceType{&voltha.DeviceType{Id:SENTINEL_DEVICETYPE_ID, Adapter:SENTINEL_ADAPTER_ID}}}, true)
+ }
+}
+
+func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ log.Debugw("adding-adapter", log.Fields{"adapter": adapter})
+ if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
+ clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, nil)
+ if saveToDb {
+ // Save the adapter to the KV store - first check if it already exist
+ if kvAdapter := aMgr.clusterDataProxy.Get("/adapters/"+adapter.Id, 0, false, ""); kvAdapter == nil {
+ if added := aMgr.clusterDataProxy.AddWithID("/adapters", adapter.Id, clonedAdapter, ""); added == nil {
+ //TODO: Errors when saving to KV would require a separate go routine to be launched and try the saving again
+ log.Errorw("failed-to-save-adapter", log.Fields{"adapter": adapter})
+ } else {
+ log.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapter": adapter})
+ }
+ }
+ }
+ }
+}
+
+
+func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) {
+ if deviceTypes == nil {
+ return
+ }
+ log.Debugw("adding-device-types", log.Fields{"deviceTypes": deviceTypes})
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ aMgr.lockdDeviceTypeToAdapterMap.Lock()
+ defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ for _, deviceType := range deviceTypes.Items {
+ clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
+ if adapterAgent, exist := aMgr.adapterAgents[clonedDType.Adapter]; exist {
+ adapterAgent.updateDeviceType(clonedDType)
+ } else {
+ log.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
+ aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id:clonedDType.Adapter}, deviceTypes)
+ }
+ aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
+ }
+ if saveToDb {
+ // Save the device types to the KV store as well
+ for _, deviceType := range deviceTypes.Items {
+ if dType := aMgr.clusterDataProxy.Get("/device_types/"+deviceType.Id, 0, false, ""); dType == nil {
+ // Does not exist - save it
+ clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
+ if added := aMgr.clusterDataProxy.AddWithID("/device_types", deviceType.Id, clonedDType, ""); added == nil {
+ log.Errorw("failed-to-save-deviceType", log.Fields{"deviceType": deviceType})
+ } else {
+ log.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
+ }
+ }
+ }
+ }
+}
+
+func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
+ result := &voltha.Adapters{Items:[]*voltha.Adapter{}}
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if a := adapterAgent.getAdapter(); a != nil {
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ }
+ }
+ return result, nil
+}
+
+func (aMgr *AdapterManager) deleteAdapter(adapterId string) {
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ delete(aMgr.adapterAgents, adapterId)
+}
+
+func (aMgr *AdapterManager) getAdapter(adapterId string) *voltha.Adapter {
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapterId]; ok {
+ return adapterAgent.getAdapter()
+ }
+ return nil
+}
+
+//updateAdapter updates an adapter if it exist. Otherwise, it creates it.
+func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter) {
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapter.Id]; ok {
+ adapterAgent.updateAdapter(adapter)
+ } else {
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(adapter, nil)
+ }
+}
+
+//updateDeviceType updates an adapter if it exist. Otherwise, it creates it.
+func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType) {
+ aMgr.lockAdaptersMap.Lock()
+ defer aMgr.lockAdaptersMap.Unlock()
+ aMgr.lockdDeviceTypeToAdapterMap.Lock()
+ defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; !exist {
+ adapterAgent.updateDeviceType(deviceType)
+ } else {
+ aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
+ &voltha.DeviceTypes{Items:[]*voltha.DeviceType{deviceType}})
+ }
+ aMgr.deviceTypeToAdapterMap[deviceType.Id] = deviceType.Adapter
+}
+
+func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *voltha.CoreInstance {
+ log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
+
+ if aMgr.getAdapter(adapter.Id) != nil {
+ // Already registered
+ return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+ }
+ // Save the adapter and the device types
+ aMgr.addAdapter(adapter, true)
+ aMgr.addDeviceTypes(deviceTypes, true)
+
+ log.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
+
+ return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+}
+
+//getAdapterName returns the name of the device adapter that service this device type
+func (aMgr *AdapterManager) getAdapterName(deviceType string) (string, error) {
+ aMgr.lockdDeviceTypeToAdapterMap.Lock()
+ defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
+ return adapterId, nil
+ }
+ return "", errors.New(fmt.Sprintf("Adapter-not-registered-for-device-type %s", deviceType))
+}
+
+// getDeviceType returns the device type proto definition given the name of the device type
+func (aMgr *AdapterManager) getDeviceType(deviceType string) *voltha.DeviceType {
+ aMgr.lockdDeviceTypeToAdapterMap.Lock()
+ defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
+ if adapterAgent, _ := aMgr.adapterAgents[adapterId]; adapterAgent != nil {
+ return adapterAgent.getDeviceType(deviceType)
+ }
+ }
+ return nil
+}
+
+//adapterUpdated is a callback invoked when an adapter change has been noticed
+func (aMgr *AdapterManager) adapterUpdated(args ...interface{}) interface{} {
+ log.Debugw("updateAdapter-callback", log.Fields{"argsLen": len(args)})
+
+ var previousData *voltha.Adapters
+ var latestData *voltha.Adapters
+
+ var ok bool
+ if previousData, ok = args[0].(*voltha.Adapters); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*voltha.Adapters); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
+
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("update-not-required")
+ return nil
+ }
+
+ for _, adapter := range latestData.Items {
+ aMgr.updateAdapter(adapter)
+ }
+ return nil
+}
+
+//deviceTypesUpdated is a callback invoked when a device type change has been noticed
+func (aMgr *AdapterManager) deviceTypesUpdated(args ...interface{}) interface{} {
+ log.Debugw("deviceTypesUpdated-callback", log.Fields{"argsLen": len(args)})
+
+ var previousData *voltha.DeviceTypes
+ var latestData *voltha.DeviceTypes
+
+ var ok bool
+ if previousData, ok = args[0].(*voltha.DeviceTypes); !ok {
+ log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+ }
+ if latestData, ok = args[1].(*voltha.DeviceTypes); !ok {
+ log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+ }
+
+ if reflect.DeepEqual(previousData.Items, latestData.Items) {
+ log.Debug("update-not-required")
+ return nil
+ }
+
+ for _, dType := range latestData.Items {
+ aMgr.updateDeviceType(dType)
+ }
+ return nil
+}
\ No newline at end of file
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 9d029fc..9ddda85 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -54,22 +54,10 @@
}
}
-//func kafka.CreateSubTopic(args ...string) kafka.Topic{
-// topic := ""
-// for index , arg := range args {
-// if index == 0 {
-// topic = arg
-// } else {
-// topic = fmt.Sprintf("%s_%s", topic, arg)
-// }
-// }
-// return kafka.Topic{Name:topic}
-//}
-
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
rpc := "adopt_device"
- topic := kafka.Topic{Name: device.Type}
+ topic := kafka.Topic{Name: device.Adapter}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -83,14 +71,6 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
- //if success {
- // // From now on, any unsolicited requests from the adapters for this device will come over the device topic.
- // // We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
- // if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
- // log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
- // return err
- // }
- //}
return unPackResponse(rpc, device.Id, success, result)
}
@@ -99,7 +79,7 @@
rpc := "disable_device"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -115,7 +95,7 @@
func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
rpc := "reenable_device"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -131,7 +111,7 @@
func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
rpc := "reboot_device"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -147,7 +127,7 @@
func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
rpc := "delete_device"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -169,7 +149,7 @@
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "device",
@@ -200,7 +180,7 @@
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -269,7 +249,7 @@
func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "download_image"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -290,7 +270,7 @@
func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "get_image_download_status"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -327,7 +307,7 @@
func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "cancel_image_download"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -348,7 +328,7 @@
func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "activate_image_update"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -369,7 +349,7 @@
func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
rpc := "revert_image_update"
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
args := make([]*kafka.KVArg, 2)
args[0] = &kafka.KVArg{
Key: "device",
@@ -422,7 +402,7 @@
func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
@@ -447,7 +427,7 @@
func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
- toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
rpc := "update_flows_bulk"
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 6893179..98cc688 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -33,17 +33,19 @@
coreInstanceId string
deviceMgr *DeviceManager
lDeviceMgr *LogicalDeviceManager
+ adapterMgr *AdapterManager
localDataProxy *model.Proxy
clusterDataProxy *model.Proxy
}
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
proxy.lDeviceMgr = ldMgr
proxy.clusterDataProxy = cdProxy
proxy.localDataProxy = ldProxy
+ proxy.adapterMgr = aMgr
return &proxy
}
@@ -70,12 +72,11 @@
}
}
log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
- // TODO process the request and store the data in the KV store
if rhp.TestMode { // Execute only for test cases
return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
}
- return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
+ return rhp.adapterMgr.registerAdapter(adapter, deviceTypes), nil
}
func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1496200..c81141b 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -33,6 +33,7 @@
logicalDeviceMgr *LogicalDeviceManager
grpcServer *grpcserver.GrpcServer
grpcNBIAPIHandler *APIHandler
+ adapterMgr *AdapterManager
config *config.RWCoreFlags
kmp *kafka.InterContainerProxy
clusterDataRoot model.Root
@@ -80,14 +81,16 @@
log.Fatal("Failure-starting-kafkaMessagingProxy")
}
log.Info("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.instanceId)
+ core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
+ core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+ if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
+ go core.startAdapterManager(ctx)
log.Info("adaptercore-started")
}
@@ -110,7 +113,7 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
+ core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
@@ -152,8 +155,8 @@
}
func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
- cdProxy *model.Proxy, ldProxy *model.Proxy) error {
- requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
+ aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
+ requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
log.Info("request-handlers")
@@ -164,13 +167,19 @@
// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
// callbacks. For now, until the model is ready, devicemanager will keep a reference to the
// logicaldevicemanager to initiate the creation of logical devices
- log.Info("starting-DeviceManager")
+ log.Info("DeviceManager-Starting...")
core.deviceMgr.start(ctx, core.logicalDeviceMgr)
- log.Info("started-DeviceManager")
+ log.Info("DeviceManager-Started")
}
func (core *Core) startLogicalDeviceManager(ctx context.Context) {
- log.Info("starting-Logical-DeviceManager")
+ log.Info("Logical-DeviceManager-Starting...")
core.logicalDeviceMgr.start(ctx)
- log.Info("started-Logical-DeviceManager")
+ log.Info("Logical-DeviceManager-Started")
}
+
+func (core *Core) startAdapterManager(ctx context.Context) {
+ log.Info("Adapter-Manager-Starting...")
+ core.adapterMgr.start(ctx)
+ log.Info("Adapter-Manager-Started")
+}
\ No newline at end of file
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 31720c1..8bf8664 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -36,6 +36,7 @@
deviceType string
lastData *voltha.Device
adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
deviceMgr *DeviceManager
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
@@ -66,6 +67,7 @@
agent.deviceType = cloned.Type
agent.lastData = cloned
agent.deviceMgr = deviceMgr
+ agent.adapterMgr = deviceMgr.adapterMgr
agent.exitChannel = make(chan int, 1)
agent.clusterDataProxy = cdProxy
agent.lockDevice = sync.RWMutex{}
@@ -136,9 +138,20 @@
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+
if device, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
+ // First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
+ // pre-provisionned with the required adapter not registered. At this stage, since we need to communicate
+ // with the adapter then we need to know the adapter that will handle this request
+ if adapterName, err := agent.adapterMgr.getAdapterName(device.Type); err != nil {
+ log.Warnw("no-adapter-registered-for-device-type", log.Fields{"deviceType": device.Type, "deviceAdapter": device.Adapter})
+ return err
+ } else {
+ device.Adapter = adapterName
+ }
+
if device.AdminState == voltha.AdminState_ENABLED {
log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
//TODO: Needs customized error message
@@ -795,7 +808,7 @@
}
//flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapters
+//to the adapterAgents
func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -829,11 +842,9 @@
}
groups := device.FlowGroups
- // Send update to adapters
- // TODO: Check whether the device supports incremental flow changes
- // Assume false for test
- acceptsAddRemoveFlowUpdates := false
- if !acceptsAddRemoveFlowUpdates {
+ // Send update to adapterAgents
+ dType := agent.adapterMgr.getDeviceType(device.Type)
+ if !dType.AcceptsAddRemoveFlowUpdates {
if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
@@ -871,7 +882,7 @@
}
//groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapters
+//to the adapterAgents
func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
@@ -905,7 +916,7 @@
}
flows := device.Flows
- // Send update to adapters
+ // Send update to adapterAgents
// TODO: Check whether the device supports incremental flow changes
// Assume false for test
acceptsAddRemoveFlowUpdates := false
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 76475f9..376433b 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,6 +34,7 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
adapterProxy *AdapterProxy
+ adapterMgr *AdapterManager
logicalDeviceMgr *LogicalDeviceManager
kafkaICProxy *kafka.InterContainerProxy
stateTransitions *TransitionMap
@@ -43,7 +44,7 @@
lockDeviceAgentsMap sync.RWMutex
}
-func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, coreInstanceId string) *DeviceManager {
+func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, adapterMgr *AdapterManager, coreInstanceId string) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
@@ -51,6 +52,7 @@
deviceMgr.kafkaICProxy = kafkaICProxy
deviceMgr.coreInstanceId = coreInstanceId
deviceMgr.clusterDataProxy = cdProxy
+ deviceMgr.adapterMgr= adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index c65178d..877fcb4 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -47,6 +47,7 @@
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
+ adapterMgr *AdapterManager
packetInQueue *queue.Queue
coreInCompetingMode bool
longRunningRequestTimeout int64
@@ -54,10 +55,11 @@
da.DefaultAPIHandler
}
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
+func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, adapterMgr *AdapterManager, inCompetingMode bool, longRunningRequestTimeout int64, defaultRequestTimeout int64 ) *APIHandler {
handler := &APIHandler{
deviceMgr: deviceMgr,
logicalDeviceMgr: lDeviceMgr,
+ adapterMgr:adapterMgr,
coreInCompetingMode:inCompetingMode,
longRunningRequestTimeout:longRunningRequestTimeout,
defaultRequestTimeout:defaultRequestTimeout,
@@ -301,6 +303,13 @@
return handler.logicalDeviceMgr.listLogicalDevices()
}
+
+// ListAdapters returns the contents of all adapters known to the system
+func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
+ log.Debug("ListDevices")
+ return handler.adapterMgr.listAdapters(ctx)
+}
+
// ListLogicalDevicePorts must be implemented in the read-only containers - should it also be implemented here?
func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 19eb6a4..18bc30a 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -291,7 +291,7 @@
// Update the device routes - let it run in its own go routine as it can take time
go agent.updateRoutes()
}
- return status.Errorf(codes.NotFound, "%s", childDevice.Id)
+ return nil
}
func (ldMgr *LogicalDeviceManager) updateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {