blob: 6a3dcd46b977e1b61a5cfc8a2e2a14b9ce998bcf [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package vpagent
17
18import (
19 "context"
20 "sync"
21 "time"
22
23 "voltha-go-controller/database"
24 "voltha-go-controller/internal/pkg/holder"
25 "voltha-go-controller/internal/pkg/intf"
26
Tinoj Joseph1d108322022-07-13 10:07:39 +053027 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053028 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
29 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31 "google.golang.org/grpc"
32)
33
34var logger log.CLogger
35var ctx = context.TODO()
36
37func init() {
38 // Setup this package so that it's log level can be modified at run time
39 var err error
Tinoj Joseph1d108322022-07-13 10:07:39 +053040 logger, err = log.AddPackageWithDefaultParam()
Naveen Sampath04696f72022-06-13 15:19:14 +053041 if err != nil {
42 panic(err)
43 }
44}
45
46type vpaEvent byte
47type vpaState byte
48
49var db database.DBIntf
50
51const (
52 vpaEventStart = vpaEvent(iota)
53 vpaEventVolthaConnected
54 vpaEventVolthaDisconnected
55 vpaEventError
56
57 vpaStateConnected = vpaState(iota)
58 vpaStateConnecting
59 vpaStateDisconnected
60)
61
62var vpAgent *VPAgent
63
64// VPAgent structure
65type VPAgent struct {
66 VolthaAPIEndPoint string
67 DeviceListRefreshInterval time.Duration
68 ConnectionMaxRetries int
69 ConnectionRetryDelay time.Duration
70
71 volthaConnection *grpc.ClientConn
72 volthaClient *holder.VolthaServiceClientHolder
73 mapLock sync.Mutex
74 clientMap map[string]intf.IVPClient
75 events chan vpaEvent
76
77 packetInChannel chan *ofp.PacketIn
78 packetOutChannel chan *ofp.PacketOut
79 changeEventChannel chan *ofp.ChangeEvent
80 VPClientAgent intf.IVPClientAgent
81}
82
83// NewVPAgent is constructor for VPAgent
84func NewVPAgent(config *VPAgent) (*VPAgent, error) {
85 vpa := VPAgent{
86 VolthaAPIEndPoint: config.VolthaAPIEndPoint,
87 DeviceListRefreshInterval: config.DeviceListRefreshInterval,
88 ConnectionMaxRetries: config.ConnectionMaxRetries,
89 ConnectionRetryDelay: config.ConnectionRetryDelay,
90 VPClientAgent: config.VPClientAgent,
91 volthaClient: &holder.VolthaServiceClientHolder{},
92 packetInChannel: make(chan *ofp.PacketIn),
93 // customPacketIndChannel: make(chan *voltha.CustomPacketIn),
94 packetOutChannel: make(chan *ofp.PacketOut),
95 changeEventChannel: make(chan *ofp.ChangeEvent),
96 // ofpCommandNotiChannel: make(chan *voltha.OfpCmdRespNotification),
97 // oltRebootNotiChannel: make(chan *voltha.OltRebootNotification),
98 clientMap: make(map[string]intf.IVPClient),
99 events: make(chan vpaEvent, 100),
100 }
101
102 if vpa.DeviceListRefreshInterval <= 0 {
103 logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
104 log.Fields{
105 "value": vpa.DeviceListRefreshInterval.String(),
106 "default": (10 * time.Second).String()})
107 vpa.DeviceListRefreshInterval = 1 * time.Minute
108 }
109
110 if vpa.ConnectionRetryDelay <= 0 {
111 logger.Warnw(ctx, "connection retry delay not value, setting to default",
112 log.Fields{
113 "value": vpa.ConnectionRetryDelay.String(),
114 "default": (3 * time.Second).String()})
115 vpa.ConnectionRetryDelay = 3 * time.Second
116 }
117
118 if db == nil {
119 db = database.GetDatabase()
120 }
121 vpAgent = &vpa
122 return &vpa, nil
123}
124
125//GetVPAgent - returns vpAgent object
126func GetVPAgent() *VPAgent {
127 return vpAgent
128}
129
130// VolthaSvcClient for Voltha Svc client
131func (vpa *VPAgent) VolthaSvcClient() voltha.VolthaServiceClient {
132 return vpa.volthaClient.Get()
133}
134
135// Run - make the inital connection to voltha and kicks off io streams
136func (vpa *VPAgent) Run(ctx context.Context) {
137
138 logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
139 log.Fields{
140 "voltha-endpoint": vpa.VolthaAPIEndPoint})
141
142 // If the context contains a k8s probe then register services
143 p := probe.GetProbeFromContext(ctx)
144 if p != nil {
145 p.RegisterService(ctx, "voltha")
146 }
147
148 vpa.events <- vpaEventStart
149
150 /*
151 * Two sub-contexts are created here for different purposes so we can
152 * control the lifecyle of processing loops differently.
153 *
154 * volthaCtx - controls those processes that rely on the GRPC
155 * GRPCconnection to voltha and will be restarted when the
156 * GRPC connection is interrupted.
157 * hdlCtx - controls those processes that listen to channels and
158 * process each message. these will likely never be
159 * stopped until the vpagent is stopped.
160 */
161 var volthaCtx, hdlCtx context.Context
162 var volthaDone, hdlDone func()
163 state := vpaStateDisconnected
164
165 for {
166 select {
167 case <-ctx.Done():
168 logger.Errorw(ctx, "Context Done", log.Fields{"Context": ctx})
169 if volthaDone != nil {
170 volthaDone()
171 }
172 if hdlDone != nil {
173 hdlDone()
174 }
175 return
176 case event := <-vpa.events:
177 switch event {
178 case vpaEventStart:
179 logger.Debug(ctx, "vpagent-voltha-start-event")
180
181 // Start the loops that process messages
182 hdlCtx, hdlDone = context.WithCancel(context.Background())
183 go vpa.handlePacketsIn(hdlCtx)
184 go vpa.handleChangeEvents(hdlCtx)
185
186 // Kick off process to attempt to establish
187 // connection to voltha
188 state = vpaStateConnecting
189 go func() {
190 if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
191 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
192 }
193 }()
194
195 case vpaEventVolthaConnected:
196 logger.Debug(ctx, "vpagent-voltha-connect-event")
197
198 // Start the loops that poll from voltha
199 if state != vpaStateConnected {
200 state = vpaStateConnected
201 volthaCtx, volthaDone = context.WithCancel(context.Background())
202 go vpa.receiveChangeEvents(volthaCtx)
203 go vpa.receivePacketsIn(volthaCtx)
204 go vpa.streamPacketOut(volthaCtx)
205 go vpa.synchronizeDeviceList(volthaCtx)
206 }
207
208 case vpaEventVolthaDisconnected:
209 if p != nil {
210 p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
211 }
212 logger.Debug(ctx, "vpagent-voltha-disconnect-event")
213 if state == vpaStateConnected {
214 state = vpaStateDisconnected
215 vpa.volthaClient.Clear()
216 volthaDone()
217 volthaDone = nil
218 }
219 if state != vpaStateConnecting {
220 state = vpaStateConnecting
221 go func() {
222 hdlCtx, hdlDone = context.WithCancel(context.Background())
223 if err := vpa.establishConnectionToVoltha(hdlCtx, p); err != nil {
224 logger.Fatalw(ctx, "voltha-connection-failed", log.Fields{"error": err})
225 }
226 }()
227 }
228
229 case vpaEventError:
230 logger.Debug(ctx, "vpagent-error-event")
231 default:
232 logger.Fatalw(ctx, "vpagent-unknown-event",
233 log.Fields{"event": event})
234 }
235 }
236 }
237}