First Commit of Voltha-Go-Controller from Radisys
Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/vpagent/changeEvent.go b/internal/pkg/vpagent/changeEvent.go
new file mode 100644
index 0000000..3d86675
--- /dev/null
+++ b/internal/pkg/vpagent/changeEvent.go
@@ -0,0 +1,106 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "io"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc"
+)
+
+func (vpa *VPAgent) receiveChangeEvents(ctx context.Context) {
+ logger.Debug(ctx, "receive-change-events-started")
+ // If we exit, assume disconnected
+ defer func() {
+ vpa.events <- vpaEventVolthaDisconnected
+ logger.Debug(ctx, "receive-change-events-finished")
+ }()
+ if vpa.volthaClient == nil {
+ logger.Error(ctx, "no-voltha-connection")
+ return
+ }
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
+ vServiceClient := vpa.volthaClient.Get()
+ if vServiceClient == nil {
+ logger.Error(ctx, "Failed to get Voltha Service Client")
+ return
+ }
+
+ stream, err := vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
+ log.Fields{"error": err})
+ return
+ }
+
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ break top
+ default:
+ ce, err := stream.Recv()
+ if err == io.EOF {
+ logger.Infow(ctx, "EOF for receiveChangeEvents stream, reconnecting", log.Fields{"err": err})
+ stream, err = vServiceClient.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
+ log.Fields{"error": err})
+ return
+ }
+ continue
+ }
+ if isConnCanceled(err) {
+ logger.Errorw(ctx, "error receiving change event",
+ log.Fields{"error": err})
+ break top
+ } else if err != nil {
+ logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
+ continue
+ }
+ vpa.changeEventChannel <- ce
+ logger.Debug(ctx, "receive-change-event-queued")
+ }
+ }
+}
+
+func (vpa *VPAgent) handleChangeEvents(ctx context.Context) {
+ logger.Debug(ctx, "handle-change-event-started")
+
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ break top
+ case changeEvent := <-vpa.changeEventChannel:
+ logger.Debugw(ctx, "Change Event", log.Fields{"Device": changeEvent.Id})
+ if vpc := vpa.getVPClient(changeEvent.Id); vpc != nil {
+ if err:= vpc.ChangeEvent(changeEvent); err != nil {
+ logger.Errorw(ctx, "error handling Change Event", log.Fields{"Error": err, "Device": changeEvent.Id})
+ }
+ }
+ }
+ }
+
+ logger.Debug(ctx, "handle-change-event-finsihed")
+}
diff --git a/internal/pkg/vpagent/common.go b/internal/pkg/vpagent/common.go
new file mode 100644
index 0000000..d32e0a6
--- /dev/null
+++ b/internal/pkg/vpagent/common.go
@@ -0,0 +1,63 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+// Package vpagent Common Logger initialization
+package vpagent
+
+import (
+ "context"
+ "strings"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
+
+// IsConnCanceled returns true, if error is from a closed gRPC connection.
+// ref. https://github.com/grpc/grpc-go/pull/1854
+func isConnCanceled(err error) bool {
+ if err == nil {
+ return false
+ }
+ // >= gRPC v1.23.x
+ s, ok := status.FromError(err)
+ if ok {
+ // connection is canceled or server has already closed the connection
+ return s.Code() == codes.Canceled || s.Message() == "transport is closing"
+ }
+
+ e, ok := status.FromError(err)
+ if ok {
+ // connection is canceled or server has already closed the connection
+ return e.Code() == codes.Canceled || e.Message() == "all SubConns are in TransientFailure"
+ }
+
+ // >= gRPC v1.10.x
+ if err == context.Canceled {
+ return true
+ }
+
+ // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
+ return strings.Contains(err.Error(), "grpc: the client connection is closing")
+}
diff --git a/internal/pkg/vpagent/connection.go b/internal/pkg/vpagent/connection.go
new file mode 100644
index 0000000..607f7f3
--- /dev/null
+++ b/internal/pkg/vpagent/connection.go
@@ -0,0 +1,88 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+)
+
+//GrpcMaxSize Max size of grpc message
+const GrpcMaxSize int = 17455678
+
+func (vpa *VPAgent) establishConnectionToVoltha(ctx context.Context, p *probe.Probe) error {
+ if p != nil {
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusPreparing)
+ }
+
+ if vpa.volthaConnection != nil {
+ vpa.volthaConnection.Close()
+ }
+
+ vpa.volthaConnection = nil
+ vpa.volthaClient.Clear()
+ try := 1
+ for vpa.ConnectionMaxRetries == 0 || try < vpa.ConnectionMaxRetries {
+ conn, err := grpc.Dial(vpa.VolthaAPIEndPoint, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(GrpcMaxSize)))
+ if err == nil {
+ svc := voltha.NewVolthaServiceClient(conn)
+ if svc != nil {
+ if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
+ logger.Debugw(ctx, "Established connection to Voltha",
+ log.Fields{
+ "VolthaApiEndPoint": vpa.VolthaAPIEndPoint,
+ })
+ vpa.volthaConnection = conn
+ vpa.volthaClient.Set(svc)
+ if p != nil {
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusRunning)
+ }
+ vpa.events <- vpaEventVolthaConnected
+ return nil
+ }
+ }
+ }
+ logger.Warnw(ctx, "Failed to connect to voltha",
+ log.Fields{
+ "VolthaApiEndPoint": vpa.VolthaAPIEndPoint,
+ "error": err.Error(),
+ })
+ if vpa.ConnectionMaxRetries == 0 || try < vpa.ConnectionMaxRetries {
+ if vpa.ConnectionMaxRetries != 0 {
+ try++
+ }
+ time.Sleep(vpa.ConnectionRetryDelay)
+ }
+ }
+ if p != nil {
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusFailed)
+ }
+ return errors.New("failed-to-connect-to-voltha")
+}
+
+// CloseConnectionToVoltha closes the grpc connection to VOLTHA
+func (vpa *VPAgent) CloseConnectionToVoltha() {
+ //Close the grpc connection to voltha
+ logger.Debug(ctx, "Closing voltha grpc connection")
+ vpa.volthaConnection.Close()
+}
diff --git a/internal/pkg/vpagent/packetIn.go b/internal/pkg/vpagent/packetIn.go
new file mode 100644
index 0000000..4a2f7c3
--- /dev/null
+++ b/internal/pkg/vpagent/packetIn.go
@@ -0,0 +1,96 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "io"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc"
+)
+
+func (vpa *VPAgent) receivePacketsIn(ctx context.Context) {
+ logger.Debug(ctx, "receive-packets-in-started")
+ // If we exit, assume disconnected
+ defer func() {
+ vpa.events <- vpaEventVolthaDisconnected
+ logger.Debug(ctx, "receive-packets-in-finished")
+ }()
+ if vpa.volthaClient == nil {
+ logger.Error(ctx, "no-voltha-connection")
+ return
+ }
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ defer streamDone()
+ stream, err := vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
+ log.Fields{"error": err})
+ return
+ }
+
+top:
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ break top
+ default:
+ pkt, err := stream.Recv()
+ if err == io.EOF {
+ logger.Infow(ctx, "EOF for receivePacketsIn stream, reconnecting", log.Fields{"err": err})
+ stream, err = vpa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
+ log.Fields{"error": err})
+ return
+ }
+ continue
+ }
+
+ if isConnCanceled(err) {
+ logger.Errorw(ctx, "error receiving packet",
+ log.Fields{"error": err})
+ break top
+ } else if err != nil {
+ logger.Infow(ctx, "Ignoring unhandled error", log.Fields{"err": err})
+ continue
+ }
+ vpa.packetInChannel <- pkt
+ }
+ }
+}
+
+func (vpa *VPAgent) handlePacketsIn(ctx context.Context) {
+ logger.Debug(ctx, "handle-packets-in-started")
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ break top
+ case packet := <-vpa.packetInChannel:
+ if vpc := vpa.getVPClient(packet.Id); vpc != nil {
+ vpc.PacketIn(packet)
+ }
+ }
+ }
+ logger.Debug(ctx, "handle-packets-in-finished")
+}
diff --git a/internal/pkg/vpagent/packetOut.go b/internal/pkg/vpagent/packetOut.go
new file mode 100644
index 0000000..501bdea
--- /dev/null
+++ b/internal/pkg/vpagent/packetOut.go
@@ -0,0 +1,61 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc"
+)
+
+func (vpa *VPAgent) streamPacketOut(ctx context.Context) {
+ logger.Debug(ctx, "packet-out-started")
+ // If we exit, assume disconnected
+ defer func() {
+ vpa.events <- vpaEventVolthaDisconnected
+ logger.Debug(ctx, "packet-out-finished")
+ }()
+ if vpa.volthaClient == nil {
+ logger.Error(ctx, "no-voltha-connection")
+ return
+ }
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(context.Background())
+ outClient, err := vpa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
+ defer streamDone()
+ if err != nil {
+ logger.Errorw(ctx, "streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+ return
+ }
+top:
+ for {
+ select {
+ case <-ctx.Done():
+ break top
+ case ofPacketOut := <-vpa.packetOutChannel:
+ if logger.V(log.DebugLevel) {
+ logger.Debug(ctx, "streamPacketOut Receive PacketOut from Channel")
+ }
+ if err := outClient.Send(ofPacketOut); err != nil {
+ logger.Errorw(ctx, "packet-out-send-error",
+ log.Fields{"error": err.Error()})
+ break top
+ }
+ logger.Debug(ctx, "packet-out-send")
+ }
+ }
+}
diff --git a/internal/pkg/vpagent/refresh.go b/internal/pkg/vpagent/refresh.go
new file mode 100644
index 0000000..c06e5cd
--- /dev/null
+++ b/internal/pkg/vpagent/refresh.go
@@ -0,0 +1,141 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "time"
+
+ "voltha-go-controller/internal/pkg/intf"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+func (vpa *VPAgent) synchronizeDeviceList(ctx context.Context) {
+ // Send reconnection indication to the devices already known
+ for _, vpc := range vpa.clientMap {
+ vpc.ConnectInd(context.TODO(), intf.DeviceReDisc)
+ }
+
+ // Refresh once to get everything started
+ vpa.refreshDeviceList()
+
+ tick := time.NewTicker(vpa.DeviceListRefreshInterval)
+loop:
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ break loop
+ case <-tick.C:
+ vpa.refreshDeviceList()
+ }
+ }
+ tick.Stop()
+}
+
+func (vpa *VPAgent) refreshDeviceList() {
+ // If we exit, assume disconnected
+ if vpa.volthaClient == nil {
+ logger.Error(ctx, "no-voltha-connection")
+ vpa.events <- vpaEventVolthaDisconnected
+ return
+ }
+ deviceList, err := vpa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
+ if err != nil {
+ logger.Errorw(ctx, "vpagent failed to query device list from voltha",
+ log.Fields{"error": err})
+ vpa.events <- vpaEventVolthaDisconnected
+ return
+ }
+
+ var toAdd []int
+ var toDel []string
+ var deviceIDMap = make(map[string]string)
+ for index, d := range deviceList.Items {
+ deviceID := d.Id
+ deviceIDMap[deviceID] = deviceID
+ if vpa.clientMap[deviceID] == nil {
+ toAdd = append(toAdd, index)
+ }
+ }
+ for key := range vpa.clientMap {
+ deviceID, ok := deviceIDMap[key]
+ if !ok || (ok && deviceID == "") {
+ toDel = append(toDel, key)
+ }
+ }
+ logger.Debugw(ctx, "Device Refresh", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ for i := 0; i < len(toAdd); i++ {
+ device := deviceList.Items[toAdd[i]]
+ serialNum := device.Desc.SerialNum
+ // If the blocked device list contain device serial number, do not add VPClient.
+ if vpa.VPClientAgent.IsBlockedDevice(serialNum) {
+ logger.Debugw(ctx, "Device Serial Number is present in the blocked device list", log.Fields{"device-serial-number": serialNum})
+ } else {
+ vpa.addVPClient(device) // client is started in addVPClient
+ }
+ }
+
+ for i := 0; i < len(toDel); i++ {
+ vpa.VPClientAgent.DelDevice(toDel[i])
+ vpa.mapLock.Lock()
+ delete(vpa.clientMap, toDel[i])
+ vpa.mapLock.Unlock()
+ }
+}
+
+func (vpa *VPAgent) addVPClient(device *voltha.LogicalDevice) intf.IVPClient {
+ logger.Warnw(ctx, "GrpcClient addClient called ", log.Fields{"device-id": device.Id})
+ vpa.mapLock.Lock()
+ defer vpa.mapLock.Unlock()
+ var serialNum = "Unknown"
+ if device.Desc != nil {
+ serialNum = device.Desc.SerialNum
+ }
+ vpc := vpa.clientMap[device.Id]
+ if vpc == nil {
+ vpa.VPClientAgent.AddNewDevice(&intf.VPClientCfg{
+ DeviceID: device.Id,
+ SerialNum: serialNum,
+ SouthBoundID: device.RootDeviceId,
+ VolthaClient: vpa.volthaClient,
+ PacketOutChannel: vpa.packetOutChannel,
+ })
+
+ }
+ logger.Debugw(ctx, "Finished with addClient", log.Fields{"deviceID": device.Id})
+ return vpc
+}
+
+//AddClientToClientMap - called by controller once device obj is created
+func (vpa *VPAgent) AddClientToClientMap(deviceID string, vpc intf.IVPClient) {
+ vpa.mapLock.Lock()
+ defer vpa.mapLock.Unlock()
+
+ if vpc != nil {
+ vpa.clientMap[deviceID] = vpc
+ }
+}
+
+func (vpa *VPAgent) getVPClient(deviceID string) intf.IVPClient {
+ if vpc, ok := vpa.clientMap[deviceID]; ok {
+ return vpc
+ }
+ return nil
+}
diff --git a/internal/pkg/vpagent/volthaprotoagent.go b/internal/pkg/vpagent/volthaprotoagent.go
new file mode 100644
index 0000000..04e3d08
--- /dev/null
+++ b/internal/pkg/vpagent/volthaprotoagent.go
@@ -0,0 +1,237 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package vpagent
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "voltha-go-controller/database"
+ "voltha-go-controller/internal/pkg/holder"
+ "voltha-go-controller/internal/pkg/intf"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+)
+
+var logger log.CLogger
+var ctx = context.TODO()
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
+
+type vpaEvent byte
+type vpaState byte
+
+var db database.DBIntf
+
+const (
+ vpaEventStart = vpaEvent(iota)
+ vpaEventVolthaConnected
+ vpaEventVolthaDisconnected
+ vpaEventError
+
+ vpaStateConnected = vpaState(iota)
+ vpaStateConnecting
+ vpaStateDisconnected
+)
+
+var vpAgent *VPAgent
+
+// VPAgent structure
+type VPAgent struct {
+ VolthaAPIEndPoint string
+ DeviceListRefreshInterval time.Duration
+ ConnectionMaxRetries int
+ ConnectionRetryDelay time.Duration
+
+ volthaConnection *grpc.ClientConn
+ volthaClient *holder.VolthaServiceClientHolder
+ mapLock sync.Mutex
+ clientMap map[string]intf.IVPClient
+ events chan vpaEvent
+
+ packetInChannel chan *ofp.PacketIn
+ packetOutChannel chan *ofp.PacketOut
+ changeEventChannel chan *ofp.ChangeEvent
+ VPClientAgent intf.IVPClientAgent
+}
+
+// NewVPAgent is constructor for VPAgent
+func NewVPAgent(config *VPAgent) (*VPAgent, error) {
+ vpa := VPAgent{
+ VolthaAPIEndPoint: config.VolthaAPIEndPoint,
+ DeviceListRefreshInterval: config.DeviceListRefreshInterval,
+ ConnectionMaxRetries: config.ConnectionMaxRetries,
+ ConnectionRetryDelay: config.ConnectionRetryDelay,
+ VPClientAgent: config.VPClientAgent,
+ volthaClient: &holder.VolthaServiceClientHolder{},
+ packetInChannel: make(chan *ofp.PacketIn),
+ // customPacketIndChannel: make(chan *voltha.CustomPacketIn),
+ packetOutChannel: make(chan *ofp.PacketOut),
+ changeEventChannel: make(chan *ofp.ChangeEvent),
+ // ofpCommandNotiChannel: make(chan *voltha.OfpCmdRespNotification),
+ // oltRebootNotiChannel: make(chan *voltha.OltRebootNotification),
+ clientMap: make(map[string]intf.IVPClient),
+ events: make(chan vpaEvent, 100),
+ }
+
+ if vpa.DeviceListRefreshInterval <= 0 {
+ logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
+ log.Fields{
+ "value": vpa.DeviceListRefreshInterval.String(),
+ "default": (10 * time.Second).String()})
+ vpa.DeviceListRefreshInterval = 1 * time.Minute
+ }
+
+ if vpa.ConnectionRetryDelay <= 0 {
+ logger.Warnw(ctx, "connection retry delay not value, setting to default",
+ log.Fields{
+ "value": vpa.ConnectionRetryDelay.String(),
+ "default": (3 * time.Second).String()})
+ vpa.ConnectionRetryDelay = 3 * time.Second
+ }
+
+ if db == nil {
+ db = database.GetDatabase()
+ }
+ vpAgent = &vpa
+ return &vpa, nil
+}
+
+//GetVPAgent - returns vpAgent object
+func GetVPAgent() *VPAgent {
+ return vpAgent
+}
+
+// VolthaSvcClient for Voltha Svc client
+func (vpa *VPAgent) VolthaSvcClient() voltha.VolthaServiceClient {
+ return vpa.volthaClient.Get()
+}
+
+// Run - make the inital connection to voltha and kicks off io streams
+func (vpa *VPAgent) Run(ctx context.Context) {
+
+ logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
+ log.Fields{
+ "voltha-endpoint": vpa.VolthaAPIEndPoint})
+
+ // If the context contains a k8s probe then register services
+ p := probe.GetProbeFromContext(ctx)
+ if p != nil {
+ p.RegisterService(ctx, "voltha")
+ }
+
+ vpa.events <- vpaEventStart
+
+ /*
+ * Two sub-contexts are created here for different purposes so we can
+ * control the lifecyle of processing loops differently.
+ *
+ * volthaCtx - controls those processes that rely on the GRPC
+ * GRPCconnection to voltha and will be restarted when the
+ * GRPC connection is interrupted.
+ * hdlCtx - controls those processes that listen to channels and
+ * process each message. these will likely never be
+ * stopped until the vpagent is stopped.
+ */
+ var volthaCtx, hdlCtx context.Context
+ var volthaDone, hdlDone func()
+ state := vpaStateDisconnected
+
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
+ if volthaDone != nil {
+ volthaDone()
+ }
+ if hdlDone != nil {
+ hdlDone()
+ }
+ return
+ case event := <-vpa.events:
+ switch event {
+ case vpaEventStart:
+ logger.Debug(ctx, "vpagent-voltha-start-event")
+
+ // Start the loops that process messages
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ go vpa.handlePacketsIn(hdlCtx)
+ go vpa.handleChangeEvents(hdlCtx)
+
+ // Kick off process to attempt to establish
+ // connection to voltha
+ state = vpaStateConnecting
+ go func() {
+ if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
+ }
+ }()
+
+ case vpaEventVolthaConnected:
+ logger.Debug(ctx, "vpagent-voltha-connect-event")
+
+ // Start the loops that poll from voltha
+ if state != vpaStateConnected {
+ state = vpaStateConnected
+ volthaCtx, volthaDone = context.WithCancel(context.Background())
+ go vpa.receiveChangeEvents(volthaCtx)
+ go vpa.receivePacketsIn(volthaCtx)
+ go vpa.streamPacketOut(volthaCtx)
+ go vpa.synchronizeDeviceList(volthaCtx)
+ }
+
+ case vpaEventVolthaDisconnected:
+ if p != nil {
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
+ }
+ logger.Debug(ctx, "vpagent-voltha-disconnect-event")
+ if state == vpaStateConnected {
+ state = vpaStateDisconnected
+ vpa.volthaClient.Clear()
+ volthaDone()
+ volthaDone = nil
+ }
+ if state != vpaStateConnecting {
+ state = vpaStateConnecting
+ go func() {
+ hdlCtx, hdlDone = context.WithCancel(context.Background())
+ if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
+ logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
+ }
+ }()
+ }
+
+ case vpaEventError:
+ logger.Debug(ctx, "vpagent-error-event")
+ default:
+ logger.Fatalw(ctx, "vpagent-unknown-event",
+ log.Fields{"event": event})
+ }
+ }
+ }
+}