blob: 893f69df56193e2f30822a97bbc17766c614e3bf [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "sync"
21 "time"
22
23 "voltha-go-controller/database"
24 "voltha-go-controller/internal/pkg/holder"
25 "voltha-go-controller/internal/pkg/intf"
26
Tinoj Joseph1d108322022-07-13 10:07:39 +053027 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053028
Naveen Sampath04696f72022-06-13 15:19:14 +053029 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31 "google.golang.org/grpc"
32)
33
34var logger log.CLogger
35var ctx = context.TODO()
36
37func init() {
38 // Setup this package so that it's log level can be modified at run time
39 var err error
Tinoj Joseph1d108322022-07-13 10:07:39 +053040 logger, err = log.AddPackageWithDefaultParam()
Naveen Sampath04696f72022-06-13 15:19:14 +053041 if err != nil {
42 panic(err)
43 }
44}
45
46type vpaEvent byte
47type vpaState byte
48
49var db database.DBIntf
50
51const (
52 vpaEventStart = vpaEvent(iota)
53 vpaEventVolthaConnected
54 vpaEventVolthaDisconnected
55 vpaEventError
56
57 vpaStateConnected = vpaState(iota)
58 vpaStateConnecting
59 vpaStateDisconnected
60)
61
62var vpAgent *VPAgent
63
64// VPAgent structure
65type VPAgent struct {
vinokuma926cb3e2023-03-29 11:41:06 +053066 VPClientAgent intf.IVPClientAgent
67 clientMap map[string]intf.IVPClient
68 packetInChannel chan *ofp.PacketIn
69 packetOutChannel chan *ofp.PacketOut
70 changeEventChannel chan *ofp.ChangeEvent
71 volthaClient *holder.VolthaServiceClientHolder
72 volthaConnection *grpc.ClientConn
73 events chan vpaEvent
Naveen Sampath04696f72022-06-13 15:19:14 +053074 VolthaAPIEndPoint string
vinokuma926cb3e2023-03-29 11:41:06 +053075 mapLock sync.Mutex
Naveen Sampath04696f72022-06-13 15:19:14 +053076 DeviceListRefreshInterval time.Duration
Naveen Sampath04696f72022-06-13 15:19:14 +053077 ConnectionRetryDelay time.Duration
vinokuma926cb3e2023-03-29 11:41:06 +053078 ConnectionMaxRetries int
Naveen Sampath04696f72022-06-13 15:19:14 +053079}
80
81// NewVPAgent is constructor for VPAgent
82func NewVPAgent(config *VPAgent) (*VPAgent, error) {
83 vpa := VPAgent{
84 VolthaAPIEndPoint: config.VolthaAPIEndPoint,
85 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
86 ConnectionMaxRetries: config.ConnectionMaxRetries,
87 ConnectionRetryDelay: config.ConnectionRetryDelay,
88 VPClientAgent: config.VPClientAgent,
89 volthaClient: &holder.VolthaServiceClientHolder{},
90 packetInChannel: make(chan *ofp.PacketIn),
91 // customPacketIndChannel: make(chan *voltha.CustomPacketIn),
92 packetOutChannel: make(chan *ofp.PacketOut),
93 changeEventChannel: make(chan *ofp.ChangeEvent),
94 // ofpCommandNotiChannel: make(chan *voltha.OfpCmdRespNotification),
95 // oltRebootNotiChannel: make(chan *voltha.OltRebootNotification),
96 clientMap: make(map[string]intf.IVPClient),
97 events: make(chan vpaEvent, 100),
98 }
99
100 if vpa.DeviceListRefreshInterval <= 0 {
101 logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
102 log.Fields{
103 "value": vpa.DeviceListRefreshInterval.String(),
104 "default": (10 * time.Second).String()})
105 vpa.DeviceListRefreshInterval = 1 * time.Minute
106 }
107
108 if vpa.ConnectionRetryDelay <= 0 {
109 logger.Warnw(ctx, "connection retry delay not value, setting to default",
110 log.Fields{
111 "value": vpa.ConnectionRetryDelay.String(),
112 "default": (3 * time.Second).String()})
113 vpa.ConnectionRetryDelay = 3 * time.Second
114 }
115
116 if db == nil {
117 db = database.GetDatabase()
118 }
119 vpAgent = &vpa
120 return &vpa, nil
121}
122
vinokuma926cb3e2023-03-29 11:41:06 +0530123// GetVPAgent - returns vpAgent object
Naveen Sampath04696f72022-06-13 15:19:14 +0530124func GetVPAgent() *VPAgent {
125 return vpAgent
126}
127
128// VolthaSvcClient for Voltha Svc client
129func (vpa *VPAgent) VolthaSvcClient() voltha.VolthaServiceClient {
130 return vpa.volthaClient.Get()
131}
132
vinokuma926cb3e2023-03-29 11:41:06 +0530133// Run - make the initial connection to voltha and kicks off io streams
Naveen Sampath04696f72022-06-13 15:19:14 +0530134func (vpa *VPAgent) Run(ctx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530135 logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
136 log.Fields{
137 "voltha-endpoint": vpa.VolthaAPIEndPoint})
138
Naveen Sampath04696f72022-06-13 15:19:14 +0530139 vpa.events <- vpaEventStart
140
141 /*
142 * Two sub-contexts are created here for different purposes so we can
143 * control the lifecyle of processing loops differently.
144 *
145 * volthaCtx - controls those processes that rely on the GRPC
146 * GRPCconnection to voltha and will be restarted when the
147 * GRPC connection is interrupted.
148 * hdlCtx - controls those processes that listen to channels and
149 * process each message. these will likely never be
150 * stopped until the vpagent is stopped.
151 */
152 var volthaCtx, hdlCtx context.Context
153 var volthaDone, hdlDone func()
154 state := vpaStateDisconnected
155
156 for {
157 select {
158 case <-ctx.Done():
159 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
160 if volthaDone != nil {
161 volthaDone()
162 }
163 if hdlDone != nil {
164 hdlDone()
165 }
166 return
167 case event := <-vpa.events:
168 switch event {
169 case vpaEventStart:
170 logger.Debug(ctx, "vpagent-voltha-start-event")
171
172 // Start the loops that process messages
173 hdlCtx, hdlDone = context.WithCancel(context.Background())
174 go vpa.handlePacketsIn(hdlCtx)
175 go vpa.handleChangeEvents(hdlCtx)
176
177 // Kick off process to attempt to establish
178 // connection to voltha
179 state = vpaStateConnecting
180 go func() {
Akash Sonief452f12024-12-12 18:20:28 +0530181 if err := vpa.establishConnectionToVoltha(hdlCtx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
183 }
184 }()
185
186 case vpaEventVolthaConnected:
187 logger.Debug(ctx, "vpagent-voltha-connect-event")
188
189 // Start the loops that poll from voltha
190 if state != vpaStateConnected {
191 state = vpaStateConnected
192 volthaCtx, volthaDone = context.WithCancel(context.Background())
193 go vpa.receiveChangeEvents(volthaCtx)
194 go vpa.receivePacketsIn(volthaCtx)
195 go vpa.streamPacketOut(volthaCtx)
196 go vpa.synchronizeDeviceList(volthaCtx)
197 }
198
199 case vpaEventVolthaDisconnected:
Naveen Sampath04696f72022-06-13 15:19:14 +0530200 logger.Debug(ctx, "vpagent-voltha-disconnect-event")
201 if state == vpaStateConnected {
202 state = vpaStateDisconnected
203 vpa.volthaClient.Clear()
204 volthaDone()
205 volthaDone = nil
206 }
207 if state != vpaStateConnecting {
208 state = vpaStateConnecting
209 go func() {
210 hdlCtx, hdlDone = context.WithCancel(context.Background())
Akash Sonief452f12024-12-12 18:20:28 +0530211 if err := vpa.establishConnectionToVoltha(hdlCtx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530212 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
213 }
214 }()
215 }
216
217 case vpaEventError:
218 logger.Debug(ctx, "vpagent-error-event")
219 default:
220 logger.Fatalw(ctx, "vpagent-unknown-event",
221 log.Fields{"event": event})
222 }
223 }
224 }
225}