[VOL-5374] - Upgrade go version to v1.23
Change-Id: I86c21c482e61b358023119620b87032f2ea04c6d
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
[VOL-5374] - Upgrade go version to v1.23
Change-Id: Ie653d5c992aa3ff6624916d65009e2efbe0ed3f5
Signed-off-by: Akash Reddy Kankanala <akash.kankanala@radisys.com>
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 7cb644d..8c53006 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -34,17 +34,18 @@
GrpcSBIAddress string
KafkaClusterAddress string
KVStoreType string
- KVStoreTimeout time.Duration
KVStoreAddress string
EventTopic string
- EventTopicPartitions int
- EventTopicReplicas int
LogLevel string
- Banner bool
- DisplayVersionOnly bool
RWCoreKey string
RWCoreCert string
RWCoreCA string
+ ProbeAddress string
+ TraceAgentAddress string
+ VolthaStackID string
+ KVStoreTimeout time.Duration
+ EventTopicPartitions int
+ EventTopicReplicas int
InternalTimeout time.Duration
RPCTimeout time.Duration
FlowTimeout time.Duration
@@ -52,16 +53,15 @@
ConnectionRetryInterval time.Duration
LiveProbeInterval time.Duration
NotLiveProbeInterval time.Duration
- ProbeAddress string
- TraceEnabled bool
- TraceAgentAddress string
- LogCorrelationEnabled bool
- VolthaStackID string
BackoffRetryInitialInterval time.Duration
BackoffRetryMaxElapsedTime time.Duration
BackoffRetryMaxInterval time.Duration
PerRPCRetryTimeout time.Duration
MaxRetries uint
+ Banner bool
+ DisplayVersionOnly bool
+ TraceEnabled bool
+ LogCorrelationEnabled bool
}
// ParseCommandArguments parses the arguments when running read-write core service
@@ -129,7 +129,7 @@
5*time.Second,
"RPC timeout")
- fs.DurationVar(&(cf.FlowTimeout), //Note flow time out will be considered for flows related rpc's not rpc timeout
+ fs.DurationVar(&(cf.FlowTimeout), // Note flow time out will be considered for flows related rpc's not rpc timeout
"flow_timeout",
30*time.Second,
"Flow timeout")
@@ -197,7 +197,7 @@
fs.DurationVar(&cf.BackoffRetryMaxElapsedTime,
"backoff_retry_max_elapsed_time",
0*time.Second,
- "The maximum number of milliseconds an exponential backoff can elasped")
+ "The maximum number of milliseconds an exponential backoff can elapsed")
fs.DurationVar(&cf.BackoffRetryMaxInterval,
"backoff_retry_max_interval",
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index 12c75ad..91c9c45 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -19,6 +19,9 @@
import (
"context"
"errors"
+ "sync"
+ "time"
+
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
@@ -26,22 +29,20 @@
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
- "sync"
- "time"
)
// agent represents adapter agent
type agent struct {
adapter *voltha.Adapter
- lock sync.RWMutex
- adapterAPIEndPoint string
vClient *vgrpc.Client
- adapterLock sync.RWMutex
onAdapterRestart vgrpc.RestartedHandler
- liveProbeInterval time.Duration
+ adapterAPIEndPoint string
coreEndpoint string
+ liveProbeInterval time.Duration
maxRetries uint
perRPCRetryTimeout time.Duration
+ lock sync.RWMutex
+ adapterLock sync.RWMutex
}
func getAdapterServiceClientHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
@@ -96,7 +97,7 @@
}
}
-func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
+func (aa *agent) getAdapter() *voltha.Adapter {
aa.adapterLock.RLock()
defer aa.adapterLock.RUnlock()
return aa.adapter
diff --git a/rw_core/core/adapter/endpoint_manager.go b/rw_core/core/adapter/endpoint_manager.go
index 6a16332..f209f93 100644
--- a/rw_core/core/adapter/endpoint_manager.go
+++ b/rw_core/core/adapter/endpoint_manager.go
@@ -66,20 +66,20 @@
}
type adapterService struct {
- adapterType string // Type of the adapter. The same type applies for all replicas of that adapter
- totalReplicas int32
replicas map[ReplicaID]Endpoint
consistentRing *consistent.Consistent
+ adapterType string // Type of the adapter. The same type applies for all replicas of that adapter
+ totalReplicas int32
}
type endpointManager struct {
+ backend *db.Backend
+ adapterServices map[string]*adapterService
+ deviceTypeToAdapterServiceMap map[string]string
partitionCount int
replicationFactor int
load float64
- backend *db.Backend
- adapterServices map[string]*adapterService
adapterServicesLock sync.RWMutex
- deviceTypeToAdapterServiceMap map[string]string
deviceTypeToAdapterServiceMapLock sync.RWMutex
}
@@ -307,15 +307,15 @@
return err
}
- // Data is marshalled as proto bytes in the data store
+ // Data is marshaled as proto bytes in the data store
for _, blob := range blobs {
data := blob.Value.([]byte)
adapter := &voltha.Adapter{}
- if err := proto.Unmarshal(data, adapter); err != nil {
+ if err = proto.Unmarshal(data, adapter); err != nil {
return err
}
// A valid adapter should have the vendorID set
- if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+ if err = ep.setupAdapterWithLock(ctx, adapter); err != nil {
logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
}
}
@@ -327,7 +327,7 @@
for _, blob := range blobs {
data := blob.Value.([]byte)
deviceType := &voltha.DeviceType{}
- if err := proto.Unmarshal(data, deviceType); err != nil {
+ if err = proto.Unmarshal(data, deviceType); err != nil {
return err
}
ep.addDeviceTypeWithLock(deviceType)
@@ -434,8 +434,8 @@
adapterType string
vendor string
version string
- replica ReplicaID
endpoint Endpoint
+ replica ReplicaID
}
func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 88b755c..2788a3b 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -41,23 +41,23 @@
// Manager represents adapter manager attributes
type Manager struct {
+ endpointMgr EndpointManager
adapterAgents map[string]*agent
adapterEndpoints map[Endpoint]*agent
deviceTypes map[string]*voltha.DeviceType
adapterDbProxy *model.Proxy
deviceTypeDbProxy *model.Proxy
onAdapterRestart vgrpc.RestartedHandler
- endpointMgr EndpointManager
- lockAdapterAgentsMap sync.RWMutex
- lockDeviceTypesMap sync.RWMutex
- lockAdapterEndPointsMap sync.RWMutex
+ rollingUpdateMap map[string]bool
+ rxStreamCloseChMap map[string]chan bool
+ coreEndpoint string
liveProbeInterval time.Duration
PerRPCRetryTimeout time.Duration
MaxRetries uint
- coreEndpoint string
- rollingUpdateMap map[string]bool
+ lockAdapterAgentsMap sync.RWMutex
+ lockDeviceTypesMap sync.RWMutex
+ lockAdapterEndPointsMap sync.RWMutex
rollingUpdateLock sync.RWMutex
- rxStreamCloseChMap map[string]chan bool
rxStreamCloseChLock sync.RWMutex
}
@@ -138,10 +138,10 @@
aMgr.lockAdapterEndPointsMap.RUnlock()
if have {
- return agent.getAdapter(ctx), nil
+ return agent.getAdapter(), nil
}
- return nil, errors.New("Not found")
+ return nil, fmt.Errorf("%v: Not found", ctx)
}
func (aMgr *Manager) GetAdapterNameWithEndpoint(ctx context.Context, endPoint string) (string, error) {
@@ -153,7 +153,7 @@
return agent.adapter.Id, nil
}
- return "", errors.New("Not found")
+ return "", fmt.Errorf("%v: Not found", ctx)
}
func (aMgr *Manager) GetAdapterClient(_ context.Context, endpoint string) (adapter_service.AdapterServiceClient, error) {
@@ -398,7 +398,7 @@
// Start adapter instance - this will trigger the connection to the adapter
if agent, err := aMgr.getAgent(ctx, adapter.Id); agent != nil {
subCtx := log.WithSpanFromContext(context.Background(), ctx)
- if err := agent.start(subCtx); err != nil {
+ if err = agent.start(subCtx); err != nil {
logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"error": err})
return nil, err
}
@@ -484,7 +484,7 @@
aMgr.lockAdapterAgentsMap.RLock()
defer aMgr.lockAdapterAgentsMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
- if a := adapterAgent.getAdapter(ctx); a != nil {
+ if a := adapterAgent.getAdapter(); a != nil {
result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
}
}
@@ -569,16 +569,16 @@
if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
return adapterAgent, nil
}
- return nil, errors.New("Not found")
+ return nil, fmt.Errorf("%v: Not found", ctx)
}
func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) (*voltha.Adapter, error) {
aMgr.lockAdapterAgentsMap.RLock()
defer aMgr.lockAdapterAgentsMap.RUnlock()
if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
- return adapterAgent.getAdapter(ctx), nil
+ return adapterAgent.getAdapter(), nil
}
- return nil, errors.New("Not found")
+ return nil, fmt.Errorf("%v: Not found", ctx)
}
// mutedAdapterRestartedHandler will be invoked by the grpc client on an adapter restart.
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 1fb8d79..a8490bf 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -108,20 +108,20 @@
backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
// wait until connection to KV Store is up
- if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
+ if err = waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval, kvService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kv-store")
}
go monitorKVStoreLiveness(ctx, backend, kvService, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
// Start kafka communications and artefacts
- if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
+ if err = kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, core.KafkaClient, cf.ConnectionRetryInterval, clusterMessagingService); err != nil {
logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
defer core.KafkaClient.Stop(ctx)
// create the voltha.events topic
topic := &kafka.Topic{Name: cf.EventTopic}
- if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
+ if err = core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
if err != nil {
logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
}
@@ -130,7 +130,7 @@
// Create the event proxy to post events to KAFKA
eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
go func() {
- if err := eventProxy.Start(); err != nil {
+ if err = eventProxy.Start(); err != nil {
logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
}
}()
@@ -179,7 +179,7 @@
// Create the NBI gRPC server
grpcNBIServer := grpcserver.NewGrpcServer(cf.GrpcNBIAddress, nil, false, probe.GetProbeFromContext(ctx))
- //Register the 'Extension' service on this gRPC server
+ // Register the 'Extension' service on this gRPC server
addGRPCExtensionService(ctx, grpcNBIServer, device.GetNewExtensionManager(deviceMgr))
go startGrpcNbiService(ctx, grpcNBIServer, grpcNBIService, api.NewAPIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index c6afce5..649eb3f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -58,31 +58,31 @@
// Agent represents device agent attributes
type Agent struct {
- deviceID string
- parentID string
- deviceType string
- adapterEndpoint string
- isRootDevice bool
- adapterMgr *adapter.Manager
- deviceMgr *Manager
- dbProxy *model.Proxy
- exitChannel chan int
- device *voltha.Device
- requestQueue *coreutils.RequestQueue
- internalTimeout time.Duration
- rpcTimeout time.Duration
- flowTimeout time.Duration
- startOnce sync.Once
- stopOnce sync.Once
- stopped bool
- stopReconciling chan int
- stopReconcilingMutex sync.RWMutex
- config *config.RWCoreFlags
+ adapterMgr *adapter.Manager
+ deviceMgr *Manager
+ dbProxy *model.Proxy
+ exitChannel chan int
+ device *voltha.Device
+ requestQueue *coreutils.RequestQueue
+ stopReconciling chan int
+ config *config.RWCoreFlags
flowCache *flow.Cache
groupCache *group.Cache
portLoader *port.Loader
transientStateLoader *transientstate.Loader
+ deviceID string
+ parentID string
+ deviceType string
+ adapterEndpoint string
+ internalTimeout time.Duration
+ rpcTimeout time.Duration
+ flowTimeout time.Duration
+ stopReconcilingMutex sync.RWMutex
+ startOnce sync.Once
+ stopOnce sync.Once
+ isRootDevice bool
+ stopped bool
}
// newAgent creates a new device agent. The device will be initialized when start() is called.
@@ -205,11 +205,11 @@
if err := agent.deleteTransientState(ctx); err != nil {
return err
}
- // Remove the device from the KV store
+ // Remove the device from the KV store
if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
return err
}
- //send the device event to the message bus
+ // Send the device event to the message bus
_ = agent.deviceMgr.Agent.SendDeviceDeletedEvent(ctx, agent.device, time.Now().Unix())
close(agent.exitChannel)
@@ -248,6 +248,7 @@
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
+// nolint: unparam
func (agent *Agent) onSuccess(ctx context.Context, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
if deviceUpdateLog {
requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
@@ -260,6 +261,7 @@
// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
// and the only action required is to publish the failed result on kafka
+// nolint: unparam
func (agent *Agent) onFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types, deviceUpdateLog bool) {
// Send an event on kafka
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
@@ -277,6 +279,7 @@
}
// onForceDeleteResponse is invoked following a force delete request to an adapter.
+// nolint: unparam
func (agent *Agent) onForceDeleteResponse(ctx context.Context, prevState, currState *common.AdminState_Types, dErr error) {
// Log the status
requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
@@ -324,7 +327,7 @@
func (agent *Agent) onDeleteFailure(ctx context.Context, err error, prevState, currState *common.AdminState_Types) {
logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": coreutils.GetRPCMetadataFromContext(ctx), "device-id": agent.deviceID, "error": err})
- //Only updating of transient state is required, no transition.
+ // Only updating of transient state is required, no transition.
if er := agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); er != nil {
logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID, "error": er})
}
@@ -394,7 +397,7 @@
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
- //To preserve and use oldDevice state as prev state in new device
+ // To preserve and use oldDevice state as prev state in new device
var err error
var desc string
var prevAdminState, currAdminState common.AdminState_Types
@@ -415,12 +418,12 @@
err = status.Errorf(codes.FailedPrecondition, "cannot complete operation as device deletion is in progress or reconciling is in progress/failed: %s", agent.deviceID)
return err
}
- //vol-4275 TST meeting 08/04/2021: Let EnableDevice to be called again if device is in FAILED operational state,
- //even the admin state is ENABLED.
+ // vol-4275 TST meeting 08/04/2021: Let EnableDevice to be called again if device is in FAILED operational state,
+ // even the admin state is ENABLED.
if oldDevice.AdminState == voltha.AdminState_ENABLED && oldDevice.OperStatus != voltha.OperStatus_FAILED {
logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
- err = status.Errorf(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+ err = status.Errorf(codes.FailedPrecondition, "cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
return err
}
@@ -463,7 +466,6 @@
requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
- var err error
if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
_, err = client.AdoptDevice(subCtx, newDevice)
} else {
@@ -489,11 +491,11 @@
func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *ofp.FlowMetadata) error {
var flwResponse, grpResponse coreutils.Response
var err error
- //if new flow list is empty then the called function returns quickly
+ // if new flow list is empty then the called function returns quickly
if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
return err
}
- //if new group list is empty then the called function returns quickly
+ // if new group list is empty then the called function returns quickly
if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
return err
}
@@ -577,7 +579,8 @@
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ var client adapter_service.AdapterServiceClient
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
log.Fields{
@@ -593,7 +596,7 @@
requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
- _, err := client.DisableDevice(subCtx, cloned)
+ _, err = client.DisableDevice(subCtx, cloned)
if err == nil {
agent.onSuccess(subCtx, nil, nil, true)
} else {
@@ -669,7 +672,7 @@
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
device := agent.cloneDeviceWithoutLock()
- if !agent.isForceDeletingAllowed(previousDeviceTransientState, device) {
+ if !agent.isForceDeletingAllowed(previousDeviceTransientState) {
agent.requestQueue.RequestComplete()
err = status.Error(codes.FailedPrecondition, fmt.Sprintf("deviceId:%s, force deletion is in progress", agent.deviceID))
return err
@@ -694,7 +697,7 @@
requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
- _, err := client.DeleteDevice(subCtx, device)
+ _, err = client.DeleteDevice(subCtx, device)
if err == nil {
agent.onSuccess(subCtx, nil, nil, true)
} else {
@@ -821,7 +824,7 @@
if agent.deviceType == "" {
agent.reconcileWithKVStore(ctx)
}
- // Send packet to adapter
+ // Send packet to adapter
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1002,7 +1005,7 @@
prevDevice := agent.device
// update the device
agent.device = device
- //If any of the states has chenged, send the change event.
+ // If any of the states has chenged, send the change event.
if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
}
@@ -1033,14 +1036,14 @@
agent.requestQueue.RequestComplete()
return errors.New("device-agent-stopped")
}
- //update device TransientState
+ // update device TransientState
if err := agent.updateTransientState(ctx, transientState); err != nil {
agent.requestQueue.RequestComplete()
return err
}
// update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
- //Reverting TransientState update
+ // Reverting TransientState update
if errTransient := agent.updateTransientState(ctx, prevTransientState); errTransient != nil {
logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
"previous-transient-state": prevTransientState, "current-transient-state": transientState, "error": errTransient})
@@ -1054,7 +1057,7 @@
prevDevice := agent.device
// update the device
agent.device = device
- //If any of the states has chenged, send the change event.
+ // If any of the states has chenged, send the change event.
if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
}
@@ -1113,7 +1116,7 @@
}
newPort := *oldPort
newPort.Peers = updatedPeers
- if err := portHandle.Update(ctx, &newPort); err != nil {
+ if err = portHandle.Update(ctx, &newPort); err != nil {
portHandle.Unlock()
return nil
}
@@ -1124,7 +1127,7 @@
logger.Errorw(ctx, "adapter-request-cannot-proceed", log.Fields{"device-id": agent.deviceID, "error": err})
return err
}
- //send request to adapter
+ // send request to adapter
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1207,7 +1210,7 @@
return nil, err
}
- //send request to adapter synchronously
+ // send request to adapter synchronously
client, err := agent.adapterMgr.GetAdapterClient(ctx, pdevice.AdapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1247,8 +1250,8 @@
return nil, err
}
- //send request to adapter
- //send request to adapter synchronously
+ // send request to adapter
+ // send request to adapter synchronously
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1288,7 +1291,7 @@
cloned := agent.cloneDeviceWithoutLock()
- //send request to adapter
+ // send request to adapter
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1318,13 +1321,13 @@
requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() { agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc) }()
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
cloned := agent.cloneDeviceWithoutLock()
- //send request to adapter
+ // send request to adapter
client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
@@ -1471,7 +1474,7 @@
requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
var desc string
- //set transient state to RECONCILE IN PROGRESS
+ // set transient state to RECONCILE IN PROGRESS
err := agent.updateTransientState(ctx, core.DeviceTransientState_RECONCILE_IN_PROGRESS)
if err != nil {
logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err})
@@ -1514,7 +1517,7 @@
reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
- //making here to keep lifecycle of this channel within the scope of retryReconcile
+ // making here to keep lifecycle of this channel within the scope of retryReconcile
agent.stopReconcilingMutex.Lock()
if agent.stopReconciling != nil {
logger.Warnw(ctx, "Reconciling with retries is already in progress, don't proceed further", log.Fields{"device-id": device.Id})
@@ -1536,7 +1539,7 @@
// Use an exponential back off to prevent getting into a tight loop
duration := reconcilingBackoff.NextBackOff()
- //This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
+ // This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
if duration == backoff.Stop {
// If we have received device reconciled error and the retry intervals have elapsed
// clean up the reconcile and break the retry loop
@@ -1610,7 +1613,7 @@
case <-backoffTimer.C:
// backoffTimer expired continue
// Take lock back before retrying
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
desc = "failed-to-acquire-lock"
agent.logDeviceUpdate(ctx, nil, nil, requestStatus, err, desc)
break retry
@@ -1656,7 +1659,7 @@
func (agent *Agent) ReconcileDevice(ctx context.Context) {
- //set transient state to RECONCILE IN PROGRESS
+ // set transient state to RECONCILE IN PROGRESS
err := agent.UpdateTransientStateToReconcile(ctx)
if err != nil {
logger.Errorw(ctx, "check-and-update-transient-state-failed", log.Fields{"error": err})
@@ -1674,28 +1677,28 @@
}
adapterResponse := make(chan error)
go func() {
- _, err := client.ReconcileDevice(ctx, device)
+ _, err = client.ReconcileDevice(ctx, device)
adapterResponse <- err
}()
select {
// wait for response
- case err := <-adapterResponse:
+ case err = <-adapterResponse:
if err != nil {
return err
}
- //In case of success quit retrying and wait for adapter to reset operation state of device
+ // In case of success quit retrying and wait for adapter to reset operation state of device
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = nil
agent.stopReconcilingMutex.Unlock()
return nil
- //if reconciling need to be stopped
+ // if reconciling need to be stopped
case _, ok := <-agent.stopReconciling:
agent.stopReconcilingMutex.Lock()
agent.stopReconciling = nil
agent.stopReconcilingMutex.Unlock()
if !ok {
- //channel-closed
+ // channel-closed
return fmt.Errorf("reconcile channel closed:%w", errReconcileAborted)
}
return fmt.Errorf("reconciling aborted:%w", errReconcileAborted)
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 474c453..c491699 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -86,7 +86,7 @@
} else {
flowToReplace := flowHandle.GetReadOnly()
if !proto.Equal(flowToReplace, flow) {
- //Flow needs to be updated.
+ // Flow needs to be updated.
if err := flowHandle.Update(ctx, flow); err != nil {
flowHandle.Unlock()
return coreutils.DoneResponse(), err
@@ -94,7 +94,7 @@
flowsToDelete = append(flowsToDelete, flowToReplace)
flowsToAdd = append(flowsToAdd, flow)
} else {
- //No need to change the flow. It is already exist.
+ // No need to change the flow. It is already exist.
logger.Debugw(ctx, "no-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
}
}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 1a5b938..72e9bd7 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -84,7 +84,7 @@
} else {
groupToChange := groupHandle.GetReadOnly()
if !proto.Equal(groupToChange, group) {
- //Group needs to be updated.
+ // Group needs to be updated.
if err = groupHandle.Update(ctx, group); err != nil {
groupHandle.Unlock()
return coreutils.DoneResponse(), err
@@ -92,7 +92,7 @@
groupsToDelete = append(groupsToDelete, groupToChange)
groupsToAdd = append(groupsToAdd, group)
} else {
- //No need to change the group. It is already exist.
+ // No need to change the group. It is already exist.
logger.Debugw(ctx, "no-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
}
}
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 2a1cb50..e71ea07 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -21,9 +21,9 @@
"errors"
"time"
- ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
-
+ "github.com/opencord/voltha-protos/v5/go/adapter_service"
"github.com/opencord/voltha-protos/v5/go/common"
+ ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
@@ -40,7 +40,7 @@
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, operStatus, err, desc) }()
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
logger.Debugw(ctx, "download-image", log.Fields{"device-id": agent.deviceID})
@@ -83,7 +83,8 @@
}
// Send the request to the adapter
- client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ var client adapter_service.AdapterServiceClient
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
log.Fields{
@@ -100,7 +101,8 @@
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
- response, err := client.DownloadImage(subCtx, &ca.ImageDownloadMessage{
+ var response *voltha.ImageDownload
+ response, err = client.DownloadImage(subCtx, &ca.ImageDownloadMessage{
Device: cloned,
Image: clonedImg,
})
@@ -234,7 +236,7 @@
return nil, err
}
- //TODO does this need to be removed ?
+ // TODO does this need to be removed ?
if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
agent.requestQueue.RequestComplete()
err = status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
@@ -445,7 +447,7 @@
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- return nil, status.Errorf(codes.Aborted, "%s", err)
+ return nil, status.Errorf(codes.Aborted, "deviceID: %s, Err:%s", deviceID, err)
}
return &voltha.ImageDownloads{Items: device.ImageDownloads}, nil
}
@@ -484,7 +486,7 @@
if imgErr != nil {
logger.Errorw(subCtx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": imgErr})
cloned := agent.cloneDeviceWithoutLock()
- //TODO base this on IMAGE ID when created
+ // TODO base this on IMAGE ID when created
var imageFailed *voltha.ImageDownload
var index int
if cloned.ImageDownloads != nil {
@@ -502,13 +504,13 @@
return
}
- //update image state on failure
+ // update image state on failure
if imageFailed.DownloadState == voltha.ImageDownload_DOWNLOAD_REQUESTED {
cloned.ImageDownloads[index].DownloadState = voltha.ImageDownload_DOWNLOAD_FAILED
} else if imageFailed.ImageState == voltha.ImageDownload_IMAGE_ACTIVATING {
cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_INACTIVE
}
- //Enabled is the only state we can go back to.
+ // Enabled is the only state we can go back to.
cloned.AdminState = voltha.AdminState_ENABLED
if err := agent.updateDeviceAndReleaseLock(subCtx, cloned); err != nil {
logger.Errorw(subCtx, "failed-enable-device-after-image-failure",
@@ -541,7 +543,7 @@
return
}
logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response})
- //TODO base this on IMAGE ID when created
+ // TODO base this on IMAGE ID when created
var imageSucceeded *voltha.ImageDownload
var index int
if cloned.ImageDownloads != nil {
@@ -558,13 +560,13 @@
err = errors.New("can't find image")
return
}
- //update image state on success
+ // update image state on success
if imageSucceeded.DownloadState == voltha.ImageDownload_DOWNLOAD_REQUESTED {
cloned.ImageDownloads[index].DownloadState = voltha.ImageDownload_DOWNLOAD_SUCCEEDED
} else if imageSucceeded.ImageState == voltha.ImageDownload_IMAGE_ACTIVATING {
cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_ACTIVE
}
- //Enabled is the only state we can go back to.
+ // Enabled is the only state we can go back to.
cloned.AdminState = voltha.AdminState_ENABLED
if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
logger.Errorw(ctx, "failed-enable-device-after-image-download-success",
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index fe122bd..61bb451 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -20,6 +20,7 @@
"context"
"fmt"
+ "github.com/opencord/voltha-protos/v5/go/adapter_service"
"github.com/opencord/voltha-protos/v5/go/common"
"github.com/gogo/protobuf/proto"
@@ -258,18 +259,20 @@
newPort := *oldPort
newPort.AdminState = voltha.AdminState_DISABLED
- if err := portHandle.Update(ctx, &newPort); err != nil {
+ if err = portHandle.Update(ctx, &newPort); err != nil {
return err
}
- //send request to adapter
- device, err := agent.getDeviceReadOnly(ctx)
+ // Send request to adapter
+ var device *voltha.Device
+ device, err = agent.getDeviceReadOnly(ctx)
if err != nil {
return err
}
// Send the request to the adapter
- client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ var client adapter_service.AdapterServiceClient
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
logger.Errorw(ctx, "grpc-client-nil",
log.Fields{
@@ -284,7 +287,7 @@
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
go func() {
defer cancel()
- _, err := client.DisablePort(subCtx, &newPort)
+ _, err = client.DisablePort(subCtx, &newPort)
if err == nil {
agent.onSuccess(subCtx, nil, nil, true)
} else {
@@ -322,7 +325,7 @@
return err
}
- //send request to adapter
+ // Send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
return err
diff --git a/rw_core/core/device/agent_transient_state.go b/rw_core/core/device/agent_transient_state.go
index 8af99a2..2867fef 100644
--- a/rw_core/core/device/agent_transient_state.go
+++ b/rw_core/core/device/agent_transient_state.go
@@ -61,7 +61,7 @@
deviceTransientState == core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
}
-func (agent *Agent) isForceDeletingAllowed(deviceTransientState core.DeviceTransientState_Types, device *voltha.Device) bool {
+func (agent *Agent) isForceDeletingAllowed(deviceTransientState core.DeviceTransientState_Types) bool {
return deviceTransientState != core.DeviceTransientState_FORCE_DELETING
}
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 7d5db66..e07990c 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -98,7 +98,7 @@
return streamingTracker.calls[method]
}
-func (q *Manager) flushFailedPackets(ctx context.Context, tracker *callTracker) error {
+func (q *Manager) flushFailedPackets(ctx context.Context, tracker *callTracker) {
if tracker.failedPacket != nil {
switch failedPacket := tracker.failedPacket.(type) {
case openflow_13.PacketIn:
@@ -109,7 +109,6 @@
q.changeEventQueue <- failedPacket
}
}
- return nil
}
// ReceivePacketsIn receives packets from adapter
@@ -119,10 +118,7 @@
var streamingTracker = q.getStreamingTracker(ctx, "ReceivePacketsIn", q.packetInQueueDone)
logger.Debugw(ctx, "receive-packets-in-request", log.Fields{"packets-in": packetsIn})
- err := q.flushFailedPackets(ctx, streamingTracker)
- if err != nil {
- logger.Errorw(ctx, "unable-to-flush-failed-packets", log.Fields{"error": err})
- }
+ q.flushFailedPackets(ctx, streamingTracker)
loop:
for {
@@ -169,19 +165,19 @@
logger.Debugw(ctx, "send-change-event", log.Fields{"device-id": deviceID,
"flow-id": xid, "flow-cookie": flowCookie, "errors": res})
errorType := openflow_13.OfpErrorType_OFPET_FLOW_MOD_FAILED
- //Manually creating the data payload for the flow error message
+ // Manually creating the data payload for the flow error message
bs := make([]byte, 2)
- //OF 1.3
+ // OF 1.3
bs[0] = byte(4)
- //Flow Mod
+ // Flow Mod
bs[1] = byte(14)
- //Length of the message
+ // Length of the message
length := make([]byte, 2)
binary.BigEndian.PutUint16(length, 56)
bs = append(bs, length...)
emptyArr := []byte{0, 0, 0, 0}
bs = append(bs, emptyArr...)
- //Cookie of the Flow
+ // Cookie of the Flow
cookie := make([]byte, 52)
binary.BigEndian.PutUint64(cookie, flowCookie)
bs = append(bs, cookie...)
@@ -221,10 +217,7 @@
var streamingTracker = q.getStreamingTracker(ctx, "ReceiveChangeEvents", q.changeEventQueueDone)
logger.Debugw(ctx, "receive-change-events-request", log.Fields{"change-events": changeEvents})
- err := q.flushFailedPackets(ctx, streamingTracker)
- if err != nil {
- logger.Errorw(ctx, "unable-to-flush-failed-packets", log.Fields{"error": err})
- }
+ q.flushFailedPackets(ctx, streamingTracker)
loop:
for {
@@ -283,7 +276,7 @@
}
func (q *Agent) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ // TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id, "error": err})
diff --git a/rw_core/core/device/flow/cache.go b/rw_core/core/device/flow/cache.go
index 3e39575..dfbff4b 100644
--- a/rw_core/core/device/flow/cache.go
+++ b/rw_core/core/device/flow/cache.go
@@ -25,18 +25,17 @@
// Cache hides all low-level locking & synchronization related to flow state updates
type Cache struct {
- // this lock protects the flows map, it does not protect individual flows
- lock sync.RWMutex
flows map[uint64]*chunk
+ // this lock protects the flows map, it does not protect individual flows
+ lock sync.RWMutex
}
// chunk keeps a flow and the lock for this flow
type chunk struct {
+ flow *ofp.OfpFlowStats
// this lock is used to synchronize all access to the flow, and also to the "deleted" variable
lock sync.Mutex
deleted bool
-
- flow *ofp.OfpFlowStats
}
func NewCache() *Cache {
@@ -56,7 +55,7 @@
cache.lock.Lock()
entry, have := cache.flows[flow.Id]
if !have {
- entry := &chunk{flow: flow}
+ entry = &chunk{flow: flow}
cache.flows[flow.Id] = entry
entry.lock.Lock()
cache.lock.Unlock()
diff --git a/rw_core/core/device/group/cache.go b/rw_core/core/device/group/cache.go
index 6d9a336..15c3cc9 100644
--- a/rw_core/core/device/group/cache.go
+++ b/rw_core/core/device/group/cache.go
@@ -25,18 +25,17 @@
// Cache hides all low-level locking & synchronization related to group state updates
type Cache struct {
- // this lock protects the groups map, it does not protect individual groups
- lock sync.RWMutex
groups map[uint32]*chunk
+ // this lock protects the groups map, it does not protect individual groups
+ lock sync.RWMutex
}
// chunk keeps a group and the lock for this group
type chunk struct {
+ group *ofp.OfpGroupEntry
// this lock is used to synchronize all access to the group, and also to the "deleted" variable
lock sync.Mutex
deleted bool
-
- group *ofp.OfpGroupEntry
}
func NewCache() *Cache {
@@ -56,7 +55,7 @@
cache.lock.Lock()
entry, have := cache.groups[group.Desc.GroupId]
if !have {
- entry := &chunk{group: group}
+ entry = &chunk{group: group}
cache.groups[group.Desc.GroupId] = entry
entry.lock.Lock()
cache.lock.Unlock()
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 253a523..adcaba3 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -41,31 +41,34 @@
)
// LogicalAgent represent attributes of logical device agent
+//
+//nolint:govet
type LogicalAgent struct {
+ orderedEvents orderedEvents
+ deviceMgr *Manager
+ ldeviceMgr *LogicalManager
+ ldProxy *model.Proxy
+ deviceRoutes *route.DeviceRoutes
+ flowDecomposer *fd.FlowDecomposer
+ logicalDevice *voltha.LogicalDevice
+ requestQueue *coreutils.RequestQueue
+
+ flowCache *flow.Cache
+ meterLoader *meter.Loader
+ groupCache *group.Cache
+ portLoader *lp.Loader
logicalDeviceID string
serialNumber string
rootDeviceID string
- deviceMgr *Manager
- ldeviceMgr *LogicalManager
- ldProxy *model.Proxy
- stopped bool
- deviceRoutes *route.DeviceRoutes
- flowDecomposer *fd.FlowDecomposer
internalTimeout time.Duration
- logicalDevice *voltha.LogicalDevice
- requestQueue *coreutils.RequestQueue
- orderedEvents orderedEvents
startOnce sync.Once
stopOnce sync.Once
exitChannel chan int
- flowCache *flow.Cache
- meterLoader *meter.Loader
- groupCache *group.Cache
- portLoader *lp.Loader
+ stopped bool
}
-func newLogicalAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
+func newLogicalAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, internalTimeout time.Duration) *LogicalAgent {
return &LogicalAgent{
logicalDeviceID: id,
@@ -108,7 +111,7 @@
var ld *voltha.LogicalDevice
if !logicalDeviceExist {
- //Build the logical device based on information retrieved from the device adapter
+ // Build the logical device based on information retrieved from the device adapter
var switchCap *ca.SwitchCapability
var err error
@@ -152,7 +155,7 @@
ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
// Save the logical device
- if err := agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
+ if err = agent.ldProxy.Set(ctx, ld.Id, ld); err != nil {
logger.Errorw(ctx, "failed-to-add-logical-device", log.Fields{"logical-device-id": agent.logicalDeviceID})
return
}
@@ -227,7 +230,7 @@
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
// Before deletion of the logical agent, make sure all events for ldagent are sent to avoid race conditions
if err := agent.orderedEvents.waitForAllEventsToBeSent(subCtx, cancel); err != nil {
- //Log the error here
+ // Log the error here
logger.Errorw(ctx, "failed-to-send-all-events-on-the-logical-device-before-deletion",
log.Fields{"error": err, "logical-device-id": agent.logicalDeviceID})
}
@@ -237,7 +240,7 @@
logger.Warnw(ctx, "delete-logical-meters-error", log.Fields{"device-id": agent.logicalDeviceID, "error": err})
}
- //Remove the logical device from the model
+ // Remove the logical device from the model
if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
returnErr = err
} else {
@@ -404,8 +407,8 @@
})
}
outPort := fu.GetPacketOutPort(packet)
- //frame := packet.GetData()
- //TODO: Use a channel between the logical agent and the device agent
+ // frame := packet.GetData()
+ // TODO: Use a channel between the logical agent and the device agent
if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
logger.Error(ctx, "packet-out-failed", log.Fields{"logical-device-id": agent.rootDeviceID})
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 0e997ec..642b3d9 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -104,7 +104,7 @@
mod := flowUpdate.FlowMod
var flowToReplace *ofp.OfpFlowStats
- //if flow is not found in the map, create a new entry, otherwise get the existing one.
+ // if flow is not found in the map, create a new entry, otherwise get the existing one.
flowHandle, flowCreated, err := agent.flowCache.LockOrCreate(ctx, flow)
if err != nil {
return changed, updated, err
@@ -119,7 +119,7 @@
// TODO: should this error be notified other than being logged?
logger.Warnw(ctx, "overlapped-flows", log.Fields{"logical-device-id": agent.logicalDeviceID})
} else {
- // Add flow
+ // Add flow
changed = true
}
} else {
@@ -177,7 +177,7 @@
}
logger.Debugw(ctx, "rules", log.Fields{"rules": deviceRules.String()})
- // Update store and cache
+ // Update store and cache
if updated {
if err := flowHandle.Update(ctx, flow); err != nil {
return changed, updated, err
@@ -279,10 +279,10 @@
return nil
}
- //build a list of what to delete
+ // Build a list of what to delete
toDelete := make(map[uint64]*ofp.OfpFlowStats)
- // add perfectly matching entry if exists
+ // Add perfectly matching entry if exists
fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
if err != nil {
return err
@@ -292,7 +292,7 @@
handle.Unlock()
}
- // search through all the flows
+ // Search through all the flows
for flowID := range agent.flowCache.ListIDs() {
if flowHandle, have := agent.flowCache.Lock(flowID); have {
if flow := flowHandle.GetReadOnly(); fu.FlowMatchesMod(flow, mod) {
@@ -302,7 +302,7 @@
}
}
- //Delete the matched flows
+ // Delete the matched flows
if len(toDelete) > 0 {
logger.Debugw(ctx, "flow-delete", log.Fields{"logical-device-id": agent.logicalDeviceID, "to-delete": len(toDelete)})
@@ -316,7 +316,7 @@
return fmt.Errorf("cannot-delete-flow-%d. Meter-update-failed", flow.Id)
}
// Update store and cache
- if err := flowHandle.Delete(ctx); err != nil {
+ if err = flowHandle.Delete(ctx); err != nil {
flowHandle.Unlock()
return fmt.Errorf("cannot-delete-flows-%d. Delete-from-store-failed", flow.Id)
}
@@ -357,8 +357,8 @@
}
for _, deviceID := range devicesInFlows {
- if err := agent.deviceMgr.canAdapterRequestProceed(ctx, deviceID); err != nil {
- //If the error has code.NotFound the device is not there anymore, there is no need to delete flows, just ignore it
+ if err = agent.deviceMgr.canAdapterRequestProceed(ctx, deviceID); err != nil {
+ // If the error has code.NotFound the device is not there anymore, there is no need to delete flows, just ignore it
if status.Code(err) != codes.NotFound {
logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": toDelete, "error": err})
return err
@@ -402,7 +402,7 @@
}
}()
}
- //TODO: send announcement on delete
+ // TODO: send announcement on delete
return nil
}
@@ -469,7 +469,7 @@
for _, deviceID := range devicesInFlows {
if err := agent.deviceMgr.canAdapterRequestProceed(ctx, deviceID); err != nil {
- //If the error has code.NotFound the device is not there anymore, there is no need to delete flows, just ignore it
+ // If the error has code.NotFound the device is not there anymore, there is no need to delete flows, just ignore it
if status.Code(err) != codes.NotFound {
logger.Warnw(ctx, "adapters-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "flow": flowsToDelete, "error": err})
return err
@@ -554,9 +554,9 @@
if flowHandle, have := agent.flowCache.Lock(flowID); have {
if flowMeterID := fu.GetMeterIdFromFlow(flowHandle.GetReadOnly()); flowMeterID != 0 && flowMeterID == meterID {
if err := flowHandle.Delete(ctx); err != nil {
- //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
- //Anyways this returns an error to controller which possibly results with a re-deletion.
- //Then how can we handle the new deletion request(Same for group deletion)?
+ // TODO: Think on carrying on and deleting the remaining flows, instead of returning.
+ // Anyways this returns an error to controller which possibly results with a re-deletion.
+ // Then how can we handle the new deletion request(Same for group deletion)?
return err
}
}
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index aeda40a..9a9d528 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -103,7 +103,7 @@
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- //TODO: Revert flow changes
+ // TODO: Revert flow changes
}
}()
return nil
@@ -131,7 +131,7 @@
}
groupHandle.Unlock()
- //TODO: this is another case where ordering guarantees are not being made,
+ // TODO: this is another case where ordering guarantees are not being made,
// group deletion does not guarantee deletion of corresponding flows.
// an error while deleting flows can cause inconsistent state.
flows, err := agent.deleteFlowsHavingGroup(ctx, groupID)
@@ -159,12 +159,12 @@
return err
}
} else {
- //no flow is affected, just remove the groups
+ // No flow is affected, just remove the groups
deviceRules = fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
}
- //add groups to deviceRules
+ // Add groups to deviceRules
for _, groupEntry := range affectedGroups {
fg := fu.NewFlowsAndGroups()
fg.AddGroup(groupEntry)
@@ -177,7 +177,7 @@
return err
}
- // delete groups and related flows, if any
+ // Delete groups and related flows, if any
respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &ofp.OfpFlowMod{})
// Wait for completion
@@ -194,7 +194,7 @@
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- //TODO: Revert flow changes
+ // TODO: Revert flow changes
}
}()
return nil
@@ -214,7 +214,7 @@
}
defer groupHandle.Unlock()
- //replace existing group entry with new group definition
+ // Replace existing group entry with new group definition
groupEntry := fu.GroupEntryFromGroupMod(groupMod)
deviceRules := fu.NewDeviceRules()
deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
@@ -224,7 +224,7 @@
logger.Debugw(ctx, "rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
- //update KV
+ // Update KV
if err := groupHandle.Update(ctx, groupEntry); err != nil {
logger.Errorw(ctx, "cannot-update-logical-group", log.Fields{"logical-device-id": agent.logicalDeviceID})
return err
@@ -247,7 +247,7 @@
agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- //TODO: Revert flow changes
+ // TODO: Revert flow changes
}
}()
return nil
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index cd173f2..e06cb6f 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -63,7 +63,7 @@
logger.Infow(ctx, "failed-to-update-routes-after-adding-parent-pon-port", log.Fields{"device-id": device.Id, "port": port, "ports-count": len(devicePorts), "error": err})
}
- //fallthrough
+ // Fallthrough
case voltha.Port_PON_ONU:
// Add the routes corresponding to that child device
@@ -134,7 +134,7 @@
return err
}
- //Get UNI port number
+ // Get UNI port number
for _, port := range devicePorts {
if port.Type == voltha.Port_ETHERNET_NNI {
if err = agent.addNNILogicalPort(ctx, deviceID, devicePorts, port); err != nil {
@@ -183,7 +183,7 @@
logger.Infow(ctx, "setup-uni-logical-ports", log.Fields{"logical-device-id": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
- //Get UNI port number
+ // Get UNI port number
for _, port := range childDevicePorts {
if port.Type == voltha.Port_ETHERNET_UNI {
if err = agent.addUNILogicalPort(ctx, childDevice.Id, childDevice.AdminState, childDevice.OperStatus, childDevicePorts, port); err != nil {
@@ -444,8 +444,8 @@
// orderedEvents guarantees the order that events are sent, while allowing events to back up.
type orderedEvents struct {
- mutex sync.Mutex
last <-chan struct{}
+ mutex sync.Mutex
}
type queuePosition struct {
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
index 33bd412..c1172d0 100644
--- a/rw_core/core/device/logical_agent_route.go
+++ b/rw_core/core/device/logical_agent_route.go
@@ -37,7 +37,7 @@
if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
logger.Debugw(ctx, "controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo})
if agent.isNNIPort(ingressPortNo) {
- //This is a trap on the NNI Port
+ // This is a trap on the NNI Port
if agent.deviceRoutes.IsRoutesEmpty() {
// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
// route with same IngressHop and EgressHop
@@ -62,7 +62,7 @@
}
}
- //If ingress port is not specified (nil), it may be a wildcarded route if egress port is OFPP_CONTROLLER or a nni
+ // If ingress port is not specified (nil), it may be a wildcarded route if egress port is OFPP_CONTROLLER or a nni
// logical port, in which case we need to create a half-route where only the egress hop is filled, the first hop is nil
if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
routes, err := agent.deviceRoutes.GetHalfRoute(true, ingressPortNo, egressPortNo)
@@ -72,7 +72,7 @@
return routes, nil
}
- //If egress port is not specified (nil), we can also can return a "half" route
+ // If egress port is not specified (nil), we can also can return a "half" route
if egressPortNo == 0 {
routes, err := agent.deviceRoutes.GetHalfRoute(false, ingressPortNo, egressPortNo)
if err != nil {
@@ -81,7 +81,7 @@
return routes, nil
}
- // Return the pre-calculated route
+ // Return the pre-calculated route
return agent.deviceRoutes.GetRoute(ctx, ingressPortNo, egressPortNo)
}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index b673195..7eba187 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -134,7 +134,7 @@
return test
}
-func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
+func (lda *LDATest) startCore(ctx context.Context) {
cfg := &config.RWCoreFlags{}
cfg.ParseCommandArguments([]string{})
cfg.EventTopic = "voltha.events"
@@ -178,7 +178,7 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.internalTimeout)
+ lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.internalTimeout)
lDeviceAgent.logicalDevice = clonedLD
for _, port := range lda.logicalPorts {
clonedPort := proto.Clone(port).(*voltha.LogicalPort)
@@ -330,7 +330,7 @@
defer lda.stopAll(ctx)
// Start the Core
- lda.startCore(ctx, false)
+ lda.startCore(ctx)
var wg sync.WaitGroup
numConCurrentLogicalDeviceAgents := 3
@@ -350,7 +350,7 @@
defer lda.stopAll(ctx)
// Start the Core
- lda.startCore(ctx, false)
+ lda.startCore(ctx)
a := lda.createLogicalDeviceAgent(t)
lda.updateLogicalDevice(t, a)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 3812e2f..0d89e8e 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -40,13 +40,13 @@
// LogicalManager represent logical device manager attributes
type LogicalManager struct {
*event.Manager
- logicalDeviceAgents sync.Map
deviceMgr *Manager
dbPath *model.Path
ldProxy *model.Proxy
+ logicalDeviceLoadingInProgress map[string][]chan int
+ logicalDeviceAgents sync.Map
internalTimeout time.Duration
logicalDevicesLoadingLock sync.RWMutex
- logicalDeviceLoadingInProgress map[string][]chan int
}
func (ldMgr *LogicalManager) Start(ctx context.Context, serviceName string) {
@@ -60,7 +60,7 @@
}
for _, lDevice := range logicalDevices {
// Create an agent for each device
- agent := newLogicalAgent(ctx, lDevice.Id, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
+ agent := newLogicalAgent(lDevice.Id, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
go agent.start(ctx, true, lDevice)
}
@@ -90,7 +90,7 @@
}
return lda
}
- // Try to load into memory - loading will also create the logical device agent
+ // Try to load into memory - loading will also create the logical device agent
if err := ldMgr.load(ctx, logicalDeviceID); err == nil {
if agent, ok = ldMgr.logicalDeviceAgents.Load(logicalDeviceID); ok {
return agent.(*LogicalAgent)
@@ -148,7 +148,7 @@
// with length varying from eight characters to a maximum of 14 characters. Mac Address is part of oneof
// in the Device model. May need to be moved out.
id := utils.CreateLogicalDeviceID()
- sn := strings.Replace(device.MacAddress, ":", "", -1)
+ sn := strings.ReplaceAll(device.MacAddress, ":", "")
if id == "" {
logger.Errorw(ctx, "mac-address-not-set", log.Fields{"device-id": device.Id, "serial-number": sn})
return nil, errors.New("mac-address-not-set")
@@ -156,7 +156,7 @@
logger.Debugw(ctx, "logical-device-id", log.Fields{"logical-device-id": id})
- agent := newLogicalAgent(ctx, id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
+ agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
// Update the root device with the logical device Id reference
if err := ldMgr.deviceMgr.setParentID(ctx, device, id); err != nil {
@@ -194,6 +194,8 @@
}
// getLogicalDeviceFromModel retrieves the logical device data from the model.
+//
+//nolint:unparam
func (ldMgr *LogicalManager) getLogicalDeviceFromModel(ctx context.Context, lDeviceID string) (*voltha.LogicalDevice, error) {
logicalDevice := &voltha.LogicalDevice{}
if have, err := ldMgr.ldProxy.Get(ctx, lDeviceID, logicalDevice); err != nil {
@@ -219,7 +221,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw(ctx, "loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
+ agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
go agent.start(ctx, true, nil)
} else {
logger.Debugw(ctx, "logical-device-not-in-model", log.Fields{"logical-device-id": lDeviceID})
@@ -240,7 +242,7 @@
ch := make(chan int, 1)
ldMgr.logicalDeviceLoadingInProgress[lDeviceID] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceID], ch)
ldMgr.logicalDevicesLoadingLock.Unlock()
- // Wait for the channel to be closed, implying the process loading this device is done.
+ // Wait for the channel to be closed, implying the process loading this device is done.
<-ch
}
if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceID); exist {
@@ -262,7 +264,7 @@
logger.Errorw(ctx, "failed-to-stop-agent", log.Fields{"error": err})
return err
}
- //Remove the logical device agent from the Map
+ // Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceID)
ldMgr.SendDeviceDeletionEvent(ctx, logDeviceID)
} else {
@@ -408,6 +410,7 @@
}
// deleteAllLogicalMetersForLogicalDevice removes the logical meters associated with a the Logical Device ID
+// nolint:unparam
func (ldMgr *LogicalManager) deleteAllLogicalMetersForLogicalDevice(ctx context.Context, ldID string) error {
logger.Debugw(ctx, "delete-logical-meters", log.Fields{"logical-device-id": ldID})
if agent := ldMgr.getLogicalDeviceAgent(ctx, ldID); agent != nil {
@@ -456,7 +459,7 @@
var ldID *string
var err error
- //Get the logical device Id for this device
+ // Get the logical device Id for this device
if ldID, err = ldMgr.getLogicalDeviceID(ctx, device); err != nil {
logger.Warnw(ctx, "no-logical-device-found", log.Fields{"device-id": device.Id, "error": err})
return err
@@ -474,7 +477,7 @@
var ldID *string
var err error
- //Get the logical device Id for this device
+ // Get the logical device Id for this device
if ldID, err = ldMgr.getLogicalDeviceIDFromDeviceID(ctx, deviceID); err != nil {
logger.Warnw(ctx, "no-logical-device-found", log.Fields{"device-id": deviceID, "error": err})
return err
diff --git a/rw_core/core/device/logical_port/loader.go b/rw_core/core/device/logical_port/loader.go
index 9083e06..ffaad12 100644
--- a/rw_core/core/device/logical_port/loader.go
+++ b/rw_core/core/device/logical_port/loader.go
@@ -30,20 +30,19 @@
// Loader hides all low-level locking & synchronization related to port state updates
type Loader struct {
- dbProxy *model.Proxy
- // this lock protects the ports map, it does not protect individual ports
- lock sync.RWMutex
+ dbProxy *model.Proxy
ports map[uint32]*chunk
deviceLookup map[string]map[uint32]struct{}
+ // this lock protects the ports map, it does not protect individual ports
+ lock sync.RWMutex
}
// chunk keeps a port and the lock for this port
type chunk struct {
+ port *voltha.LogicalPort
// this lock is used to synchronize all access to the port, and also to the "deleted" variable
lock sync.Mutex
deleted bool
-
- port *voltha.LogicalPort
}
func NewLoader(dbProxy *model.Proxy) *Loader {
@@ -82,7 +81,7 @@
loader.lock.Lock()
entry, have := loader.ports[port.OfpPort.PortNo]
if !have {
- entry := &chunk{port: port}
+ entry = &chunk{port: port}
loader.ports[port.OfpPort.PortNo] = entry
loader.addLookup(port.DeviceId, port.OfpPort.PortNo)
entry.lock.Lock()
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index f09ddb3..bf68509 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -44,23 +44,23 @@
// Manager represent device manager attributes
type Manager struct {
- deviceAgents sync.Map
- rootDevices map[string]bool
- lockRootDeviceMap sync.RWMutex
+ rootDevices map[string]bool
*event.Agent
adapterMgr *adapter.Manager
logicalDeviceMgr *LogicalManager
stateTransitions *state.TransitionMap
dbPath *model.Path
dProxy *model.Proxy
+ deviceLoadingInProgress map[string][]chan int
+ config *config.RWCoreFlags
+ doneCh chan struct{}
+ deviceAgents sync.Map
coreInstanceID string
internalTimeout time.Duration
rpcTimeout time.Duration
flowTimeout time.Duration
+ lockRootDeviceMap sync.RWMutex
devicesLoadingLock sync.RWMutex
- deviceLoadingInProgress map[string][]chan int
- config *config.RWCoreFlags
- doneCh chan struct{}
}
// NewManagers creates the Manager and the Logical Manager.
@@ -171,7 +171,7 @@
}
return agent.(*Agent)
}
- //TODO: Change the return params to return an error as well
+ // TODO: Change the return params to return an error as well
logger.Errorw(ctx, "loading-device-failed", log.Fields{"device-id": deviceID, "error": err})
return nil
}
@@ -307,7 +307,7 @@
ch := make(chan int, 1)
dMgr.deviceLoadingInProgress[deviceID] = append(dMgr.deviceLoadingInProgress[deviceID], ch)
dMgr.devicesLoadingLock.Unlock()
- // Wait for the channel to be closed, implying the process loading this device is done.
+ // Wait for the channel to be closed, implying the process loading this device is done.
<-ch
}
if agent, ok := dMgr.deviceAgents.Load(deviceID); ok {
@@ -375,7 +375,7 @@
}
logger.Debugw(ctx, "successfully-loaded-parent-and-children", log.Fields{"device-id": deviceID})
} else if device.ParentId != "" {
- // Scenario B - use the parentId of that device (root device) to trigger the loading
+ // Scenario B - use the parentId of that device (root device) to trigger the loading
return dMgr.load(ctx, device.ParentId)
}
@@ -393,7 +393,7 @@
if ok && deviceAgent.adapterEndpoint == adapter.Endpoint {
// Before reconciling, abort in-process request
if err := deviceAgent.abortAllProcessing(utils.WithNewSpanAndRPCMetadataContext(ctx, "AbortProcessingOnRestart")); err == nil {
- logger.Debugw(ctx, "setting transiet state",
+ logger.Debugw(ctx, "setting transient state",
log.Fields{
"device-id": deviceAgent.deviceID,
"root-device": deviceAgent.isRootDevice,
@@ -401,8 +401,8 @@
"device-type": deviceAgent.deviceType,
"adapter-type": adapter.Type,
})
- //set transient state to RECONCILE IN PROGRESS
- err := deviceAgent.UpdateTransientStateToReconcile(ctx)
+ // set transient state to RECONCILE IN PROGRESS
+ err = deviceAgent.UpdateTransientStateToReconcile(ctx)
if err != nil {
logger.Errorw(ctx, "setting-transient-state-failed", log.Fields{"error": err})
}
@@ -479,7 +479,7 @@
if err := agent.addPort(ctx, port); err != nil {
return err
}
- // Setup peer ports in its own routine
+ // Setup peer ports in its own routine
if err := dMgr.addPeerPort(ctx, deviceID, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": deviceID})
}
@@ -746,7 +746,7 @@
"curr-oper-state": device.OperStatus,
"curr-conn-state": device.ConnectStatus,
})
- //TODO: notify over kafka?
+ // TODO: notify over kafka?
return nil
}
@@ -776,7 +776,7 @@
func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
+ // TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
dMgr.Agent.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
}
@@ -841,7 +841,7 @@
respCount++
- //check whether all responses received, if so, sent back the collated response
+ // Check whether all responses received, if so, sent back the collated response
if respCount == expectedResps {
return response, nil
}
@@ -861,7 +861,7 @@
err := agent.reconcilingCleanup(ctx)
if err != nil {
logger.Errorf(ctx, err.Error())
- return status.Errorf(codes.Internal, err.Error())
+ return status.Errorf(codes.Internal, "%s", err.Error())
}
return nil
}
diff --git a/rw_core/core/device/manager_nbi.go b/rw_core/core/device/manager_nbi.go
index e2c848d..95cc648 100644
--- a/rw_core/core/device/manager_nbi.go
+++ b/rw_core/core/device/manager_nbi.go
@@ -282,7 +282,7 @@
CommitOnSuccess: request.CommitOnSuccess,
}
- //slice-out only single deviceID from the request
+ // slice-out only single deviceID from the request
downloadReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageDownloadRequest, ch chan []*voltha.DeviceImageState) {
@@ -348,7 +348,7 @@
respCh := make(chan []*voltha.DeviceImageState, len(request.GetDeviceId()))
if request.DeviceId == nil {
- //Reply for every ONU
+ // Reply for every ONU
dMgr.deviceAgents.Range(func(key, value interface{}) bool {
device := value.(*Agent).device
if !device.Root {
@@ -365,7 +365,7 @@
CommitOnSuccess: request.CommitOnSuccess,
}
- //slice-out only single deviceID from the request
+ // slice-out only single deviceID from the request
imageStatusReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
@@ -436,7 +436,7 @@
CommitOnSuccess: request.CommitOnSuccess,
}
- //slice-out only single deviceID from the request
+ // slice-out only single deviceID from the request
abortImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
@@ -529,7 +529,7 @@
CommitOnSuccess: request.CommitOnSuccess,
}
- //slice-out only single deviceID from the request
+ // slice-out only single deviceID from the request
activateImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
@@ -600,7 +600,7 @@
Version: request.Version,
CommitOnSuccess: request.CommitOnSuccess,
}
- //slice-out only single deviceID from the request
+ // slice-out only single deviceID from the request
commitImageReq.DeviceId = request.DeviceId[index : index+1]
go func(deviceID string, req *voltha.DeviceImageRequest, ch chan []*voltha.DeviceImageState) {
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 4d154eb..8af1d15 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -42,7 +42,7 @@
if err := agent.addPort(ctx, port); err != nil {
return nil, err
}
- // Setup peer ports in its own routine
+ // Setup peer ports in its own routine
if err := dMgr.addPeerPort(ctx, port.DeviceId, port); err != nil {
logger.Errorw(ctx, "unable-to-add-peer-port", log.Fields{"error": err, "device-id": port.DeviceId})
@@ -80,6 +80,9 @@
}
func (dMgr *Manager) ChildDeviceDetected(ctx context.Context, dd *ca.DeviceDiscovery) (*voltha.Device, error) {
+ var err error
+ var pDevice *voltha.Device
+
ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "ChildDeviceDetected")
logger.Debugw(ctx, "child-device-detected",
log.Fields{
@@ -91,15 +94,13 @@
"serialNumber": dd.SerialNumber,
"onuId": dd.OnuId,
})
-
- var err error
if dd.ChildDeviceType == "" && dd.VendorId != "" {
logger.Debug(ctx, "device-type-is-nil-fetching-device-type")
if dd.ChildDeviceType, err = dMgr.adapterMgr.GetAdapterTypeByVendorID(dd.VendorId); err != nil {
return nil, err
}
}
- //if no match found for the vendorid,report adapter with the custom error message
+ // If no match found for the vendorid,report adapter with the custom error message
if dd.ChildDeviceType == "" {
logger.Errorw(ctx, "failed-to-fetch-adapter-name ", log.Fields{"vendorId": dd.VendorId})
return nil, status.Errorf(codes.NotFound, "%s", dd.VendorId)
@@ -120,21 +121,21 @@
return nil, status.Errorf(codes.NotFound, "%s", dd.ParentId)
}
if pAgent.deviceType == "" {
- pDevice, err := pAgent.getDeviceReadOnly(ctx)
+ pDevice, err = pAgent.getDeviceReadOnly(ctx)
logger.Errorw(ctx, "device-type-not-set", log.Fields{"parent-device": pDevice, "error": err})
return nil, status.Errorf(codes.FailedPrecondition, "device Type not set %s", dd.ParentId)
}
- if device, err := dMgr.GetChildDevice(ctx, &ca.ChildDeviceFilter{
+ if pDevice, err = dMgr.GetChildDevice(ctx, &ca.ChildDeviceFilter{
ParentId: dd.ParentId,
SerialNumber: dd.SerialNumber,
OnuId: dd.OnuId,
ParentPortNo: dd.ParentPortNo}); err == nil {
logger.Warnw(ctx, "child-device-exists", log.Fields{"parent-device-id": dd.ParentId, "serialNumber": dd.SerialNumber})
- return device, status.Errorf(codes.AlreadyExists, "%s", dd.SerialNumber)
+ return pDevice, status.Errorf(codes.AlreadyExists, "%s", dd.SerialNumber)
}
- //Get parent endpoint
+ // Get parent endpoint
pEndPoint, err := dMgr.adapterMgr.GetAdapterEndpoint(ctx, pAgent.deviceID, pAgent.deviceType)
if err != nil {
logger.Errorw(ctx, "endpoint-error", log.Fields{"error": err, "parent-id": pAgent.deviceID, "parent-device-type": pAgent.deviceType})
@@ -404,7 +405,7 @@
if device, err := dMgr.getDeviceReadOnly(ctx, deviceID.Id); err == nil {
go func() {
subCtx := utils.WithSpanAndRPCMetadataFromContext(ctx)
- if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
+ if err = dMgr.logicalDeviceMgr.deleteAllLogicalPorts(subCtx, device); err != nil {
logger.Errorw(ctx, "unable-to-delete-logical-ports", log.Fields{"error": err})
}
}()
diff --git a/rw_core/core/device/manager_state_callback.go b/rw_core/core/device/manager_state_callback.go
index b64c10a..f4f6754 100644
--- a/rw_core/core/device/manager_state_callback.go
+++ b/rw_core/core/device/manager_state_callback.go
@@ -80,7 +80,7 @@
// RunPostDeviceDelete removes any reference of this device
func (dMgr *Manager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
logger.Infow(ctx, "run-post-device-delete", log.Fields{"device-id": cDevice.Id})
- //deleting the logical device
+ // Deleting the logical device
logger.Debugw(ctx, "delete-logical-device", log.Fields{"device-id": cDevice.Id})
if dMgr.logicalDeviceMgr != nil && cDevice.Root {
if err := dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
@@ -92,7 +92,7 @@
}
if agent := dMgr.getDeviceAgent(ctx, cDevice.Id); agent != nil {
logger.Debugw(ctx, "invoking-delete-device-and-ports", log.Fields{"device-id": cDevice.Id})
- //delete ports
+ // Delete ports
if err := agent.deleteAllPorts(ctx); err != nil {
logger.Warnw(ctx, "failure-delete-device-ports", log.Fields{"device-id": cDevice.Id, "error": err.Error()})
}
diff --git a/rw_core/core/device/meter/loader.go b/rw_core/core/device/meter/loader.go
index 1cf8809..ad5aca7 100644
--- a/rw_core/core/device/meter/loader.go
+++ b/rw_core/core/device/meter/loader.go
@@ -9,8 +9,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-package meter
+ */package meter
import (
"context"
@@ -27,17 +26,17 @@
// Loader hides all low-level locking & synchronization related to meter state updates
type Loader struct {
dbProxy *model.Proxy
+ meters map[uint32]*chunk
// this lock protects the meters map, it does not protect individual meters
- lock sync.RWMutex
- meters map[uint32]*chunk
+ lock sync.RWMutex
}
// chunk keeps a meter and the lock for this meter
type chunk struct {
+ meter *ofp.OfpMeterEntry
// this lock is used to synchronize all access to the meter, and also to the "deleted" variable
lock sync.Mutex
deleted bool
- meter *ofp.OfpMeterEntry
}
func NewLoader(dbProxy *model.Proxy) *Loader {
@@ -72,7 +71,7 @@
loader.lock.Lock()
entry, have := loader.meters[meter.Config.MeterId]
if !have {
- entry := &chunk{meter: meter}
+ entry = &chunk{meter: meter}
loader.meters[meter.Config.MeterId] = entry
entry.lock.Lock()
loader.lock.Unlock()
diff --git a/rw_core/core/device/port/loader.go b/rw_core/core/device/port/loader.go
index 61adeae..30cb69d 100644
--- a/rw_core/core/device/port/loader.go
+++ b/rw_core/core/device/port/loader.go
@@ -31,18 +31,17 @@
// Loader hides all low-level locking & synchronization related to port state updates
type Loader struct {
dbProxy *model.Proxy
+ ports map[uint32]*chunk
// this lock protects the ports map, it does not protect individual ports
- lock sync.RWMutex
- ports map[uint32]*chunk
+ lock sync.RWMutex
}
// chunk keeps a port and the lock for this port
type chunk struct {
+ port *voltha.Port
// this lock is used to synchronize all access to the port, and also to the "deleted" variable
lock sync.Mutex
deleted bool
-
- port *voltha.Port
}
func NewLoader(dbProxy *model.Proxy) *Loader {
@@ -79,7 +78,7 @@
loader.lock.Lock()
entry, have := loader.ports[port.PortNo]
if !have {
- entry := &chunk{port: port}
+ entry = &chunk{port: port}
loader.ports[port.PortNo] = entry
entry.lock.Lock()
loader.lock.Unlock()
diff --git a/rw_core/core/device/state/transitions.go b/rw_core/core/device/state/transitions.go
index a2117df..ebc720c 100644
--- a/rw_core/core/device/state/transitions.go
+++ b/rw_core/core/device/state/transitions.go
@@ -78,16 +78,16 @@
// transition represent transition related attributes
type transition struct {
- deviceType deviceType
+ handlers []transitionHandler
previousState deviceState
currentState deviceState
- handlers []transitionHandler
+ deviceType deviceType
}
// TransitionMap represent map of transitions and device manager
type TransitionMap struct {
- transitions []transition
dMgr DeviceManager
+ transitions []transition
}
// DeviceManager represents a generic device manager
@@ -143,7 +143,7 @@
currentState: deviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE, Transient: core.DeviceTransientState_NONE},
handlers: []transitionHandler{dMgr.SetupUNILogicalPorts}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE PRE PROVISIONED State device forcefully
+ transition{ // DELETE PRE PROVISIONED State device forcefully
deviceType: any,
previousState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_FORCE_DELETING},
@@ -155,19 +155,19 @@
currentState: deviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE},
handlers: []transitionHandler{dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE device forcefully
+ transition{ // DELETE device forcefully
deviceType: parent,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_FORCE_DELETING},
handlers: []transitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteAllLogicalMeters, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE device after adapter response
+ transition{ // DELETE device after adapter response
deviceType: parent,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE},
handlers: []transitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteAllLogicalMeters, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE no operation transition
+ transition{ // DELETE no operation transition
deviceType: parent,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_DELETING_FROM_ADAPTER},
@@ -209,19 +209,19 @@
currentState: deviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_NONE},
handlers: []transitionHandler{dMgr.CreateLogicalDevice}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE force case
+ transition{ // DELETE force case
deviceType: child,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_FORCE_DELETING},
handlers: []transitionHandler{dMgr.DeleteAllDeviceFlows, dMgr.ChildDeviceLost, dMgr.DeleteLogicalPorts, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE after adapter response case
+ transition{ // DELETE after adapter response case
deviceType: child,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE},
handlers: []transitionHandler{dMgr.DeleteAllDeviceFlows, dMgr.ChildDeviceLost, dMgr.DeleteLogicalPorts, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
- transition{ //DELETE wait for adapter response(no operation)
+ transition{ // DELETE wait for adapter response(no operation)
deviceType: child,
previousState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_ANY},
currentState: deviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN, Transient: core.DeviceTransientState_DELETING_FROM_ADAPTER},
@@ -398,7 +398,7 @@
Transient: transientState}
}
-// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
+// isMatched matches a state transition. It returns whether there is a match and if there is whether it is an exact match
func getHandler(previous deviceState, current deviceState, transition *transition) ([]transitionHandler, *match) {
m := &match{}
var waitForOtherStatesMatch bool
@@ -473,7 +473,7 @@
// getTransitionHandler returns transition handler & a flag that's set if the transition is invalid
func (tMap *TransitionMap) getTransitionHandler(ctx context.Context, cDevice, pDevice *voltha.Device,
cTransientState, pTransientState core.DeviceTransientState_Types) []transitionHandler {
- //1. Get the previous and current set of states
+ // Get the previous and current set of states
cState := getDeviceStates(cDevice, cTransientState)
pState := getDeviceStates(pDevice, pTransientState)
@@ -482,7 +482,6 @@
return nil
}
- //logger.Infow(ctx, "deviceType", log.Fields{"device": pDevice})
deviceType := parent
if !cDevice.Root {
logger.Info(ctx, "device is child")
@@ -490,13 +489,13 @@
}
logger.Infof(ctx, "deviceType:%d-deviceId:%s-previous:%v-current:%v", deviceType, cDevice.Id, pState, cState)
- //2. Go over transition array to get the right transition
+ // Go over transition array to get the right transition
var currentMatch []transitionHandler
var tempHandler []transitionHandler
var m *match
bestMatch := &match{}
for i := range tMap.transitions {
- // consider transition only if it matches deviceType or is a wild card - any
+ // Consider transition only if it matches deviceType or is a wild card - any
if tMap.transitions[i].deviceType != deviceType && tMap.transitions[i].deviceType != any {
continue
}
diff --git a/rw_core/core/device/state/transitions_test.go b/rw_core/core/device/state/transitions_test.go
index a5470a1..b674799 100644
--- a/rw_core/core/device/state/transitions_test.go
+++ b/rw_core/core/device/state/transitions_test.go
@@ -66,6 +66,7 @@
assert.Equal(t, 0, len(handlers))
}
+// nolint: staticcheck,ineffassign
func TestValidTransitions(t *testing.T) {
ctx := context.Background()
diff --git a/rw_core/core/device/transientstate/loader.go b/rw_core/core/device/transientstate/loader.go
index bae1099..0855ad5 100644
--- a/rw_core/core/device/transientstate/loader.go
+++ b/rw_core/core/device/transientstate/loader.go
@@ -31,15 +31,15 @@
// Loader hides all low-level locking & synchronization related to device transient state updates
type Loader struct {
- dbProxy *model.Proxy
- // this lock protects the device transient state
- lock sync.RWMutex
+ dbProxy *model.Proxy
deviceTransientState *data
+ // this lock protects the device transient state
+ lock sync.RWMutex
}
type data struct {
- transientState core.DeviceTransientState_Types
deviceID string
+ transientState core.DeviceTransientState_Types
}
func NewLoader(dbProxy *model.Proxy, deviceID string) *Loader {
diff --git a/rw_core/core/kv.go b/rw_core/core/kv.go
index 2df1187..a430e99 100644
--- a/rw_core/core/kv.go
+++ b/rw_core/core/kv.go
@@ -68,7 +68,7 @@
// Take a nap before retrying
select {
case <-ctx.Done():
- //ctx canceled
+ // ctx canceled
return ctx.Err()
case <-time.After(retryInterval):
}
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index 6b5755a..58fcb83 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -70,6 +70,7 @@
}
// Handles special case of any controller-bound flow for a parent device
+// nolint: unparam
func (fd *FlowDecomposer) updateOutputPortForControllerBoundFlowForParentDevice(ctx context.Context, dr *fu.DeviceRules) (*fu.DeviceRules, error) {
EAPOL := fu.EthType(0x888e)
PPPoED := fu.EthType(0x8863)
@@ -77,7 +78,7 @@
UDP := fu.IpProto(17)
newDeviceRules := dr.Copy()
- // Check whether we are dealing with a parent device
+ // Check whether we are dealing with a parent device
for deviceID, fg := range dr.GetRules() {
if device, err := fd.getDevice(ctx, deviceID); err == nil && device.Root {
newDeviceRules.ClearFlows(deviceID)
@@ -97,11 +98,6 @@
f = fu.UpdateOutputPortByActionType(f, uint32(ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS),
uint32(ofp.OfpPortNo_OFPP_CONTROLLER))
}
- // Update flow Id as a change in the instruction field will result in a new flow ID
- //var err error
- //if f.Id, err = fu.HashFlowStats(f); err != nil {
- //return nil, err
- //}
newDeviceRules.AddFlow(deviceID, (proto.Clone(f)).(*ofp.OfpFlowStats))
}
}
@@ -122,7 +118,7 @@
ingressHop := path[0]
egressHop := path[1]
- //case of packet_in from NNI port rule
+ // case of packet_in from NNI port rule
if agent.GetDeviceRoutes().IsRootPort(inPortNo) {
// Trap flow for NNI port
logger.Debug(ctx, "trap-nni")
@@ -148,7 +144,7 @@
logger.Debug(ctx, "trap-uni")
var setVid, setPcp uint32
var setVidOk, setPcpOk bool
- //inPortNo is 0 for wildcard input case, do not include upstream port for controller bound flow in input
+ // inPortNo is 0 for wildcard input case, do not include upstream port for controller bound flow in input
var inPorts = map[uint32]struct{}{inPortNo: {}}
if inPortNo == 0 {
inPorts = agent.GetWildcardInputPorts(ctx, egressHop.Egress) // exclude egress_hop.egress_port.port_no
@@ -291,7 +287,7 @@
// Augment the matchfields with the ofpfields from the flow
fa.MatchFields = append(fa.MatchFields, fu.GetOfbFields(flow, fu.IN_PORT)...)
- //Augment the actions
+ // Augment the actions
filteredAction := fu.GetActions(flow, fu.OUTPUT)
filteredAction = append(filteredAction, fu.Output(egressHop.Egress))
fa.Actions = filteredAction
@@ -443,7 +439,7 @@
logger.Debugw(ctx, "multicast-flow", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo})
deviceRules := fu.NewDeviceRules()
- //having no Group yet is the same as having a Group with no buckets
+ // having no Group yet is the same as having a Group with no buckets
var grp *ofp.OfpGroupEntry
var ok bool
if grp, ok = groupMap[grpID]; !ok {
@@ -458,7 +454,7 @@
deviceRules.CreateEntryIfNotExist(path[0].DeviceID)
fg := fu.NewFlowsAndGroups()
fg.AddFlow(flow)
- //return the multicast flow without decomposing it
+ // return the multicast flow without decomposing it
deviceRules.AddFlowsAndGroup(path[0].DeviceID, fg)
return deviceRules
}
@@ -469,8 +465,8 @@
inPortNo := fu.GetInPort(flow)
if fu.HasGroup(flow) && inPortNo == 0 {
- //if no in-port specified for a multicast flow, put NNI port as in-port
- //so that a valid path can be found for the flow
+ // if no in-port specified for a multicast flow, put NNI port as in-port
+ // so that a valid path can be found for the flow
nniPorts := agent.GetNNIPorts()
if len(nniPorts) > 0 {
for port := range nniPorts {
@@ -504,7 +500,6 @@
}
} else {
var ingressDevice *voltha.Device
- var err error
if ingressDevice, err = fd.getDevice(ctx, path[0].DeviceID); err != nil {
// This can happen in a race condition where a device is deleted right after we obtain a
// route involving the device (GetRoute() above). Handle it as a no route event as well.
@@ -535,7 +530,7 @@
if err != nil {
return nil, err
}
- } else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { //Multicast
+ } else if grpID := fu.GetGroup(flow); grpID != 0 && flow.TableId == 0 { // Multicast
logger.Debugw(ctx, "process-multicast-flow", log.Fields{"flows": flow})
deviceRules = fd.processMulticastFlow(ctx, path, inPortNo, outPortNo, flow, grpID, groupMap)
} else {
diff --git a/rw_core/main.go b/rw_core/main.go
index 0a0e4fb..70c4b35 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "io"
"os"
"time"
@@ -69,13 +70,13 @@
panic(err)
}
- //Setup default logger - applies for packages that do not have specific logger set
- if _, err := log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": instanceID}); err != nil {
+ // Setup default logger - applies for packages that do not have specific logger set
+ if _, err = log.SetDefaultLogger(log.JSON, logLevel, log.Fields{"instanceId": instanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
// Update all loggers (provisioned via init) with a common field
- if err := log.UpdateAllLoggers(log.Fields{"instanceId": instanceID}); err != nil {
+ if err = log.UpdateAllLoggers(log.Fields{"instanceId": instanceID}); err != nil {
logger.With(log.Fields{"error": err}).Fatal(ctx, "Cannot setup logging")
}
@@ -83,7 +84,7 @@
log.SetAllLogLevel(logLevel)
defer func() {
- err := log.CleanUp()
+ err = log.CleanUp()
if err != nil {
logger.Errorw(ctx, "unable-to-flush-any-buffered-log-entries", log.Fields{"error": err})
}
@@ -117,7 +118,8 @@
// Add the probe to the context to pass to all the services started
probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
- closer, err := log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
+ var closer io.Closer
+ closer, err = log.GetGlobalLFM().InitTracingAndLogCorrelation(cf.TraceEnabled, cf.TraceAgentAddress, cf.LogCorrelationEnabled)
if err != nil {
logger.Warnw(ctx, "unable-to-initialize-tracing-and-log-correlation-module", log.Fields{"error": err})
} else {
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index 11987be..80465c8 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -73,21 +73,21 @@
// Adapter represents adapter attributes
type Adapter struct {
flows map[string]map[uint64]*openflow_13.OfpFlowStats
- flowLock sync.RWMutex
devices map[string]*voltha.Device
- deviceLock sync.RWMutex
failFlowAdd map[string]bool
- failFlowAddLock sync.RWMutex
failFlowDelete map[string]bool
- failFlowDeleteLock sync.RWMutex
failDeleteDevice map[string]bool
- failDeleteDeviceLock sync.RWMutex
- coreEnpoint string
coreClient *vgrpc.Client
+ Probe *probe.Probe
+ coreEnpoint string
serviceEndpoint string
DeviceType string
vendor string
- Probe *probe.Probe
+ flowLock sync.RWMutex
+ deviceLock sync.RWMutex
+ failFlowAddLock sync.RWMutex
+ failFlowDeleteLock sync.RWMutex
+ failDeleteDeviceLock sync.RWMutex
}
// NewAdapter creates adapter instance
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 5a9d779..65c5503 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -45,9 +45,9 @@
// OLTAdapter represent OLT adapter
type OLTAdapter struct {
*Adapter
+ grpcServer *vgrpc.GrpcServer
ChildDeviceType string
childVendor string
- grpcServer *vgrpc.GrpcServer
}
// NewOLTAdapter - creates OLT adapter instance
@@ -140,7 +140,7 @@
if err != nil {
return
}
- if _, err := c.DeviceUpdate(context.TODO(), d); err != nil {
+ if _, err = c.DeviceUpdate(context.TODO(), d); err != nil {
logger.Fatalf(ctx, "deviceUpdate-failed-%s", err)
}
@@ -184,7 +184,7 @@
logger.Fatalf(ctx, "PortCreated-failed-%s", err)
}
- //Get the latest device data from the Core
+ // Get the latest device data from the Core
if d, err = c.GetDevice(context.TODO(), &common.ID{Id: d.Id}); err != nil {
logger.Fatalf(ctx, "getting-device-failed-%s", err)
}
@@ -195,7 +195,7 @@
initialUniPortNo := startingUNIPortNo
for i := 0; i < numONUPerOLT; i++ {
go func(seqNo int) {
- if _, err := c.ChildDeviceDetected(context.TODO(),
+ if _, err = c.ChildDeviceDetected(context.TODO(),
&ca.DeviceDiscovery{
ParentId: d.Id,
ParentPortNo: 1,
@@ -265,7 +265,7 @@
logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
- //Update the device operational state
+ // Update the device operational state
cloned.OperStatus = common.OperStatus_UNKNOWN
// The device is still reachable after it has been disabled, so the connection status should not be changed.
@@ -314,7 +314,7 @@
logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
- //Update the device state
+ // Update the device state
cloned.OperStatus = common.OperStatus_ACTIVE
if _, err := c.DeviceStateUpdate(context.TODO(), &ca.DeviceStateFilter{
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 6087422..b7309f6 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -130,7 +130,7 @@
if err != nil {
return
}
- if _, err := c.DeviceUpdate(context.TODO(), d); err != nil {
+ if _, err = c.DeviceUpdate(context.TODO(), d); err != nil {
logger.Fatalf(ctx, "deviceUpdate-failed-%s", err)
}
@@ -197,7 +197,7 @@
logger.Fatalf(ctx, "PortCreated-failed-%s", err)
}
- //Get the latest device data from the Core
+ // Get the latest device data from the Core
if d, err = c.GetDevice(context.TODO(), &common.ID{Id: d.Id}); err != nil {
logger.Fatalf(ctx, "getting-device-failed-%s", err)
}
@@ -237,7 +237,7 @@
logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
- //Update the device operational state
+ // Update the device operational state
cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
cloned.OperStatus = common.OperStatus_UNKNOWN
@@ -280,7 +280,7 @@
logger.Warnw(ctx, "updating-ports-failed", log.Fields{"device-id": device.Id, "error": err})
}
- //Update the device state
+ // Update the device state
cloned.ConnectStatus = common.ConnectStatus_REACHABLE
cloned.OperStatus = common.OperStatus_ACTIVE
diff --git a/rw_core/route/device_route.go b/rw_core/route/device_route.go
index 26a775d..d773649 100644
--- a/rw_core/route/device_route.go
+++ b/rw_core/route/device_route.go
@@ -53,16 +53,16 @@
// DeviceRoutes represent the set of routes between logical ports of a logical device
type DeviceRoutes struct {
- logicalDeviceID string
- rootDeviceID string
listDevicePorts listDevicePortsFunc
logicalPorts map[uint32]*voltha.LogicalPort
RootPorts map[uint32]uint32
- rootPortsLock sync.RWMutex
Routes map[PathID][]Hop
- routeBuildLock sync.RWMutex
devicesPonPorts map[string][]*voltha.Port
childConnectionPort map[string]uint32
+ logicalDeviceID string
+ rootDeviceID string
+ rootPortsLock sync.RWMutex
+ routeBuildLock sync.RWMutex
}
// NewDeviceRoutes creates device graph instance
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index f823f0f..471bc2e 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -141,7 +141,7 @@
return test
}
-func (nb *NBTest) startGRPCCore(ctx context.Context, t *testing.T) (coreEndpoint, nbiEndpoint string) {
+func (nb *NBTest) startGRPCCore(ctx context.Context) (coreEndpoint, nbiEndpoint string) {
// Setup the configs
cfg := &config.RWCoreFlags{}
cfg.ParseCommandArguments([]string{})
@@ -511,9 +511,7 @@
// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
var wg sync.WaitGroup
wg.Add(1)
- subCtx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
+ go nb.monitorLogicalDevices(t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
// Wait for the logical device to be in the ready state
var vldFunction = func(ports []*voltha.LogicalPort) bool {
@@ -781,7 +779,7 @@
return oltDevice, err
}
-func (nb *NBTest) testEnableDeviceFailed(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+func (nb *NBTest) testEnableDeviceFailed(t *testing.T, nbi voltha.VolthaServiceClient) {
//Create a device that has no adapter registered
macAddress := getRandomMacAddress()
oltDeviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegistered", MacAddress: macAddress})
@@ -816,9 +814,7 @@
//Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
var wg sync.WaitGroup
wg.Add(1)
- subCtx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
+ go nb.monitorLogicalDevices(t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
// Wait for the logical device to be in the ready state
var vldFunction = func(ports []*voltha.LogicalPort) bool {
@@ -1339,7 +1335,7 @@
assert.Nil(t, err)
}
-func (nb *NBTest) sendTrapFlows(t *testing.T, nbi voltha.VolthaServiceClient, logicalDeviceID string, ports []*voltha.LogicalPort, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi voltha.VolthaServiceClient, logicalDeviceID string, ports []*voltha.LogicalPort) (numNNIPorts, numUNIPorts int) {
// Send flows for the parent device
var nniPorts []*voltha.LogicalPort
var uniPorts []*voltha.LogicalPort
@@ -1526,7 +1522,6 @@
}
func (nb *NBTest) monitorLogicalDevices(
- ctx context.Context,
t *testing.T,
nbi voltha.VolthaServiceClient,
numNNIPorts int,
@@ -1615,7 +1610,7 @@
// Send initial set of Trap flows
startingVlan := 4091
- nb.sendTrapFlows(t, nbi, logicalDeviceID, ports.Items, uint64(meterID), startingVlan)
+ nb.sendTrapFlows(t, nbi, logicalDeviceID, ports.Items)
//Listen for port events
processedNniLogicalPorts := 0
@@ -1689,9 +1684,7 @@
// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
var wg sync.WaitGroup
wg.Add(1)
- subCtx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, true, false, oltDevice.Id, eventCh)
+ go nb.monitorLogicalDevices(t, nbi, 1, nb.numONUPerOLT, &wg, true, false, oltDevice.Id, eventCh)
// Wait for the logical device to be in the ready state
var vldFunction = func(ports []*voltha.LogicalPort) bool {
@@ -1946,7 +1939,7 @@
nb.testDeleteDeviceFailure(t, nbi, oltDeviceType)
////Test failed enable device
- nb.testEnableDeviceFailed(t, nbi, oltDeviceType)
+ nb.testEnableDeviceFailed(t, nbi)
//Test Enable a device
nb.testEnableDevice(t, nbi, oltDeviceType)
@@ -2008,7 +2001,7 @@
func setUpCore(ctx context.Context, t *testing.T, nb *NBTest) (voltha.VolthaServiceClient, string) {
// Start the Core
- coreAPIEndpoint, nbiEndpoint := nb.startGRPCCore(ctx, t)
+ coreAPIEndpoint, nbiEndpoint := nb.startGRPCCore(ctx)
// Wait until the core is ready
start := time.Now()
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
index dfd28da..db1b28b 100644
--- a/rw_core/test/utils.go
+++ b/rw_core/test/utils.go
@@ -20,18 +20,17 @@
import (
"bufio"
"context"
+ "crypto/rand"
"encoding/json"
"fmt"
+ "io"
"os"
"path/filepath"
"strings"
"testing"
- "time"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
- "math/rand"
-
"github.com/opencord/voltha-go/rw_core/config"
cm "github.com/opencord/voltha-go/rw_core/mocks"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
@@ -51,11 +50,11 @@
)
type AdapterInfo struct {
- TotalReplica int32
Vendor string
DeviceType string
ChildDeviceType string
ChildVendor string
+ TotalReplica int32
}
// prettyPrintDeviceUpdateLog is used just for debugging and exploring the Core logs
@@ -100,16 +99,14 @@
if err := json.Unmarshal([]byte(input), &logEntry); err != nil {
logger.Fatal(context.Background(), err)
}
- fmt.Println(
- fmt.Sprintf(
- "%s\t%s\t%s\t%-30.30q\t%-16.16s\t%-25.25s\t%s",
- logEntry.Ts,
- logEntry.DeviceID,
- logEntry.Status,
- logEntry.RPC,
- logEntry.RequestedBy,
- logEntry.StateChange,
- logEntry.Description))
+ fmt.Printf("%s\t%s\t%s\t%-30.30q\t%-16.16s\t%-25.25s\t%s",
+ logEntry.Ts,
+ logEntry.DeviceID,
+ logEntry.Status,
+ logEntry.RPC,
+ logEntry.RequestedBy,
+ logEntry.StateChange,
+ logEntry.Description)
}
}
@@ -317,13 +314,14 @@
// getRandomMacAddress returns a random mac address
func getRandomMacAddress() string {
- rand.Seed(time.Now().UnixNano() / int64(rand.Intn(255)+1))
+ mac := make([]byte, 6)
+ if _, err := io.ReadFull(rand.Reader, mac); err != nil {
+ fmt.Println("Error", err)
+ }
+
+ // Ensure the locally administered bit is set and the unicast bit is cleared
+ mac[0] = (mac[0] & 0xfe) | 0x02
+
return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
- rand.Intn(255),
- rand.Intn(255),
- rand.Intn(255),
- rand.Intn(255),
- rand.Intn(255),
- rand.Intn(255),
- )
+ mac[0], mac[1], mac[2], mac[3], mac[4], mac[5])
}
diff --git a/rw_core/utils/request_queue.go b/rw_core/utils/request_queue.go
index 336362c..c232b6d 100644
--- a/rw_core/utils/request_queue.go
+++ b/rw_core/utils/request_queue.go
@@ -29,10 +29,9 @@
// RequestQueue represents a request processing queue where each request is processed to completion before another
// request is given the green light to proceed.
type RequestQueue struct {
- mutex sync.Mutex
-
last, current *request
lastCompleteCh <-chan struct{}
+ mutex sync.Mutex
}
// NewRequestQueue creates a new request queue