First Commit of Voltha-Go-Controller from Radisys
Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/vpagent/volthaprotoagent.go b/internal/pkg/vpagent/volthaprotoagent.go
new file mode 100644
index 0000000..04e3d08
--- /dev/null
+++ b/internal/pkg/vpagent/volthaprotoagent.go
@@ -0,0 +1,237 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "voltha-go-controller/database"
+ "voltha-go-controller/internal/pkg/holder"
+ "voltha-go-controller/internal/pkg/intf"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+)
+
+var logger log.CLogger
+var ctx = context.TODO()
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
+
+type vpaEvent byte
+type vpaState byte
+
+var db database.DBIntf
+
+const (
+ vpaEventStart = vpaEvent(iota)
+ vpaEventVolthaConnected
+ vpaEventVolthaDisconnected
+ vpaEventError
+
+ vpaStateConnected = vpaState(iota)
+ vpaStateConnecting
+ vpaStateDisconnected
+)
+
+var vpAgent *VPAgent
+
+// VPAgent structure
+type VPAgent struct {
+ VolthaAPIEndPoint string
+ DeviceListRefreshInterval time.Duration
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ volthaConnection *grpc.ClientConn
+ volthaClient *holder.VolthaServiceClientHolder
+ mapLock sync.Mutex
+ clientMap map[string]intf.IVPClient
+ events chan vpaEvent
+
+ packetInChannel chan *ofp.PacketIn
+ packetOutChannel chan *ofp.PacketOut
+ changeEventChannel chan *ofp.ChangeEvent
+ VPClientAgent intf.IVPClientAgent
+}
+
+// NewVPAgent is constructor for VPAgent
+func NewVPAgent(config *VPAgent) (*VPAgent, error) {
+ vpa := VPAgent{
+ VolthaAPIEndPoint: config.VolthaAPIEndPoint,
+ DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ VPClientAgent: config.VPClientAgent,
+ volthaClient: &holder.VolthaServiceClientHolder{},
+ packetInChannel: make(chan *ofp.PacketIn),
+ // customPacketIndChannel: make(chan *voltha.CustomPacketIn),
+ packetOutChannel: make(chan *ofp.PacketOut),
+ changeEventChannel: make(chan *ofp.ChangeEvent),
+ // ofpCommandNotiChannel: make(chan *voltha.OfpCmdRespNotification),
+ // oltRebootNotiChannel: make(chan *voltha.OltRebootNotification),
+ clientMap: make(map[string]intf.IVPClient),
+ events: make(chan vpaEvent, 100),
+ }
+
+ if vpa.DeviceListRefreshInterval <= 0 {
+ logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
+ log.Fields{
+ "value": vpa.DeviceListRefreshInterval.String(),
+ "default": (10 * time.Second).String()})
+ vpa.DeviceListRefreshInterval = 1 * time.Minute
+ }
+
+ if vpa.ConnectionRetryDelay <= 0 {
+ logger.Warnw(ctx, "connection retry delay not value, setting to default",
+ log.Fields{
+ "value": vpa.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ vpa.ConnectionRetryDelay = 3 * time.Second
+ }
+
+ if db == nil {
+ db = database.GetDatabase()
+ }
+ vpAgent = &vpa
+ return &vpa, nil
+}
+
+//GetVPAgent - returns vpAgent object
+func GetVPAgent() *VPAgent {
+ return vpAgent
+}
+
+// VolthaSvcClient for Voltha Svc client
+func (vpa *VPAgent) VolthaSvcClient() voltha.VolthaServiceClient {
+ return vpa.volthaClient.Get()
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (vpa *VPAgent) Run(ctx context.Context) {
+
+ logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
+ log.Fields{
+ "voltha-endpoint": vpa.VolthaAPIEndPoint})
+
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService(ctx, "voltha")
+ }
+
+ vpa.events <- vpaEventStart
+
+ /*
+ * Two sub-contexts are created here for different purposes so we can
+ * control the lifecyle of processing loops differently.
+ *
+ * volthaCtx - controls those processes that rely on the GRPC
+ * GRPCconnection to voltha and will be restarted when the
+ * GRPC connection is interrupted.
+ * hdlCtx - controls those processes that listen to channels and
+ * process each message. these will likely never be
+ * stopped until the vpagent is stopped.
+ */
+ var volthaCtx, hdlCtx context.Context
+ var volthaDone, hdlDone func()
+ state := vpaStateDisconnected
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ if volthaDone != nil {
+ volthaDone()
+ }
+ if hdlDone != nil {
+ hdlDone()
+ }
+ return
+ case event := <-vpa.events:
+ switch event {
+ case vpaEventStart:
+ logger.Debug(ctx, "vpagent-voltha-start-event")
+
+ // Start the loops that process messages
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ go vpa.handlePacketsIn(hdlCtx)
+ go vpa.handleChangeEvents(hdlCtx)
+
+ // Kick off process to attempt to establish
+ // connection to voltha
+ state = vpaStateConnecting
+ go func() {
+ if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
+ }
+ }()
+
+ case vpaEventVolthaConnected:
+ logger.Debug(ctx, "vpagent-voltha-connect-event")
+
+ // Start the loops that poll from voltha
+ if state != vpaStateConnected {
+ state = vpaStateConnected
+ volthaCtx, volthaDone = context.WithCancel(context.Background())
+ go vpa.receiveChangeEvents(volthaCtx)
+ go vpa.receivePacketsIn(volthaCtx)
+ go vpa.streamPacketOut(volthaCtx)
+ go vpa.synchronizeDeviceList(volthaCtx)
+ }
+
+ case vpaEventVolthaDisconnected:
+ if p != nil {
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
+ }
+ logger.Debug(ctx, "vpagent-voltha-disconnect-event")
+ if state == vpaStateConnected {
+ state = vpaStateDisconnected
+ vpa.volthaClient.Clear()
+ volthaDone()
+ volthaDone = nil
+ }
+ if state != vpaStateConnecting {
+ state = vpaStateConnecting
+ go func() {
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
+ }
+ }()
+ }
+
+ case vpaEventError:
+ logger.Debug(ctx, "vpagent-error-event")
+ default:
+ logger.Fatalw(ctx, "vpagent-unknown-event",
+ log.Fields{"event": event})
+ }
+ }
+ }
+}