WIP - Suggesting changes (take2)

    This is not yet completed, still working on things. Eventually the plan
    is to provide the following changes

    - restructure repo to be more aligned with https://github.com/golang-standards/project-layout
    - add k8s probes
    - modifications (golang range loops, etc) to follow some golang
    practices

Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
new file mode 100644
index 0000000..90b0fd6
--- /dev/null
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -0,0 +1,110 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package ofagent
+
+import (
+	"context"
+	"encoding/json"
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/ofagent-go/internal/pkg/openflow"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"google.golang.org/grpc"
+	"net"
+)
+
+func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
+	logger.Debug("receive-change-events-started")
+	opt := grpc.EmptyCallOption{}
+	streamCtx, streamDone := context.WithCancel(context.Background())
+	stream, err := ofa.volthaClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+	if err != nil {
+		logger.Errorw("Unable to establish Receive Change Event Stream",
+			log.Fields{"error": err})
+		ofa.events <- ofaEventVolthaDisconnected
+	}
+	defer streamDone()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if ce, err := stream.Recv(); err != nil {
+				logger.Errorw("error receiving change event",
+					log.Fields{"error": err})
+				ofa.events <- ofaEventVolthaDisconnected
+			} else {
+				ofa.changeEventChannel <- ce
+			}
+		}
+	}
+}
+
+func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
+	logger.Debugln("handle-change-event-started")
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case changeEvent := <-ofa.changeEventChannel:
+			deviceID := changeEvent.GetId()
+			portStatus := changeEvent.GetPortStatus()
+			logger.Debugw("received-change-event",
+				log.Fields{
+					"device-id":   deviceID,
+					"port-status": portStatus})
+
+			if portStatus == nil {
+				if logger.V(log.WarnLevel) {
+					js, _ := json.Marshal(changeEvent.GetEvent())
+					logger.Warnw("Received change event that was not port status",
+						log.Fields{"ChangeEvent": js})
+				}
+				break
+			}
+			ofPortStatus := ofp.NewPortStatus()
+			ofPortStatus.SetXid(openflow.GetXid())
+			ofPortStatus.SetVersion(4)
+
+			ofReason := ofp.PortReason(portStatus.GetReason())
+			ofPortStatus.SetReason(ofReason)
+			ofDesc := ofp.NewPortDesc()
+
+			desc := portStatus.GetDesc()
+			ofDesc.SetAdvertised(ofp.PortFeatures(desc.GetAdvertised()))
+			ofDesc.SetConfig(ofp.PortConfig(0))
+			ofDesc.SetCurr(ofp.PortFeatures(desc.GetAdvertised()))
+			ofDesc.SetCurrSpeed(desc.GetCurrSpeed())
+			intArray := desc.GetHwAddr()
+			var octets []byte
+			for _, val := range intArray {
+				octets = append(octets, byte(val))
+			}
+			addr := net.HardwareAddr(octets)
+			ofDesc.SetHwAddr(addr)
+			ofDesc.SetMaxSpeed(desc.GetMaxSpeed())
+			ofDesc.SetName(openflow.PadString(desc.GetName(), 16))
+			ofDesc.SetPeer(ofp.PortFeatures(desc.GetPeer()))
+			ofDesc.SetPortNo(ofp.Port(desc.GetPortNo()))
+			ofDesc.SetState(ofp.PortState(desc.GetState()))
+			ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
+			ofPortStatus.SetDesc(*ofDesc)
+			ofa.getOFClient(deviceID).SendMessage(ofPortStatus)
+		}
+	}
+}
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
new file mode 100644
index 0000000..5fd8a3f
--- /dev/null
+++ b/internal/pkg/ofagent/connection.go
@@ -0,0 +1,77 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+package ofagent
+
+import (
+	"context"
+	"errors"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
+	"github.com/opencord/voltha-protos/v2/go/voltha"
+	"google.golang.org/grpc"
+	"time"
+)
+
+func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
+	if p != nil {
+		p.UpdateStatus("voltha", probe.ServiceStatusPreparing)
+	}
+
+	if ofa.volthaConnection != nil {
+		ofa.volthaConnection.Close()
+	}
+
+	ofa.volthaConnection = nil
+	ofa.volthaClient = nil
+	try := 1
+	for ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+		conn, err := grpc.Dial(ofa.VolthaApiEndPoint, grpc.WithInsecure())
+		if err == nil {
+			svc := voltha.NewVolthaServiceClient(conn)
+			if svc != nil {
+				if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
+					logger.Debugw("Established connection to Voltha",
+						log.Fields{
+							"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+						})
+					ofa.volthaConnection = conn
+					ofa.volthaClient = svc
+					if p != nil {
+						p.UpdateStatus("voltha", probe.ServiceStatusRunning)
+					}
+					ofa.events <- ofaEventVolthaConnected
+					return nil
+				}
+			}
+		}
+		logger.Warnw("Failed to connect to voltha",
+			log.Fields{
+				"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
+				"error":             err.Error(),
+			})
+		if ofa.ConnectionMaxRetries == 0 || try < ofa.ConnectionMaxRetries {
+			if ofa.ConnectionMaxRetries != 0 {
+				try += 1
+			}
+			time.Sleep(ofa.ConnectionRetryDelay)
+		}
+	}
+	if p != nil {
+		p.UpdateStatus("voltha", probe.ServiceStatusFailed)
+	}
+	return errors.New("failed-to-connect-to-voltha")
+}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
new file mode 100644
index 0000000..0ad4cd1
--- /dev/null
+++ b/internal/pkg/ofagent/ofagent.go
@@ -0,0 +1,181 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package ofagent
+
+import (
+	"context"
+	"github.com/opencord/ofagent-go/internal/pkg/openflow"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
+	"github.com/opencord/voltha-protos/v2/go/voltha"
+	"google.golang.org/grpc"
+	"sync"
+	"time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofaEvent byte
+type ofaState byte
+
+const (
+	ofaEventStart = ofaEvent(iota)
+	ofaEventVolthaConnected
+	ofaEventVolthaDisconnected
+	ofaEventError
+
+	ofaStateConnected = ofaState(iota)
+	ofaStateDisconnected
+)
+
+type OFAgent struct {
+	VolthaApiEndPoint         string
+	OFControllerEndPoint      string
+	DeviceListRefreshInterval time.Duration
+	ConnectionMaxRetries      int
+	ConnectionRetryDelay      time.Duration
+
+	volthaConnection *grpc.ClientConn
+	volthaClient     voltha.VolthaServiceClient
+	mapLock          sync.Mutex
+	clientMap        map[string]*openflow.OFClient
+	events           chan ofaEvent
+
+	packetInChannel    chan *voltha.PacketIn
+	packetOutChannel   chan *voltha.PacketOut
+	changeEventChannel chan *voltha.ChangeEvent
+}
+
+func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+	ofa := OFAgent{
+		VolthaApiEndPoint:         config.VolthaApiEndPoint,
+		OFControllerEndPoint:      config.OFControllerEndPoint,
+		DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+		ConnectionMaxRetries:      config.ConnectionMaxRetries,
+		ConnectionRetryDelay:      config.ConnectionRetryDelay,
+		packetInChannel:           make(chan *voltha.PacketIn),
+		packetOutChannel:          make(chan *voltha.PacketOut),
+		changeEventChannel:        make(chan *voltha.ChangeEvent),
+		clientMap:                 make(map[string]*openflow.OFClient),
+		events:                    make(chan ofaEvent, 100),
+	}
+
+	if ofa.DeviceListRefreshInterval <= 0 {
+		logger.Warnw("device list refresh internal not valid, setting to default",
+			log.Fields{
+				"value":   ofa.DeviceListRefreshInterval.String(),
+				"default": (1 * time.Minute).String()})
+		ofa.DeviceListRefreshInterval = 1 * time.Minute
+	}
+
+	if ofa.ConnectionRetryDelay <= 0 {
+		logger.Warnw("connection retry delay not value, setting to default",
+			log.Fields{
+				"value":   ofa.ConnectionRetryDelay.String(),
+				"default": (3 * time.Second).String()})
+		ofa.ConnectionRetryDelay = 3 * time.Second
+	}
+
+	return &ofa, nil
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (ofa *OFAgent) Run(ctx context.Context) {
+
+	logger.Debugw("Starting GRPC - VOLTHA client",
+		log.Fields{
+			"voltha-endpoint":     ofa.VolthaApiEndPoint,
+			"controller-endpoint": ofa.OFControllerEndPoint})
+
+	// If the context contains a k8s probe then register services
+	p := probe.GetProbeFromContext(ctx)
+	if p != nil {
+		p.RegisterService("voltha")
+	}
+	ofa.events <- ofaEventStart
+
+	/*
+	 * Two sub-contexts are created here for different purposes so we can
+	 * control the lifecyle of processing loops differently.
+	 *
+	 * volthaCtx -  controls those processes that rely on the GRPC
+	 *              GRPCconnection to voltha and will be restarted when the
+	 *              GRPC connection is interrupted.
+	 * hdlCtx    -  controls those processes that listen to channels and
+	 *              process each message. these will likely never be
+	 *              stopped until the ofagent is stopped.
+	 */
+	var volthaCtx, hdlCtx context.Context
+	var volthaDone, hdlDone func()
+	state := ofaStateDisconnected
+
+	for {
+		select {
+		case <-ctx.Done():
+			if volthaDone != nil {
+				volthaDone()
+				volthaDone = nil
+			}
+			if hdlDone != nil {
+				hdlDone()
+				hdlDone = nil
+			}
+			return
+		case event := <-ofa.events:
+			switch event {
+			case ofaEventStart:
+				logger.Debug("ofagent-voltha-start-event")
+
+				// Start the loops that process messages
+				hdlCtx, hdlDone = context.WithCancel(context.Background())
+				go ofa.handlePacketsIn(hdlCtx)
+				go ofa.handleChangeEvents(hdlCtx)
+
+				// Kick off process to attempt to establish
+				// connection to voltha
+				go ofa.establishConnectionToVoltha(p)
+
+			case ofaEventVolthaConnected:
+				logger.Debug("ofagent-voltha-connect-event")
+
+				// Start the loops that poll from voltha
+				if state != ofaStateConnected {
+					state = ofaStateConnected
+					volthaCtx, volthaDone = context.WithCancel(context.Background())
+					go ofa.receiveChangeEvents(volthaCtx)
+					go ofa.receivePacketsIn(volthaCtx)
+					go ofa.streamPacketOut(volthaCtx)
+					go ofa.synchronizeDeviceList(volthaCtx)
+				}
+
+			case ofaEventVolthaDisconnected:
+				logger.Debug("ofagent-voltha-disconnect-event")
+				if state == ofaStateConnected {
+					state = ofaStateDisconnected
+					volthaDone()
+					volthaDone = nil
+					volthaCtx = nil
+				}
+			case ofaEventError:
+				logger.Debug("ofagent-error-event")
+			default:
+				logger.Fatalw("ofagent-unknown-event",
+					log.Fields{"event": event})
+			}
+		}
+	}
+}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
new file mode 100644
index 0000000..721d1d7
--- /dev/null
+++ b/internal/pkg/ofagent/packetIn.go
@@ -0,0 +1,160 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package ofagent
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/donNewtonAlpha/goloxi"
+	ofp "github.com/donNewtonAlpha/goloxi/of13"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/ofagent-go/internal/pkg/openflow"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-protos/v2/go/openflow_13"
+	"github.com/opencord/voltha-protos/v2/go/voltha"
+	"google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
+	logger.Debug("receive-packets-in-started")
+	opt := grpc.EmptyCallOption{}
+	streamCtx, streamDone := context.WithCancel(context.Background())
+	stream, err := ofa.volthaClient.ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+	if err != nil {
+		logger.Errorw("Unable to establish Receive PacketIn Stream",
+			log.Fields{"error": err})
+	}
+	defer streamDone()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if pkt, err := stream.Recv(); err != nil {
+				logger.Errorw("error receiving packet",
+					log.Fields{"error": err})
+				ofa.events <- ofaEventVolthaDisconnected
+			} else {
+				ofa.packetInChannel <- pkt
+			}
+		}
+	}
+}
+
+func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
+	logger.Debug("handle-packets-in-started")
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case packet := <-ofa.packetInChannel:
+			packetIn := packet.GetPacketIn()
+
+			if logger.V(log.DebugLevel) {
+				js, _ := json.Marshal(packetIn)
+				logger.Debugw("packet-in recieved", log.Fields{"packet-in": js})
+			}
+			deviceID := packet.GetId()
+			ofPacketIn := ofp.NewPacketIn()
+			ofPacketIn.SetVersion(uint8(4))
+			ofPacketIn.SetXid(openflow.GetXid())
+			ofPacketIn.SetBufferId(packetIn.GetBufferId())
+			ofPacketIn.SetCookie(packetIn.GetCookie())
+			ofPacketIn.SetData(packetIn.GetData())
+			match := ofp.NewMatchV3()
+			inMatch := packetIn.GetMatch()
+			match.SetType(uint16(inMatch.GetType()))
+			//oxFields := inMatch.GetOxmFields()
+			var fields []goloxi.IOxm
+			size := uint16(4)
+			for _, oxmField := range inMatch.GetOxmFields() {
+				/*
+					for i := 0; i < len(oxFields); i++ {
+						oxmField := oxFields[i]
+				*/
+				field := oxmField.GetField()
+				ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
+				size += 4 //header for oxm
+				switch ofbField.Type {
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+					ofpInPort := ofp.NewOxmInPort()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+					ofpInPort.Value = ofp.Port(val.Port)
+					size += 4
+					fields = append(fields, ofpInPort)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+					ofpEthType := ofp.NewOxmEthType()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+					ofpEthType.Value = ofp.EthernetType(val.EthType)
+					size += 2
+					fields = append(fields, ofpEthType)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+					ofpInPhyPort := ofp.NewOxmInPhyPort()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+					ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+					size += 4
+					fields = append(fields, ofpInPhyPort)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+					ofpIpProto := ofp.NewOxmIpProto()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+					ofpIpProto.Value = ofp.IpPrototype(val.IpProto)
+					size += 1
+					fields = append(fields, ofpIpProto)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+					ofpUdpSrc := ofp.NewOxmUdpSrc()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+					ofpUdpSrc.Value = uint16(val.UdpSrc)
+					size += 2
+					fields = append(fields, ofpUdpSrc)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+					ofpUdpDst := ofp.NewOxmUdpDst()
+					val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+					ofpUdpDst.Value = uint16(val.UdpDst)
+					size += 2
+					fields = append(fields, ofpUdpDst)
+				case voltha.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+					ofpVlanVid := ofp.NewOxmVlanVid()
+					val := ofbField.GetValue()
+					if val != nil {
+						vlanId := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+						ofpVlanVid.Value = uint16(vlanId.VlanVid) + 0x1000
+						size += 2
+					} else {
+						ofpVlanVid.Value = uint16(0)
+					}
+
+					fields = append(fields, ofpVlanVid)
+				default:
+					logger.Warnw("receive-packet-in:unhandled-oxm-field",
+						log.Fields{"field": ofbField.Type})
+				}
+			}
+			match.SetLength(size)
+
+			match.SetOxmList(fields)
+
+			ofPacketIn.SetMatch(*match)
+			ofPacketIn.SetReason(uint8(packetIn.GetReason()))
+			ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
+			ofPacketIn.SetTotalLen(uint16(len(ofPacketIn.GetData())))
+			ofc := ofa.getOFClient(deviceID)
+			ofc.SendMessage(ofPacketIn)
+
+		}
+	}
+}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
new file mode 100644
index 0000000..3a43335
--- /dev/null
+++ b/internal/pkg/ofagent/packetOut.go
@@ -0,0 +1,50 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package ofagent
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"google.golang.org/grpc"
+)
+
+func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
+	if logger.V(log.DebugLevel) {
+		logger.Debug("GrpcClient streamPacketOut called")
+	}
+	opt := grpc.EmptyCallOption{}
+	streamCtx, streamDone := context.WithCancel(context.Background())
+	outClient, err := ofa.volthaClient.StreamPacketsOut(streamCtx, opt)
+	defer streamDone()
+	if err != nil {
+		logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+		ofa.events <- ofaEventVolthaDisconnected
+	}
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case ofPacketOut := <-ofa.packetOutChannel:
+			if logger.V(log.DebugLevel) {
+				js, _ := json.Marshal(ofPacketOut)
+				logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
+			}
+			outClient.Send(ofPacketOut)
+		}
+	}
+}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
new file mode 100644
index 0000000..8f68ef0
--- /dev/null
+++ b/internal/pkg/ofagent/refresh.go
@@ -0,0 +1,109 @@
+/*
+   Copyright 2020 the original author or authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package ofagent
+
+import (
+	"context"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/ofagent-go/internal/pkg/openflow"
+	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"time"
+)
+
+func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
+	// Refresh once to get everything started
+	ofa.refreshDeviceList()
+
+	tick := time.NewTicker(ofa.DeviceListRefreshInterval)
+loop:
+	for {
+		select {
+		case <-ctx.Done():
+			break loop
+		case <-tick.C:
+			ofa.refreshDeviceList()
+		}
+	}
+	tick.Stop()
+}
+
+func (ofa *OFAgent) refreshDeviceList() {
+	deviceList, err := ofa.volthaClient.ListLogicalDevices(context.Background(), &empty.Empty{})
+	if err != nil {
+		logger.Errorw("ofagent failed to query device list from voltha",
+			log.Fields{"error": err})
+		return
+	}
+	devices := deviceList.GetItems()
+
+	var toAdd []string
+	var toDel []string
+	var deviceIDMap = make(map[string]string)
+	for i := 0; i < len(devices); i++ {
+		deviceID := devices[i].GetId()
+		deviceIDMap[deviceID] = deviceID
+		if ofa.clientMap[deviceID] == nil {
+			toAdd = append(toAdd, deviceID)
+		}
+	}
+	for key := range ofa.clientMap {
+		if deviceIDMap[key] == "" {
+			toDel = append(toDel, key)
+		}
+	}
+	logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+	for i := 0; i < len(toAdd); i++ {
+		var client = ofa.addOFClient(toAdd[i])
+		go client.Run(context.Background())
+	}
+	for i := 0; i < len(toDel); i++ {
+		ofa.clientMap[toDel[i]].Stop()
+		ofa.mapLock.Lock()
+		delete(ofa.clientMap, toDel[i])
+		ofa.mapLock.Unlock()
+	}
+}
+
+func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
+	logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+	ofa.mapLock.Lock()
+	ofc := ofa.clientMap[deviceID]
+	if ofc == nil {
+		ofc = openflow.NewOFClient(&openflow.OFClient{
+			DeviceID:             deviceID,
+			OFControllerEndPoint: ofa.OFControllerEndPoint,
+			VolthaClient:         ofa.volthaClient,
+			PacketOutChannel:     ofa.packetOutChannel,
+			ConnectionMaxRetries: ofa.ConnectionMaxRetries,
+			ConnectionRetryDelay: ofa.ConnectionRetryDelay,
+			KeepRunning:          true,
+		})
+		go ofc.Run(context.Background())
+		ofa.clientMap[deviceID] = ofc
+	}
+	ofa.mapLock.Unlock()
+	logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+	return ofc
+}
+
+func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+	ofc := ofa.clientMap[deviceID]
+	if ofc == nil {
+		ofc = ofa.addOFClient(deviceID)
+	}
+	return ofc
+}