WIP - Suggesting changes (take2)
This is not yet completed, still working on things. Eventually the plan
is to provide the following changes
- restructure repo to be more aligned with https://github.com/golang-standards/project-layout
- add k8s probes
- modifications (golang range loops, etc) to follow some golang
practices
Change-Id: I6922cbc00b5ef17ceab183aba00a7fc59ab46480
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
new file mode 100644
index 0000000..0ad4cd1
--- /dev/null
+++ b/internal/pkg/ofagent/ofagent.go
@@ -0,0 +1,181 @@
+/*
+ Copyright 2020 the original author or authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package ofagent
+
+import (
+ "context"
+ "github.com/opencord/ofagent-go/internal/pkg/openflow"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/probe"
+ "github.com/opencord/voltha-protos/v2/go/voltha"
+ "google.golang.org/grpc"
+ "sync"
+ "time"
+)
+
+var logger, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+
+type ofaEvent byte
+type ofaState byte
+
+const (
+ ofaEventStart = ofaEvent(iota)
+ ofaEventVolthaConnected
+ ofaEventVolthaDisconnected
+ ofaEventError
+
+ ofaStateConnected = ofaState(iota)
+ ofaStateDisconnected
+)
+
+type OFAgent struct {
+ VolthaApiEndPoint string
+ OFControllerEndPoint string
+ DeviceListRefreshInterval time.Duration
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ volthaConnection *grpc.ClientConn
+ volthaClient voltha.VolthaServiceClient
+ mapLock sync.Mutex
+ clientMap map[string]*openflow.OFClient
+ events chan ofaEvent
+
+ packetInChannel chan *voltha.PacketIn
+ packetOutChannel chan *voltha.PacketOut
+ changeEventChannel chan *voltha.ChangeEvent
+}
+
+func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+ ofa := OFAgent{
+ VolthaApiEndPoint: config.VolthaApiEndPoint,
+ OFControllerEndPoint: config.OFControllerEndPoint,
+ DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ packetInChannel: make(chan *voltha.PacketIn),
+ packetOutChannel: make(chan *voltha.PacketOut),
+ changeEventChannel: make(chan *voltha.ChangeEvent),
+ clientMap: make(map[string]*openflow.OFClient),
+ events: make(chan ofaEvent, 100),
+ }
+
+ if ofa.DeviceListRefreshInterval <= 0 {
+ logger.Warnw("device list refresh internal not valid, setting to default",
+ log.Fields{
+ "value": ofa.DeviceListRefreshInterval.String(),
+ "default": (1 * time.Minute).String()})
+ ofa.DeviceListRefreshInterval = 1 * time.Minute
+ }
+
+ if ofa.ConnectionRetryDelay <= 0 {
+ logger.Warnw("connection retry delay not value, setting to default",
+ log.Fields{
+ "value": ofa.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ ofa.ConnectionRetryDelay = 3 * time.Second
+ }
+
+ return &ofa, nil
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (ofa *OFAgent) Run(ctx context.Context) {
+
+ logger.Debugw("Starting GRPC - VOLTHA client",
+ log.Fields{
+ "voltha-endpoint": ofa.VolthaApiEndPoint,
+ "controller-endpoint": ofa.OFControllerEndPoint})
+
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService("voltha")
+ }
+ ofa.events <- ofaEventStart
+
+ /*
+ * Two sub-contexts are created here for different purposes so we can
+ * control the lifecyle of processing loops differently.
+ *
+ * volthaCtx - controls those processes that rely on the GRPC
+ * GRPCconnection to voltha and will be restarted when the
+ * GRPC connection is interrupted.
+ * hdlCtx - controls those processes that listen to channels and
+ * process each message. these will likely never be
+ * stopped until the ofagent is stopped.
+ */
+ var volthaCtx, hdlCtx context.Context
+ var volthaDone, hdlDone func()
+ state := ofaStateDisconnected
+
+ for {
+ select {
+ case <-ctx.Done():
+ if volthaDone != nil {
+ volthaDone()
+ volthaDone = nil
+ }
+ if hdlDone != nil {
+ hdlDone()
+ hdlDone = nil
+ }
+ return
+ case event := <-ofa.events:
+ switch event {
+ case ofaEventStart:
+ logger.Debug("ofagent-voltha-start-event")
+
+ // Start the loops that process messages
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ go ofa.handlePacketsIn(hdlCtx)
+ go ofa.handleChangeEvents(hdlCtx)
+
+ // Kick off process to attempt to establish
+ // connection to voltha
+ go ofa.establishConnectionToVoltha(p)
+
+ case ofaEventVolthaConnected:
+ logger.Debug("ofagent-voltha-connect-event")
+
+ // Start the loops that poll from voltha
+ if state != ofaStateConnected {
+ state = ofaStateConnected
+ volthaCtx, volthaDone = context.WithCancel(context.Background())
+ go ofa.receiveChangeEvents(volthaCtx)
+ go ofa.receivePacketsIn(volthaCtx)
+ go ofa.streamPacketOut(volthaCtx)
+ go ofa.synchronizeDeviceList(volthaCtx)
+ }
+
+ case ofaEventVolthaDisconnected:
+ logger.Debug("ofagent-voltha-disconnect-event")
+ if state == ofaStateConnected {
+ state = ofaStateDisconnected
+ volthaDone()
+ volthaDone = nil
+ volthaCtx = nil
+ }
+ case ofaEventError:
+ logger.Debug("ofagent-error-event")
+ default:
+ logger.Fatalw("ofagent-unknown-event",
+ log.Fields{"event": event})
+ }
+ }
+ }
+}