blob: 3d962efde1086dee323d42467482f28eca3a7ee5 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "sync"
21 "time"
22
23 "voltha-go-controller/database"
24 "voltha-go-controller/internal/pkg/holder"
25 "voltha-go-controller/internal/pkg/intf"
26
Tinoj Joseph1d108322022-07-13 10:07:39 +053027 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053028
Naveen Sampath04696f72022-06-13 15:19:14 +053029 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
30 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
31 "github.com/opencord/voltha-protos/v5/go/voltha"
32 "google.golang.org/grpc"
33)
34
35var logger log.CLogger
36var ctx = context.TODO()
37
38func init() {
39 // Setup this package so that it's log level can be modified at run time
40 var err error
Tinoj Joseph1d108322022-07-13 10:07:39 +053041 logger, err = log.AddPackageWithDefaultParam()
Naveen Sampath04696f72022-06-13 15:19:14 +053042 if err != nil {
43 panic(err)
44 }
45}
46
47type vpaEvent byte
48type vpaState byte
49
50var db database.DBIntf
51
52const (
53 vpaEventStart = vpaEvent(iota)
54 vpaEventVolthaConnected
55 vpaEventVolthaDisconnected
56 vpaEventError
57
58 vpaStateConnected = vpaState(iota)
59 vpaStateConnecting
60 vpaStateDisconnected
61)
62
63var vpAgent *VPAgent
64
65// VPAgent structure
66type VPAgent struct {
vinokuma926cb3e2023-03-29 11:41:06 +053067 VPClientAgent intf.IVPClientAgent
68 clientMap map[string]intf.IVPClient
69 packetInChannel chan *ofp.PacketIn
70 packetOutChannel chan *ofp.PacketOut
71 changeEventChannel chan *ofp.ChangeEvent
72 volthaClient *holder.VolthaServiceClientHolder
73 volthaConnection *grpc.ClientConn
74 events chan vpaEvent
Naveen Sampath04696f72022-06-13 15:19:14 +053075 VolthaAPIEndPoint string
vinokuma926cb3e2023-03-29 11:41:06 +053076 mapLock sync.Mutex
Naveen Sampath04696f72022-06-13 15:19:14 +053077 DeviceListRefreshInterval time.Duration
Naveen Sampath04696f72022-06-13 15:19:14 +053078 ConnectionRetryDelay time.Duration
vinokuma926cb3e2023-03-29 11:41:06 +053079 ConnectionMaxRetries int
Naveen Sampath04696f72022-06-13 15:19:14 +053080}
81
82// NewVPAgent is constructor for VPAgent
83func NewVPAgent(config *VPAgent) (*VPAgent, error) {
84 vpa := VPAgent{
85 VolthaAPIEndPoint: config.VolthaAPIEndPoint,
86 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
87 ConnectionMaxRetries: config.ConnectionMaxRetries,
88 ConnectionRetryDelay: config.ConnectionRetryDelay,
89 VPClientAgent: config.VPClientAgent,
90 volthaClient: &holder.VolthaServiceClientHolder{},
91 packetInChannel: make(chan *ofp.PacketIn),
92 // customPacketIndChannel: make(chan *voltha.CustomPacketIn),
93 packetOutChannel: make(chan *ofp.PacketOut),
94 changeEventChannel: make(chan *ofp.ChangeEvent),
95 // ofpCommandNotiChannel: make(chan *voltha.OfpCmdRespNotification),
96 // oltRebootNotiChannel: make(chan *voltha.OltRebootNotification),
97 clientMap: make(map[string]intf.IVPClient),
98 events: make(chan vpaEvent, 100),
99 }
100
101 if vpa.DeviceListRefreshInterval <= 0 {
102 logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
103 log.Fields{
104 "value": vpa.DeviceListRefreshInterval.String(),
105 "default": (10 * time.Second).String()})
106 vpa.DeviceListRefreshInterval = 1 * time.Minute
107 }
108
109 if vpa.ConnectionRetryDelay <= 0 {
110 logger.Warnw(ctx, "connection retry delay not value, setting to default",
111 log.Fields{
112 "value": vpa.ConnectionRetryDelay.String(),
113 "default": (3 * time.Second).String()})
114 vpa.ConnectionRetryDelay = 3 * time.Second
115 }
116
117 if db == nil {
118 db = database.GetDatabase()
119 }
120 vpAgent = &vpa
121 return &vpa, nil
122}
123
vinokuma926cb3e2023-03-29 11:41:06 +0530124// GetVPAgent - returns vpAgent object
Naveen Sampath04696f72022-06-13 15:19:14 +0530125func GetVPAgent() *VPAgent {
126 return vpAgent
127}
128
129// VolthaSvcClient for Voltha Svc client
130func (vpa *VPAgent) VolthaSvcClient() voltha.VolthaServiceClient {
131 return vpa.volthaClient.Get()
132}
133
vinokuma926cb3e2023-03-29 11:41:06 +0530134// Run - make the initial connection to voltha and kicks off io streams
Naveen Sampath04696f72022-06-13 15:19:14 +0530135func (vpa *VPAgent) Run(ctx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530136 logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
137 log.Fields{
138 "voltha-endpoint": vpa.VolthaAPIEndPoint})
139
140 // If the context contains a k8s probe then register services
141 p := probe.GetProbeFromContext(ctx)
142 if p != nil {
143 p.RegisterService(ctx, "voltha")
144 }
145
146 vpa.events <- vpaEventStart
147
148 /*
149 * Two sub-contexts are created here for different purposes so we can
150 * control the lifecyle of processing loops differently.
151 *
152 * volthaCtx - controls those processes that rely on the GRPC
153 * GRPCconnection to voltha and will be restarted when the
154 * GRPC connection is interrupted.
155 * hdlCtx - controls those processes that listen to channels and
156 * process each message. these will likely never be
157 * stopped until the vpagent is stopped.
158 */
159 var volthaCtx, hdlCtx context.Context
160 var volthaDone, hdlDone func()
161 state := vpaStateDisconnected
162
163 for {
164 select {
165 case <-ctx.Done():
166 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
167 if volthaDone != nil {
168 volthaDone()
169 }
170 if hdlDone != nil {
171 hdlDone()
172 }
173 return
174 case event := <-vpa.events:
175 switch event {
176 case vpaEventStart:
177 logger.Debug(ctx, "vpagent-voltha-start-event")
178
179 // Start the loops that process messages
180 hdlCtx, hdlDone = context.WithCancel(context.Background())
181 go vpa.handlePacketsIn(hdlCtx)
182 go vpa.handleChangeEvents(hdlCtx)
183
184 // Kick off process to attempt to establish
185 // connection to voltha
186 state = vpaStateConnecting
187 go func() {
188 if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
189 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
190 }
191 }()
192
193 case vpaEventVolthaConnected:
194 logger.Debug(ctx, "vpagent-voltha-connect-event")
195
196 // Start the loops that poll from voltha
197 if state != vpaStateConnected {
198 state = vpaStateConnected
199 volthaCtx, volthaDone = context.WithCancel(context.Background())
200 go vpa.receiveChangeEvents(volthaCtx)
201 go vpa.receivePacketsIn(volthaCtx)
202 go vpa.streamPacketOut(volthaCtx)
203 go vpa.synchronizeDeviceList(volthaCtx)
204 }
205
206 case vpaEventVolthaDisconnected:
207 if p != nil {
208 p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
209 }
210 logger.Debug(ctx, "vpagent-voltha-disconnect-event")
211 if state == vpaStateConnected {
212 state = vpaStateDisconnected
213 vpa.volthaClient.Clear()
214 volthaDone()
215 volthaDone = nil
216 }
217 if state != vpaStateConnecting {
218 state = vpaStateConnecting
219 go func() {
220 hdlCtx, hdlDone = context.WithCancel(context.Background())
221 if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
222 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
223 }
224 }()
225 }
226
227 case vpaEventError:
228 logger.Debug(ctx, "vpagent-error-event")
229 default:
230 logger.Fatalw(ctx, "vpagent-unknown-event",
231 log.Fields{"event": event})
232 }
233 }
234 }
235}