blob: de649d636f7a75fafa8e67eddca0065881a05c23 [file] [log] [blame]
khenaidoo106c61a2021-08-11 18:05:46 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package grpc
17
18import (
19 "context"
20 "fmt"
21 "reflect"
22 "strings"
23 "sync"
24 "time"
25
26 grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
27 grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
30 "github.com/opencord/voltha-protos/v5/go/adapter_services"
31 "github.com/opencord/voltha-protos/v5/go/core"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/keepalive"
34)
35
36type event byte
37type state byte
38type SetAndTestServiceHandler func(context.Context, *grpc.ClientConn) interface{}
39type RestartedHandler func(ctx context.Context, endPoint string) error
40
41type contextKey string
42
43func (c contextKey) String() string {
44 return string(c)
45}
46
47var (
48 grpcMonitorContextKey = contextKey("grpc-monitor")
49)
50
51const (
52 grpcBackoffInitialInterval = "GRPC_BACKOFF_INITIAL_INTERVAL"
53 grpcBackoffMaxInterval = "GRPC_BACKOFF_MAX_INTERVAL"
54 grpcBackoffMaxElapsedTime = "GRPC_BACKOFF_MAX_ELAPSED_TIME"
55 grpcMonitorInterval = "GRPC_MONITOR_INTERVAL"
56)
57
58const (
59 DefaultBackoffInitialInterval = 100 * time.Millisecond
60 DefaultBackoffMaxInterval = 5 * time.Second
61 DefaultBackoffMaxElapsedTime = 0 * time.Second // No time limit
62 DefaultGRPCMonitorInterval = 5 * time.Second
63)
64
65const (
66 connectionErrorSubString = "SubConns are in TransientFailure"
67 connectionClosedSubstring = "client connection is closing"
68 connectionError = "connection error"
69 connectionSystemNotReady = "system is not ready"
70)
71
72const (
73 eventConnecting = event(iota)
74 eventConnected
75 eventDisconnected
76 eventStopped
77 eventError
78
79 stateConnected = state(iota)
80 stateConnecting
81 stateDisconnected
82)
83
84type Client struct {
85 apiEndPoint string
86 connection *grpc.ClientConn
87 connectionLock sync.RWMutex
88 stateLock sync.RWMutex
89 state state
90 service interface{}
91 events chan event
92 onRestart RestartedHandler
93 backoffInitialInterval time.Duration
94 backoffMaxInterval time.Duration
95 backoffMaxElapsedTime time.Duration
96 activityCheck bool
97 monitorInterval time.Duration
98 activeCh chan struct{}
99 activeChMutex sync.RWMutex
100 done bool
101 livenessCallback func(timestamp time.Time)
102}
103
104type ClientOption func(*Client)
105
106func ActivityCheck(enable bool) ClientOption {
107 return func(args *Client) {
108 args.activityCheck = enable
109 }
110}
111
112func NewClient(endpoint string, onRestart RestartedHandler, opts ...ClientOption) (*Client, error) {
113 c := &Client{
114 apiEndPoint: endpoint,
115 onRestart: onRestart,
116 events: make(chan event, 1),
117 state: stateDisconnected,
118 backoffInitialInterval: DefaultBackoffInitialInterval,
119 backoffMaxInterval: DefaultBackoffMaxInterval,
120 backoffMaxElapsedTime: DefaultBackoffMaxElapsedTime,
121 monitorInterval: DefaultGRPCMonitorInterval,
122 }
123 for _, option := range opts {
124 option(c)
125 }
126
127 // Check for environment variables
128 if err := SetFromEnvVariable(grpcBackoffInitialInterval, &c.backoffInitialInterval); err != nil {
129 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffInitialInterval})
130 }
131
132 if err := SetFromEnvVariable(grpcBackoffMaxInterval, &c.backoffMaxInterval); err != nil {
133 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxInterval})
134 }
135
136 if err := SetFromEnvVariable(grpcBackoffMaxElapsedTime, &c.backoffMaxElapsedTime); err != nil {
137 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcBackoffMaxElapsedTime})
138 }
139
140 if err := SetFromEnvVariable(grpcMonitorInterval, &c.monitorInterval); err != nil {
141 logger.Warnw(context.Background(), "failure-reading-env-variable", log.Fields{"error": err, "variable": grpcMonitorInterval})
142 }
143
144 logger.Infow(context.Background(), "initialized-client", log.Fields{"client": c})
145
146 // Sanity check
147 if c.backoffInitialInterval > c.backoffMaxInterval {
148 return nil, fmt.Errorf("initial retry delay %v is greater than maximum retry delay %v", c.backoffInitialInterval, c.backoffMaxInterval)
149 }
150
151 return c, nil
152}
153
154func (c *Client) GetClient() (interface{}, error) {
155 c.connectionLock.RLock()
156 defer c.connectionLock.RUnlock()
157 if c.service == nil {
158 return nil, fmt.Errorf("no connection to %s", c.apiEndPoint)
159 }
160 return c.service, nil
161}
162
163// GetCoreServiceClient is a helper function that returns a concrete service instead of the GetClient() API
164// which returns an interface
165func (c *Client) GetCoreServiceClient() (core.CoreServiceClient, error) {
166 c.connectionLock.RLock()
167 defer c.connectionLock.RUnlock()
168 if c.service == nil {
169 return nil, fmt.Errorf("no core connection to %s", c.apiEndPoint)
170 }
171 client, ok := c.service.(core.CoreServiceClient)
172 if ok {
173 return client, nil
174 }
175 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
176}
177
178// GetOnuAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
179// which returns an interface
180func (c *Client) GetOnuInterAdapterServiceClient() (adapter_services.OnuInterAdapterServiceClient, error) {
181 c.connectionLock.RLock()
182 defer c.connectionLock.RUnlock()
183 if c.service == nil {
184 return nil, fmt.Errorf("no child adapter connection to %s", c.apiEndPoint)
185 }
186 client, ok := c.service.(adapter_services.OnuInterAdapterServiceClient)
187 if ok {
188 return client, nil
189 }
190 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
191}
192
193// GetOltAdapterServiceClient is a helper function that returns a concrete service instead of the GetClient() API
194// which returns an interface
195func (c *Client) GetOltInterAdapterServiceClient() (adapter_services.OltInterAdapterServiceClient, error) {
196 c.connectionLock.RLock()
197 defer c.connectionLock.RUnlock()
198 if c.service == nil {
199 return nil, fmt.Errorf("no parent adapter connection to %s", c.apiEndPoint)
200 }
201 client, ok := c.service.(adapter_services.OltInterAdapterServiceClient)
202 if ok {
203 return client, nil
204 }
205 return nil, fmt.Errorf("invalid-service-%s", reflect.TypeOf(c.service))
206}
207
208func (c *Client) Reset(ctx context.Context) {
209 logger.Debugw(ctx, "resetting-client-connection", log.Fields{"endpoint": c.apiEndPoint})
210 c.stateLock.Lock()
211 defer c.stateLock.Unlock()
212 if c.state == stateConnected {
213 c.state = stateDisconnected
214 c.events <- eventDisconnected
215 }
216}
217
218func (c *Client) clientInterceptor(ctx context.Context, method string, req interface{}, reply interface{},
219 cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
220 // Nothing to do before intercepting the call
221 err := invoker(ctx, method, req, reply, cc, opts...)
222 // On connection failure, start the reconnect process depending on the error response
223 if err != nil {
224 logger.Errorw(ctx, "received-error", log.Fields{"error": err, "context": ctx, "endpoint": c.apiEndPoint})
225 if strings.Contains(err.Error(), connectionErrorSubString) ||
226 strings.Contains(err.Error(), connectionError) ||
227 strings.Contains(err.Error(), connectionSystemNotReady) ||
228 isGrpcMonitorKeyPresentInContext(ctx) {
229 c.stateLock.Lock()
230 if c.state == stateConnected {
231 logger.Warnw(context.Background(), "sending-disconnect-event", log.Fields{"endpoint": c.apiEndPoint, "error": err})
232 c.state = stateDisconnected
233 c.events <- eventDisconnected
234 }
235 c.stateLock.Unlock()
236 } else if strings.Contains(err.Error(), connectionClosedSubstring) {
237 logger.Errorw(context.Background(), "invalid-client-connection-closed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
238 }
239 return err
240 }
241 // Update activity on success only
242 c.updateActivity(ctx)
243 return nil
244}
245
246// updateActivity pushes an activity indication on the channel so that the monitoring routine does not validate
247// the gRPC connection when the connection is being used. Note that this update is done both when the connection
248// is alive or a connection error is returned. A separate routine takes care of doing the re-connect.
249func (c *Client) updateActivity(ctx context.Context) {
250 c.activeChMutex.RLock()
251 defer c.activeChMutex.RUnlock()
252 if c.activeCh != nil {
253 logger.Debugw(ctx, "update-activity", log.Fields{"api-endpoint": c.apiEndPoint})
254 c.activeCh <- struct{}{}
255
256 // Update liveness only in connected state
257 if c.livenessCallback != nil {
258 c.stateLock.RLock()
259 if c.state == stateConnected {
260 c.livenessCallback(time.Now())
261 }
262 c.stateLock.RUnlock()
263 }
264 }
265}
266
267func WithGrpcMonitorContext(ctx context.Context, name string) context.Context {
268 ctx = context.WithValue(ctx, grpcMonitorContextKey, name)
269 return ctx
270}
271
272func isGrpcMonitorKeyPresentInContext(ctx context.Context) bool {
273 if ctx != nil {
274 _, present := ctx.Value(grpcMonitorContextKey).(string)
275 return present
276 }
277 return false
278}
279
280// monitorActivity monitors the activity on the gRPC connection. If there are no activity after a specified
281// timeout, it will send a default API request on that connection. If the connection is good then nothing
282// happens. If it's bad this will trigger reconnection attempts.
283func (c *Client) monitorActivity(ctx context.Context, handler SetAndTestServiceHandler) {
284 logger.Infow(ctx, "start-activity-monitor", log.Fields{"endpoint": c.apiEndPoint})
285
286 // Create an activity monitor channel. Unbuffered channel works well. However, we use a buffered
287 // channel here as a safeguard of having the grpc interceptor publishing too many events that can be
288 // consumed by this monitoring thread
289 c.activeChMutex.Lock()
290 c.activeCh = make(chan struct{}, 10)
291 c.activeChMutex.Unlock()
292
293 // Interval to wait for no activity before probing the connection
294 timeout := c.monitorInterval
295loop:
296 for {
297 timeoutTimer := time.NewTimer(timeout)
298 select {
299
300 case <-c.activeCh:
301 logger.Debugw(ctx, "received-active-notification", log.Fields{"endpoint": c.apiEndPoint})
302
303 // Reset timer
304 if !timeoutTimer.Stop() {
305 <-timeoutTimer.C
306 }
307
308 case <-ctx.Done():
309 break loop
310
311 case <-timeoutTimer.C:
312 // Trigger an activity check if the state is connected. If the state is not connected then there is already
313 // a backoff retry mechanism in place to retry establishing connection.
314 c.stateLock.RLock()
315 runCheck := c.state == stateConnected
316 c.stateLock.RUnlock()
317 if runCheck {
318 go func() {
319 logger.Debugw(ctx, "connection-check-start", log.Fields{"api-endpoint": c.apiEndPoint})
320 subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
321 defer cancel()
322 subCtx = WithGrpcMonitorContext(subCtx, "grpc-monitor")
323 c.connectionLock.RLock()
324 defer c.connectionLock.RUnlock()
325 if c.connection != nil {
326 response := handler(subCtx, c.connection)
327 logger.Debugw(ctx, "connection-check-response", log.Fields{"api-endpoint": c.apiEndPoint, "up": response != nil})
328 }
329 }()
330 }
331 }
332 }
333 logger.Infow(ctx, "activity-monitor-stopping", log.Fields{"endpoint": c.apiEndPoint})
334}
335
336// Start kicks off the adapter agent by trying to connect to the adapter
337func (c *Client) Start(ctx context.Context, handler SetAndTestServiceHandler) {
338 logger.Debugw(ctx, "Starting GRPC - Client", log.Fields{"api-endpoint": c.apiEndPoint})
339
340 // If the context contains a k8s probe then register services
341 p := probe.GetProbeFromContext(ctx)
342 if p != nil {
343 p.RegisterService(ctx, c.apiEndPoint)
344 }
345
346 // Enable activity check, if required
347 if c.activityCheck {
348 go c.monitorActivity(ctx, handler)
349 }
350
351 initialConnection := true
352 c.events <- eventConnecting
353 backoff := NewBackoff(c.backoffInitialInterval, c.backoffMaxInterval, c.backoffMaxElapsedTime)
354 attempt := 1
355loop:
356 for {
357 select {
358 case <-ctx.Done():
359 logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
360 return
361 case event := <-c.events:
362 logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
363 switch event {
364 case eventConnecting:
365 logger.Debugw(ctx, "connection-start", log.Fields{"endpoint": c.apiEndPoint, "attempts": attempt})
366
367 c.stateLock.Lock()
368 if c.state == stateConnected {
369 c.state = stateDisconnected
370 }
371 if c.state != stateConnecting {
372 c.state = stateConnecting
373 go func() {
374 if err := c.connectToEndpoint(ctx, handler, p); err != nil {
375 c.stateLock.Lock()
376 c.state = stateDisconnected
377 c.stateLock.Unlock()
378 logger.Errorw(ctx, "connection-failed", log.Fields{"endpoint": c.apiEndPoint, "attempt": attempt, "error": err})
379
380 // Retry connection after a delay
381 if err = backoff.Backoff(ctx); err != nil {
382 // Context has closed or reached maximum elapsed time, if set
383 logger.Errorw(ctx, "retry-aborted", log.Fields{"endpoint": c.apiEndPoint, "error": err})
384 return
385 }
386 attempt += 1
387 c.events <- eventConnecting
388 } else {
389 backoff.Reset()
390 }
391 }()
392 }
393 c.stateLock.Unlock()
394
395 case eventConnected:
396 logger.Debugw(ctx, "endpoint-connected", log.Fields{"endpoint": c.apiEndPoint})
397 attempt = 1
398 c.stateLock.Lock()
399 if c.state != stateConnected {
400 c.state = stateConnected
401 if initialConnection {
402 logger.Debugw(ctx, "initial-endpoint-connection", log.Fields{"endpoint": c.apiEndPoint})
403 initialConnection = false
404 } else {
405 logger.Debugw(ctx, "endpoint-reconnection", log.Fields{"endpoint": c.apiEndPoint})
406 // Trigger any callback on a restart
407 go func() {
408 err := c.onRestart(log.WithSpanFromContext(context.Background(), ctx), c.apiEndPoint)
409 if err != nil {
410 logger.Errorw(ctx, "unable-to-restart-endpoint", log.Fields{"error": err, "endpoint": c.apiEndPoint})
411 }
412 }()
413 }
414 }
415 c.stateLock.Unlock()
416
417 case eventDisconnected:
418 if p != nil {
419 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusNotReady)
420 }
421 logger.Debugw(ctx, "endpoint-disconnected", log.Fields{"endpoint": c.apiEndPoint, "status": c.state})
422
423 // Try to connect again
424 c.events <- eventConnecting
425
426 case eventStopped:
427 logger.Debugw(ctx, "endPoint-stopped", log.Fields{"adapter": c.apiEndPoint})
428 go func() {
429 if err := c.closeConnection(ctx, p); err != nil {
430 logger.Errorw(ctx, "endpoint-closing-connection-failed", log.Fields{"endpoint": c.apiEndPoint, "error": err})
431 }
432 }()
433 break loop
434 case eventError:
435 logger.Errorw(ctx, "endpoint-error-event", log.Fields{"endpoint": c.apiEndPoint})
436 default:
437 logger.Errorw(ctx, "endpoint-unknown-event", log.Fields{"endpoint": c.apiEndPoint, "error": event})
438 }
439 }
440 }
441 logger.Infow(ctx, "endpoint-stopped", log.Fields{"endpoint": c.apiEndPoint})
442}
443
444func (c *Client) connectToEndpoint(ctx context.Context, handler SetAndTestServiceHandler, p *probe.Probe) error {
445 if p != nil {
446 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusPreparing)
447 }
448
449 c.connectionLock.Lock()
450 defer c.connectionLock.Unlock()
451
452 if c.connection != nil {
453 _ = c.connection.Close()
454 c.connection = nil
455 }
456
457 c.service = nil
458
459 // Use Interceptors to:
460 // 1. automatically inject
461 // 2. publish Open Tracing Spans by this GRPC Client
462 // 3. detect connection failure on client calls such that the reconnection process can begin
463 conn, err := grpc.Dial(c.apiEndPoint,
464 grpc.WithInsecure(),
465 grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
466 grpc_opentracing.StreamClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
467 )),
468 grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
469 grpc_opentracing.UnaryClientInterceptor(grpc_opentracing.WithTracer(log.ActiveTracerProxy{})),
470 )),
471 grpc.WithUnaryInterceptor(c.clientInterceptor),
472 // Set keealive parameter - use default grpc values
473 grpc.WithKeepaliveParams(keepalive.ClientParameters{
474 Time: c.monitorInterval,
475 Timeout: c.backoffMaxInterval,
476 PermitWithoutStream: true,
477 }),
478 )
479
480 if err == nil {
481 subCtx, cancel := context.WithTimeout(ctx, c.backoffMaxInterval)
482 defer cancel()
483 svc := handler(subCtx, conn)
484 if svc != nil {
485 c.connection = conn
486 c.service = svc
487 if p != nil {
488 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusRunning)
489 }
490 logger.Infow(ctx, "connected-to-endpoint", log.Fields{"endpoint": c.apiEndPoint})
491 c.events <- eventConnected
492 return nil
493 }
494 }
495 logger.Warnw(ctx, "Failed to connect to endpoint",
496 log.Fields{
497 "endpoint": c.apiEndPoint,
498 "error": err,
499 })
500
501 if p != nil {
502 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusFailed)
503 }
504 return fmt.Errorf("no connection to endpoint %s", c.apiEndPoint)
505}
506
507func (c *Client) closeConnection(ctx context.Context, p *probe.Probe) error {
508 if p != nil {
509 p.UpdateStatus(ctx, c.apiEndPoint, probe.ServiceStatusStopped)
510 }
511
512 c.connectionLock.Lock()
513 defer c.connectionLock.Unlock()
514
515 if c.connection != nil {
516 err := c.connection.Close()
517 c.connection = nil
518 return err
519 }
520
521 return nil
522}
523
524func (c *Client) Stop(ctx context.Context) {
525 if !c.done {
526 c.events <- eventStopped
527 close(c.events)
528 c.done = true
529 }
530}
531
532// SetService is used for testing only
533func (c *Client) SetService(srv interface{}) {
534 c.connectionLock.Lock()
535 defer c.connectionLock.Unlock()
536 c.service = srv
537}
538
539func (c *Client) SubscribeForLiveness(callback func(timestamp time.Time)) {
540 c.livenessCallback = callback
541}