[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase
Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index 2143a84..f2f0e8a 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -17,6 +17,7 @@
package adapter
import (
+ "context"
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -30,13 +31,13 @@
lock sync.RWMutex
}
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func newAdapterAgent(ctx context.Context, adapter *voltha.Adapter) *agent {
return &agent{
adapter: adapter,
}
}
-func (aa *agent) getAdapter() *voltha.Adapter {
+func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
aa.lock.RLock()
defer aa.lock.RUnlock()
logger.Debugw("getAdapter", log.Fields{"adapter": aa.adapter})
@@ -45,7 +46,7 @@
// updateCommunicationTime updates the message to the specified time.
// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
-func (aa *agent) updateCommunicationTime(new time.Time) {
+func (aa *agent) updateCommunicationTime(ctx context.Context, new time.Time) {
// only update if new time is not in the future, and either the old time is invalid or new time > old time
if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
timestamp, err := ptypes.TimestampProto(new)
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index b552d8f..99f445b 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -44,14 +44,14 @@
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func NewAdapterManager(cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+func NewAdapterManager(ctx context.Context, cdProxy *model.Proxy, coreInstanceID string, kafkaClient kafka.Client) *Manager {
aMgr := &Manager{
coreInstanceID: coreInstanceID,
clusterDataProxy: cdProxy,
deviceTypes: make(map[string]*voltha.DeviceType),
adapterAgents: make(map[string]*agent),
}
- kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+ kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
return aMgr
}
@@ -59,7 +59,7 @@
// if more than one callback is required, this should be converted to a proper interface
type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
-func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+func (aMgr *Manager) SetAdapterRestartedCallback(ctx context.Context, onAdapterRestart adapterRestartedHandler) {
aMgr.onAdapterRestart = onAdapterRestart
}
@@ -69,7 +69,7 @@
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
// created if there are no data in the dB to start
- err := aMgr.loadAdaptersAndDevicetypesInMemory()
+ err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
if err != nil {
logger.Fatalf("failed-to-load-adapters-and-device-types-in-memory: %s", err)
}
@@ -79,7 +79,7 @@
}
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
// Load the adapters
var adapters []*voltha.Adapter
if err := aMgr.clusterDataProxy.List(context.Background(), "adapters", &adapters); err != nil {
@@ -88,7 +88,7 @@
}
if len(adapters) != 0 {
for _, adapter := range adapters {
- if err := aMgr.addAdapter(adapter, false); err != nil {
+ if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
logger.Errorw("failed to add adapter", log.Fields{"adapterId": adapter.Id})
} else {
logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
@@ -108,7 +108,7 @@
logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
dTypes.Items = append(dTypes.Items, dType)
}
- return aMgr.addDeviceTypes(dTypes, false)
+ return aMgr.addDeviceTypes(ctx, dTypes, false)
}
logger.Debug("no-existing-device-type-found")
@@ -116,17 +116,17 @@
return nil
}
-func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp time.Time) {
+func (aMgr *Manager) updateLastAdapterCommunication(ctx context.Context, adapterID string, timestamp time.Time) {
aMgr.lockAdaptersMap.RLock()
adapterAgent, have := aMgr.adapterAgents[adapterID]
aMgr.lockAdaptersMap.RUnlock()
if have {
- adapterAgent.updateCommunicationTime(timestamp)
+ adapterAgent.updateCommunicationTime(ctx, timestamp)
}
}
-func (aMgr *Manager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
+func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
@@ -153,12 +153,12 @@
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
- aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(ctx, clonedAdapter)
}
return nil
}
-func (aMgr *Manager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
+func (aMgr *Manager) addDeviceTypes(ctx context.Context, deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
if deviceTypes == nil {
return fmt.Errorf("no-device-type")
}
@@ -195,28 +195,28 @@
}
// ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(_ context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
result := &voltha.Adapters{Items: []*voltha.Adapter{}}
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
- if a := adapterAgent.getAdapter(); a != nil {
+ if a := adapterAgent.getAdapter(ctx); a != nil {
result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
}
}
return result, nil
}
-func (aMgr *Manager) getAdapter(adapterID string) *voltha.Adapter {
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
- return adapterAgent.getAdapter()
+ return adapterAgent.getAdapter(ctx)
}
return nil
}
-func (aMgr *Manager) RegisterAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
logger.Debugw("RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
@@ -229,7 +229,7 @@
return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
}
- if aMgr.getAdapter(adapter.Id) != nil {
+ if aMgr.getAdapter(ctx, adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
go func() {
err := aMgr.onAdapterRestart(context.Background(), adapter)
@@ -240,11 +240,11 @@
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
// Save the adapter and the device types
- if err := aMgr.addAdapter(adapter, true); err != nil {
+ if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
logger.Errorw("failed-to-add-adapter", log.Fields{"error": err})
return nil, err
}
- if err := aMgr.addDeviceTypes(deviceTypes, true); err != nil {
+ if err := aMgr.addDeviceTypes(ctx, deviceTypes, true); err != nil {
logger.Errorw("failed-to-add-device-types", log.Fields{"error": err})
return nil, err
}
@@ -256,7 +256,7 @@
}
// GetAdapterType returns the name of the device adapter that service this device type
-func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
+func (aMgr *Manager) GetAdapterType(ctx context.Context, deviceType string) (string, error) {
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
for _, adapterAgent := range aMgr.adapterAgents {
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 4deca75..f49f435 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -36,14 +36,14 @@
}
// NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
-func NewAdapterRequestHandlerProxy(dMgr *device.Manager, aMgr *adapter.Manager) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(ctx context.Context, dMgr *device.Manager, aMgr *adapter.Manager) *AdapterRequestHandlerProxy {
return &AdapterRequestHandlerProxy{
deviceMgr: dMgr,
adapterMgr: aMgr,
}
}
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(ctx context.Context, args []*ic.Argument) (*voltha.CoreInstance, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -73,11 +73,11 @@
}
logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val})
- return rhp.adapterMgr.RegisterAdapter(adapter, deviceTypes)
+ return rhp.adapterMgr.RegisterAdapter(ctx, adapter, deviceTypes)
}
// GetDevice returns device info
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -111,7 +111,7 @@
}
// DeviceUpdate updates device using adapter data
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -144,7 +144,7 @@
}
// GetChildDevice returns details of child device
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -191,7 +191,7 @@
}
// GetChildDeviceWithProxyAddress returns details of child device with proxy address
-func (rhp *AdapterRequestHandlerProxy) GetChildDeviceWithProxyAddress(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDeviceWithProxyAddress(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -220,7 +220,7 @@
}
// GetPorts returns the ports information of the device based on the port type.
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(ctx context.Context, args []*ic.Argument) (*voltha.Ports, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -254,7 +254,7 @@
}
// GetChildDevices gets all the child device IDs from the device passed as parameter
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(ctx context.Context, args []*ic.Argument) (*voltha.Devices, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -284,7 +284,7 @@
// ChildDeviceDetected is invoked when a child device is detected. The following parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, channel_id, vendor_id, serial_number)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(ctx context.Context, args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 5 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -355,7 +355,7 @@
}
// DeviceStateUpdate updates device status
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -401,7 +401,7 @@
}
// ChildrenStateUpdate updates child device status
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -448,7 +448,7 @@
}
// PortsStateUpdate updates the ports state related to the device
-func (rhp *AdapterRequestHandlerProxy) PortsStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortsStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -486,7 +486,7 @@
}
// PortStateUpdate updates the port state of the device
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -541,7 +541,7 @@
}
// DeleteAllPorts deletes all ports of device
-func (rhp *AdapterRequestHandlerProxy) DeleteAllPorts(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeleteAllPorts(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -574,7 +574,7 @@
// ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
// This will trigger the Core to disable all the child devices.
-func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -607,7 +607,7 @@
// ChildDevicesDetected invoked by an adapter when child devices are found, typically after after a disable/enable sequence.
// This will trigger the Core to Enable all the child devices of that parent.
-func (rhp *AdapterRequestHandlerProxy) ChildDevicesDetected(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDevicesDetected(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -639,7 +639,7 @@
}
// PortCreated adds port to device
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -677,7 +677,7 @@
}
// DevicePMConfigUpdate initializes the pm configs as defined by the adapter.
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -710,7 +710,7 @@
}
// PacketIn sends the incoming packet of device
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 4 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -756,7 +756,7 @@
}
// UpdateImageDownload updates image download
-func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -795,7 +795,7 @@
}
// ReconcileChildDevices reconciles child devices
-func (rhp *AdapterRequestHandlerProxy) ReconcileChildDevices(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ReconcileChildDevices(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -827,7 +827,7 @@
}
// DeviceReasonUpdate updates device reason
-func (rhp *AdapterRequestHandlerProxy) DeviceReasonUpdate(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceReasonUpdate(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
logger.Warn("DeviceReasonUpdate: invalid-number-of-args", log.Fields{"args": args})
err := errors.New("DeviceReasonUpdate: invalid-number-of-args")
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..d701984 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -84,7 +84,7 @@
if err != nil {
return nil, 0, err
}
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ etcdServer := mock_etcd.StartEtcdServer(context.Background(), mock_etcd.MKConfig(context.Background(), configName, kvClientPort, peerPort, storageDir, logLevel))
if etcdServer == nil {
return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
}
@@ -93,42 +93,43 @@
func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
if server != nil {
- server.Stop()
+ server.Stop(context.Background())
}
}
func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+ client, err := kvstore.NewEtcdClient(context.Background(), addr, cf.KVStoreTimeout, log.FatalLevel)
if err != nil {
panic("no kv client")
}
return client
}
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
+func createMockAdapter(ctx context.Context, adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
var err error
var adapter adapters.IAdapter
adapterKafkaICProxy := kafka.NewInterContainerProxy(
- kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
- adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
+ ctx,
+ kafka.MsgClient(ctx, kafkaClient),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: adapterName}))
+ adapterCoreProxy := com.NewCoreProxy(ctx, adapterKafkaICProxy, adapterName, coreName)
var adapterReqHandler *com.RequestHandlerProxy
switch adapterType {
case OltAdapter:
- adapter = cm.NewOLTAdapter(adapterCoreProxy)
+ adapter = cm.NewOLTAdapter(ctx, adapterCoreProxy)
case OnuAdapter:
- adapter = cm.NewONUAdapter(adapterCoreProxy)
+ adapter = cm.NewONUAdapter(ctx, adapterCoreProxy)
default:
logger.Fatalf("invalid-adapter-type-%d", adapterType)
}
- adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+ adapterReqHandler = com.NewRequestHandlerProxy(ctx, coreInstanceID, adapter, adapterCoreProxy)
- if err = adapterKafkaICProxy.Start(); err != nil {
+ if err = adapterKafkaICProxy.Start(ctx); err != nil {
logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
return nil, err
}
- if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
+ if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
return nil, err
}
diff --git a/rw_core/core/api/grpc_nbi_handler.go b/rw_core/core/api/grpc_nbi_handler.go
index 55117d2..0793994 100755
--- a/rw_core/core/api/grpc_nbi_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -40,7 +40,7 @@
type adapterManager struct{ *adapter.Manager }
// NewNBIHandler creates API handler instance
-func NewNBIHandler(deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
+func NewNBIHandler(ctx context.Context, deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
return &NBIHandler{
Manager: deviceMgr,
LogicalManager: logicalDeviceMgr,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index fa1f657..fae4c79 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -73,7 +73,7 @@
maxTimeout time.Duration
}
-func newNBTest() *NBTest {
+func newNBTest(ctx context.Context) *NBTest {
test := &NBTest{}
// Start the embedded etcd server
var err error
@@ -82,7 +82,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = mock_kafka.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient(ctx)
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-nbi-test"
@@ -117,39 +117,40 @@
LivenessChannelInterval: cfg.LiveProbeInterval / 2,
PathPrefix: cfg.KVStoreDataPrefix}
nb.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
- kafka.MsgClient(nb.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ context.Background(),
+ kafka.InterContainerHost(context.Background(), cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(context.Background(), cfg.KafkaAdapterPort),
+ kafka.MsgClient(context.Background(), nb.kClient),
+ kafka.DefaultTopic(context.Background(), &kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(context.Background(), &kafka.Topic{Name: cfg.AffinityRouterTopic}))
- endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
- nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
- nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ endpointMgr := kafka.NewEndpointManager(context.Background(), backend)
+ proxy := model.NewProxy(context.Background(), backend, "/")
+ nb.adapterMgr = adapter.NewAdapterManager(context.Background(), proxy, nb.coreInstanceID, nb.kClient)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(context.Background(), proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
nb.adapterMgr.Start(ctx)
- if err := nb.kmp.Start(); err != nil {
+ if err := nb.kmp.Start(context.Background()); err != nil {
logger.Fatalf("Cannot start InterContainerProxy: %s", err)
}
- requestProxy := NewAdapterRequestHandlerProxy(nb.deviceMgr, nb.adapterMgr)
- if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
+ requestProxy := NewAdapterRequestHandlerProxy(context.Background(), nb.deviceMgr, nb.adapterMgr)
+ if err := nb.kmp.SubscribeWithRequestHandlerInterface(context.Background(), kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
logger.Fatalf("Cannot add request handler: %s", err)
}
- if err := nb.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ if err := nb.kmp.SubscribeWithDefaultRequestHandler(context.Background(), kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
logger.Fatalf("Cannot add default request handler: %s", err)
}
}
func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
// Setup the mock OLT adapter
- oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
+ oltAdapter, err := createMockAdapter(context.Background(), OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
if err != nil {
logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
}
nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
- nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
- nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT(context.Background())
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo(context.Background())
// Register the adapter
registrationData := &voltha.Adapter{
@@ -163,13 +164,13 @@
}
types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ if _, err := nb.adapterMgr.RegisterAdapter(context.Background(), registrationData, deviceTypes); err != nil {
logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
assert.NotNil(t, err)
}
// Setup the mock ONU adapter
- onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+ onuAdapter, err := createMockAdapter(context.Background(), OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
if err != nil {
logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
}
@@ -187,7 +188,7 @@
}
types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
deviceTypes = &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ if _, err := nb.adapterMgr.RegisterAdapter(context.Background(), registrationData, deviceTypes); err != nil {
logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
assert.NotNil(t, err)
}
@@ -195,10 +196,10 @@
func (nb *NBTest) stopAll() {
if nb.kClient != nil {
- nb.kClient.Stop()
+ nb.kClient.Stop(context.Background())
}
if nb.kmp != nil {
- nb.kmp.Stop()
+ nb.kmp.Stop(context.Background())
}
if nb.etcdServer != nil {
stopEmbeddedEtcdServer(nb.etcdServer)
@@ -972,7 +973,7 @@
for _, val := range fa.MatchFields {
matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
}
- return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+ return flows.MkSimpleFlowMod(context.Background(), matchFields, fa.Actions, fa.Command, fa.KV)
}
func createMetadata(cTag int, techProfile int, port int) uint64 {
@@ -1001,6 +1002,7 @@
// Send flows for the parent device
var nniPorts []*voltha.LogicalPort
var uniPorts []*voltha.LogicalPort
+ ctx := context.Background()
for _, p := range logicalDevice.Ports {
if p.RootPort {
nniPorts = append(nniPorts, p)
@@ -1017,11 +1019,11 @@
fa = &flows.FlowArgs{
KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
MatchFields: []*ofp.OfpOxmOfbField{
- flows.InPort(nniPort),
- flows.EthType(35020),
+ flows.InPort(ctx, nniPort),
+ flows.EthType(ctx, 35020),
},
Actions: []*ofp.OfpAction{
- flows.Output(controllerPortMask),
+ flows.Output(ctx, controllerPortMask),
},
}
flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1031,14 +1033,14 @@
fa = &flows.FlowArgs{
KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
MatchFields: []*ofp.OfpOxmOfbField{
- flows.InPort(nniPort),
- flows.EthType(2048),
- flows.IpProto(17),
- flows.UdpSrc(67),
- flows.UdpDst(68),
+ flows.InPort(ctx, nniPort),
+ flows.EthType(ctx, 2048),
+ flows.IpProto(ctx, 17),
+ flows.UdpSrc(ctx, 67),
+ flows.UdpDst(ctx, 68),
},
Actions: []*ofp.OfpAction{
- flows.Output(controllerPortMask),
+ flows.Output(ctx, controllerPortMask),
},
}
flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1048,14 +1050,14 @@
fa = &flows.FlowArgs{
KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
MatchFields: []*ofp.OfpOxmOfbField{
- flows.InPort(nniPort),
- flows.EthType(34525),
- flows.IpProto(17),
- flows.UdpSrc(546),
- flows.UdpDst(547),
+ flows.InPort(ctx, nniPort),
+ flows.EthType(ctx, 34525),
+ flows.IpProto(ctx, 17),
+ flows.UdpSrc(ctx, 546),
+ flows.UdpDst(ctx, 547),
},
Actions: []*ofp.OfpAction{
- flows.Output(controllerPortMask),
+ flows.Output(ctx, controllerPortMask),
},
}
flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
@@ -1066,17 +1068,18 @@
}
func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *NBIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+ ctx := context.Background()
maxInt32 := uint64(0xFFFFFFFF)
controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
fa := &flows.FlowArgs{
KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
MatchFields: []*ofp.OfpOxmOfbField{
- flows.InPort(port.PortNo),
- flows.EthType(34958),
- flows.VlanVid(8187),
+ flows.InPort(ctx, port.PortNo),
+ flows.EthType(ctx, 34958),
+ flows.VlanVid(ctx, 8187),
},
Actions: []*ofp.OfpAction{
- flows.Output(controllerPortMask),
+ flows.Output(ctx, controllerPortMask),
},
}
flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
@@ -1088,12 +1091,12 @@
defer wg.Done()
// Clear any existing flows on the adapters
- nb.oltAdapter.ClearFlows()
- nb.onuAdapter.ClearFlows()
+ nb.oltAdapter.ClearFlows(context.Background())
+ nb.onuAdapter.ClearFlows(context.Background())
// Set the adapter actions on flow addition/deletion
- nb.oltAdapter.SetFlowAction(flowAddFail, flowDelete)
- nb.onuAdapter.SetFlowAction(flowAddFail, flowDelete)
+ nb.oltAdapter.SetFlowAction(context.Background(), flowAddFail, flowDelete)
+ nb.onuAdapter.SetFlowAction(context.Background(), flowAddFail, flowDelete)
// Wait until a logical device is ready
var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
@@ -1149,7 +1152,7 @@
processedNniLogicalPorts := 0
processedUniLogicalPorts := 0
- for event := range nbi.GetChangeEventsQueueForTest() {
+ for event := range nbi.GetChangeEventsQueueForTest(context.Background()) {
startingVlan++
if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
ps := portStatus.PortStatus
@@ -1177,7 +1180,7 @@
expectedFlowCount = 0
}
var oltVFunc isConditionSatisfied = func() bool {
- return nb.oltAdapter.GetFlowCount() >= expectedFlowCount
+ return nb.oltAdapter.GetFlowCount(context.Background()) >= expectedFlowCount
}
err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
assert.Nil(t, err)
@@ -1188,7 +1191,7 @@
expectedFlowCount = 0
}
var onuVFunc isConditionSatisfied = func() bool {
- return nb.onuAdapter.GetFlowCount() == expectedFlowCount
+ return nb.onuAdapter.GetFlowCount(context.Background()) == expectedFlowCount
}
err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
assert.Nil(t, err)
@@ -1252,7 +1255,7 @@
//log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
- nb := newNBTest()
+ nb := newNBTest(context.Background())
assert.NotNil(t, nb)
defer nb.stopAll()
@@ -1261,7 +1264,7 @@
nb.startCore(false)
// Set the grpc API interface - no grpc server is running in unit test
- nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+ nbi := NewNBIHandler(context.Background(), nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
// 1. Basic test with no data in Core
nb.testCoreWithoutData(t, nbi)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0dbecc8..f33fba8 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -47,6 +47,7 @@
// If the context has a probe then fetch it and register our services
if p := probe.GetProbeFromContext(ctx); p != nil {
p.RegisterService(
+ ctx,
"message-bus",
"kv-store",
"adapter-manager",
@@ -74,15 +75,15 @@
// setup kv client
logger.Debugw("create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
- kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
+ kvClient, err := newKVClient(ctx, cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
if err != nil {
logger.Fatal(err)
}
defer stopKVClient(context.Background(), kvClient)
// sync logging config with kv store
- cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
- go conf.StartLogLevelConfigProcessing(cm, ctx)
+ cm := conf.NewConfigManager(ctx, kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
+ go conf.StartLogLevelConfigProcessing(ctx, cm)
backend := &db.Backend{
Client: kvClient,
@@ -104,27 +105,28 @@
// create kafka client
kafkaClient := kafka.NewSaramaClient(
- kafka.Host(cf.KafkaAdapterHost),
- kafka.Port(cf.KafkaAdapterPort),
- kafka.ConsumerType(kafka.GroupCustomer),
- kafka.ProducerReturnOnErrors(true),
- kafka.ProducerReturnOnSuccess(true),
- kafka.ProducerMaxRetries(6),
- kafka.NumPartitions(3),
- kafka.ConsumerGroupName(id),
- kafka.ConsumerGroupPrefix(id),
- kafka.AutoCreateTopic(true),
- kafka.ProducerFlushFrequency(5),
- kafka.ProducerRetryBackoff(time.Millisecond*30),
- kafka.LivenessChannelInterval(cf.LiveProbeInterval/2),
+ ctx,
+ kafka.Host(ctx, cf.KafkaAdapterHost),
+ kafka.Port(ctx, cf.KafkaAdapterPort),
+ kafka.ConsumerType(ctx, kafka.GroupCustomer),
+ kafka.ProducerReturnOnErrors(ctx, true),
+ kafka.ProducerReturnOnSuccess(ctx, true),
+ kafka.ProducerMaxRetries(ctx, 6),
+ kafka.NumPartitions(ctx, 3),
+ kafka.ConsumerGroupName(ctx, id),
+ kafka.ConsumerGroupPrefix(ctx, id),
+ kafka.AutoCreateTopic(ctx, true),
+ kafka.ProducerFlushFrequency(ctx, 5),
+ kafka.ProducerRetryBackoff(ctx, time.Millisecond*30),
+ kafka.LivenessChannelInterval(ctx, cf.LiveProbeInterval/2),
)
// defer kafkaClient.Stop()
// create kv proxy
- proxy := model.NewProxy(backend, "/")
+ proxy := model.NewProxy(ctx, backend, "/")
// load adapters & device types while other things are starting
- adapterMgr := adapter.NewAdapterManager(proxy, id, kafkaClient)
+ adapterMgr := adapter.NewAdapterManager(ctx, proxy, id, kafkaClient)
go adapterMgr.Start(ctx)
// connect to kafka, then wait until reachable and publisher/consumer created
@@ -134,27 +136,27 @@
logger.Warn("Failed to setup kafka connection")
return
}
- defer kmp.Stop()
+ defer kmp.Stop(ctx)
go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
// create the core of the system, the device managers
- endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
+ endpointMgr := kafka.NewEndpointManager(ctx, backend)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(ctx, proxy, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
// register kafka RPC handler
- registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
+ registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
// start gRPC handler
- grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
- go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
- defer grpcServer.Stop()
+ grpcServer := grpcserver.NewGrpcServer(ctx, cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
+ go startGRPCService(ctx, grpcServer, api.NewNBIHandler(ctx, deviceMgr, logicalDeviceMgr, adapterMgr))
+ defer grpcServer.Stop(ctx)
// wait for core to be stopped, via Stop() or context cancellation, before running deferred functions
<-ctx.Done()
}
// Stop brings down core services
-func (core *Core) Stop() {
+func (core *Core) Stop(ctx context.Context) {
core.shutdown()
<-core.stopped
}
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 901b27f..6073dc4 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -62,11 +62,11 @@
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ctx context.Context, ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *Agent {
var agent Agent
agent.adapterProxy = ap
if device.Id == "" {
- agent.deviceID = coreutils.CreateDeviceID()
+ agent.deviceID = coreutils.CreateDeviceID(ctx)
} else {
agent.deviceID = device.Id
}
@@ -80,7 +80,7 @@
agent.clusterDataProxy = cdProxy
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
- agent.requestQueue = coreutils.NewRequestQueue()
+ agent.requestQueue = coreutils.NewRequestQueue(ctx)
return &agent
}
@@ -154,7 +154,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
@@ -178,7 +178,7 @@
logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
return
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
device := &voltha.Device{}
@@ -196,14 +196,14 @@
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
// and the only action required is to publish a successful result on kafka
-func (agent *Agent) onSuccess(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
logger.Debugw("response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
// TODO: Post success message onto kafka
}
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
-func (agent *Agent) onFailure(rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
if res, ok := response.(error); ok {
logger.Errorw("rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
} else {
@@ -218,14 +218,14 @@
select {
case rpcResponse, ok := <-ch:
if !ok {
- onFailure(rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
} else if rpcResponse.Err != nil {
- onFailure(rpc, rpcResponse.Err, reqArgs)
+ onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
} else {
- onSuccess(rpc, rpcResponse.Reply, reqArgs)
+ onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
}
case <-ctx.Done():
- onFailure(rpc, ctx.Err(), reqArgs)
+ onFailure(ctx, rpc, ctx.Err(), reqArgs)
}
}
@@ -234,12 +234,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
return proto.Clone(agent.device).(*voltha.Device), nil
}
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
+func (agent *Agent) getDeviceWithoutLock(ctx context.Context) *voltha.Device {
return proto.Clone(agent.device).(*voltha.Device)
}
@@ -248,16 +248,16 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("enableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(ctx, cloned.Type)
if err != nil {
return err
}
@@ -306,20 +306,20 @@
select {
case rpcResponse, ok := <-ch:
if !ok {
- response.Error(status.Errorf(codes.Aborted, "channel-closed"))
+ response.Error(ctx, status.Errorf(codes.Aborted, "channel-closed"))
} else if rpcResponse.Err != nil {
- response.Error(rpcResponse.Err)
+ response.Error(ctx, rpcResponse.Err)
} else {
- response.Done()
+ response.Done(ctx)
}
case <-ctx.Done():
- response.Error(ctx.Err())
+ response.Error(ctx, ctx.Err())
}
}
//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
//panic if the index is out of range.
-func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+func deleteFlowWithoutPreservingOrder(ctx context.Context, flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
flows[index] = flows[len(flows)-1]
flows[len(flows)-1] = nil
return flows[:len(flows)-1]
@@ -327,16 +327,16 @@
//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
//panic if the index is out of range.
-func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+func deleteGroupWithoutPreservingOrder(ctx context.Context, groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
groups[index] = groups[len(groups)-1]
groups[len(groups)-1] = nil
return groups[:len(groups)-1]
}
-func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
+func flowsToUpdateToDelete(ctx context.Context, newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
// Process flows
for _, flow := range existingFlows {
- if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+ if idx := fu.FindFlows(ctx, newFlows, flow); idx == -1 {
updatedAllFlows = append(updatedAllFlows, flow)
} else {
// We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
@@ -344,7 +344,7 @@
// ignored. Otherwise, the previous flow will be deleted and the new one added
if proto.Equal(newFlows[idx], flow) {
// Flow already exist, remove it from the new flows but keep it in the updated flows slice
- newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
+ newFlows = deleteFlowWithoutPreservingOrder(ctx, newFlows, idx)
updatedAllFlows = append(updatedAllFlows, flow)
} else {
// Minor change to flow, delete old and add new one
@@ -356,15 +356,15 @@
return newFlows, flowsToDelete, updatedAllFlows
}
-func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
+func groupsToUpdateToDelete(ctx context.Context, newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
for _, group := range existingGroups {
- if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
+ if idx := fu.FindGroup(ctx, newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
updatedAllGroups = append(updatedAllGroups, group)
} else {
// Follow same logic as flows
if proto.Equal(newGroups[idx], group) {
// Group already exist, remove it from the new groups
- newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
+ newGroups = deleteGroupWithoutPreservingOrder(ctx, newGroups, idx)
updatedAllGroups = append(updatedAllGroups, group)
} else {
// Minor change to group, delete old and add new one
@@ -381,55 +381,55 @@
if (len(newFlows) | len(newGroups)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
// Process flows
- newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
+ newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(ctx, newFlows, existingFlows.Items)
// Process groups
- newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
+ newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(ctx, newGroups, existingGroups.Items)
// Sanity check
if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
// store the changed data
device.Flows = &voltha.Flows{Items: updatedAllFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
}
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
+ response := coreutils.NewResponse(ctx)
if !dType.AcceptsAddRemoveFlowUpdates {
if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
cancel()
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
@@ -445,7 +445,7 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
@@ -459,7 +459,7 @@
if err != nil {
return err
}
- if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+ if errs := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); errs != nil {
logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
@@ -471,18 +471,18 @@
if (len(flowsToDel) | len(groupsToDel)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -493,14 +493,14 @@
// Process flows
for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
+ if idx := fu.FindFlows(ctx, flowsToDel, flow); idx == -1 {
flowsToKeep = append(flowsToKeep, flow)
}
}
// Process groups
for _, group := range existingGroups.Items {
- if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
+ if fu.FindGroup(ctx, groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
groupsToKeep = append(groupsToKeep, group)
}
}
@@ -517,29 +517,29 @@
// Sanity check
if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
// store the changed data
device.Flows = &voltha.Flows{Items: flowsToKeep}
device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
+ response := coreutils.NewResponse(ctx)
if !dType.AcceptsAddRemoveFlowUpdates {
if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
logger.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
cancel()
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
@@ -555,7 +555,7 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
@@ -569,7 +569,7 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -586,7 +586,7 @@
// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
for _, flow := range existingFlows.Items {
- if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
+ if fu.GetInPort(ctx, flow) == uniPort || fu.GetOutPort(ctx, flow) == uniPort || fu.GetTunnelId(ctx, flow) == uint64(uniPort) {
flowsToDelete = append(flowsToDelete, flow)
}
}
@@ -599,7 +599,7 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -610,21 +610,21 @@
if (len(updatedFlows) | len(updatedGroups)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
@@ -632,7 +632,7 @@
if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
logger.Debugw("updating-flows-and-groups",
@@ -646,17 +646,17 @@
device.Flows = &voltha.Flows{Items: updatedFlows}
device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ return coreutils.DoneResponse(ctx), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- response := coreutils.NewResponse()
+ response := coreutils.NewResponse(ctx)
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
@@ -667,24 +667,24 @@
// Process flows
for _, flow := range updatedFlows {
- if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
+ if idx := fu.FindFlows(ctx, existingFlows.Items, flow); idx == -1 {
flowsToAdd = append(flowsToAdd, flow)
}
}
for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
+ if idx := fu.FindFlows(ctx, updatedFlows, flow); idx != -1 {
flowsToDelete = append(flowsToDelete, flow)
}
}
// Process groups
for _, g := range updatedGroups {
- if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
+ if fu.FindGroup(ctx, existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
groupsToAdd = append(groupsToAdd, g)
}
}
for _, group := range existingGroups.Items {
- if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
+ if fu.FindGroup(ctx, updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
groupsToDelete = append(groupsToDelete, group)
}
}
@@ -702,7 +702,7 @@
if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
cancel()
- return coreutils.DoneResponse(), nil
+ return coreutils.DoneResponse(ctx), nil
}
flowChanges := &ofp.FlowChanges{
@@ -717,7 +717,7 @@
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
cancel()
- return coreutils.DoneResponse(), err
+ return coreutils.DoneResponse(ctx), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
}
@@ -732,7 +732,7 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -744,9 +744,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
// purge all flows on the device by setting it to nil
device.Flows = &ofp.Flows{Items: nil}
if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
@@ -761,10 +761,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("disableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
if cloned.AdminState == voltha.AdminState_DISABLED {
logger.Debugw("device-already-disabled", log.Fields{"id": agent.deviceID})
@@ -795,10 +795,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("rebootDevice", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
@@ -814,9 +814,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
previousState := cloned.AdminState
@@ -844,11 +844,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
cloned.ParentId = parentID
// Store the device
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
@@ -862,10 +862,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
@@ -886,10 +886,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
@@ -899,21 +899,21 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
- return agent.getDeviceWithoutLock().PmConfigs, nil
+ return agent.getDeviceWithoutLock(ctx).PmConfigs, nil
}
func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
if device.AdminState != voltha.AdminState_ENABLED {
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
@@ -954,7 +954,7 @@
}
// isImageRegistered is a helper method to figure out if an image is already registered
-func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
+func isImageRegistered(ctx context.Context, img *voltha.ImageDownload, device *voltha.Device) bool {
for _, image := range device.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
return true
@@ -967,14 +967,14 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
// Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, device) {
+ if !isImageRegistered(ctx, img, device) {
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
@@ -1005,12 +1005,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
+ if !isImageRegistered(ctx, img, cloned) {
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
@@ -1045,13 +1045,13 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
+ if !isImageRegistered(ctx, img, cloned) {
return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
@@ -1086,9 +1086,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
if err != nil {
return nil, err
}
@@ -1112,10 +1112,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// Update the image as well as remove it if the download was cancelled
clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
@@ -1140,10 +1140,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
return image, nil
@@ -1156,10 +1156,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
- return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
+ return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock(ctx).ImageDownloads}, nil
}
// getPorts retrieves the ports information of the device based on the port type.
@@ -1232,7 +1232,7 @@
return portCap, nil
}
-func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
+func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
// packet data is encoded in the args param as the first parameter
var packet []byte
if len(args) >= 1 {
@@ -1270,8 +1270,8 @@
// updatePartialDeviceData updates a subset of a device that an Adapter can update.
// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
- cloned := agent.getDeviceWithoutLock()
+func (agent *Agent) mergeDeviceInfoFromAdapter(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
+ cloned := agent.getDeviceWithoutLock(ctx)
cloned.Root = device.Root
cloned.Vendor = device.Vendor
cloned.Model = device.Model
@@ -1286,10 +1286,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
- updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
+ updatedDevice, err := agent.mergeDeviceInfoFromAdapter(ctx, device)
if err != nil {
return status.Errorf(codes.Internal, "%s", err.Error())
}
@@ -1308,9 +1308,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
@@ -1332,8 +1332,8 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
+ defer agent.requestQueue.RequestComplete(ctx)
+ cloned := agent.getDeviceWithoutLock(ctx)
for _, port := range cloned.Ports {
port.OperStatus = operStatus
}
@@ -1345,10 +1345,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Work only on latest data
// TODO: Get list of ports from device directly instead of the entire device
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
@@ -1369,9 +1369,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
@@ -1393,10 +1393,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
updatePort := false
if cloned.Ports == nil {
// First port
@@ -1432,10 +1432,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
// Get the peer port on the device based on the peerPort no
found := false
@@ -1471,12 +1471,12 @@
logger.Warnw("request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
return
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
if value == nil {
return
}
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
updated := false
s := reflect.ValueOf(cloned).Elem()
if s.Kind() == reflect.Struct {
@@ -1508,10 +1508,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("simulateAlarm", log.Fields{"id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
@@ -1530,7 +1530,7 @@
connectStatus voltha.ConnectStatus_Types,
operStatus voltha.OperStatus_Types,
) error {
- previousState := getDeviceStates(device)
+ previousState := getDeviceStates(ctx, device)
device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
@@ -1567,9 +1567,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.getDeviceWithoutLock(ctx)
cloned.Reason = reason
logger.Debugw("updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
// Store the device
@@ -1580,11 +1580,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
var cp *voltha.Port
// Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
for _, port := range device.Ports {
if port.PortNo == Port.PortNo {
port.AdminState = voltha.AdminState_DISABLED
@@ -1621,12 +1621,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
var cp *voltha.Port
// Get the most up to date the device info
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
for _, port := range device.Ports {
if port.PortNo == Port.PortNo {
port.AdminState = voltha.AdminState_ENABLED
@@ -1662,12 +1662,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
logger.Debugw("childDeviceLost", log.Fields{"child-device-id": device.Id, "parent-device-ud": agent.deviceID})
//Remove the associated peer ports on the parent device
- parentDevice := agent.getDeviceWithoutLock()
+ parentDevice := agent.getDeviceWithoutLock(ctx)
var updatedPeers []*voltha.Port_PeerPort
for _, port := range parentDevice.Ports {
updatedPeers = make([]*voltha.Port_PeerPort, 0)
@@ -1698,12 +1698,12 @@
return nil, err
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceWithoutLock(ctx)
if device.Adapter == "" {
- adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(ctx, device.Type)
if err != nil {
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
return nil, err
}
device.Adapter = adapterName
@@ -1711,7 +1711,7 @@
// Send request to the adapter
ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
if err != nil {
return nil, err
}
@@ -1742,7 +1742,7 @@
//send request to adapter
ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
if err != nil {
return nil, err
}
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 2abfdeb..78353a7 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -69,7 +69,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = mock_kafka.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient(context.Background())
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -114,6 +114,7 @@
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStorePort = dat.kvClientPort
cfg.InCompetingMode = inCompeteMode
+ ctx := context.Background()
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
@@ -130,18 +131,19 @@
LivenessChannelInterval: cfg.LiveProbeInterval / 2,
PathPrefix: cfg.KVStoreDataPrefix}
dat.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
- kafka.MsgClient(dat.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ ctx,
+ kafka.InterContainerHost(ctx, cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(ctx, cfg.KafkaAdapterPort),
+ kafka.MsgClient(ctx, dat.kClient),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: cfg.AffinityRouterTopic}))
- endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
- adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+ endpointMgr := kafka.NewEndpointManager(ctx, backend)
+ proxy := model.NewProxy(ctx, backend, "/")
+ adapterMgr := adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
- if err = dat.kmp.Start(); err != nil {
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(ctx, proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ if err = dat.kmp.Start(ctx); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
adapterMgr.Start(context.Background())
@@ -149,10 +151,10 @@
func (dat *DATest) stopAll() {
if dat.kClient != nil {
- dat.kClient.Stop()
+ dat.kClient.Stop(context.Background())
}
if dat.kmp != nil {
- dat.kmp.Stop()
+ dat.kmp.Stop(context.Background())
}
if dat.etcdServer != nil {
stopEmbeddedEtcdServer(dat.etcdServer)
@@ -169,7 +171,7 @@
if err != nil {
return nil, 0, err
}
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ etcdServer := mock_etcd.StartEtcdServer(context.Background(), mock_etcd.MKConfig(context.Background(), configName, kvClientPort, peerPort, storageDir, logLevel))
if etcdServer == nil {
return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
}
@@ -178,13 +180,13 @@
func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
if server != nil {
- server.Stop()
+ server.Stop(context.Background())
}
}
func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+ client, err := kvstore.NewEtcdClient(context.Background(), addr, cf.KVStoreTimeout, log.FatalLevel)
if err != nil {
panic("no kv client")
}
@@ -194,11 +196,11 @@
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(context.Background(), deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
- deviceMgr.addDeviceAgentToMap(deviceAgent)
+ deviceMgr.addDeviceAgentToMap(context.Background(), deviceAgent)
return deviceAgent
}
@@ -336,7 +338,7 @@
expectedNewFlows := []*ofp.OfpFlowStats{}
expectedFlowsToDelete := []*ofp.OfpFlowStats{}
expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+ uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -360,7 +362,7 @@
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+ uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -389,7 +391,7 @@
{Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
{Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+ uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -429,7 +431,7 @@
{Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
+ uNF, fD, uAF := flowsToUpdateToDelete(context.Background(), newFlows, existingFlows)
assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
@@ -441,7 +443,7 @@
expectedNewGroups := []*ofp.OfpGroupEntry{}
expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+ uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -462,7 +464,7 @@
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+ uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -488,7 +490,7 @@
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+ uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
@@ -517,7 +519,7 @@
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
+ uNG, gD, uAG := groupsToUpdateToDelete(context.Background(), newGroups, existingGroups)
assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index c205564..829fc19 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -17,6 +17,7 @@
package event
import (
+ "context"
"encoding/hex"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -32,7 +33,7 @@
changeEventQueueDone chan bool
}
-func NewManager() *Manager {
+func NewManager(ctx context.Context) *Manager {
return &Manager{
packetInQueue: make(chan openflow_13.PacketIn, 100),
packetInQueueDone: make(chan bool, 1),
@@ -41,7 +42,7 @@
}
}
-func (q *Manager) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
+func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
// TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
@@ -58,7 +59,7 @@
var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
-func (q *Manager) getStreamingTracker(method string, done chan<- bool) *callTracker {
+func (q *Manager) getStreamingTracker(ctx context.Context, method string, done chan<- bool) *callTracker {
streamingTracker.Lock()
defer streamingTracker.Unlock()
if _, ok := streamingTracker.calls[method]; ok {
@@ -72,7 +73,7 @@
return streamingTracker.calls[method]
}
-func (q *Manager) flushFailedPackets(tracker *callTracker) error {
+func (q *Manager) flushFailedPackets(ctx context.Context, tracker *callTracker) error {
if tracker.failedPacket != nil {
switch tracker.failedPacket.(type) {
case openflow_13.PacketIn:
@@ -88,10 +89,11 @@
// ReceivePacketsIn receives packets from adapter
func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
- var streamingTracker = q.getStreamingTracker("ReceivePacketsIn", q.packetInQueueDone)
+ ctx := context.Background()
+ var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
- err := q.flushFailedPackets(streamingTracker)
+ err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
}
@@ -123,7 +125,7 @@
return nil
}
-func (q *Manager) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
+func (q *Manager) SendChangeEvent(ctx context.Context, deviceID string, portStatus *openflow_13.OfpPortStatus) {
// TODO: validate the type of portStatus parameter
//if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
//}
@@ -134,10 +136,11 @@
// ReceiveChangeEvents receives change in events
func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
- var streamingTracker = q.getStreamingTracker("ReceiveChangeEvents", q.changeEventQueueDone)
+ ctx := context.Background()
+ var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
- err := q.flushFailedPackets(streamingTracker)
+ err := q.flushFailedPackets(ctx, streamingTracker)
if err != nil {
logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
}
@@ -167,6 +170,6 @@
return nil
}
-func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
+func (q *Manager) GetChangeEventsQueueForTest(ctx context.Context) <-chan openflow_13.ChangeEvent {
return q.changeEventQueue
}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 15eb677..34bad6b 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -65,7 +65,7 @@
groupLock sync.RWMutex
}
-func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+func newLogicalDeviceAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
var agent LogicalAgent
agent.logicalDeviceID = id
@@ -74,10 +74,10 @@
agent.deviceMgr = deviceMgr
agent.clusterDataProxy = cdProxy
agent.ldeviceMgr = ldeviceMgr
- agent.flowDecomposer = fd.NewFlowDecomposer(agent.deviceMgr)
+ agent.flowDecomposer = fd.NewFlowDecomposer(ctx, agent.deviceMgr)
agent.logicalPortsNo = make(map[uint32]bool)
agent.defaultTimeout = timeout
- agent.requestQueue = coreutils.NewRequestQueue()
+ agent.requestQueue = coreutils.NewRequestQueue(ctx)
agent.meters = make(map[uint32]*MeterChunk)
agent.flows = make(map[uint64]*FlowChunk)
agent.groups = make(map[uint32]*GroupChunk)
@@ -114,7 +114,7 @@
// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
var datapathID uint64
- if datapathID, err = coreutils.CreateDataPathID(agent.serialNumber); err != nil {
+ if datapathID, err = coreutils.CreateDataPathID(ctx, agent.serialNumber); err != nil {
return err
}
ld.DatapathId = datapathID
@@ -159,7 +159,7 @@
agent.logicalDevice = proto.Clone(ld).(*voltha.LogicalDevice)
// Setup the local list of logical ports
- agent.addLogicalPortsToMap(ld.Ports)
+ agent.addLogicalPortsToMap(ctx, ld.Ports)
// load the flows, meters and groups from KV to cache
agent.loadFlows(ctx)
agent.loadMeters(ctx)
@@ -190,7 +190,7 @@
returnErr = err
return
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
//Remove the logical device from the model
if err := agent.clusterDataProxy.Remove(ctx, "logical_devices/"+agent.logicalDeviceID); err != nil {
@@ -211,12 +211,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
// getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
-func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
+func (agent *LogicalAgent) getLogicalDeviceWithoutLock(ctx context.Context) *voltha.LogicalDevice {
logger.Debug("getLogicalDeviceWithoutLock")
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
}
@@ -255,17 +255,17 @@
logger.Debugw("send-add-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
responses := make([]coreutils.Response, 0)
- for deviceID, value := range deviceRules.GetRules() {
- response := coreutils.NewResponse()
+ for deviceID, value := range deviceRules.GetRules(ctx) {
+ response := coreutils.NewResponse(ctx)
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
logger.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
- response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
+ response.Error(ctx, status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
}
- response.Done()
+ response.Done(ctx)
}(deviceID, value)
}
// Return responses (an array of channels) for the caller to wait for a response from the far end.
@@ -276,17 +276,17 @@
logger.Debugw("send-delete-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
- for deviceID, value := range deviceRules.GetRules() {
- response := coreutils.NewResponse()
+ for deviceID, value := range deviceRules.GetRules(ctx) {
+ response := coreutils.NewResponse(ctx)
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
logger.Errorw("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
- response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
+ response.Error(ctx, status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
}
- response.Done()
+ response.Done(ctx)
}(deviceID, value)
}
return responses
@@ -296,17 +296,17 @@
logger.Debugw("send-update-flows-to-device-manager", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
responses := make([]coreutils.Response, 0)
- for deviceID, value := range deviceRules.GetRules() {
- response := coreutils.NewResponse()
+ for deviceID, value := range deviceRules.GetRules(ctx) {
+ response := coreutils.NewResponse(ctx)
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
ctx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
defer cancel()
- if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+ if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(ctx), value.ListGroups(ctx), flowMetadata); err != nil {
logger.Errorw("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
- response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
+ response.Error(ctx, status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
}
- response.Done()
+ response.Done(ctx)
}(deviceID, value)
}
return responses
@@ -316,13 +316,13 @@
logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
for _, flow := range flows.Items {
- response := coreutils.NewResponse()
+ response := coreutils.NewResponse(ctx)
responses = append(responses, response)
- uniPort, err := agent.getUNILogicalPortNo(flow)
+ uniPort, err := agent.getUNILogicalPortNo(ctx, flow)
if err != nil {
logger.Error("no-uni-port-in-flow", log.Fields{"deviceID": agent.rootDeviceID, "flow": flow, "error": err})
- response.Error(err)
- response.Done()
+ response.Error(ctx, err)
+ response.Done(ctx)
continue
}
logger.Debugw("uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
@@ -331,9 +331,9 @@
defer cancel()
if err := agent.deviceMgr.deleteParentFlows(ctx, agent.rootDeviceID, uniPort, metadata); err != nil {
logger.Error("flow-delete-failed", log.Fields{"device-id": agent.rootDeviceID, "error": err})
- response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
+ response.Error(ctx, status.Errorf(codes.Internal, "flow-delete-failed: %s %v", agent.rootDeviceID, err))
}
- response.Done()
+ response.Done(ctx)
}(uniPort, metadata)
}
return responses
@@ -344,7 +344,7 @@
"packet": hex.EncodeToString(packet.Data),
"inPort": packet.GetInPort(),
})
- outPort := fu.GetPacketOutPort(packet)
+ outPort := fu.GetPacketOutPort(ctx, packet)
//frame := packet.GetData()
//TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
@@ -352,13 +352,13 @@
}
}
-func (agent *LogicalAgent) packetIn(port uint32, transactionID string, packet []byte) {
+func (agent *LogicalAgent) packetIn(ctx context.Context, port uint32, transactionID string, packet []byte) {
logger.Debugw("packet-in", log.Fields{
"port": port,
"packet": hex.EncodeToString(packet),
"transactionId": transactionID,
})
- packetIn := fu.MkPacketIn(port, packet)
- agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
+ packetIn := fu.MkPacketIn(ctx, port, packet)
+ agent.ldeviceMgr.SendPacketIn(ctx, agent.logicalDeviceID, transactionID, packetIn)
logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 5d35251..03bbbdf 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -51,9 +51,9 @@
case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
return agent.flowDeleteStrict(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_MODIFY:
- return agent.flowModify(flow)
+ return agent.flowModify(ctx, flow)
case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
- return agent.flowModifyStrict(flow)
+ return agent.flowModifyStrict(ctx, flow)
}
return status.Errorf(codes.Internal,
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
@@ -65,7 +65,7 @@
if mod == nil {
return nil
}
- flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ flow, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
if err != nil {
logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
return err
@@ -126,7 +126,7 @@
updatedFlows := make([]*ofp.OfpFlowStats, 0)
checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
if checkOverlap {
- if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
+ if overlapped := fu.FindOverlappingFlows(ctx, flows, mod); len(overlapped) != 0 {
// TODO: should this error be notified other than being logged?
logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
@@ -153,7 +153,7 @@
updatedFlows = append(updatedFlows, flow)
var flowMetadata voltha.FlowMetadata
lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
- if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+ if err := agent.GetMeterConfig(ctx, updatedFlows, lMeters.Items, &flowMetadata); err != nil {
logger.Error("Meter-referred-in-flow-not-present")
return changed, updated, err
}
@@ -163,7 +163,7 @@
return changed, updated, err
}
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ logger.Debugw("rules", log.Fields{"rules": deviceRules.String(ctx)})
// Update store and cache
if updated {
if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
@@ -174,7 +174,7 @@
// Create the go routines to wait
go func() {
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChannels...); res != nil {
logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
// Revert added flows
if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
@@ -223,7 +223,7 @@
// Wait for the responses
go func() {
// Since this action is taken following an add failure, we may also receive a failure for the revert
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
}
}()
@@ -238,7 +238,7 @@
return nil
}
- fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ fs, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
if err != nil {
return err
}
@@ -249,13 +249,13 @@
//Lock the map to search the matched flows
agent.flowLock.RLock()
for _, f := range agent.flows {
- if fu.FlowMatch(f.flow, fs) {
+ if fu.FlowMatch(ctx, f.flow, fs) {
toDelete = append(toDelete, f.flow)
toDeleteChunks = append(toDeleteChunks, f)
continue
}
// Check wild card match
- if fu.FlowMatchesMod(f.flow, mod) {
+ if fu.FlowMatchesMod(ctx, f.flow, mod) {
toDelete = append(toDelete, f.flow)
toDeleteChunks = append(toDeleteChunks, f)
}
@@ -280,7 +280,7 @@
}
}
var flowMetadata voltha.FlowMetadata
- if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+ if err := agent.GetMeterConfig(ctx, toDelete, meters, &flowMetadata); err != nil { // This should never happen
logger.Error("Meter-referred-in-flows-not-present")
return err
}
@@ -308,7 +308,7 @@
// Wait for the responses
go func() {
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
// TODO: Revert the flow deletion
}
@@ -325,7 +325,7 @@
return nil
}
- flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ flow, err := fu.FlowStatsEntryFromFlowModMessage(ctx, mod)
if err != nil {
return err
}
@@ -355,7 +355,7 @@
var flowMetadata voltha.FlowMetadata
flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
- if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+ if err := agent.GetMeterConfig(ctx, flowsToDelete, meters, &flowMetadata); err != nil {
logger.Error("meter-referred-in-flows-not-present")
return err
}
@@ -385,7 +385,7 @@
// Wait for completion
go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
@@ -395,11 +395,11 @@
}
//flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModify(ctx context.Context, mod *ofp.OfpFlowMod) error {
return errors.New("flowModify not implemented")
}
//flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalAgent) flowModifyStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
return errors.New("flowModifyStrict not implemented")
}
diff --git a/rw_core/core/device/logical_agent_flow_loader.go b/rw_core/core/device/logical_agent_flow_loader.go
index 84d1a47..d5cc54e 100644
--- a/rw_core/core/device/logical_agent_flow_loader.go
+++ b/rw_core/core/device/logical_agent_flow_loader.go
@@ -96,7 +96,7 @@
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
for flowID, flowChunk := range agent.flows {
- if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
+ if mID := fu.GetMeterIdFromFlow(ctx, flowChunk.flow); mID != 0 && mID == meterID {
logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
@@ -117,7 +117,7 @@
agent.flowLock.Lock()
defer agent.flowLock.Unlock()
for flowID, flowChunk := range agent.flows {
- if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
+ if fu.FlowHasOutGroup(ctx, flowChunk.flow, groupID) {
path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index a0d6c4a..46855f4 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -64,7 +64,7 @@
return fmt.Errorf("Group %d already exists", groupMod.GroupId)
}
- groupEntry := fu.GroupEntryFromGroupMod(groupMod)
+ groupEntry := fu.GroupEntryFromGroupMod(ctx, groupMod)
groupChunk := GroupChunk{
group: groupEntry,
}
@@ -83,20 +83,20 @@
agent.groupLock.Unlock()
return err
}
- deviceRules := fu.NewDeviceRules()
- deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
- fg := fu.NewFlowsAndGroups()
- fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
- deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+ deviceRules := fu.NewDeviceRules(ctx)
+ deviceRules.CreateEntryIfNotExist(ctx, agent.rootDeviceID)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fg.AddGroup(ctx, fu.GroupEntryFromGroupMod(ctx, groupMod))
+ deviceRules.AddFlowsAndGroup(ctx, agent.rootDeviceID, fg)
- logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
+ logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String(ctx)})
// Update the devices
respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
// Wait for completion
go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
@@ -169,14 +169,14 @@
if err != nil {
return err
}
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ logger.Debugw("rules", log.Fields{"rules": deviceRules.String(ctx)})
// Update the devices
respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
// Wait for completion
go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
@@ -202,14 +202,14 @@
groupChunk.lock.Lock()
defer groupChunk.lock.Unlock()
//replace existing group entry with new group definition
- groupEntry := fu.GroupEntryFromGroupMod(groupMod)
- deviceRules := fu.NewDeviceRules()
- deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
- fg := fu.NewFlowsAndGroups()
- fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
- deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+ groupEntry := fu.GroupEntryFromGroupMod(ctx, groupMod)
+ deviceRules := fu.NewDeviceRules(ctx)
+ deviceRules.CreateEntryIfNotExist(ctx, agent.rootDeviceID)
+ fg := fu.NewFlowsAndGroups(ctx)
+ fg.AddGroup(ctx, fu.GroupEntryFromGroupMod(ctx, groupMod))
+ deviceRules.AddFlowsAndGroup(ctx, agent.rootDeviceID, fg)
- logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+ logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String(ctx)})
//update KV
if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
@@ -221,7 +221,7 @@
// Wait for completion
go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, respChnls...); res != nil {
logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
//TODO: Revert flow changes
}
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
index c211f1e..497c35c 100644
--- a/rw_core/core/device/logical_agent_meter.go
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -52,7 +52,7 @@
return nil
}
- meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
agent.meterLock.Lock()
//check if the meter already exists or not
_, ok := agent.meters[meterMod.MeterId]
@@ -116,7 +116,7 @@
if meterMod == nil {
return nil
}
- newMeter := fu.MeterEntryFromMeterMod(meterMod)
+ newMeter := fu.MeterEntryFromMeterMod(ctx, meterMod)
agent.meterLock.RLock()
meterChunk, ok := agent.meters[newMeter.Config.MeterId]
agent.meterLock.RUnlock()
diff --git a/rw_core/core/device/logical_agent_meter_loader.go b/rw_core/core/device/logical_agent_meter_loader.go
index 4408bf1..b511b35 100644
--- a/rw_core/core/device/logical_agent_meter_loader.go
+++ b/rw_core/core/device/logical_agent_meter_loader.go
@@ -93,10 +93,10 @@
}
// GetMeterConfig returns meter config
-func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+func (agent *LogicalAgent) GetMeterConfig(ctx context.Context, flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
m := make(map[uint32]bool)
for _, flow := range flows {
- if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
+ if flowMeterID := fu.GetMeterIdFromFlow(ctx, flow); flowMeterID != 0 && !m[flowMeterID] {
foundMeter := false
// Meter is present in the flow , Get from logical device
for _, meter := range meters {
@@ -122,7 +122,7 @@
func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
flowCommand := modCommand.GetCommand()
- meterID := fu.GetMeterIdFromFlow(flow)
+ meterID := fu.GetMeterIdFromFlow(ctx, flow)
logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
if meterID == 0 {
logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7845ad5..7235791 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -53,12 +53,12 @@
if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, true)
+ agent.addLogicalPortToMap(ctx, port.PortNo, true)
} else if port.Type == voltha.Port_ETHERNET_UNI {
if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
return err
}
- agent.addLogicalPortToMap(port.PortNo, false)
+ agent.addLogicalPortToMap(ctx, port.PortNo, false)
} else {
// Update the device routes to ensure all routes on the logical device have been calculated
if err = agent.buildRoutes(ctx); err != nil {
@@ -88,18 +88,18 @@
}
responses := make([]coreutils.Response, 0)
for _, child := range children.Items {
- response := coreutils.NewResponse()
+ response := coreutils.NewResponse(ctx)
responses = append(responses, response)
go func(child *voltha.Device) {
if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
logger.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
- response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
+ response.Error(ctx, status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
}
- response.Done()
+ response.Done(ctx)
}(child)
}
// Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(ctx, agent.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -123,7 +123,7 @@
if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
- agent.addLogicalPortToMap(port.PortNo, true)
+ agent.addLogicalPortToMap(ctx, port.PortNo, true)
}
}
return err
@@ -135,10 +135,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
+ original := agent.getLogicalDeviceWithoutLock(ctx)
+ updatedPorts := clonePorts(ctx, original.Ports)
for _, port := range updatedPorts {
if port.DeviceId == deviceID && port.DevicePortNo == portNo {
if operStatus == voltha.OperStatus_ACTIVE {
@@ -165,10 +165,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
+ original := agent.getLogicalDeviceWithoutLock(ctx)
+ updatedPorts := clonePorts(ctx, original.Ports)
for _, port := range updatedPorts {
if port.DeviceId == device.Id {
if state == voltha.OperStatus_ACTIVE {
@@ -201,7 +201,7 @@
logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
}
if added {
- agent.addLogicalPortToMap(port.PortNo, false)
+ agent.addLogicalPortToMap(ctx, port.PortNo, false)
}
}
}
@@ -214,9 +214,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
+ cloned := agent.getLogicalDeviceWithoutLock(ctx)
if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
@@ -230,9 +230,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- logicalDevice := agent.getLogicalDeviceWithoutLock()
+ logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
index := -1
for i, logicalPort := range logicalDevice.Ports {
@@ -242,7 +242,7 @@
}
}
if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts := clonePorts(ctx, logicalDevice.Ports)
if index < len(clonedPorts)-1 {
copy(clonedPorts[index:], clonedPorts[index+1:])
}
@@ -255,7 +255,7 @@
}
// Remove the logical port from cache
- agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
+ agent.deleteLogicalPortsFromMap(ctx, []uint32{lPort.DevicePortNo})
// Reset the logical device routes
go func() {
if err := agent.buildRoutes(context.Background()); err != nil {
@@ -272,9 +272,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- logicalDevice := agent.getLogicalDeviceWithoutLock()
+ logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
lPortstoKeep := []*voltha.LogicalPort{}
lPortsNoToDelete := []uint32{}
for _, logicalPort := range logicalDevice.Ports {
@@ -290,7 +290,7 @@
return err
}
// Remove the port from the cached logical ports set
- agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
+ agent.deleteLogicalPortsFromMap(ctx, lPortsNoToDelete)
// Reset the logical device routes
go func() {
@@ -307,9 +307,9 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
- logicalDevice := agent.getLogicalDeviceWithoutLock()
+ logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
index := -1
for i, logicalPort := range logicalDevice.Ports {
@@ -319,7 +319,7 @@
}
}
if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts := clonePorts(ctx, logicalDevice.Ports)
clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
}
@@ -331,10 +331,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Get the most up to date logical device
- logicalDevice := agent.getLogicalDeviceWithoutLock()
+ logicalDevice := agent.getLogicalDeviceWithoutLock(ctx)
index := -1
for i, logicalPort := range logicalDevice.Ports {
if logicalPort.Id == lPortID {
@@ -343,7 +343,7 @@
}
}
if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts := clonePorts(ctx, logicalDevice.Ports)
clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
}
@@ -360,12 +360,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
- if agent.portExist(device, port) {
+ if agent.portExist(ctx, device, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
return false, nil
}
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
var portCap *ic.PortCapability
var err error
@@ -379,9 +379,9 @@
return false, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(device, port) {
+ if agent.portExist(ctx, device, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
return false, nil
}
@@ -394,9 +394,9 @@
lp.OfpPort.Name = lp.Id
lp.DevicePortNo = port.PortNo
- ld := agent.getLogicalDeviceWithoutLock()
+ ld := agent.getLogicalDeviceWithoutLock(ctx)
- clonedPorts := clonePorts(ld.Ports)
+ clonedPorts := clonePorts(ctx, ld.Ports)
if clonedPorts == nil {
clonedPorts = make([]*voltha.LogicalPort, 0)
}
@@ -418,8 +418,8 @@
return true, nil
}
-func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
- ldevice := agent.getLogicalDeviceWithoutLock()
+func (agent *LogicalAgent) portExist(ctx context.Context, device *voltha.Device, port *voltha.Port) bool {
+ ldevice := agent.getLogicalDeviceWithoutLock(ctx)
for _, lPort := range ldevice.Ports {
if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo {
return true
@@ -442,12 +442,12 @@
return false, err
}
- if agent.portExist(childDevice, port) {
+ if agent.portExist(ctx, childDevice, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
return false, nil
}
- agent.requestQueue.RequestComplete()
+ agent.requestQueue.RequestComplete(ctx)
var portCap *ic.PortCapability
var err error
// First get the port capability
@@ -458,14 +458,14 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return false, err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
// Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(childDevice, port) {
+ if agent.portExist(ctx, childDevice, port) {
logger.Debugw("port-already-exist", log.Fields{"port": port})
return false, nil
}
// Get stored logical device
- ldevice := agent.getLogicalDeviceWithoutLock()
+ ldevice := agent.getLogicalDeviceWithoutLock(ctx)
logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
portCap.Port.RootPort = false
@@ -473,7 +473,7 @@
portCap.Port.OfpPort.PortNo = port.PortNo
portCap.Port.DeviceId = childDevice.Id
portCap.Port.DevicePortNo = port.PortNo
- clonedPorts := clonePorts(ldevice.Ports)
+ clonedPorts := clonePorts(ctx, ldevice.Ports)
if clonedPorts == nil {
clonedPorts = make([]*voltha.LogicalPort, 0)
}
@@ -493,7 +493,7 @@
return true, nil
}
-func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
+func clonePorts(ctx context.Context, ports []*voltha.LogicalPort) []*voltha.LogicalPort {
return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
}
@@ -504,12 +504,12 @@
if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
return err
}
- agent.portUpdated(oldPorts, newPorts)
+ agent.portUpdated(ctx, oldPorts, newPorts)
return nil
}
// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
+func diff(ctx context.Context, oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
newPorts = make(map[string]*voltha.LogicalPort, len(newList))
changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
@@ -533,21 +533,21 @@
}
// portUpdated is invoked when a port is updated on the logical device
-func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
+func (agent *LogicalAgent) portUpdated(ctx context.Context, prevPorts, currPorts []*voltha.LogicalPort) interface{} {
// Get the difference between the two list
- newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
+ newPorts, changedPorts, deletedPorts := diff(ctx, prevPorts, currPorts)
// Send the port change events to the OF controller
for _, newP := range newPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
}
for _, change := range changedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
}
for _, del := range deletedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(ctx, agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
}
@@ -557,13 +557,13 @@
//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
//device is already held. Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+func (agent *LogicalAgent) GetWildcardInputPorts(ctx context.Context, excludePort ...uint32) []uint32 {
lPorts := make([]uint32, 0)
var exclPort uint32
if len(excludePort) == 1 {
exclPort = excludePort[0]
}
- lDevice := agent.getLogicalDeviceWithoutLock()
+ lDevice := agent.getLogicalDeviceWithoutLock(ctx)
for _, port := range lDevice.Ports {
if port.OfpPort.PortNo != exclPort {
lPorts = append(lPorts, port.OfpPort.PortNo)
@@ -574,7 +574,7 @@
// helpers for agent.logicalPortsNo
-func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+func (agent *LogicalAgent) addLogicalPortToMap(ctx context.Context, portNo uint32, nniPort bool) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
if exist := agent.logicalPortsNo[portNo]; !exist {
@@ -582,7 +582,7 @@
}
}
-func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
+func (agent *LogicalAgent) addLogicalPortsToMap(ctx context.Context, lps []*voltha.LogicalPort) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
for _, lp := range lps {
@@ -592,7 +592,7 @@
}
}
-func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
+func (agent *LogicalAgent) deleteLogicalPortsFromMap(ctx context.Context, portsNo []uint32) {
agent.lockLogicalPortsNo.Lock()
defer agent.lockLogicalPortsNo.Unlock()
for _, pNo := range portsNo {
@@ -600,7 +600,7 @@
}
}
-func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
+func (agent *LogicalAgent) isNNIPort(ctx context.Context, portNo uint32) bool {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
if exist := agent.logicalPortsNo[portNo]; exist {
@@ -609,7 +609,7 @@
return false
}
-func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
+func (agent *LogicalAgent) getFirstNNIPort(ctx context.Context) (uint32, error) {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
for portNo, nni := range agent.logicalPortsNo {
@@ -621,7 +621,7 @@
}
//GetNNIPorts returns NNI ports.
-func (agent *LogicalAgent) GetNNIPorts() []uint32 {
+func (agent *LogicalAgent) GetNNIPorts(ctx context.Context) []uint32 {
agent.lockLogicalPortsNo.RLock()
defer agent.lockLogicalPortsNo.RUnlock()
nniPorts := make([]uint32, 0)
@@ -634,13 +634,13 @@
}
// getUNILogicalPortNo returns the UNI logical port number specified in the flow
-func (agent *LogicalAgent) getUNILogicalPortNo(flow *ofp.OfpFlowStats) (uint32, error) {
+func (agent *LogicalAgent) getUNILogicalPortNo(ctx context.Context, flow *ofp.OfpFlowStats) (uint32, error) {
var uniPort uint32
- inPortNo := fu.GetInPort(flow)
- outPortNo := fu.GetOutPort(flow)
- if agent.isNNIPort(inPortNo) {
+ inPortNo := fu.GetInPort(ctx, flow)
+ outPortNo := fu.GetOutPort(ctx, flow)
+ if agent.isNNIPort(ctx, inPortNo) {
uniPort = outPortNo
- } else if agent.isNNIPort(outPortNo) {
+ } else if agent.isNNIPort(ctx, outPortNo) {
uniPort = inPortNo
}
if uniPort != 0 {
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 6736160..db1c65c 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -39,7 +39,7 @@
// Consider different possibilities
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
- if agent.isNNIPort(ingressPortNo) {
+ if agent.isNNIPort(ctx, ingressPortNo) {
//This is a trap on the NNI Port
if len(agent.deviceRoutes.Routes) == 0 {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
@@ -51,7 +51,7 @@
}
//Return a 'half' route to make the flow decomposer logic happy
for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
+ if agent.isNNIPort(ctx, routeLink.Egress) {
routes = append(routes, route.Hop{}) // first hop is set to empty
routes = append(routes, path[1])
return routes, nil
@@ -61,7 +61,7 @@
}
//treat it as if the output port is the first NNI of the OLT
var err error
- if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+ if egressPortNo, err = agent.getFirstNNIPort(ctx); err != nil {
logger.Warnw("no-nni-port", log.Fields{"error": err})
return nil, err
}
@@ -70,10 +70,10 @@
//route if egress port is OFPP_CONTROLLER or a nni logical port,
//in which case we need to create a half-route where only the egress
//hop is filled, the first hop is nil
- if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
+ if ingressPortNo == 0 && agent.isNNIPort(ctx, egressPortNo) {
// We can use the 2nd hop of any upstream route, so just find the first upstream:
for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
+ if agent.isNNIPort(ctx, routeLink.Egress) {
routes = append(routes, route.Hop{}) // first hop is set to empty
routes = append(routes, path[1])
return routes, nil
@@ -93,10 +93,10 @@
return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
}
// Return the pre-calculated route
- return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
+ return agent.getPreCalculatedRoute(ctx, ingressPortNo, egressPortNo)
}
-func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
+func (agent *LogicalAgent) getPreCalculatedRoute(ctx context.Context, ingress, egress uint32) ([]route.Hop, error) {
logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
for routeLink, route := range agent.deviceRoutes.Routes {
logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
@@ -108,7 +108,7 @@
}
// GetDeviceRoutes returns device graph
-func (agent *LogicalAgent) GetDeviceRoutes() *route.DeviceRoutes {
+func (agent *LogicalAgent) GetDeviceRoutes(ctx context.Context) *route.DeviceRoutes {
return agent.deviceRoutes
}
@@ -123,7 +123,7 @@
return err
}
- if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
+ if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ctx, ld) {
return nil
}
logger.Debug("Generation of device route required")
@@ -142,18 +142,18 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
}
// Get all the logical ports on that logical device
- lDevice := agent.getLogicalDeviceWithoutLock()
+ lDevice := agent.getLogicalDeviceWithoutLock(ctx)
if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
return err
}
- if err := agent.deviceRoutes.Print(); err != nil {
+ if err := agent.deviceRoutes.Print(ctx); err != nil {
return err
}
return nil
@@ -165,15 +165,15 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ defer agent.requestQueue.RequestComplete(ctx)
if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ agent.deviceRoutes = route.NewDeviceRoutes(ctx, agent.logicalDeviceID, agent.deviceMgr.getDevice)
}
if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
return err
}
- if err := agent.deviceRoutes.Print(); err != nil {
+ if err := agent.deviceRoutes.Print(ctx); err != nil {
return err
}
return nil
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8ec5454..cad89bf 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -42,7 +42,7 @@
func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
currentLogicalPorts := []*voltha.LogicalPort{}
updatedLogicalPorts := []*voltha.LogicalPort{}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 0, len(newPorts))
assert.Equal(t, 0, len(changedPorts))
assert.Equal(t, 0, len(deletedPorts))
@@ -125,7 +125,7 @@
},
},
}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 0, len(newPorts))
assert.Equal(t, 0, len(changedPorts))
assert.Equal(t, 0, len(deletedPorts))
@@ -159,7 +159,7 @@
},
},
}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 2, len(newPorts))
assert.Equal(t, 0, len(changedPorts))
assert.Equal(t, 0, len(deletedPorts))
@@ -183,7 +183,7 @@
},
}
updatedLogicalPorts := []*voltha.LogicalPort{}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 0, len(newPorts))
assert.Equal(t, 0, len(changedPorts))
assert.Equal(t, 1, len(deletedPorts))
@@ -267,7 +267,7 @@
},
},
}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 0, len(newPorts))
assert.Equal(t, 2, len(changedPorts))
assert.Equal(t, 0, len(deletedPorts))
@@ -352,7 +352,7 @@
},
},
}
- newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
+ newPorts, changedPorts, deletedPorts := diff(context.Background(), currentLogicalPorts, updatedLogicalPorts)
assert.Equal(t, 1, len(newPorts))
assert.Equal(t, 2, len(changedPorts))
assert.Equal(t, 1, len(deletedPorts))
@@ -387,7 +387,7 @@
logger.Fatal(err)
}
// Create the kafka client
- test.kClient = mock_kafka.NewKafkaClient()
+ test.kClient = mock_kafka.NewKafkaClient(context.Background())
test.oltAdapterName = "olt_adapter_mock"
test.onuAdapterName = "onu_adapter_mock"
test.coreInstanceID = "rw-da-test"
@@ -458,6 +458,7 @@
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStorePort = lda.kvClientPort
cfg.InCompetingMode = inCompeteMode
+ ctx := context.Background()
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
@@ -474,29 +475,30 @@
LivenessChannelInterval: cfg.LiveProbeInterval / 2,
PathPrefix: cfg.KVStoreDataPrefix}
lda.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
- kafka.MsgClient(lda.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ ctx,
+ kafka.InterContainerHost(ctx, cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(ctx, cfg.KafkaAdapterPort),
+ kafka.MsgClient(ctx, lda.kClient),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: cfg.AffinityRouterTopic}))
- endpointMgr := kafka.NewEndpointManager(backend)
- proxy := model.NewProxy(backend, "/")
- adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
+ endpointMgr := kafka.NewEndpointManager(ctx, backend)
+ proxy := model.NewProxy(ctx, backend, "/")
+ adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
- if err = lda.kmp.Start(); err != nil {
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(ctx, proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+ if err = lda.kmp.Start(ctx); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
- adapterMgr.Start(context.Background())
+ adapterMgr.Start(ctx)
}
func (lda *LDATest) stopAll() {
if lda.kClient != nil {
- lda.kClient.Stop()
+ lda.kClient.Stop(context.Background())
}
if lda.kmp != nil {
- lda.kmp.Stop()
+ lda.kmp.Stop(context.Background())
}
if lda.etcdServer != nil {
stopEmbeddedEtcdServer(lda.etcdServer)
@@ -509,11 +511,11 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalDeviceAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
assert.Nil(t, err)
- lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
+ lDeviceMgr.addLogicalDeviceAgentToMap(context.Background(), lDeviceAgent)
return lDeviceAgent
}
@@ -569,7 +571,7 @@
}()
// wait for go routines to be done
localWG.Wait()
- meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ meterEntry := fu.MeterEntryFromMeterMod(context.Background(), meterMod)
meterChunk, ok := ldAgent.meters[meterMod.MeterId]
assert.Equal(t, ok, true)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index d9b6731..89fdf34 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -48,7 +48,7 @@
logicalDeviceLoadingInProgress map[string][]chan int
}
-func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
+func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(ctx context.Context, agent *LogicalAgent) {
if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
}
@@ -79,7 +79,7 @@
return nil
}
-func (ldMgr *LogicalManager) deleteLogicalDeviceAgent(logicalDeviceID string) {
+func (ldMgr *LogicalManager) deleteLogicalDeviceAgent(ctx context.Context, logicalDeviceID string) {
ldMgr.logicalDeviceAgents.Delete(logicalDeviceID)
}
@@ -116,7 +116,7 @@
// For now use the serial number - it may contain any combination of alphabetic characters and numbers,
// with length varying from eight characters to a maximum of 14 characters. Mac Address is part of oneof
// in the Device model. May need to be moved out.
- id := utils.CreateLogicalDeviceID()
+ id := utils.CreateLogicalDeviceID(ctx)
sn := strings.Replace(device.MacAddress, ":", "", -1)
if id == "" {
logger.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id, "serial-number": sn})
@@ -125,8 +125,8 @@
logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalDeviceAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
- ldMgr.addLogicalDeviceAgentToMap(agent)
+ agent := newLogicalDeviceAgent(ctx, id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ ldMgr.addLogicalDeviceAgentToMap(ctx, agent)
// Update the root device with the logical device Id reference
if err := ldMgr.deviceMgr.setParentID(ctx, device, id); err != nil {
@@ -139,7 +139,7 @@
err := agent.start(context.Background(), false)
if err != nil {
logger.Errorw("unable-to-create-the-logical-device", log.Fields{"error": err})
- ldMgr.deleteLogicalDeviceAgent(id)
+ ldMgr.deleteLogicalDeviceAgent(ctx, id)
}
}()
@@ -196,7 +196,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalDeviceAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+ agent := newLogicalDeviceAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
if err := agent.start(ctx, true); err != nil {
return err
}
@@ -243,7 +243,7 @@
return err
}
//Remove the logical device agent from the Map
- ldMgr.deleteLogicalDeviceAgent(logDeviceID)
+ ldMgr.deleteLogicalDeviceAgent(ctx, logDeviceID)
}
logger.Debug("deleting-logical-device-ends")
@@ -539,7 +539,7 @@
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
logger.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
- agent.packetIn(port, transactionID, packet)
+ agent.packetIn(ctx, port, transactionID, packet)
} else {
logger.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceID})
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 0c4448b..3f88094 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -57,21 +57,21 @@
deviceLoadingInProgress map[string][]chan int
}
-func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(ctx context.Context, proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
- adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
+ adapterProxy: remote.NewAdapterProxy(ctx, kmp, corePairTopic, endpointMgr),
coreInstanceID: coreInstanceID,
clusterDataProxy: proxy,
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout * time.Millisecond,
deviceLoadingInProgress: make(map[string][]chan int),
}
- deviceMgr.stateTransitions = NewTransitionMap(deviceMgr)
+ deviceMgr.stateTransitions = NewTransitionMap(ctx, deviceMgr)
logicalDeviceMgr := &LogicalManager{
- Manager: event.NewManager(),
+ Manager: event.NewManager(ctx),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
clusterDataProxy: proxy,
@@ -80,12 +80,12 @@
}
deviceMgr.logicalDeviceMgr = logicalDeviceMgr
- adapterMgr.SetAdapterRestartedCallback(deviceMgr.adapterRestarted)
+ adapterMgr.SetAdapterRestartedCallback(ctx, deviceMgr.adapterRestarted)
return deviceMgr, logicalDeviceMgr
}
-func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
+func (dMgr *Manager) addDeviceAgentToMap(ctx context.Context, agent *Agent) {
if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
dMgr.deviceAgents.Store(agent.deviceID, agent)
}
@@ -95,7 +95,7 @@
}
-func (dMgr *Manager) deleteDeviceAgentFromMap(agent *Agent) {
+func (dMgr *Manager) deleteDeviceAgentFromMap(ctx context.Context, agent *Agent) {
dMgr.deviceAgents.Delete(agent.deviceID)
dMgr.lockRootDeviceMap.Lock()
defer dMgr.lockRootDeviceMap.Unlock()
@@ -123,7 +123,7 @@
}
// listDeviceIdsFromMap returns the list of device IDs that are in memory
-func (dMgr *Manager) listDeviceIdsFromMap() *voltha.IDs {
+func (dMgr *Manager) listDeviceIdsFromMap(ctx context.Context) *voltha.IDs {
result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
@@ -137,7 +137,6 @@
// CreateDevice creates a new parent device in the data model
func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
if device.MacAddress == "" && device.GetHostAndPort() == "" {
- logger.Errorf("No Device Info Present")
return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
}
logger.Debugw("create-device", log.Fields{"device": *device})
@@ -156,13 +155,13 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
return nil, err
}
- dMgr.addDeviceAgentToMap(agent)
+ dMgr.addDeviceAgentToMap(ctx, agent)
return device, nil
}
@@ -242,8 +241,8 @@
// deletion deletion also includes removal of any reference of this device.
func (dMgr *Manager) stopManagingDevice(ctx context.Context, id string) {
logger.Infow("stopManagingDevice", log.Fields{"deviceId": id})
- if dMgr.IsDeviceInCache(id) { // Proceed only if an agent is present for this device
- if root, _ := dMgr.IsRootDevice(id); root {
+ if dMgr.IsDeviceInCache(ctx, id) { // Proceed only if an agent is present for this device
+ if root, _ := dMgr.IsRootDevice(ctx, id); root {
// stop managing the logical device
_ = dMgr.logicalDeviceMgr.stopManagingLogicalDeviceWithDeviceID(ctx, id)
}
@@ -251,7 +250,7 @@
if err := agent.stop(ctx); err != nil {
logger.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
}
- dMgr.deleteDeviceAgentFromMap(agent)
+ dMgr.deleteDeviceAgentFromMap(ctx, agent)
}
}
}
@@ -287,7 +286,7 @@
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
if len(childDeviceIds) == 0 {
@@ -349,7 +348,7 @@
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
if len(childDeviceIds) == 0 {
@@ -377,13 +376,13 @@
}
// IsDeviceInCache returns true if device is found in the map
-func (dMgr *Manager) IsDeviceInCache(id string) bool {
+func (dMgr *Manager) IsDeviceInCache(ctx context.Context, id string) bool {
_, exist := dMgr.deviceAgents.Load(id)
return exist
}
// IsRootDevice returns true if root device is found in the map
-func (dMgr *Manager) IsRootDevice(id string) (bool, error) {
+func (dMgr *Manager) IsRootDevice(ctx context.Context, id string) (bool, error) {
dMgr.lockRootDeviceMap.RLock()
defer dMgr.lockRootDeviceMap.RUnlock()
if exist := dMgr.rootDevices[id]; exist {
@@ -405,13 +404,13 @@
for _, device := range devices {
// If device is not in memory then set it up
- if !dMgr.IsDeviceInCache(device.Id) {
+ if !dMgr.IsDeviceInCache(ctx, device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
- dMgr.addDeviceAgentToMap(agent)
+ dMgr.addDeviceAgentToMap(ctx, agent)
}
}
result.Items = append(result.Items, device)
@@ -464,17 +463,17 @@
var device *voltha.Device
dMgr.devicesLoadingLock.Lock()
if _, exist := dMgr.deviceLoadingInProgress[deviceID]; !exist {
- if !dMgr.IsDeviceInCache(deviceID) {
+ if !dMgr.IsDeviceInCache(ctx, deviceID) {
dMgr.deviceLoadingInProgress[deviceID] = []chan int{make(chan int, 1)}
dMgr.devicesLoadingLock.Unlock()
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(ctx, dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
- dMgr.addDeviceAgentToMap(agent)
+ dMgr.addDeviceAgentToMap(ctx, agent)
}
} else {
logger.Debugw("Device not in model", log.Fields{"deviceId": deviceID})
@@ -518,7 +517,7 @@
logger.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
}
// Load all child devices, if needed
- if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+ if childDeviceIds, err := dMgr.getAllChildDeviceIds(ctx, device); err == nil {
for _, childDeviceID := range childDeviceIds {
if _, err := dMgr.loadDevice(ctx, childDeviceID); err != nil {
logger.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceID, "error": err})
@@ -574,10 +573,10 @@
}
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
-func (dMgr *Manager) ListDeviceIds(_ context.Context, _ *empty.Empty) (*voltha.IDs, error) {
+func (dMgr *Manager) ListDeviceIds(ctx context.Context, _ *empty.Empty) (*voltha.IDs, error) {
logger.Debug("ListDeviceIDs")
// Report only device IDs that are in the device agent map
- return dMgr.listDeviceIdsFromMap(), nil
+ return dMgr.listDeviceIdsFromMap(ctx), nil
}
// ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
@@ -605,7 +604,7 @@
}
// isOkToReconcile validates whether a device is in the correct status to be reconciled
-func isOkToReconcile(device *voltha.Device) bool {
+func isOkToReconcile(ctx context.Context, device *voltha.Device) bool {
if device == nil {
return false
}
@@ -626,13 +625,13 @@
responses := make([]utils.Response, 0)
for rootDeviceID := range dMgr.rootDevices {
if rootDevice, _ := dMgr.getDeviceFromModel(ctx, rootDeviceID); rootDevice != nil {
- isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(rootDeviceID, adapter.Type, adapter.CurrentReplica)
+ isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, rootDeviceID, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "root-device-id": rootDeviceID, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
continue
}
if isDeviceOwnedByService {
- if isOkToReconcile(rootDevice) {
+ if isOkToReconcile(ctx, rootDevice) {
logger.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
} else {
@@ -643,12 +642,12 @@
for _, port := range rootDevice.Ports {
for _, peer := range port.Peers {
if childDevice, _ := dMgr.getDeviceFromModel(ctx, peer.DeviceId); childDevice != nil {
- isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(childDevice.Id, adapter.Type, adapter.CurrentReplica)
+ isDeviceOwnedByService, err := dMgr.adapterProxy.IsDeviceOwnedByService(ctx, childDevice.Id, adapter.Type, adapter.CurrentReplica)
if err != nil {
logger.Warnw("is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapterType": adapter.Type, "replica-number": adapter.CurrentReplica})
}
if isDeviceOwnedByService {
- if isOkToReconcile(childDevice) {
+ if isOkToReconcile(ctx, childDevice) {
logger.Debugw("reconciling-child-device", log.Fields{"child-device-id": childDevice.Id})
responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
} else {
@@ -667,7 +666,7 @@
}
if len(responses) > 0 {
// Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
+ if res := utils.WaitForNilOrErrorResponses(ctx, dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
} else {
@@ -681,20 +680,20 @@
// point of creating a device agent (if the device is not being managed by this Core) before sending the request
// to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
// the adapter proxy.
- response := utils.NewResponse()
+ response := utils.NewResponse(ctx)
ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
if err != nil {
- response.Error(err)
+ response.Error(ctx, err)
}
// Wait for adapter response in its own routine
go func() {
resp, ok := <-ch
if !ok {
- response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
+ response.Error(ctx, status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
} else if resp.Err != nil {
- response.Error(resp.Err)
+ response.Error(ctx, resp.Err)
}
- response.Done()
+ response.Done(ctx)
}()
return response
}
@@ -710,7 +709,7 @@
}
}
// Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
+ if res := utils.WaitForNilOrErrorResponses(ctx, dMgr.defaultTimeout, responses...); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
}
@@ -869,7 +868,7 @@
return status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
}
if len(childDeviceIds) == 0 {
@@ -1024,13 +1023,13 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+ agent := newAgent(ctx, dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
return nil, err
}
- dMgr.addDeviceAgentToMap(agent)
+ dMgr.addDeviceAgentToMap(ctx, agent)
// Activate the child device
if agent = dMgr.getDeviceAgent(ctx, agent.deviceID); agent != nil {
@@ -1044,7 +1043,7 @@
// Publish on the messaging bus that we have discovered new devices
go func() {
- err := dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
+ err := dMgr.kafkaICProxy.DeviceDiscovered(ctx, agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
if err != nil {
logger.Errorw("unable-to-discover-the-device", log.Fields{"error": err})
}
@@ -1064,16 +1063,16 @@
"curr-oper-state": device.OperStatus,
"curr-conn-state": device.ConnectStatus,
})
- handlers := dMgr.stateTransitions.GetTransitionHandler(device, previousState)
+ handlers := dMgr.stateTransitions.GetTransitionHandler(ctx, device, previousState)
if handlers == nil {
logger.Debugw("no-op-transition", log.Fields{"deviceId": device.Id})
return nil
}
logger.Debugw("handler-found", log.Fields{"num-expectedHandlers": len(handlers), "isParent": device.Root, "current-data": device, "previous-state": previousState})
for _, handler := range handlers {
- logger.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
+ logger.Debugw("running-handler", log.Fields{"handler": funcName(ctx, handler)})
if err := handler(ctx, device); err != nil {
- logger.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
+ logger.Warnw("handler-failed", log.Fields{"handler": funcName(ctx, handler), "error": err})
return err
}
}
@@ -1210,7 +1209,7 @@
return err
}
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentDevice); err != nil {
return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
}
if len(childDeviceIds) == 0 {
@@ -1248,7 +1247,7 @@
logger.Debug("DisableAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
}
if len(childDeviceIds) == 0 {
@@ -1270,7 +1269,7 @@
logger.Debug("DeleteAllChildDevices")
var childDeviceIds []string
var err error
- if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+ if childDeviceIds, err = dMgr.getAllChildDeviceIds(ctx, parentCurrDevice); err != nil {
return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
}
if len(childDeviceIds) == 0 {
@@ -1312,7 +1311,7 @@
}
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
-func (dMgr *Manager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
+func (dMgr *Manager) getAllChildDeviceIds(ctx context.Context, parentDevice *voltha.Device) ([]string, error) {
logger.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
childDeviceIds := make([]string, 0)
if parentDevice != nil {
@@ -1331,7 +1330,7 @@
logger.Debugw("GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
if parentDevice, err := dMgr.getDevice(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
- if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+ if childDeviceIds, er := dMgr.getAllChildDeviceIds(ctx, parentDevice); er == nil {
for _, deviceID := range childDeviceIds {
if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
childDevices = append(childDevices, d)
@@ -1491,7 +1490,7 @@
return nil
}
-func funcName(f interface{}) string {
+func funcName(ctx context.Context, f interface{}) string {
p := reflect.ValueOf(f).Pointer()
rf := runtime.FuncForPC(p)
return rf.Name()
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index f4579ef..eab5aa7 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -34,7 +34,7 @@
}
// NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
return &AdapterProxy{
EndpointManager: endpointManager,
kafkaICProxy: kafkaProxy,
@@ -43,13 +43,12 @@
}
}
-func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
+func (ap *AdapterProxy) getCoreTopic(ctx context.Context) kafka.Topic {
return kafka.Topic{Name: ap.corePairTopic}
}
-func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
-
- endpoint, err := ap.GetEndpoint(deviceID, adapterType)
+func (ap *AdapterProxy) getAdapterTopic(ctx context.Context, deviceID string, adapterType string) (*kafka.Topic, error) {
+ endpoint, err := ap.GetEndpoint(ctx, deviceID, adapterType)
if err != nil {
return nil, err
}
@@ -59,7 +58,6 @@
func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
-
// Sent the request to kafka
respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
@@ -77,14 +75,14 @@
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("AdoptDevice", log.Fields{"device-id": device.Id})
rpc := "adopt_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
ap.deviceTopicRegistered = true
logger.Debugw("adoptDevice-send-request", log.Fields{"device-id": device.Id, "deviceType": device.Type, "serialNumber": device.SerialNumber})
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
@@ -94,14 +92,14 @@
func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("DisableDevice", log.Fields{"device-id": device.Id})
rpc := "disable_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -109,14 +107,14 @@
func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("ReEnableDevice", log.Fields{"device-id": device.Id})
rpc := "reenable_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -124,14 +122,14 @@
func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("RebootDevice", log.Fields{"device-id": device.Id})
rpc := "reboot_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -139,14 +137,14 @@
func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("DeleteDevice", log.Fields{"device-id": device.Id})
rpc := "delete_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -154,21 +152,21 @@
func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("GetOfpDeviceInfo", log.Fields{"device-id": device.Id})
rpc := "get_ofp_device_info"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// GetOfpPortInfo invokes get ofp port info rpc
func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
logger.Debugw("GetOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -176,7 +174,7 @@
{Key: "device", Value: device},
{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, "get_ofp_port_info", toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -184,14 +182,14 @@
func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
logger.Debugw("ReconcileDevice", log.Fields{"device-id": device.Id})
rpc := "reconcile_device"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
args := []*kafka.KVArg{
{Key: "device", Value: device},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -199,7 +197,7 @@
func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("DownloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "download_image"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -207,7 +205,7 @@
{Key: "device", Value: device},
{Key: "request", Value: download},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -215,7 +213,7 @@
func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "get_image_download_status"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -223,7 +221,7 @@
{Key: "device", Value: device},
{Key: "request", Value: download},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -231,7 +229,7 @@
func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("CancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "cancel_image_download"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -239,7 +237,7 @@
{Key: "device", Value: device},
{Key: "request", Value: download},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -247,7 +245,7 @@
func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "activate_image_update"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -255,7 +253,7 @@
{Key: "device", Value: device},
{Key: "request", Value: download},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -263,7 +261,7 @@
func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
logger.Debugw("RevertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
rpc := "revert_image_update"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -271,13 +269,13 @@
{Key: "device", Value: device},
{Key: "request", Value: download},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
func (ap *AdapterProxy) PacketOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
logger.Debugw("PacketOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
- toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+ toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
if err != nil {
return nil, err
}
@@ -287,14 +285,14 @@
{Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
{Key: "packet", Value: packet},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
// UpdateFlowsBulk invokes update flows bulk rpc
func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
logger.Debugw("UpdateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -305,8 +303,8 @@
{Key: "groups", Value: groups},
{Key: "flow_metadata", Value: flowMetadata},
}
- replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+ replyToTopic := ap.getCoreTopic(ctx)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// UpdateFlowsIncremental invokes update flows incremental rpc
@@ -320,7 +318,7 @@
"group-to-delete-count": len(groupChanges.ToRemove.Items),
"group-to-update-count": len(groupChanges.ToUpdate.Items),
})
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -331,14 +329,14 @@
{Key: "group_changes", Value: groupChanges},
{Key: "flow_metadata", Value: flowMetadata},
}
- replyToTopic := ap.getCoreTopic()
- return ap.sendRPC(context.TODO(), rpc, toTopic, &replyToTopic, true, device.Id, args...)
+ replyToTopic := ap.getCoreTopic(ctx)
+ return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
// UpdatePmConfigs invokes update pm configs rpc
func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
logger.Debugw("UpdatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -347,7 +345,7 @@
{Key: "device", Value: device},
{Key: "pm_configs", Value: pmConfigs},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -355,7 +353,7 @@
func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
logger.Debugw("SimulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
rpc := "simulate_alarm"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -363,7 +361,7 @@
{Key: "device", Value: device},
{Key: "request", Value: simulateReq},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
ap.deviceTopicRegistered = true
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -371,7 +369,7 @@
func (ap *AdapterProxy) DisablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
logger.Debugw("DisablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "disable_port"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -379,14 +377,14 @@
{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
{Key: "port", Value: port},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
func (ap *AdapterProxy) EnablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
logger.Debugw("EnablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
rpc := "enable_port"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
@@ -394,7 +392,7 @@
{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
{Key: "port", Value: port},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id, args...)
}
@@ -402,7 +400,7 @@
func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, deviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
logger.Debugw("ChildDeviceLost", log.Fields{"device-id": deviceID, "parent-port-no": pPortNo, "onu-id": onuID})
rpc := "child_device_lost"
- toTopic, err := ap.getAdapterTopic(deviceID, deviceType)
+ toTopic, err := ap.getAdapterTopic(ctx, deviceID, deviceType)
if err != nil {
return nil, err
}
@@ -411,19 +409,19 @@
{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, deviceID, args...)
}
func (ap *AdapterProxy) StartOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
logger.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
rpc := "start_omci_test"
- toTopic, err := ap.getAdapterTopic(device.Id, device.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, device.Id, device.Adapter)
if err != nil {
return nil, err
}
// Use a device specific topic as we are the only core handling requests for this device
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
// TODO: Perhaps this should have used omcitestrequest.uuid as the second argument rather
// than including the whole request, which is (deviceid, uuid)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, device.Id,
@@ -434,7 +432,7 @@
func (ap *AdapterProxy) GetExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, id string, valuetype voltha.ValueType_Type) (chan *kafka.RpcResponse, error) {
log.Debugw("GetExtValue", log.Fields{"device-id": pdevice.Id, "onuid": id})
rpc := "get_ext_value"
- toTopic, err := ap.getAdapterTopic(pdevice.Id, pdevice.Adapter)
+ toTopic, err := ap.getAdapterTopic(ctx, pdevice.Id, pdevice.Adapter)
if err != nil {
return nil, err
}
@@ -454,6 +452,6 @@
Value: &ic.IntType{Val: int64(valuetype)},
}}
- replyToTopic := ap.getCoreTopic()
+ replyToTopic := ap.getCoreTopic(ctx)
return ap.sendRPC(ctx, rpc, toTopic, &replyToTopic, true, pdevice.Id, args...)
}
diff --git a/rw_core/core/device/remote/adapter_proxy_test.go b/rw_core/core/device/remote/adapter_proxy_test.go
index 17627dc..d3b86fa 100755
--- a/rw_core/core/device/remote/adapter_proxy_test.go
+++ b/rw_core/core/device/remote/adapter_proxy_test.go
@@ -58,35 +58,38 @@
log.SetAllLogLevel(log.WarnLevel)
var err error
+ ctx := context.Background()
// Create the KV client
- kc = mock_kafka.NewKafkaClient()
+ kc = mock_kafka.NewKafkaClient(ctx)
// Setup core inter-container proxy and core request handler
coreKafkaICProxy = kafka.NewInterContainerProxy(
- kafka.MsgClient(kc),
- kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
+ ctx,
+ kafka.MsgClient(ctx, kc),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreName}))
- if err = coreKafkaICProxy.Start(); err != nil {
+ if err = coreKafkaICProxy.Start(ctx); err != nil {
logger.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
}
- if err = coreKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: coreName}, 0); err != nil {
+ if err = coreKafkaICProxy.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: coreName}, 0); err != nil {
logger.Fatalw("Failure-subscribing-core-request-handler", log.Fields{"error": err})
}
// Setup adapter inter-container proxy and adapter request handler
- adapterCoreProxy := com.NewCoreProxy(nil, adapterName, coreName)
- adapter = cm.NewAdapter(adapterCoreProxy)
- adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+ adapterCoreProxy := com.NewCoreProxy(ctx, nil, adapterName, coreName)
+ adapter = cm.NewAdapter(ctx, adapterCoreProxy)
+ adapterReqHandler = com.NewRequestHandlerProxy(ctx, coreInstanceID, adapter, adapterCoreProxy)
adapterKafkaICProxy = kafka.NewInterContainerProxy(
- kafka.MsgClient(kc),
- kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
- kafka.RequestHandlerInterface(adapterReqHandler))
+ ctx,
+ kafka.MsgClient(ctx, kc),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: adapterName}),
+ kafka.RequestHandlerInterface(ctx, adapterReqHandler))
- if err = adapterKafkaICProxy.Start(); err != nil {
+ if err = adapterKafkaICProxy.Start(ctx); err != nil {
logger.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
}
- if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: adapterName}, 0); err != nil {
+ if err = adapterKafkaICProxy.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: adapterName}, 0); err != nil {
logger.Fatalw("Failure-subscribing-adapter-request-handler", log.Fields{"error": err})
}
}
@@ -98,7 +101,7 @@
}
func TestCreateAdapterProxy(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
assert.NotNil(t, ap)
}
@@ -119,7 +122,7 @@
func testSimpleRequests(t *testing.T) {
type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
simpleRequests := []simpleRequest{
ap.AdoptDevice,
ap.DisableDevice,
@@ -162,7 +165,7 @@
}
func testGetSwitchCapabilityFromAdapter(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -174,12 +177,12 @@
err = ptypes.UnmarshalAny(response, switchCap)
assert.Nil(t, err)
assert.NotNil(t, switchCap)
- expectedCap, _ := adapter.Get_ofp_device_info(d)
+ expectedCap, _ := adapter.Get_ofp_device_info(ctx, d)
assert.Equal(t, switchCap.String(), expectedCap.String())
}
func testGetPortInfoFromAdapter(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
@@ -192,12 +195,12 @@
err = ptypes.UnmarshalAny(response, portCap)
assert.Nil(t, err)
assert.NotNil(t, portCap)
- expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
+ expectedPortInfo, _ := adapter.Get_ofp_port_info(context.Background(), d, int64(portNo))
assert.Equal(t, portCap.String(), expectedPortInfo.String())
}
func testPacketOut(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
outPort := uint32(1)
packet, err := getRandomBytes(50)
@@ -211,7 +214,7 @@
}
func testFlowUpdates(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
_, err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
assert.Nil(t, err)
@@ -226,7 +229,7 @@
}
func testPmUpdates(t *testing.T) {
- ap := NewAdapterProxy(coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager())
+ ap := NewAdapterProxy(context.Background(), coreKafkaICProxy, coreName, mock_kafka.NewEndpointManager(context.Background()))
d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
diff --git a/rw_core/core/device/state_transitions.go b/rw_core/core/device/state_transitions.go
index 15f4c1e..66e7d4d 100644
--- a/rw_core/core/device/state_transitions.go
+++ b/rw_core/core/device/state_transitions.go
@@ -46,18 +46,18 @@
}
// toInt returns an integer representing the matching level of the match (the larger the number the better)
-func (m *match) toInt() int {
+func (m *match) toInt(ctx context.Context) int {
return int(m.admin<<4 | m.oper<<2 | m.conn)
}
// isExactMatch returns true if match is an exact match
-func (m *match) isExactMatch() bool {
+func (m *match) isExactMatch(ctx context.Context) bool {
return m.admin == currPrevStateMatch && m.oper == currPrevStateMatch && m.conn == currPrevStateMatch
}
// isBetterMatch returns true if newMatch is a worse match
-func (m *match) isBetterMatch(newMatch *match) bool {
- return m.toInt() > newMatch.toInt()
+func (m *match) isBetterMatch(ctx context.Context, newMatch *match) bool {
+ return m.toInt(ctx) > newMatch.toInt(ctx)
}
// deviceState has admin, operational and connection status of device
@@ -85,7 +85,7 @@
}
// NewTransitionMap creates transition map
-func NewTransitionMap(dMgr coreif.DeviceManager) *TransitionMap {
+func NewTransitionMap(ctx context.Context, dMgr coreif.DeviceManager) *TransitionMap {
var transitionMap TransitionMap
transitionMap.dMgr = dMgr
transitionMap.transitions = make([]Transition, 0)
@@ -238,12 +238,12 @@
return &transitionMap
}
-func getDeviceStates(device *voltha.Device) *deviceState {
+func getDeviceStates(ctx context.Context, device *voltha.Device) *deviceState {
return &deviceState{Admin: device.AdminState, Connection: device.ConnectStatus, Operational: device.OperStatus}
}
// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *deviceState, current *deviceState, transition *Transition) ([]TransitionHandler, *match) {
+func getHandler(ctx context.Context, previous *deviceState, current *deviceState, transition *Transition) ([]TransitionHandler, *match) {
m := &match{}
// Do we have an exact match?
if *previous == transition.previousState && *current == transition.currentState {
@@ -297,9 +297,9 @@
}
// GetTransitionHandler returns transition handler & a flag that's set if the transition is invalid
-func (tMap *TransitionMap) GetTransitionHandler(device *voltha.Device, pState *deviceState) []TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(ctx context.Context, device *voltha.Device, pState *deviceState) []TransitionHandler {
//1. Get the previous and current set of states
- cState := getDeviceStates(device)
+ cState := getDeviceStates(ctx, device)
// Do nothing is there are no states change
if *pState == *cState {
@@ -324,11 +324,11 @@
if aTransition.deviceType != deviceType && aTransition.deviceType != any {
continue
}
- tempHandler, m = getHandler(pState, cState, &aTransition)
+ tempHandler, m = getHandler(ctx, pState, cState, &aTransition)
if tempHandler != nil {
- if m.isExactMatch() && aTransition.deviceType == deviceType {
+ if m.isExactMatch(ctx) && aTransition.deviceType == deviceType {
return tempHandler
- } else if m.isExactMatch() || m.isBetterMatch(bestMatch) {
+ } else if m.isExactMatch(ctx) || m.isBetterMatch(ctx, bestMatch) {
currentMatch = tempHandler
bestMatch = m
}
diff --git a/rw_core/core/device/state_transitions_test.go b/rw_core/core/device/state_transitions_test.go
index 41d77dd..1f86e17 100644
--- a/rw_core/core/device/state_transitions_test.go
+++ b/rw_core/core/device/state_transitions_test.go
@@ -16,14 +16,14 @@
package device
import (
+ "context"
"fmt"
- "reflect"
- "testing"
-
"github.com/opencord/voltha-go/rw_core/coreif"
"github.com/opencord/voltha-go/rw_core/mocks"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/stretchr/testify/assert"
+ "reflect"
+ "testing"
)
var transitionMap *TransitionMap
@@ -39,7 +39,7 @@
func init() {
tdm = newTestDeviceManager()
- transitionMap = NewTransitionMap(tdm)
+ transitionMap = NewTransitionMap(context.Background(), tdm)
}
func getDevice(root bool, admin voltha.AdminState_Types, conn voltha.ConnectStatus_Types, oper voltha.OperStatus_Types) *voltha.Device {
@@ -61,122 +61,122 @@
}
func assertInvalidTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
- handlers := transitionMap.GetTransitionHandler(device, previousState)
+ handlers := transitionMap.GetTransitionHandler(context.Background(), device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.NotifyInvalidTransition).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
}
func assertNoOpTransition(t *testing.T, device *voltha.Device, previousState *deviceState) {
- handlers := transitionMap.GetTransitionHandler(device, previousState)
+ handlers := transitionMap.GetTransitionHandler(context.Background(), device, previousState)
assert.Equal(t, 0, len(handlers))
}
func TestValidTransitions(t *testing.T) {
+ ctx := context.Background()
previousState := getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device := getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
- handlers := transitionMap.GetTransitionHandler(device, previousState)
+ handlers := transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
-
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_DISCOVERED)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_DISCOVERED)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_DISCOVERED)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVATING)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVATING)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVATING)
device = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.SetupUNILogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
device = getDevice(true, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_PREPROVISIONED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
device = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_UNKNOWN)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
device = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 3, len(handlers))
assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -184,7 +184,7 @@
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 4, len(handlers))
assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -193,13 +193,13 @@
previousState = getDeviceState(voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
device = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
previousState = getDeviceState(voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
device = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 4, len(handlers))
assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
@@ -208,7 +208,7 @@
previousState = getDeviceState(voltha.AdminState_DISABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
device = getDevice(true, voltha.AdminState_DISABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_UNKNOWN)
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
@@ -250,7 +250,7 @@
for _, device := range deleteDeviceTest.devices {
device.Root = true
t.Run(testName, func(t *testing.T) {
- handlers = transitionMap.GetTransitionHandler(device, previousState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, previousState)
assert.Equal(t, 4, len(handlers))
for idx, expHandler := range deleteDeviceTest.expectedParentHandlers {
assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
@@ -264,7 +264,7 @@
for _, device := range deleteDeviceTest.devices {
device.Root = false
t.Run(testName, func(t *testing.T) {
- handlers = transitionMap.GetTransitionHandler(device, deviceState)
+ handlers = transitionMap.GetTransitionHandler(ctx, device, deviceState)
assert.Equal(t, 3, len(handlers))
for idx, expHandler := range deleteDeviceTest.expectedChildHandlers {
assert.True(t, reflect.ValueOf(expHandler).Pointer() == reflect.ValueOf(handlers[idx]).Pointer())
@@ -333,5 +333,5 @@
func TestMatch(t *testing.T) {
best := &match{admin: currPrevStateMatch, oper: currPrevStateMatch, conn: currPrevStateMatch}
m := &match{admin: currStateOnlyMatch, oper: currWildcardMatch, conn: currWildcardMatch}
- fmt.Println(m.isBetterMatch(best), m.toInt(), best.toInt())
+ fmt.Println(m.isBetterMatch(context.Background(), best), m.toInt(context.Background()), best.toInt(context.Background()))
}
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index fcdf340..d50274b 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -36,11 +36,12 @@
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
- kafka.InterContainerHost(host),
- kafka.InterContainerPort(port),
- kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+ ctx,
+ kafka.InterContainerHost(ctx, host),
+ kafka.InterContainerPort(ctx, port),
+ kafka.MsgClient(ctx, kafkaClient),
+ kafka.DefaultTopic(ctx, &kafka.Topic{Name: coreTopic}),
+ kafka.DeviceDiscoveryTopic(ctx, &kafka.Topic{Name: affinityRouterTopic}))
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
@@ -51,7 +52,7 @@
for {
// If we haven't started yet, then try to start
logger.Infow("starting-kafka-proxy", log.Fields{})
- if err := kmp.Start(); err != nil {
+ if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
@@ -103,7 +104,7 @@
func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
logger.Info("started-kafka-message-proxy")
- livenessChannel := kmp.EnableLivenessChannel(true)
+ livenessChannel := kmp.EnableLivenessChannel(ctx, true)
logger.Info("enabled-kafka-liveness-channel")
@@ -135,7 +136,7 @@
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
- err := kmp.SendLiveness()
+ err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
@@ -147,16 +148,16 @@
}
}
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
- requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+ requestProxy := api.NewAdapterRequestHandlerProxy(ctx, dMgr, aMgr)
// Register the broadcast topic to handle any core-bound broadcast requests
- if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+ if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
}
// Register the core-pair topic to handle core-bound requests destined to the core pair
- if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
+ if err := kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
}
diff --git a/rw_core/core/kv.go b/rw_core/core/kv.go
index 53db264..1bba04c 100644
--- a/rw_core/core/kv.go
+++ b/rw_core/core/kv.go
@@ -29,13 +29,13 @@
"google.golang.org/grpc/status"
)
-func newKVClient(storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
+func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
logger.Infow("kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "consul":
- return kvstore.NewConsulClient(address, timeout)
+ return kvstore.NewConsulClient(ctx, address, timeout)
case "etcd":
- return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
+ return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
}
return nil, errors.New("unsupported-kv-store")
}
@@ -46,7 +46,7 @@
logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
- kvClient.Close()
+ kvClient.Close(ctx)
}
// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
@@ -98,7 +98,7 @@
logger.Info("start-monitoring-kvstore-liveness")
// Instruct backend to create Liveness channel for transporting state updates
- livenessChannel := backend.EnableLivenessChannel()
+ livenessChannel := backend.EnableLivenessChannel(ctx)
logger.Debug("enabled-kvstore-liveness-channel")