WIP - Suggesting changes (take2)
This is not yet completed, still working on things. Eventually the plan
is to provide the following changes
- restructure repo to be more aligned with https://github.com/golang-standards/project-layout
- add k8s probes
- modifications (golang range loops, etc) to follow some golang
practices
Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
new file mode 100644
index 0000000..90b0fd6
--- /dev/null
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -0,0 +1,110 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "google.golang.org/grpc"
+ "net"
+)
+
+func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
+ logger.Debug("receive-change-events-started")
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw("Unable to establish Receive Change Event Stream",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ }
+ defer streamDone()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if ce, err := stream.Recv(); err != nil {
+ logger.Errorw("error receiving change event",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ } else {
+ ofa.changeEventChannel <- ce
+ }
+ }
+ }
+}
+
+func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
+ logger.Debugln("handle-change-event-started")
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case changeEvent := <-ofa.changeEventChannel:
+ deviceID := changeEvent.GetId()
+ portStatus := changeEvent.GetPortStatus()
+ logger.Debugw("received-change-event",
+ log.Fields{
+ "device-id": deviceID,
+ "port-status": portStatus})
+
+ if portStatus == nil {
+ if logger.V(log.WarnLevel) {
+ js, _ := json.Marshal(changeEvent.GetEvent())
+ logger.Warnw("Received change event that was not port status",
+ log.Fields{"ChangeEvent": js})
+ }
+ break
+ }
+ ofPortStatus := ofp.NewPortStatus()
+ ofPortStatus.SetXid(openflow.GetXid())
+ ofPortStatus.SetVersion(4)
+
+ ofReason := ofp.PortReason(portStatus.GetReason())
+ ofPortStatus.SetReason(ofReason)
+ ofDesc := ofp.NewPortDesc()
+
+ desc := portStatus.GetDesc()
+ ofDesc.SetAdvertised(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetConfig(ofp.PortConfig(0))
+ ofDesc.SetCurr(ofp.PortFeatures(desc.GetAdvertised()))
+ ofDesc.SetCurrSpeed(desc.GetCurrSpeed())
+ intArray := desc.GetHwAddr()
+ var octets []byte
+ for _, val := range intArray {
+ octets = append(octets, byte(val))
+ }
+ addr := net.HardwareAddr(octets)
+ ofDesc.SetHwAddr(addr)
+ ofDesc.SetMaxSpeed(desc.GetMaxSpeed())
+ ofDesc.SetName(openflow.PadString(desc.GetName(), 16))
+ ofDesc.SetPeer(ofp.PortFeatures(desc.GetPeer()))
+ ofDesc.SetPortNo(ofp.Port(desc.GetPortNo()))
+ ofDesc.SetState(ofp.PortState(desc.GetState()))
+ ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
+ ofPortStatus.SetDesc(*ofDesc)
+ ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
new file mode 100644
index 0000000..5fd8a3f
--- /dev/null
+++ b/internal/pkg/ofagent/connection.go
@@ -0,0 +1,77 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+package ofagent
+
+import (
+ "context"
+ "errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+ "time"
+)
+
+func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusPreparing)
+ }
+
+ if ofa.volthaConnection != nil {
+ ofa.volthaConnection.Close()
+ }
+
+ ofa.volthaConnection = nil
+ ofa.volthaClient = nil
+ try := 1
+ for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+ conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
+ if err == nil {
+ svc := voltha.NewVolthaServiceClient(conn)
+ if svc != nil {
+ if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
+ logger.Debugw("Established connection to Voltha",
+ log.Fields{
+ "VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+ })
+ ofa.volthaConnection = conn
+ ofa.volthaClient = svc
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusRunning)
+ }
+ ofa.events <- ofaEventVolthaConnected
+ return nil
+ }
+ }
+ }
+ logger.Warnw("Failed to connect to voltha",
+ log.Fields{
+ "VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+ "error": err.Error(),
+ })
+ if ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+ if ofa.ConnectionMaxRetries != 0 {
+ try += 1
+ }
+ time.Sleep(ofa.ConnectionRetryDelay)
+ }
+ }
+ if p != nil {
+ p.UpdateStatus("voltha", probe.ServiceStatusFailed)
+ }
+ return errors.New("failed-to-connect-to-voltha")
+}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
new file mode 100644
index 0000000..0ad4cd1
--- /dev/null
+++ b/internal/pkg/ofagent/ofagent.go
@@ -0,0 +1,181 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+ "sync"
+ "time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofaEvent byte
+type ofaState byte
+
+const (
+ ofaEventStart = ofaEvent(iota)
+ ofaEventVolthaConnected
+ ofaEventVolthaDisconnected
+ ofaEventError
+
+ ofaStateConnected = ofaState(iota)
+ ofaStateDisconnected
+)
+
+type OFAgent struct {
+ VolthaApiEndPoint string
+ OFControllerEndPoint string
+ DeviceListRefreshInterval time.Duration
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ volthaConnection *grpc.ClientConn
+ volthaClient voltha.VolthaServiceClient
+ mapLock sync.Mutex
+ clientMap map[string]*openflow.OFClient
+ events chan ofaEvent
+
+ packetInChannel chan *voltha.PacketIn
+ packetOutChannel chan *voltha.PacketOut
+ changeEventChannel chan *voltha.ChangeEvent
+}
+
+func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+ ofa := OFAgent{
+ VolthaApiEndPoint: config.VolthaApiEndPoint,
+ OFControllerEndPoint: config.OFControllerEndPoint,
+ DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ packetInChannel: make(chan *voltha.PacketIn),
+ packetOutChannel: make(chan *voltha.PacketOut),
+ changeEventChannel: make(chan *voltha.ChangeEvent),
+ clientMap: make(map[string]*openflow.OFClient),
+ events: make(chan ofaEvent, 100),
+ }
+
+ if ofa.DeviceListRefreshInterval <= 0 {
+ logger.Warnw("device list refresh internal not valid, setting to default",
+ log.Fields{
+ "value": ofa.DeviceListRefreshInterval.String(),
+ "default": (1 * time.Minute).String()})
+ ofa.DeviceListRefreshInterval = 1 * time.Minute
+ }
+
+ if ofa.ConnectionRetryDelay <= 0 {
+ logger.Warnw("connection retry delay not value, setting to default",
+ log.Fields{
+ "value": ofa.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ ofa.ConnectionRetryDelay = 3 * time.Second
+ }
+
+ return &ofa, nil
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (ofa *OFAgent) Run(ctx context.Context) {
+
+ logger.Debugw("Starting GRPC - VOLTHA client",
+ log.Fields{
+ "voltha-endpoint": ofa.VolthaApiEndPoint,
+ "controller-endpoint": ofa.OFControllerEndPoint})
+
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService("voltha")
+ }
+ ofa.events <- ofaEventStart
+
+ /*
+ * Two sub-contexts are created here for different purposes so we can
+ * control the lifecyle of processing loops differently.
+ *
+ * volthaCtx - controls those processes that rely on the GRPC
+ * GRPCconnection to voltha and will be restarted when the
+ * GRPC connection is interrupted.
+ * hdlCtx - controls those processes that listen to channels and
+ * process each message. these will likely never be
+ * stopped until the ofagent is stopped.
+ */
+ var volthaCtx, hdlCtx context.Context
+ var volthaDone, hdlDone func()
+ state := ofaStateDisconnected
+
+ for {
+ select {
+ case <-ctx.Done():
+ if volthaDone != nil {
+ volthaDone()
+ volthaDone = nil
+ }
+ if hdlDone != nil {
+ hdlDone()
+ hdlDone = nil
+ }
+ return
+ case event := <-ofa.events:
+ switch event {
+ case ofaEventStart:
+ logger.Debug("ofagent-voltha-start-event")
+
+ // Start the loops that process messages
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ go ofa.handlePacketsIn(hdlCtx)
+ go ofa.handleChangeEvents(hdlCtx)
+
+ // Kick off process to attempt to establish
+ // connection to voltha
+ go ofa.establishConnectionToVoltha(p)
+
+ case ofaEventVolthaConnected:
+ logger.Debug("ofagent-voltha-connect-event")
+
+ // Start the loops that poll from voltha
+ if state != ofaStateConnected {
+ state = ofaStateConnected
+ volthaCtx, volthaDone = context.WithCancel(context.Background())
+ go ofa.receiveChangeEvents(volthaCtx)
+ go ofa.receivePacketsIn(volthaCtx)
+ go ofa.streamPacketOut(volthaCtx)
+ go ofa.synchronizeDeviceList(volthaCtx)
+ }
+
+ case ofaEventVolthaDisconnected:
+ logger.Debug("ofagent-voltha-disconnect-event")
+ if state == ofaStateConnected {
+ state = ofaStateDisconnected
+ volthaDone()
+ volthaDone = nil
+ volthaCtx = nil
+ }
+ case ofaEventError:
+ logger.Debug("ofagent-error-event")
+ default:
+ logger.Fatalw("ofagent-unknown-event",
+ log.Fields{"event": event})
+ }
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
new file mode 100644
index 0000000..721d1d7
--- /dev/null
+++ b/internal/pkg/ofagent/packetIn.go
@@ -0,0 +1,160 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/donNewtonAlpha/goloxi"
+ ofp "github.com/donNewtonAlpha/goloxi/of13"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-protos/v2/go/openflow_13"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
+ logger.Debug("receive-packets-in-started")
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw("Unable to establish Receive PacketIn Stream",
+ log.Fields{"error": err})
+ }
+ defer streamDone()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if pkt, err := stream.Recv(); err != nil {
+ logger.Errorw("error receiving packet",
+ log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ } else {
+ ofa.packetInChannel <- pkt
+ }
+ }
+ }
+}
+
+func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
+ logger.Debug("handle-packets-in-started")
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case packet := <-ofa.packetInChannel:
+ packetIn := packet.GetPacketIn()
+
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(packetIn)
+ logger.Debugw("packet-in recieved", log.Fields{"packet-in": js})
+ }
+ deviceID := packet.GetId()
+ ofPacketIn := ofp.NewPacketIn()
+ ofPacketIn.SetVersion(uint8(4))
+ ofPacketIn.SetXid(openflow.GetXid())
+ ofPacketIn.SetBufferId(packetIn.GetBufferId())
+ ofPacketIn.SetCookie(packetIn.GetCookie())
+ ofPacketIn.SetData(packetIn.GetData())
+ match := ofp.NewMatchV3()
+ inMatch := packetIn.GetMatch()
+ match.SetType(uint16(inMatch.GetType()))
+ //oxFields := inMatch.GetOxmFields()
+ var fields []goloxi.IOxm
+ size := uint16(4)
+ for _, oxmField := range inMatch.GetOxmFields() {
+ /*
+ for i := 0; i < len(oxFields); i++ {
+ oxmField := oxFields[i]
+ */
+ field := oxmField.GetField()
+ ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
+ size += 4 //header for oxm
+ switch ofbField.Type {
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ ofpInPort := ofp.NewOxmInPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+ ofpInPort.Value = ofp.Port(val.Port)
+ size += 4
+ fields = append(fields, ofpInPort)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ ofpEthType := ofp.NewOxmEthType()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+ ofpEthType.Value = ofp.EthernetType(val.EthType)
+ size += 2
+ fields = append(fields, ofpEthType)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ ofpInPhyPort := ofp.NewOxmInPhyPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+ ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+ size += 4
+ fields = append(fields, ofpInPhyPort)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ ofpIpProto := ofp.NewOxmIpProto()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+ ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
+ size += 1
+ fields = append(fields, ofpIpProto)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ ofpUdpSrc := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+ ofpUdpSrc.Value = uint16(val.UdpSrc)
+ size += 2
+ fields = append(fields, ofpUdpSrc)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ ofpUdpDst := ofp.NewOxmUdpDst()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+ ofpUdpDst.Value = uint16(val.UdpDst)
+ size += 2
+ fields = append(fields, ofpUdpDst)
+ case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ ofpVlanVid := ofp.NewOxmVlanVid()
+ val := ofbField.GetValue()
+ if val != nil {
+ vlanId := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+ ofpVlanVid.Value = uint16(vlanId.VlanVid) + 0x1000
+ size += 2
+ } else {
+ ofpVlanVid.Value = uint16(0)
+ }
+
+ fields = append(fields, ofpVlanVid)
+ default:
+ logger.Warnw("receive-packet-in:unhandled-oxm-field",
+ log.Fields{"field": ofbField.Type})
+ }
+ }
+ match.SetLength(size)
+
+ match.SetOxmList(fields)
+
+ ofPacketIn.SetMatch(*match)
+ ofPacketIn.SetReason(uint8(packetIn.GetReason()))
+ ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
+ ofPacketIn.SetTotalLen(uint16(len(ofPacketIn.GetData())))
+ ofc := ofa.getOFClient(deviceID)
+ ofc.SendMessage(ofPacketIn)
+
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
new file mode 100644
index 0000000..3a43335
--- /dev/null
+++ b/internal/pkg/ofagent/packetOut.go
@@ -0,0 +1,50 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "encoding/json"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
+ if logger.V(log.DebugLevel) {
+ logger.Debug("GrpcClient streamPacketOut called")
+ }
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+ defer streamDone()
+ if err != nil {
+ logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+ ofa.events <- ofaEventVolthaDisconnected
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case ofPacketOut := <-ofa.packetOutChannel:
+ if logger.V(log.DebugLevel) {
+ js, _ := json.Marshal(ofPacketOut)
+ logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
+ }
+ outClient.Send(ofPacketOut)
+ }
+ }
+}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
new file mode 100644
index 0000000..8f68ef0
--- /dev/null
+++ b/internal/pkg/ofagent/refresh.go
@@ -0,0 +1,109 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "time"
+)
+
+func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
+ // Refresh once to get everything started
+ ofa.refreshDeviceList()
+
+ tick := time.NewTicker(ofa.DeviceListRefreshInterval)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ break loop
+ case <-tick.C:
+ ofa.refreshDeviceList()
+ }
+ }
+ tick.Stop()
+}
+
+func (ofa *OFAgent) refreshDeviceList() {
+ deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ logger.Errorw("ofagent failed to query device list from voltha",
+ log.Fields{"error": err})
+ return
+ }
+ devices := deviceList.GetItems()
+
+ var toAdd []string
+ var toDel []string
+ var deviceIDMap = make(map[string]string)
+ for i := 0; i < len(devices); i++ {
+ deviceID := devices[i].GetId()
+ deviceIDMap[deviceID] = deviceID
+ if ofa.clientMap[deviceID] == nil {
+ toAdd = append(toAdd, deviceID)
+ }
+ }
+ for key := range ofa.clientMap {
+ if deviceIDMap[key] == "" {
+ toDel = append(toDel, key)
+ }
+ }
+ logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ for i := 0; i < len(toAdd); i++ {
+ var client = ofa.addOFClient(toAdd[i])
+ go client.Run(context.Background())
+ }
+ for i := 0; i < len(toDel); i++ {
+ ofa.clientMap[toDel[i]].Stop()
+ ofa.mapLock.Lock()
+ delete(ofa.clientMap, toDel[i])
+ ofa.mapLock.Unlock()
+ }
+}
+
+func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
+ logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+ ofa.mapLock.Lock()
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = openflow.NewOFClient(&openflow.OFClient{
+ DeviceID: deviceID,
+ OFControllerEndPoint: ofa.OFControllerEndPoint,
+ VolthaClient: ofa.volthaClient,
+ PacketOutChannel: ofa.packetOutChannel,
+ ConnectionMaxRetries: ofa.ConnectionMaxRetries,
+ ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+ KeepRunning: true,
+ })
+ go ofc.Run(context.Background())
+ ofa.clientMap[deviceID] = ofc
+ }
+ ofa.mapLock.Unlock()
+ logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+ return ofc
+}
+
+func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+ ofc := ofa.clientMap[deviceID]
+ if ofc == nil {
+ ofc = ofa.addOFClient(deviceID)
+ }
+ return ofc
+}