This commit consists of:
1) Update the voltha.proto to remove duplicates between the voltha message and
the CoreInstance. Since all data will be stored into the clustered KV store
then it makes sense to use a clustered proto message instead of core specific.
Each core will hold a subset of the data, only those it is actively or passively
managing.
2) Add a Makefile into the adapters directory to clearly separate the build of
adapters to the core build. This is work in progress.
3) Add an initial readme.md into the adapters directory to show how to run ponsim
olt and onu adapters in containers
4) Minor cleanup, mostly around name consistency.
Change-Id: I55155c41b56e95877f8735b536873a87d6ca63b1
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index f9e42ef..06f3ca3 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -63,8 +63,8 @@
log.Info("starting-core")
core.startKafkaMessagingProxy(ctx)
log.Info("values", log.Fields{"kmp": core.kmp})
- core.deviceMgr = NewDeviceManager(core.kmp, core.localDataProxy)
- core.logicalDeviceMgr = NewLogicalDeviceManager(core.deviceMgr, core.kmp, core.localDataProxy)
+ core.deviceMgr = NewDeviceManager(core.kmp, core.clusterDataProxy)
+ core.logicalDeviceMgr = NewLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
core.registerAdapterRequestHandler(ctx, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
@@ -86,7 +86,7 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHanfler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
+ core.grpcNBIAPIHanfler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
voltha.RegisterVolthaServiceServer(
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index aa13748..805dd21 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -28,15 +28,15 @@
)
type DeviceAgent struct {
- deviceId string
- lastData *voltha.Device
- adapterProxy *AdapterProxy
- deviceMgr *DeviceManager
- localDataProxy *model.Proxy
- exitChannel chan int
+ deviceId string
+ lastData *voltha.Device
+ adapterProxy *AdapterProxy
+ deviceMgr *DeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
}
-func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, ldProxy *model.Proxy) *DeviceAgent {
+func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
var agent DeviceAgent
device.Id = CreateDeviceId()
agent.deviceId = device.Id
@@ -44,14 +44,14 @@
agent.lastData = device
agent.deviceMgr = deviceMgr
agent.exitChannel = make(chan int, 1)
- agent.localDataProxy = ldProxy
+ agent.clusterDataProxy = cdProxy
return &agent
}
func (agent *DeviceAgent) start(ctx context.Context) {
log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
// Add the initial device to the local model
- if added := agent.localDataProxy.Add("/devices", agent.lastData, ""); added == nil {
+ if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
log.Debug("device-agent-started")
@@ -72,7 +72,7 @@
cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if afterUpdate := agent.localDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
} else {
if err := agent.adapterProxy.AdoptDevice(ctx, &cloned); err != nil {
@@ -136,7 +136,7 @@
} else {
// store the changed data
cloned := (proto.Clone(device)).(*voltha.Device)
- afterUpdate := agent.localDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -164,7 +164,7 @@
}
log.Debugw("DeviceStateUpdate-device", log.Fields{"device": cloned})
// Store the device
- if afterUpdate := agent.localDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
// Perform the state transition
@@ -187,7 +187,7 @@
cp := proto.Clone(pmConfigs)
cloned.PmConfigs = cp.(*voltha.PmConfigs)
// Store the device
- afterUpdate := agent.localDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -210,7 +210,7 @@
cp := proto.Clone(port)
cloned.Ports = append(cloned.Ports, cp.(*voltha.Port))
// Store the device
- afterUpdate := agent.localDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -250,7 +250,7 @@
log.Debugw("update-field-status", log.Fields{"device": storeDevice, "name": name, "updated": updated})
// Save the data
cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
- if afterUpdate := agent.localDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index a6c0c8d..fd18c10 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -36,18 +36,18 @@
logicalDeviceMgr *LogicalDeviceManager
kafkaProxy *kafka.KafkaMessagingProxy
stateTransitions *TransitionMap
- localDataProxy *model.Proxy
+ clusterDataProxy *model.Proxy
exitChannel chan int
lockDeviceAgentsMap sync.RWMutex
}
-func NewDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, ldProxy *model.Proxy) *DeviceManager {
+func NewDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
var deviceMgr DeviceManager
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
deviceMgr.adapterProxy = NewAdapterProxy(kafkaProxy)
deviceMgr.kafkaProxy = kafkaProxy
- deviceMgr.localDataProxy = ldProxy
+ deviceMgr.clusterDataProxy = cdProxy
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
@@ -98,7 +98,7 @@
log.Debugw("createDevice-start", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.localDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
agent.start(ctx)
@@ -122,7 +122,7 @@
func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
log.Debugw("getDevice-start", log.Fields{"deviceid": id})
- if device := dMgr.localDataProxy.Get("/devices/"+id, 1, false, ""); device == nil {
+ if device := dMgr.clusterDataProxy.Get("/devices/"+id, 1, false, ""); device == nil {
return nil, status.Errorf(codes.NotFound, "%s", id)
} else {
cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
@@ -136,7 +136,7 @@
dMgr.lockDeviceAgentsMap.Lock()
defer dMgr.lockDeviceAgentsMap.Unlock()
for _, agent := range dMgr.deviceAgents {
- if device := dMgr.localDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := dMgr.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
result.Items = append(result.Items, &cloned)
}
@@ -215,7 +215,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{ChannelId: uint32(channelId)}
// Create and start a device agent for that device
- agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.localDataProxy)
+ agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
dMgr.addDeviceAgentToMap(agent)
agent.start(nil)
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 6af73cd..bd28322 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -21,7 +21,6 @@
"github.com/golang/protobuf/ptypes/empty"
da "github.com/opencord/voltha-go/common/core/northbound/grpc"
"github.com/opencord/voltha-go/common/log"
- "github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/protos/common"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
@@ -33,16 +32,12 @@
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
- clusterDataProxy *model.Proxy
- localDataProxy *model.Proxy
da.DefaultAPIHandler
}
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *APIHandler {
+func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
handler := &APIHandler{deviceMgr: deviceMgr,
- logicalDeviceMgr: lDeviceMgr,
- clusterDataProxy: cdProxy,
- localDataProxy: ldProxy}
+ logicalDeviceMgr: lDeviceMgr}
return handler
}
func isTestMode(ctx context.Context) bool {
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
index d5aebd5..b28151f 100644
--- a/rw_core/core/id.go
+++ b/rw_core/core/id.go
@@ -44,7 +44,7 @@
return val
}
-// CreateLogicalPortId produces a random port ID for a logical device.
+// CreateLogicalPortId produces a random port ID for a logical device.
func CreateLogicalPortId() uint32 {
// A logical port is a uint32
return m.Uint32()
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 5a9562a..117c869 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -29,23 +29,23 @@
)
type LogicalDeviceAgent struct {
- logicalDeviceId string
- lastData *voltha.LogicalDevice
- rootDeviceId string
- deviceMgr *DeviceManager
- ldeviceMgr *LogicalDeviceManager
- localDataProxy *model.Proxy
- exitChannel chan int
+ logicalDeviceId string
+ lastData *voltha.LogicalDevice
+ rootDeviceId string
+ deviceMgr *DeviceManager
+ ldeviceMgr *LogicalDeviceManager
+ clusterDataProxy *model.Proxy
+ exitChannel chan int
}
func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
- ldProxy *model.Proxy) *LogicalDeviceAgent {
+ cdProxy *model.Proxy) *LogicalDeviceAgent {
var agent LogicalDeviceAgent
agent.exitChannel = make(chan int, 1)
agent.logicalDeviceId = id
agent.rootDeviceId = device.Id
agent.deviceMgr = deviceMgr
- agent.localDataProxy = ldProxy
+ agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
return &agent
}
@@ -83,7 +83,7 @@
ld.Ports = append(ld.Ports, lp)
}
// Save the logical device
- if added := agent.localDataProxy.Add("/logical_devices", ld, ""); added == nil {
+ if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
} else {
log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -108,7 +108,7 @@
cloned := reflect.ValueOf(ldevice).Elem().Interface().(voltha.LogicalDevice)
lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
cloned.Ports = append(cloned.Ports, lp)
- afterUpdate := agent.localDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, &cloned, false, "")
+ afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, &cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-add-UNI-port:%s", agent.logicalDeviceId)
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 61f96e8..aa22d57 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -34,18 +34,18 @@
deviceMgr *DeviceManager
adapterProxy *AdapterProxy
kafkaProxy *kafka.KafkaMessagingProxy
- localDataProxy *model.Proxy
+ clusterDataProxy *model.Proxy
exitChannel chan int
lockLogicalDeviceAgentsMap sync.RWMutex
}
-func NewLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, ldProxy *model.Proxy) *LogicalDeviceManager {
+func NewLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
logicalDeviceMgr.exitChannel = make(chan int, 1)
logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
logicalDeviceMgr.deviceMgr = deviceMgr
logicalDeviceMgr.kafkaProxy = kafkaProxy
- logicalDeviceMgr.localDataProxy = ldProxy
+ logicalDeviceMgr.clusterDataProxy = cdProxy
logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
return &logicalDeviceMgr
}
@@ -80,7 +80,7 @@
func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
log.Debugw("getlogicalDevice-start", log.Fields{"logicaldeviceid": id})
- logicalDevice := ldMgr.localDataProxy.Get("/logical_devices/"+id, 1, false, "")
+ logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+id, 1, false, "")
if logicalDevice != nil {
cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
return &cloned, nil
@@ -94,7 +94,7 @@
ldMgr.lockLogicalDeviceAgentsMap.Lock()
defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
for _, agent := range ldMgr.logicalDeviceAgents {
- logicalDevice := ldMgr.localDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+ logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
if logicalDevice != nil {
cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
result.Items = append(result.Items, &cloned)
@@ -118,7 +118,7 @@
id := strings.Replace(macAddress, ":", "", -1)
log.Debugw("setting-logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.localDataProxy)
+ agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.Start(ctx)