WIP - Suggesting changes (take2)
This is not yet completed, still working on things. Eventually the plan
is to provide the following changes
- restructure repo to be more aligned with https://github.com/golang-standards/project-layout
- add k8s probes
- modifications (golang range loops, etc) to follow some golang
practices
Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
new file mode 100644
index 0000000..8f68ef0
--- /dev/null
+++ b/internal/pkg/ofagent/refresh.go
@@ -0,0 +1,109 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "time"
+)
+
+func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
+ // Refresh once to get everything started
+ ofa.refreshDeviceList()
+
+ tick := time.NewTicker(ofa.DeviceListRefreshInterval)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ break loop
+ case <-tick.C:
+ ofa.refreshDeviceList()
+ }
+ }
+ tick.Stop()
+}
+
+func (ofa *OFAgent) refreshDeviceList() {
+ deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ logger.Errorw("ofagent failed to query device list from voltha",
+ log.Fields{"error": err})
+ return
+ }
+ devices := deviceList.GetItems()
+
+ var toAdd []string
+ var toDel []string
+ var deviceIDMap = make(map[string]string)
+ for i := 0; i < len(devices); i++ {
+ deviceID := devices[i].GetId()
+ deviceIDMap[deviceID] = deviceID
+ if ofa.clientMap[deviceID] == nil {
+ toAdd = append(toAdd, deviceID)
+ }
+ }
+ for key := range ofa.clientMap {
+ if deviceIDMap[key] == "" {
+ toDel = append(toDel, key)
+ }
+ }
+ logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ for i := 0; i < len(toAdd); i++ {
+ var client = ofa.addOFClient(toAdd[i])
+ go client.Run(context.Background())
+ }
+ for i := 0; i < len(toDel); i++ {
+ ofa.clientMap[toDel[i]].Stop()
+ ofa.mapLock.Lock()
+ delete(ofa.clientMap, toDel[i])
+ ofa.mapLock.Unlock()
+ }
+}
+
+func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
+ logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+ ofa.mapLock.Lock()
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = openflow.NewOFClient(&openflow.OFClient{
+ DeviceID: deviceID,
+ OFControllerEndPoint: ofa.OFControllerEndPoint,
+ VolthaClient: ofa.volthaClient,
+ PacketOutChannel: ofa.packetOutChannel,
+ ConnectionMaxRetries: ofa.ConnectionMaxRetries,
+ ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+ KeepRunning: true,
+ })
+ go ofc.Run(context.Background())
+ ofa.clientMap[deviceID] = ofc
+ }
+ ofa.mapLock.Unlock()
+ logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+ return ofc
+}
+
+func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = ofa.addOFClient(deviceID)
+ }
+ return ofc
+}